<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
package kafka;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * @author zhangweiwen
 * @date 2018/1/10
 */
public class MyConsumer extends Thread {
    private String groupid;
    private String topic = "test";

    public MyConsumer(String groupid){
        this.groupid = groupid;
    }
    public void run() {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topic));
        consumer.seekToBeginning(new ArrayList<TopicPartition>());

        Map<String, List<PartitionInfo>> listTopics = consumer.listTopics();
        Set<Map.Entry<String, List<PartitionInfo>>> entries = listTopics.entrySet();

        for (Map.Entry<String, List<PartitionInfo>> entry:
             entries) {
            System.out.println("topic:" + entry.getKey());

        }

        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
            //按分区读取数据
//              for (TopicPartition partition : records.partitions()) {
//                  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
//                  for (ConsumerRecord<String, String> record : partitionRecords) {
//                      System.out.println("fetched from partition " + partition.partition() + ", "+ record.offset() + ": " + record.value());
//                  }
//              }

        }

    }


}

package kafka;


import org.apache.kafka.clients.producer.*;

import java.util.Properties;


public class MyProducer extends Thread{

    public void run() {

        try {

            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
            props.put("acks", "0");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            int i = 0;
            while(true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null)
                            e.printStackTrace();
                        System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                    }
                });
                i++;
                Thread.sleep(1000);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
import kafka.MyProducer;

/**
 * @author zhangweiwen
 * @date 2018/1/10
 */
public class HelloKafkaP {
    public static void main(String[] args) {

        new MyProducer().start();

    }


}

import kafka.MyConsumer;

/**
 * @author zhangweiwen
 * @date 2018/1/10
 */
public class HelloKafka {
    public static void main(String[] args) {

        new MyConsumer("test-consumer-group").start();

    }


}

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author zhangweiwen
 * @date 2018/1/10
 */
public class HelloKafka2 {
    public static void main(String[] args) throws Exception{
        String kafkaConnect = "localhost:9091,localhost:9092,localhost:9093";
        Properties config = new Properties();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnect);

        AdminClient admin = AdminClient.create(config);

        Map<String, String> configs = new HashMap<String,String>();
        int partitions = 1;
        short replication = 1;

//        CreateTopicsResult result = admin.createTopics(asList(new NewTopic("mynewtopic", partitions, replication).configs(configs)));


        ListTopicsResult result = admin.listTopics();
        for (Map.Entry<String, TopicListing> entry : result.namesToListings().get().entrySet()) {

            System.out.println("topic {} created:"+ entry.getValue().name());
        }
    }


}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐