1 连接Elasticsearch测试

1.1 启动confluent

/home/kafka/.local/confluent/bin/confluent start

This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.swpIapNw
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

1.2 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

name=iot-elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=road_traffic
key.ignore=true
connection.url=http://10.0.165.8:9200
type.name=iot-kafka-connect
batch.size=1
flush.timeout.ms=200000
topic.schema.ignore=road_traffic
schema.ignore=true
retry.backoff.ms=3000

1.3 增加connect

bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

$ bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Warning: Install 'jq' to add support for parsing JSON
{"name":"iot-elasticsearch-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"road_traffic","key.ignore":"true","connection.url":"http://10.0.165.8:9200","type.name":"iot-kafka-connect","batch.size":"1","flush.timeout.ms":"200000","topic.schema.ignore":"road_traffic","schema.ignore":"true","retry.backoff.ms":"3000","name":"iot-elasticsearch-sink"},"tasks":[],"type":"sink"}

查看状态

$ bin/confluent status connectors
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
["iot-elasticsearch-sink"]

$ bin/confluent status iot-elasticsearch-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
{"name":"iot-elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.9:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.165.8:8083"}],"type":"sink"}

1.4 创建kafkatopic

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --create --replication-factor 2 --partitions 2 --topic road_traffic

查看是否创建成功

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --list

1.5 生产数据

(1)添加如下的依赖

    <groupId>org.example</groupId>
    <artifactId>Manufacturing_data</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>1.0.0</kafka.version>
        <avro.version>1.8.0</avro.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.66</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>${avro.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(2)confluent的相关包在maven上是找不到的。需要自己手动添加,否则会报错找不到io.confluent.kafka.serializers.KafkaAvroSerializer。

confluent-4.0.0 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:我们需要 confluent-common 目录下的common-config-4.1.1.jarcommon-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jarkafka-avro-serializer-4.1.1.jar
复制出来在模块下新建一个lib包放入,然后右键Add as Libary…

common-config-4.0.0.jar
common-utils-4.0.0.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.1.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.1.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-xc-1.9.13.jar
kafka-avro-serializer-4.0.0.jar
kafka-schema-registry-client-4.0.0.jar

生产者代码如下:

import java.io.File
import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

case class RoadTraffic(status:String,avgMeasuredTime:Int,avgSpeed: Int, extID:String,
                       medianMeasuredTime: Int, timestamp: Long,vehicleCount:Int,id:Long,perort_id:String,process_time:Long)

object KafkaToTraffic {

  def main(args: Array[String]): Unit = {
    // kafka配置参数
    val props = new Properties()
    props.put("bootstrap.servers","10.0.165.8:9092,10.0.165.9:9092")
    props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    props.put("schema.registry.url", "http://10.0.165.8:8081");

    // Avro Schema解析
    val schema:Schema = new Schema.Parser().parse(new File("E:\\working\\ideaWorking\\iot_road_traffic\\src\\main\\resources\\RoadTraffic.avsc"));
    //val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
    val avroRecord:GenericData.Record = new GenericData.Record(schema)


    //创建一个kafka生产者
    val producer: KafkaProducer[String,GenericRecord] = new KafkaProducer(props)

    val str= "{\"status\":\"OK\",\"avgMeasuredTime\":\"53\",\"avgSpeed\":\"58\",\"extID\":\"724\",\"medianMeasuredTime\":\"53\",\"TIMESTAMP\":\"2014-04-25T19:35:00\",\"vehicleCount\":\"1\",\"id\":\"8961146\",\"perort_id\":\"179444\",\"process_time\":\"1593386110\"}"
    val roadTraffic = JSON.parseObject(str, classOf[RoadTraffic])
    System.out.println(roadTraffic)
    avroRecord.put("status", roadTraffic.status);
    avroRecord.put("avgMeasuredTime", roadTraffic.avgMeasuredTime);
    avroRecord.put("avgSpeed", roadTraffic.avgSpeed);
    avroRecord.put("extID", roadTraffic.extID);
    avroRecord.put("medianMeasuredTime", roadTraffic.medianMeasuredTime);
    avroRecord.put("timestamp", roadTraffic.timestamp);
    avroRecord.put("vehicleCount", roadTraffic.vehicleCount);
    avroRecord.put("id", roadTraffic.id);
    avroRecord.put("perort_id", roadTraffic.perort_id);
    avroRecord.put("process_time", roadTraffic.process_time);

    try {
      val record = new ProducerRecord[String, GenericRecord]("road_traffic", avroRecord)
      System.out.println(record.toString)
      producer.send(record).get()

    } catch {
      case e: Exception => e.printStackTrace()
    }
    producer.close();


  }
}

RoadTraffic.avsc

{
    "type": "record",
    "name": "traffic",
    "fields": [
        {"name": "status", "type": "string"},
        {"name": "avgMeasuredTime",  "type": "int"},
        {"name": "avgSpeed",  "type": "int"},
        {"name": "extID", "type": "string"},
        {"name": "medianMeasuredTime", "type": "int"},
        {"name": "timestamp", "type": "long"},
        {"name": "vehicleCount", "type": "int"},
        {"name": "id", "type": "long"},
        {"name": "perort_id", "type": "string"},
        {"name": "process_time", "type": "long"}
    ]
}

1.6 查看结果

加载iot-elasticsearch-sink后启动生产者,会自动在ES上建立与topic一样的索引,查看es

curl -GET http://10.0.165.8:9200/road_traffic/_search

$ curl -GET http://10.0.165.8:9200/road_traffic/_search
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"road_traffic","_type":"iot-kafka-connect","_id":"road_traffic+0+0","_score":1.0,"_source":{"status":"OK","avgMeasuredTime":53,"avgSpeed":58,"extID":"724","medianMeasuredTime":53,"timestamp":1398425700000,"vehicleCount":1,"id":8961146,"perort_id":"179444","process_time":1593386110}}]}}

2 连接ClickHouse测试

连接ClickHouse是通过jdbc的方式

2.1 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

name=iot-clickhouse-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=road_traffic
connection.url=jdbc:clickhouse://10.0.50.1:8123/iot
connection.user=default
auto.create=false
insert.mode=INSERT
table.name.format=traffic_all
errors.log.enable=true
db.timezone=Asia/Shanghai

2.2 增加jar包

通过jdbc连接ClickHouse是需要在/home/kafka/.local/confluent/share/java/kafka-connect-jdbc目录下增加ClickHouse的jdbc连接的jar包:clickhouse-jdbc-0.2.4.jar

$ ll
total 10952
-rw-r--r-- 1 kafka kafka   20437 Mar 27 08:37 audience-annotations-0.5.0.jar
-rw-r--r-- 1 root  root   211574 Jun 29 12:30 clickhouse-jdbc-0.2.4.jar
-rw-r--r-- 1 kafka kafka   20903 Mar 27 08:37 common-utils-5.2.4.jar
-rw-r--r-- 1 kafka kafka   87325 Mar 27 08:37 jline-0.9.94.jar
-rw-r--r-- 1 kafka kafka  317816 Mar 27 08:37 jtds-1.3.1.jar
-rw-r--r-- 1 kafka kafka  223878 Mar 27 08:37 kafka-connect-jdbc-5.2.4.jar
-rw-r--r-- 1 kafka kafka 1292696 Mar 27 08:37 netty-3.10.6.Final.jar
-rw-r--r-- 1 kafka kafka  927447 Mar 27 08:37 postgresql-42.2.10.jar
-rw-r--r-- 1 kafka kafka   41203 Mar 27 08:37 slf4j-api-1.7.25.jar
-rw-r--r-- 1 kafka kafka 7064881 Mar 27 08:37 sqlite-jdbc-3.25.2.jar
-rw-r--r-- 1 kafka kafka   74798 Mar 27 08:37 zkclient-0.10.jar
-rw-r--r-- 1 kafka kafka  906708 Mar 27 08:37 zookeeper-3.4.13.jar

注意:需要重启confluent,否则会报错: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://10.0.50.1:8123/iot

2.3 在clickhouse建库建表

2.4 增加connect

因为前面已经在进行es测试的时候往road_traffic的主题上写入了一条数据直接进行测试

bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

[kafka@fbi-local-08 confluent]$ bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Warning: Install 'jq' to add support for parsing JSON
{"name":"iot-clickhouse-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"road_traffic","connection.url":"jdbc:clickhouse://10.0.50.1:8123/iot","connection.user":"default","auto.create":"false","insert.mode":"INSERT","table.name.format":"traffic_all","errors.log.enable":"true","db.timezone":"Asia/Shanghai","name":"iot-clickhouse-sink"},"tasks":[],"type":"sink"}

$  bin/confluent status iot-clickhouse-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

{"name":"iot-clickhouse-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.8:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"10.0.165.8:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Table \"traffic_all\" is missing and auto-creation is disabled\n\tat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:88)\n\tat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n"}],"type":"sink"}

当修改了iot-clickhouse-sink.properties中的表为本地表traffic时不报错。

解决如下:
修改kafka-connect-jdbc-5.2.4源码,增加clickhouse的连接然后将修改编译后的jar包上传到/home/kafka/.local/confluent/share/java/kafka-connect-jdbc,并删除原来的kafka-connect-jdbc-5.2.4.jar

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐