一、添加maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.2</version>
</dependency>
		
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.2</version>
</dependency>

二、java代码

(1)生产者

package com.ldy.kafka.service;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;

/**
 * @类名: KafkaMsgProducer<br>
 * @描述: kafka消息生产者(0.10.x版本)<br>
 * @创建者: lidongyang<br>
 * @修改时间: 2018年7月13日 下午5:07:40<br>
 */
@Service
public class KafkaMsgProducer {

	private static Properties props = new Properties();
	private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";
	
	static {
		props.put("bootstrap.servers", servers);
		props.put("acks", "all");
		props.put("retries ", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	}

	/**
	 * @方法名: sendMsg<br>
	 * @描述: 发送消息<br>
	 * @创建者: lidongyan<br>
	 * @修改时间: 2018年7月13日 下午5:05:52<br>  
	 * @param topic
	 * @param msg
	 */
	public static void sendMsg(String topic, String msg) {
		Producer<String, String> producer = null;
		try {
			producer = new KafkaProducer<String, String>(props);
			producer.send(new ProducerRecord<String, String>(topic, msg));
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(null != producer) {
				producer.close();
			}
		}
	}
	
	/**
	 * @方法名: sendMsg<br>
	 * @描述: 发送消息 K,V格式<br>
	 * @创建者: lidongyang<br>
	 * @修改时间: 2018年7月13日 下午5:09:52<br>  
	 * @param topic
	 * @param key
	 * @param value
	 */
	public static void sendMsg(String topic, String key, String value) {
		Producer<String, String> producer = null;
		try {
			producer = new KafkaProducer<String, String>(props);
			producer.send(new ProducerRecord<String, String>(topic, key, value));
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(null != producer) {
				producer.close();
			}
		}
	}

	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			sendMsg("test_topic", "content" + i);
		}
	}
	
	
}

(2)消费者

package com.ldy.kafka.service;

import java.util.Arrays;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * @类名: KafkaMsgConsumer<br>
 * @描述: kafka消息消费者(0.10.x版本)<br>
 * @创建者: lidongyang<br>
 * @修改时间: 2018年7月14日 上午10:35:23<br>
 */
public class KafkaMsgConsumer {

	private static Properties props = new Properties();
	private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";

	static {
		props.put("bootstrap.servers", servers);
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	}

	/**
	 * @方法名: receiveMsg<br>
	 * @描述: 接收消息<br>
	 * @创建者: lidongyang<br>
	 * @修改时间: 2018年7月14日 上午10:44:18<br>
	 * @param topic
	 * @param groupId
	 */
	public static void receiveMsg(String topic, String groupId) {
		if (StringUtils.isNotBlank(groupId)) {
			props.put("group.id", groupId);
		}

		try {
			@SuppressWarnings("resource")
			Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
			// 配置topic
			consumer.subscribe(Arrays.asList(topic));
			while (true) {
				// 得到ConsumerRecords实例
				ConsumerRecords<String, String> records = consumer.poll(100);
				for (ConsumerRecord<String, String> record : records) {
					// 直接通过record.offset()得到offset的值
					System.out.println("offset = " + record.offset());
					System.out.println("key = " + record.key() + ",value=" + record.value());
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		String topic = "test_topic_ldy";
		String groupId = "group1";
		receiveMsg(topic, groupId);
	}

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐