confluent connect写出到ES及ClickHouse
1 连接Elasticsearch测试1.1 启动confluent/home/kafka/.local/confluent/bin/confluent startThis CLI is intended for development only, not for productionhttps://docs.confluent.io/current/cli/index.htmlUsing CON
1 连接Elasticsearch测试
1.1 启动confluent
/home/kafka/.local/confluent/bin/confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.swpIapNw
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
1.2 增加配置
vim /home/kafka/.local/confluent/etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
name=iot-elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=road_traffic
key.ignore=true
connection.url=http://10.0.165.8:9200
type.name=iot-kafka-connect
batch.size=1
flush.timeout.ms=200000
topic.schema.ignore=road_traffic
schema.ignore=true
retry.backoff.ms=3000
1.3 增加connect
bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
$ bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Warning: Install 'jq' to add support for parsing JSON
{"name":"iot-elasticsearch-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"road_traffic","key.ignore":"true","connection.url":"http://10.0.165.8:9200","type.name":"iot-kafka-connect","batch.size":"1","flush.timeout.ms":"200000","topic.schema.ignore":"road_traffic","schema.ignore":"true","retry.backoff.ms":"3000","name":"iot-elasticsearch-sink"},"tasks":[],"type":"sink"}
查看状态
$ bin/confluent status connectors
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
["iot-elasticsearch-sink"]
$ bin/confluent status iot-elasticsearch-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
{"name":"iot-elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.9:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.165.8:8083"}],"type":"sink"}
1.4 创建kafkatopic
/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --create --replication-factor 2 --partitions 2 --topic road_traffic
查看是否创建成功
/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --list
1.5 生产数据
(1)添加如下的依赖
<groupId>org.example</groupId>
<artifactId>Manufacturing_data</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>1.0.0</kafka.version>
<avro.version>1.8.0</avro.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(2)confluent的相关包在maven上是找不到的。需要自己手动添加,否则会报错找不到io.confluent.kafka.serializers.KafkaAvroSerializer。
confluent-4.0.0 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:我们需要 confluent-common 目录下的common-config-4.1.1.jar
、common-utils-4.1.1.jar
和全部以jackson
开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jar
和kafka-avro-serializer-4.1.1.jar
复制出来在模块下新建一个lib包放入,然后右键Add as Libary…
common-config-4.0.0.jar
common-utils-4.0.0.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.1.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.1.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-xc-1.9.13.jar
kafka-avro-serializer-4.0.0.jar
kafka-schema-registry-client-4.0.0.jar
生产者代码如下:
import java.io.File
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
case class RoadTraffic(status:String,avgMeasuredTime:Int,avgSpeed: Int, extID:String,
medianMeasuredTime: Int, timestamp: Long,vehicleCount:Int,id:Long,perort_id:String,process_time:Long)
object KafkaToTraffic {
def main(args: Array[String]): Unit = {
// kafka配置参数
val props = new Properties()
props.put("bootstrap.servers","10.0.165.8:9092,10.0.165.9:9092")
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://10.0.165.8:8081");
// Avro Schema解析
val schema:Schema = new Schema.Parser().parse(new File("E:\\working\\ideaWorking\\iot_road_traffic\\src\\main\\resources\\RoadTraffic.avsc"));
//val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
val avroRecord:GenericData.Record = new GenericData.Record(schema)
//创建一个kafka生产者
val producer: KafkaProducer[String,GenericRecord] = new KafkaProducer(props)
val str= "{\"status\":\"OK\",\"avgMeasuredTime\":\"53\",\"avgSpeed\":\"58\",\"extID\":\"724\",\"medianMeasuredTime\":\"53\",\"TIMESTAMP\":\"2014-04-25T19:35:00\",\"vehicleCount\":\"1\",\"id\":\"8961146\",\"perort_id\":\"179444\",\"process_time\":\"1593386110\"}"
val roadTraffic = JSON.parseObject(str, classOf[RoadTraffic])
System.out.println(roadTraffic)
avroRecord.put("status", roadTraffic.status);
avroRecord.put("avgMeasuredTime", roadTraffic.avgMeasuredTime);
avroRecord.put("avgSpeed", roadTraffic.avgSpeed);
avroRecord.put("extID", roadTraffic.extID);
avroRecord.put("medianMeasuredTime", roadTraffic.medianMeasuredTime);
avroRecord.put("timestamp", roadTraffic.timestamp);
avroRecord.put("vehicleCount", roadTraffic.vehicleCount);
avroRecord.put("id", roadTraffic.id);
avroRecord.put("perort_id", roadTraffic.perort_id);
avroRecord.put("process_time", roadTraffic.process_time);
try {
val record = new ProducerRecord[String, GenericRecord]("road_traffic", avroRecord)
System.out.println(record.toString)
producer.send(record).get()
} catch {
case e: Exception => e.printStackTrace()
}
producer.close();
}
}
RoadTraffic.avsc
{
"type": "record",
"name": "traffic",
"fields": [
{"name": "status", "type": "string"},
{"name": "avgMeasuredTime", "type": "int"},
{"name": "avgSpeed", "type": "int"},
{"name": "extID", "type": "string"},
{"name": "medianMeasuredTime", "type": "int"},
{"name": "timestamp", "type": "long"},
{"name": "vehicleCount", "type": "int"},
{"name": "id", "type": "long"},
{"name": "perort_id", "type": "string"},
{"name": "process_time", "type": "long"}
]
}
1.6 查看结果
加载iot-elasticsearch-sink后启动生产者,会自动在ES上建立与topic一样的索引,查看es
curl -GET http://10.0.165.8:9200/road_traffic/_search
$ curl -GET http://10.0.165.8:9200/road_traffic/_search
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"road_traffic","_type":"iot-kafka-connect","_id":"road_traffic+0+0","_score":1.0,"_source":{"status":"OK","avgMeasuredTime":53,"avgSpeed":58,"extID":"724","medianMeasuredTime":53,"timestamp":1398425700000,"vehicleCount":1,"id":8961146,"perort_id":"179444","process_time":1593386110}}]}}
2 连接ClickHouse测试
连接ClickHouse是通过jdbc的方式
2.1 增加配置
vim /home/kafka/.local/confluent/etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
name=iot-clickhouse-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=road_traffic
connection.url=jdbc:clickhouse://10.0.50.1:8123/iot
connection.user=default
auto.create=false
insert.mode=INSERT
table.name.format=traffic_all
errors.log.enable=true
db.timezone=Asia/Shanghai
2.2 增加jar包
通过jdbc连接ClickHouse是需要在/home/kafka/.local/confluent/share/java/kafka-connect-jdbc目录下增加ClickHouse的jdbc连接的jar包:clickhouse-jdbc-0.2.4.jar
$ ll
total 10952
-rw-r--r-- 1 kafka kafka 20437 Mar 27 08:37 audience-annotations-0.5.0.jar
-rw-r--r-- 1 root root 211574 Jun 29 12:30 clickhouse-jdbc-0.2.4.jar
-rw-r--r-- 1 kafka kafka 20903 Mar 27 08:37 common-utils-5.2.4.jar
-rw-r--r-- 1 kafka kafka 87325 Mar 27 08:37 jline-0.9.94.jar
-rw-r--r-- 1 kafka kafka 317816 Mar 27 08:37 jtds-1.3.1.jar
-rw-r--r-- 1 kafka kafka 223878 Mar 27 08:37 kafka-connect-jdbc-5.2.4.jar
-rw-r--r-- 1 kafka kafka 1292696 Mar 27 08:37 netty-3.10.6.Final.jar
-rw-r--r-- 1 kafka kafka 927447 Mar 27 08:37 postgresql-42.2.10.jar
-rw-r--r-- 1 kafka kafka 41203 Mar 27 08:37 slf4j-api-1.7.25.jar
-rw-r--r-- 1 kafka kafka 7064881 Mar 27 08:37 sqlite-jdbc-3.25.2.jar
-rw-r--r-- 1 kafka kafka 74798 Mar 27 08:37 zkclient-0.10.jar
-rw-r--r-- 1 kafka kafka 906708 Mar 27 08:37 zookeeper-3.4.13.jar
注意:需要重启confluent,否则会报错: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://10.0.50.1:8123/iot
2.3 在clickhouse建库建表
2.4 增加connect
因为前面已经在进行es测试的时候往road_traffic的主题上写入了一条数据直接进行测试
bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
[kafka@fbi-local-08 confluent]$ bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Warning: Install 'jq' to add support for parsing JSON
{"name":"iot-clickhouse-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"road_traffic","connection.url":"jdbc:clickhouse://10.0.50.1:8123/iot","connection.user":"default","auto.create":"false","insert.mode":"INSERT","table.name.format":"traffic_all","errors.log.enable":"true","db.timezone":"Asia/Shanghai","name":"iot-clickhouse-sink"},"tasks":[],"type":"sink"}
$ bin/confluent status iot-clickhouse-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
{"name":"iot-clickhouse-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.8:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"10.0.165.8:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Table \"traffic_all\" is missing and auto-creation is disabled\n\tat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:88)\n\tat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n"}],"type":"sink"}
当修改了iot-clickhouse-sink.properties中的表为本地表traffic时不报错。
解决如下:
修改kafka-connect-jdbc-5.2.4源码,增加clickhouse的连接然后将修改编译后的jar包上传到/home/kafka/.local/confluent/share/java/kafka-connect-jdbc,并删除原来的kafka-connect-jdbc-5.2.4.jar
更多推荐
所有评论(0)