整合步骤

  1. 安装启动zookeeper
  2. 安装启动kafka
  3. 创建消息生产者
  4. 创建消息消费者

1、安装启动zookeeper

zoo.cfg:配置
dataDir=D:\Sofeware\apache-zookeeper-3.5.5-bin\tmp
dataLogDir=D:\Sofeware\apache-zookeeper-3.5.5-bin\logs
双击zkServer.cmd启动
在这里插入图片描述

2、安装启动kafka

下载解压
在这里插入图片描述
server.properties:配置 zookeeper链接地址
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

启动kafka
cd 到安装目录 .\bin\windows\kafka-server-start.bat .\config\server.properties
D:\Sofeware\kafka_2.11-2.3.0>.\bin\windows\kafka-server-start.bat .\config\serve
r.properties

在这里插入图片描述

3、创建消息生产者

applicationContext.xml:配置
加载 kafka.properties 配置文件
导入producer-kafka.xml配置文件

<!-- 装载属性配置文件 -->
	<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
		<property name="ignoreResourceNotFound" value="true" />
		<property name="locations">
			<list>
				<value>classpath:conf/kafka.properties</value>
			</list>
		</property>
	</bean>
<!-- 导入kafka配置文件 -->
	<import resource="producer-kafka.xml" />

kafka.properties:配置

################# kafka producer ##################
# brokers集群
kafka.producer.bootstrap.servers = localhost:9092

kafka.producer.acks = all

#发送失败重试次数
kafka.producer.retries = 3

kafka.producer.linger.ms =  10

# 33554432 即32MB的批处理缓冲区
kafka.producer.buffer.memory = 40960

#批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能
kafka.producer.batch.size = 4096

kafka.producer.defaultTopic = alarm

kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer

kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer

producer-kafka.xml:配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

	<!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
	<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<!-- kafka服务地址,可能是集群-->
				<entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
				<!-- 有可能导致broker接收到重复的消息,默认值为3-->
				<entry key="retries" value="${kafka.producer.retries}" />
				<!-- 每次批量发送消息的数量-->
				<entry key="batch.size" value="${kafka.producer.batch.size}" />
				<!-- 默认0ms,在异步IO线程被触发后(任何一个topic,partition满都可以触发)-->
				<entry key="linger.ms" value="${kafka.producer.linger.ms}" />
				<!--producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常 -->
				<entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
				<!-- producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号-->
				<entry key="acks" value="${kafka.producer.acks}" />
				<entry key="key.serializer"
					   value="${kafka.producer.key.serializer}" />
				<entry key="value.serializer"
					   value="${kafka.producer.value.serializer}"/>
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		  class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!--&lt;!&ndash; 3.定义生产者监听 &ndash;&gt;-->
	<bean id="kafkaProducerListener" class="com.bdxh.rpc.service.KafkaProducerListener" />

	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<!--设置对应topic-->
		<property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
	</bean>
</beans>

创建producer-kafka.xml中的 KafkaProducerListener.java监听类.


import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.springframework.kafka.support.ProducerListener;

public class KafkaProducerListener implements ProducerListener {

    protected final Logger logger = Logger.getLogger(KafkaProducerListener.class.getName());

    public KafkaProducerListener(){

    }

    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        logger.info("-----------------kafka发送数据成功");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("----------RecordMetadata:"+recordMetadata);
        logger.info("-----------------kafka发送数据结束");
    }

    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        logger.info("-----------------kafka发送数据失败");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("-----------------kafka发送数据失败结束");
        e.printStackTrace();
    }

    /**
     * 是否启动Producer监听器
     * @return
     */
    @Override
    public boolean isInterestedInSuccess() {
        return false;
    }
}

最后加上POM依赖

<!-- zookeeper -->
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.5</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.5.3</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<artifactId>spring</artifactId>
					<groupId>org.springframework</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- kafka -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>1.0.1</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>1.3.5.RELEASE</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

写个Controller 方法用来测试:

@Resource
	private KafkaTemplate<Integer, String> kafkaTemplate;

	/**
	 * 发送消息 - 测试
	 */
	@RequestMapping("/sendTest")
	@ResponseBody
	public BaseResp sendTest(HttpServletRequest request) {

		kafkaTemplate.sendDefault("Im from producer !");
		System.out.println("send success !");
		return BaseResp.success("发送成功");

	}

4、创建消息消费者

applicationContext.xml:配置
加载kafka.properties配置
导入consumer-kafka.xml配置文件

<!-- 装载属性配置文件 -->
	<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
		<property name="ignoreResourceNotFound" value="true" />
		<property name="locations">
			<list>
				<value>classpath:conf/kafka.properties</value>
			</list>
		</property>
	</bean>
	<!-- 导入consumer-kafka.xml配置文件 -->
	<import resource="consumer-kafka.xml" />

kafka.properties 配置文件:

################# kafka consumer ##################
kafka.consumer.bootstrap.servers = localhost:9092

# 如果为true,消费者的偏移量将在后台定期提交
kafka.consumer.enable.auto.commit = true

#如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
kafka.consumer.auto.commit.interval.ms=1000 

#order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
kafka.consumer.group.id = ebike-alarm

#告警topic
kafka.alarm.topic = alarm

#在使用Kafka的组管理时,用于检测消费者故障的超时
kafka.consumer.session.timeout.ms = 30000

kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

consumer-kafka.xml 配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
	<!-- 1.定义consumer的参数 -->
	<!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
	<bean id="consumerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<!--Kafka服务地址 -->
				<entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
				<!--Consumer的组ID,相同goup.id的consumer属于同一个组。 -->
				<entry key="group.id" value="${kafka.consumer.group.id}" />
				<!--如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。 -->
				<entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
				<!--网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定 -->
				<entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
				<entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
				<entry key="retry.backoff.ms" value="100" />
				<entry key="key.deserializer"
					   value="${kafka.consumer.key.deserializer}" />
				<entry key="value.deserializer"
					   value="${kafka.consumer.value.deserializer}" />
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建consumerFactory bean -->
	<bean id="consumerFactory"
		  class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
		<constructor-arg>
			<ref bean="consumerProperties" />
		</constructor-arg>
	</bean>

	<!--指定具体监听类的bean -->
	<bean id="kafkaConsumerService" class="com.bdxh.ebike.api.controller.KafkaConsumerMessageListener" />

	<!-- 4.消费者容器配置信息 -->
	<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
		<!-- topic -->
		<constructor-arg name="topics">
			<list>
				<value>${kafka.alarm.topic}</value>
			</list>
		</constructor-arg>
		<property name="messageListener" ref="kafkaConsumerService" />
	</bean>
	<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
	<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
		<constructor-arg ref="consumerFactory" />
		<constructor-arg ref="containerProperties" />
	</bean>
</beans>

创建consumer-kafka.xml 配置中的 KafkaConsumerMessageListener.java 监听类


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.kafka.listener.MessageListener;

public class KafkaConsumerMessageListener implements MessageListener<String,Object> {

    private Logger logger = Logger.getLogger(KafkaConsumerMessageListener.class.getName());

    public KafkaConsumerMessageListener(){

    }

    /**
     * 消息接收-LOG日志处理
     * @param record
     */
    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        logger.info("=============kafka消息订阅=============");

        String topic = record.topic();
        String key = record.key();
        Object value = record.value();
        long offset = record.offset();
        int partition = record.partition();

        /*if (ConstantKafka.KAFKA_TOPIC1.equals(topic)){
            doSaveLogs(value.toString());
        }*/

        Object o = record.value();
        logger.info(o.toString());

        logger.info("-------------topic:"+topic);
        logger.info("-------------value:"+value);
        logger.info("-------------key:"+key);
        logger.info("-------------offset:"+offset);
        logger.info("-------------partition:"+partition);
        logger.info("=============kafka消息订阅=============");
    }

}

consumer 消费者POM依赖:(和producer生产者一致)

<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.5</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.5.3</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<artifactId>spring</artifactId>
					<groupId>org.springframework</groupId>
				</exclusion>
			</exclusions>
		</dependency>

到此一切都配置完成。做个测试:

  1. 启动zookeeper
  2. 启动kafka
  3. 启动producer 消息提供者项目
  4. 启动consumer 消息消费者项目
  5. 调用producer 中 测试的controller --> /sendTest
    结果:
    producer:
    在这里插入图片描述
    consumer:
    在这里插入图片描述
    成功的生产了消息,和消费了消息。

完结 。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐