Kafka的 Consumer和Producer
审查会这public class KafkaProduce1 {public static void main(String[] args) throws IOException{String topic="test";Properties prop = new Properties();prop.load(KafkaProd
·
生产者如下:
public class KafkaProduce1 {
public static void main(String[] args) throws IOException{
String topic="test";
Properties prop = new Properties();
prop.load(KafkaProduce1.class.getClassLoader().getResourceAsStream("producer.properties"));
Producer<String,String> producer =
new Producer<String, String>(new ProducerConfig(prop));
KeyedMessage<String,String> message =
new KeyedMessage<String, String>(topic,"key1","message1234");
producer.send(message);
producer.close();
}
}
Producer.properies配置如下:
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=hadoop01:9092
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.StringEncoder
Kafka消费者:
public class KafkaConsumer1 {
public static void main(String[] args) throws IOException{
String topic= "test";
Properties prop = new Properties();
prop.load(KafkaConsumer1.class.getClassLoader().getResourceAsStream("consumer.properties"));
ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
Map<String,Integer> topicMap = new HashMap<String,Integer>();
topicMap.put(topic,1);//这里的1代表:只有一个消费者,若为多个 则需要多线程实现多个线程
Map<String,List<KafkaStream<byte[],byte[]>>> messageStream =
connector.createMessageStreams(topicMap);
List<KafkaStream<byte[],byte[]>> helloMsg = messageStream.get(topic);
String key="";
for(KafkaStream<byte[],byte[]> he :helloMsg){
ConsumerIterator<byte[],byte[]> iterator = he.iterator();
while(iterator.hasNext()){
MessageAndMetadata<byte[],byte[]> next = iterator.next();
if(null!=next.key())
key=new String(next.key());
System.out.println(String.format("key:%s msg:%s partitions:%s offset:%s",
new String(key),
new String(next.message()),
next.partition(),
next.offset()));
}
}
}
}
Consumer.properties的配置如下:
zookeeper.connect=hadoop01:2181,hadoop01:2181,hadoop03:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
生产者需要将消息生产道broker上,所以只需要指定broker的地址即可
而消费者则需要从zk上获取当前的进度信息,则需要指定zk信息
更多推荐
所有评论(0)