生产者如下:

public class KafkaProduce1 {

    public static void main(String[] args) throws IOException{

        String topic="test";
        Properties prop = new Properties();
        prop.load(KafkaProduce1.class.getClassLoader().getResourceAsStream("producer.properties"));

        Producer<String,String> producer =
                new Producer<String, String>(new ProducerConfig(prop));

        KeyedMessage<String,String> message =
                new KeyedMessage<String, String>(topic,"key1","message1234");

        producer.send(message);
        producer.close();
    }
}
 

Producer.properies配置如下:

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=hadoop01:9092


# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.StringEncoder


Kafka消费者:


public class KafkaConsumer1 {
    public  static void  main(String[] args) throws IOException{

        String topic= "test";
        Properties prop = new Properties();
        prop.load(KafkaConsumer1.class.getClassLoader().getResourceAsStream("consumer.properties"));

        ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));

        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        topicMap.put(topic,1);//这里的1代表:只有一个消费者,若为多个 则需要多线程实现多个线程

        Map<String,List<KafkaStream<byte[],byte[]>>> messageStream =
                connector.createMessageStreams(topicMap);
        List<KafkaStream<byte[],byte[]>> helloMsg = messageStream.get(topic);

        String key="";
        for(KafkaStream<byte[],byte[]> he :helloMsg){
           ConsumerIterator<byte[],byte[]> iterator = he.iterator();
            while(iterator.hasNext()){
                MessageAndMetadata<byte[],byte[]> next = iterator.next();

                if(null!=next.key())
                    key=new String(next.key());

                System.out.println(String.format("key:%s msg:%s partitions:%s offset:%s",
                        new String(key),
                        new String(next.message()),
                        next.partition(),
                        next.offset()));
            }
        }

    }


}



Consumer.properties的配置如下:

zookeeper.connect=hadoop01:2181,hadoop01:2181,hadoop03:2181  

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group


生产者需要将消息生产道broker上,所以只需要指定broker的地址即可

而消费者则需要从zk上获取当前的进度信息,则需要指定zk信息




Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐