提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

本文主要介绍

`提示:本文只展示了Kafka直接接入TDengine方式。

一、TDengine是什么?

TDengine 是一款专为物联网、工业互联网等场景设计并优化的大数据平台,它能安全高效地将大量设备、数据采集器每天产生的高达 TB 甚至 PB 级的数据进行汇聚、存储、分析和分发,对业务运行状态进行实时监测、预警,提供实时的商业洞察。其核心模块是高性能、集群开源、云原生、极简的时序数据库 TDengine OSS。

二、使用步骤

TDengine3.0相比起之前的版本,简化了接入kafka流程步骤,省略了Confluent的安装。具体流程如下:
TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。
参考https://docs.taosdata.com/third-party/kafka/
在这里插入图片描述TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。

在这里插入图片描述

1.前置条件

前提条件:

Linux 操作系统
KAFKA
已安装 Java 8 和 Maven
已安装 Git、curl、vi
已安装并启动 TDengine。如果还没有可参考安装和卸载

2.安装 TDengine Connector 插件

代码如下(示例):

git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package -Dmaven.test.skip=true
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip

以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 target/components/packages/ 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: $KAFKA_HOME/components/。


3.配置插件

将 kafka-connect-tdengine 插件加入 $KAFKA_HOME/config/connect-distributed.properties 配置文件 plugin.path 中

plugin.path=/usr/share/java,/opt/kafka/components

4.启动kafka

zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

5.验证是否成功

输入命令

curl http://localhost:8083/connectors

如果启动成功,会得到如下输出

[]

6.TDengine Sink Connector 的使用

采用的接收协议分为三种:
InfluxDB 行协议、OpenTSDB 行协议、OpenTSDB JSON 格式协议
主要介绍OpenTSDB JSON 格式协议
使用OpenTSDB JSON 格式协议的请求参数如下:

{
	"name": "TDengineSinkConnector20",
    "config": {
	    "connector.class":"com.taosdata.kafka.connect.sink.TDengineSinkConnector",
	    "tasks.max": "4",
	    "topics": "iiot.data.center",
	    "connection.url": "jdbc:TAOS-RS://IP:端口?user=XXX&password=XXXX",
	    "connection.user": "XXX",
	    "connection.password": "XXXX",
	    "connection.database": "test",
	    "db.schemaless": "json",
	    "data.precision": "ns",
	    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
	    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
	    "errors.tolerance": "all",
	    "errors.deadletterqueue.topic.name" : "dead_letter_topic",
	    "errors.deadletterqueue.topic.replication.factor" :1
  	}
}

需要注意的是最开始使用jdbc:TAOS:配置没有起效对与指定子表名,所以这里换成了jdbc:TAOS-RS,
各协议的配置基本相同,上面配信息的db.schemaless决定采用的是什么协议。
json是OpenTSDB JSON 格式协议,OpenTSDB 行协议是telent,InfluxDB 行协议是line
另外需要注意的是因为TDengine是时序数据库,时间戳是主键,所以数据的时间戳的格式要按照
"data.precision": "ns"
定义的不然会导致数据存储不进去的问题

总结

以上就是TDengine3.0直接接入kafka的使用,有问题随时留言

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐