六、Kafka Stream的介绍与使用
一、物理层概念1.1 物理层的功能物理层解决如何来连接各种计算机的传输媒体(电缆、光纤电缆等)上传输数据比特流,而不是指具体的传输媒体。物理层主要任务:确定与传输媒体接口有关的一些特性 定义标准1.2 物理层四大特性机械特性: 定义物理连接的特性,规定物理连接时所采用的规格、接口形状、引线数目、引脚数量和排列情况等电器特性:规定传输二进制位时,线路上信号的电压范围、阻抗匹配、传输速率和距离限制等。
本篇内容可以很好的帮助和理解Kafka stream的原理,这便于我们更好的使用它,内含一个搭建Kafka stream的实例,便于我们更好的掌握使用
一、Kafka Stream 介绍
1 、概述
Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统。Kafka Stream基于一个重要的流处理概念。如正确的区分事件时间和处理时间,窗口支持,以及简单而有效的应用程序状态管理。Kafka Streams的入口门槛很低: 你可以快速的编写和在单台机器上运行一个小规模的概念证明(proof-of-concept);而你只需要运行你的应用程序部署到多台机器上,以扩展高容量的生产负载。Kafka Stream利用kafka的并行模型来透明的处理相同的应用程序作负载平衡。
Kafka Stream 的亮点:
- 设计一个简单的、轻量级的客户端库,可以很容易地嵌入在任何java应用程序与任何现有应用程序封装集成。
- Apache Kafka本身作为内部消息层,没有外部系统的依赖,还有,它使用kafka的分区模型水平扩展处理,并同时保证有序。
- 支持本地状态容错,非常快速、高效的状态操作(如join和窗口的聚合)。
- 采用 one-recored-at-a-time(一次一个消息) 处理以实现低延迟,并支持基于事件时间(event-time)的窗口操作。
- 提供必要的流处理原语(primitive),以及一个 高级别的Steram DSL 和 低级别的Processor API。
2 、核心概念
我们首先总结Kafka Streams的关键概念。
Stream处理拓扑
- 流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
- 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
- 流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。
在拓扑中有两个特别的处理器:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
- Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
Kafka streams提供2种方式来定义流处理器拓扑:Kafka Streams DSL提供了更常用的数据转换操作,如map和filter;低级别Processor API允许开发者定义和连接自定义的处理器,以及和状态仓库交互。
处理器拓扑仅仅是流处理代码的逻辑抽象。
3、使用示例
注意:如果是搭建的集群的话,需要把各个broker都启动起来
3.1 启动相关配置
启动各个zookerper、broker、producer
创建所需要的topic
3.2 引入项目依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com</groupId>
<artifactId>KafkaTest02</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaTest02</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.3 创建一个测试类
这个类用于将输入到主题中的字段进行整理再输入到另一个主题中
package com.kafkatest02;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class StreamSample {
//定义数据来源的topic和输出到的topic
//注意这两个主题需要提前创建
public static final String INPUT_TOPIC = "countin";
public static final String OUTPUT_TOPIC = "countout";
//创建一个配置对象
static Properties getStreamsConfig() {
final Properties props = new Properties();
//新建一个Kafka消费者组
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
//监听broker的端口地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
//获取topic中最早的消息
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
static void createWordCountStream(final StreamsBuilder builder) {
final KStream<String, String> source = builder.stream(INPUT_TOPIC);
final KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.groupBy((key, value) -> value)
.count();
// need to override value serde to Long type
counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
public static void main(final String[] args) {
final Properties props = getStreamsConfig();
final StreamsBuilder builder = new StreamsBuilder();
createWordCountStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
3.4编写消费者类
用于输出整理后输入到主题中的字段数据
package com.kafkatest02;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Arrays;
import java.util.Properties;
public class UserLogConsumer {
@KafkaListener(topics = {"countout"})
public void consumer(){
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9091");
props.setProperty("group.id", "group01"); //注意这里是重新创建个分组,如果已存在则会报错
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "3000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition("countout", 0)));
consumer.seek(new TopicPartition("countout", 0), 0);//不改变当前offset
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println(record.value()+"~~~~~~~~~~~~~~~~~~~~~~~~~~~"+record.key());
}
consumer.commitAsync();
}
}
public static void main(String[] args) {
UserLogConsumer userLogConsumer=new UserLogConsumer();
userLogConsumer.consumer();
}
}
3.5编写消息生产者类
package com.kafkatest02.kafka_Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Properties;
import java.util.Scanner;
@Component
public class UserLogProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送数据
* @param topic
*/
public void sendLog(String topic,String msg){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, msg));
producer.close();
}
public static void main(String[] args) {
Scanner sc=new Scanner(System.in);
UserLogProducer producer=new UserLogProducer();
String s="";
do {
s=sc.nextLine();
producer.sendLog("countin",s);
}while(s!="n"||"n".equals(s));
}
}
3.6 启动三个类,并启动Kafka管理工具进行查看效果
在UserLogProducer的控制台中输入消息,在Kafka管理工具上进行查看效果
更多推荐
所有评论(0)