使用的是kafka 0.11.0.0版本。

生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerFirst {

    private  final static String TOPIC = "first";

    public static void main(String[] args) {

        Properties props = new Properties();
        // 服务器ip:端口号,集群用逗号分隔
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.122.132:9092");
        // key序列化指定类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value序列化指定类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 向first发送hello, kafka
        producer.send(new ProducerRecord<String, String>(TOPIC, "生产者:发送出去了"));
        System.out.println("已发送");
        producer.close();

    }
}

消费者:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Properties;

public class KafkaComsumerFirst {

    private final static String TOPIC = "first";

    public static void main(String[] args) {

        Properties prop = new Properties();
        prop.put("bootstrap.servers", "192.168.122.132:9092");
        prop.put("group.id", "group-1");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");
        //auto.offset.reset=earliest 表示从头开始消费,latest消费最新的
        prop.put("auto.offset.reset", "latest");
        //prop.put("session.timeout.ms", "30000");
        //prop.put("partition.assignment.strategy", "range");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer =  new KafkaConsumer<String, String>(prop);
        TopicPartition partition = new TopicPartition(TOPIC, 0);
        ArrayList<String> list = new ArrayList<String>();
        list.add(TOPIC);
        consumer.subscribe(list);
        //list.add(partition);
        //consumer.assign(list);
        //consumer.subscribe(partition);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("offset:" + record.offset());
                System.out.println("value:" + record.value());
            }
        }
    }
}

出现的问题:

在linux内使用Shell可以成功发送消息创建Topic。 
但是在外部使用API无法发送消息。
具体原因:
Hostname and port the broker will advertise to producers and consumers. If not set, it uses the value for “listeners” if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName().
解决方法:
conf/service.properties中
#advertised.listeners=PLAINTEXT://your.host.name:9092
修改为
advertised.listeners=PLAINTEXT://192.168.84.136:9092 (为虚拟机的ip)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐