Confluent Platform 的快速上手
什么是 Confluent Platform?先说下什么是 Confluent ? Confluent由ApacheKafka®的原始创建者创立的,以Kafka为技术核心的公司。Confluent提供了业界唯一的企业级事件流平台,从而为应用程序和数据基础架构带来了新的范例。Confluent Platform(平台)基于此理念开发出来, 可以很方便的建立实时的数据流和流处理应用。让用户更加关注..
什么是 Confluent Platform?
先说下什么是 Confluent ? Confluent由ApacheKafka®的原始创建者创立的,以Kafka为技术核心的公司。
Confluent提供了业界唯一的企业级事件流平台,从而为应用程序和数据基础架构带来了新的范例。Confluent Platform(平台)基于此理念开发出来, 可以很方便的建立实时的数据流和流处理应用。让用户更加关注于业务价值。
快速开始
官网提供了三种使用方式,每个人都可以根据自己实际需求选择最合适的。我因个人练习,所以使用了 Confluent Platform Quick Start (Docker)。
Confluent Platform Quick Start
Confluent Platform Quick Start using Community Components
Confluent Cloud Quick Start
Step1 使用的Docker-Compose 快速的启动所需服务
Docker-compose 对于搭建基础环境,简直不要太爽。Confluent Platform的基础环境Docker-Compose文件如下:
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
schema-registry:
image: confluentinc/cp-schema-registry:5.4.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
connect:
image: cnfldemos/cp-server-connect-datagen:0.2.0-5.4.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:5.4.1
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
- connect
- ksql-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksql-server:
image: confluentinc/cp-ksql-server:5.4.1
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksql-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
ksql-cli:
image: confluentinc/cp-ksql-cli:5.4.1
container_name: ksql-cli
depends_on:
- broker
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
# Downrev ksql-examples to 5.1.2 due to DEVX-798 (work around issues in 5.2.0)
image: confluentinc/ksql-examples:5.4.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksql-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:5.4.1
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
运行 docker-compose up -d
启动服务就好
可以去Github上下载最新的配置文件. github 地址为 https://github.com/confluentinc/examples, 下载 cp-all-in-one 目录下的 docker-compose.yml 文件
启动好之后,通过 docker-compose ps
可以看到正常启动的服务
Name Command State Ports
------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp
ksql-cli /bin/sh Up
ksql-datagen bash -c echo Waiting for K ... Up
ksql-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Step2 创建练习需要使用的 Topics
服务启动成功之后,进入 Confluent 控制中心。Confluent 控制 中心提供了数据流处理应用。
- 浏览器中输入 http://localhost:9021 就可以打开。
- 从集群中选择
Topics
,并且点击Add a topic
就可以添加。
- 创建一个名为
pageviews
的Topic,并且选中 Create with defaults
- 重复2、3 步骤,创建一个名为
users
Kafka 主题。
Step3 安装一个Kafka 连接器并且生成一些简单的数据
这一步中,我们选用 kafka-connect-datagen
连接器来演示,如何简单入门怎么使用Kafka 连接器。kafka-connect-datagen
连接器是 CP 自带的,并且会为 pageviews
和 users
两个主题产生一些简单数据。
-
启动一个
Kafka Connect Datagen
连接器的运行实例,以AVRO
格式将Kafka数据发送到pageviews
主题中。 -
在
Cluster
集群主界面,点击导航栏中的 Connect -
找到 DatagenConnector 连接器,并且点击 Connect 按钮
- 命名新建的连接器为
datagen-pageviews
。新建的连接器属性定义如下:- Key converter class 属性, 写入
org.apache.kafka.connect.storage.StringConverter
. - kafka.topic 属性, 写入
pageviews
. - max.interval 属性, 写入
100
. - iterations 属性, 写入
1000000000
. - quickstart 属性, 写入
pageviews
.
- Key converter class 属性, 写入
- 完成后,点击继续按钮。属性配置大概如下;
使用同样的方式创建第二个连接器,名为datagen-users。将 users
主题下的数据导入,不同的在于将前面的 max.interval 属性设置为 1000
而不是 100
。
Step4 使用KSQL 来创建和写入 Stream 和 Table
KSQL 面向Apache Kafka的一种数据流SQL引擎,非常轻量,上手简单。
创建 Stream 和 Tables
着这里,我们为Kafka中的 pageviews
主题来创建一个 Stream,为 users
主题来创建一个表(table)。
- 在Cluster界面中,点击 KSQL 导航栏,选择 KSQL Application 进入
- 在 KSQL EDITOR 界面来操作,点击工具 栏中 **Streams ** 中的 Add Stream
- 选中出现的
pageviews
主题.
- 选中你自定义的 Stream 操作
- 在 Encoding 属性中选中
AVRO
- 确保Stream中字段的类型选中如下
viewtime
的类型为BIGINT
userid
的类型为VARCHAR
pageid
的类型为VARCHAR
- 在 Encoding 属性中选中
- 点击 Save Stream 按钮就好
以下步骤为如何为 Kafka 中的 users
主题来创建 Table。
-
选中工具栏中的
Table
- 在 Encoding 属性中选中
AVRO
- 在 Key 属性中,选中
userid
. - 确保Stream中字段的类型选中如下
registertime
的类型为BIGINT
userid
的类型为VARCHAR
regionid
的类型为VARCHAR
gender
的类型为VARCHAR
- 在 Encoding 属性中选中
- 完成后 Save Table
编写查询语句
在KSQL 的编辑界面 ,在 Add query properties 中 添加一个自定义查询属性,记得将 auto.offset.reset
设置为 earliest
。还有很多参数可以设置,详情见
KSQL 的语法同标注的SQL很像。比如下面
SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
输出的结构类似于如下:
如果我们想将前面创建的 pageviews
Stream 中的数据和 users
Table中的数据,(根据userid)右连接一下,生成新的流数据,过滤出其中 gender = 'FEMALE'
的数据,并且将新生成的流数据写入到 Kafka 中的 PAGEVIEWS_FEMALE
主题中。如下的KSQL可以实现
CREATE STREAM pageviews_female AS
SELECT users.userid AS userid, pageid, regionid, gender
FROM pageviews
LEFT JOIN users
ON pageviews.userid = users.userid WHERE gender = 'FEMALE';
运行成功后,可见如下的输出结果
在前面创建的好的 ``PAGEVIEWS_FEMALE主题下, 使用
LIKE语句创建一个满足指定的 regionid 条件的持久查询,并将该查询的结果写入名为
pageviews_enriched_r8_r9`的Kafka主题中。
CREATE STREAM pageviews_female_like_89
WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO')
AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
运行成功后,可见如下的输出结果
创建一个持久查询,当计数大于1时,将在30秒的 tumbling window 中对每个区域和性别组合的浏览量进行计数。由于该过程是分组和计数结果,因此结果是表(Table)而不是流(Stream)。该查询的结果将写入名为PAGEVIEWS_REGIONS
的Kafka主题。
CREATE TABLE pageviews_regions
AS SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_female WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid HAVING COUNT(*) > 1;
运行成功后,可见如下的输出结果
点击,Running queries 可以看到所有正在运行的查询。
在 Editor 右侧的,点开 All available streams and tables 可以看到所有的 Table 和 Stream。选择任意一个,可以看到对应的Schema。
Step5 监控消费者滞后
导航到 Consumers 视图,点击消费者组ID来查看所有的详细视图。比如看具体的 _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_3
消费者组。
在此页面上,您可以查看流查询的消费者滞后值和消费值。
Step 6: 停止Docker 容器
使用完Docker后,您可以停止和删除Docker容器和映像。
-
查看所有Docker容器ID的列表。
docker container ls -aq
-
运行以下命令以停止Confluent的Docker容器:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
-
运行以下命令可以停止容器并修剪Docker系统。运行这些命令将删除容器,网络,卷和映像。释放磁盘空间:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker") && docker system prune -a -f --volumes
参考的翻译原文链接:https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#step-5-monitor-consumer-lag
更多推荐
所有评论(0)