<!-- java程序的Kakfa-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.10.0.0</version>
    </dependency>







/*消费者*/
public class KafKaConsumer {

  private final ConsumerConnector consumer;

    private KafKaConsumer() {
        Properties properties = new Properties();
        //zooKeeper配置
       properties.put("zookeeper.connect", "127.0.0.1:2181");

        //group代表一个消费组
        properties.put("group.id", "lingroup");
        //properties.put("zookeeper.sync.time.ms", "2000");
        properties.put("rebalance.max.retries", "5");
        properties.put("rebalance.backoff.ms", "1200");


        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "smallest");

        //序列化类
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
    }

    void consume(){
        Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
        topicCountMap.put(KafKaProducer.TOPIC,new Integer(1));
        StringDecoder keyDecoder=new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder=new StringDecoder(new VerifiableProperties());
        Map<String,List<KafkaStream<String,String>>> consumerMap=consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

         KafkaStream stream=consumerMap.get(KafKaProducer.TOPIC).get(0);
        ConsumerIterator<String,String> iterator=stream.iterator();
         while (iterator.hasNext()){


             System.out.println("接受消息>>>"+iterator.next().message());

    }

    }

  public static void main(String[] args){
      new KafKaConsumer().consume();
  }

}




生产者:

public class KafKaProducer {
    private final Producer<String,String> producer;
    public final static String TOPIC="linlin";

    private KafKaProducer(){
        Properties properties=new Properties();
        //此处设置kafka的端口
        properties.put("metadata.broker.list","127.0.0.1:9092");
       // properties.put("zk.connect","127.0.0.1:2181");
        //配置value的序列化类
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("key.serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "-1");

        producer=new Producer<String, String>(new ProducerConfig(properties));
    }


    void produce(){
        int messageNo=1000;
        final int count=10000;
        while (messageNo<count){
            String key=String.valueOf(messageNo);
            String data ="hello kafka message"+key;
            producer.send(new KeyedMessage<String, String>(TOPIC,key,data));
            System.out.println("发送消息:"+data);
            messageNo++;
        }


    }

  public static void main(String[] args){
       new KafKaProducer().produce();
  }

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐