发送字节数组到 Kafka

测试 Kafka 连通性

往 Kafka 里面写数据时, 默认在不存在那个topic 时,会自动创建

测试能否连上某台机器的 kafka

1
2
3
yum install telnet
yum install telnet-server
telnet 10.10.10.35 9092

如果

1
2
3
Trying 10.10.10.35...
Connected to 10.10.10.35.
Escape character is '^]'.

说明可以连通。

同样适用于 zoomkeeper:

1
2
3
4
telnet 10.0.201.83 2181
Trying 10.0.201.83...
Connected to 10.0.201.83.
Escape character is '^]'.

从指定位置消费 Kafka

1
pip install kafka-python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import gzip
from kafka import KafkaConsumer
from kafka import TopicPartition

consumer = KafkaConsumer(bootstrap_servers='10.30.10.15:9092')
partition = TopicPartition('dc-diagnostic-report', 0)
start = 8833
end = 8835
consumer.assign([partition])
consumer.seek(partition, start)

i=start
for msg in consumer:
if msg.offset > end:
break
else:
print msg
try:
outF = file( '/data/app/tar/' + str(i) + ".gz", 'wb')
outF.write(msg.value)
outF.close()
i+=1
print i
except Exception,e:
print e

发送字节数组到 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package wm.helper.util;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*;
import java.util.Properties;

public class WmKafkaProducer {
public static void main(String[] args) throws IOException, InterruptedException {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); // 值的序列化

//生产者发送消息
String topic = "dc-diagnostic-report";
Producer<String, byte[]> procuder = new org.apache.kafka.clients.producer.KafkaProducer<String,byte[]>(props);

// 发送发字节数组
byte[] byfiles = getBytes("/Users/ohmycloud/work/cihon/sxw/wm-telematics/data/dtc.gz");
ProducerRecord<String, byte[]> msgtar = new ProducerRecord<String, byte[]>(topic, "LL2274082JW100128", byfiles);
procuder.send(msgtar);
procuder.close(); // 主动关闭生产者
System.out.println("消息发送完成。");
}

/**
* 获得指定文件的byte数组
*/
public static byte[] getBytes(String filePath){

byte[] buffer = null;
try {
File file = new File(filePath);
FileInputStream fis = new FileInputStream(file);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
byte[] b = new byte[1000];
int n;
while ((n = fis.read(b)) != -1) {
bos.write(b, 0, n);
}
fis.close();
bos.close();
buffer = bos.toByteArray();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
}