安装

需要JDK 1.7以上版本。

confluent 4.0下载地址:http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.zip
confluent 3.3下载地址:http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.tar.gz

直接解压就可以使用了。

加入全局path目录:

export PATH=<path-to-confluent>/bin:$PATH

启动

$ confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

查看日志文件目录

$ confluent current
# /tmp/confluent.BnzjkBY7
列出连接
$ confluent list connectors
Bundled Predefined Connectors (edit configuration under etc/):
  elasticsearch-sink
  file-source
  file-sink
  jdbc-source
  jdbc-sink
  hdfs-sink
  s3-sink

把本地文件当作生产者源

属性文件在./etc/kafka/connect-file-source.properties

# User defined connector instance name.
name=file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test

弄一些测试数据:

$ for i in {1..3}; do echo "log line $i"; done > test.txt

加载文件源

$ confluent load file-source
{
  "name": "file-source",
  "config": {
    "connector.class": "FileStreamSource",
    "tasks.max": "1",
    "file": "test.txt",
    "topics": "connect-test",
    "name": "file-source"
  },
  "tasks": []
}

查看加载的连接器

$ confluent status connectors
[
  "file-source"
]

查看具体连接器状态

可以使用这个方法来排错,经常使用。

$ confluent status file-source
{
  "name": "file-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.10.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.10.1:8083"
    }
  ]
}

在命令行里查看消息

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
  "log line 1"
  "log line 2"
  "log line 3"

使用连接器写文件数据

使用连接器file-sink,配置代码在./etc/kafka/connect-file-sink.properties:

# User defined name for the connector instance
name=file-sink
# Name of the connector class to be run
connector.class=FileStreamSink
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Output file name relative to worker's current working directory
# This is the only property specific to the FileStreamSink connector
file=test.sink.txt
# Comma separate input topic list
topics=connect-test
加载消费文件连接器
$ confluent load file-sink
{
  "name": "file-sink",
  "config": {
    "connector.class": "FileStreamSink",
    "tasks.max": "1",
    "file": "test.sink.txt",
    "topics": "connect-test",
    "name": "file-sink"
  },
  "tasks": []
}
查看状态
$ confluent status file-sink
{
  "name": "file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.10.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.10.1:8083"
    }
  ]
}

现在就有俩连接器了:

$ confluent status connectors
[
  "file-source",
  "file-sink"
]
查看效果
$ tail -f test.sink.txt

随机写入一些数据:

$ for i in {4..1000}; do echo "log line $i"; done >> test.txt

出来了:

"log line 1"
"log line 2"
"log line 3"
"log line 4"
"log line 5"
 ...

卸载连接器

$ confluent unload file-source
$ confluent unload file-sink

停用连接器

$ confluent stop connect
Stopping connect
connect is [DOWN]

关闭confluent

$ confluent stop
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

消灭痕迹

$ confluent destroy
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /tmp/confluent.BnzjkBY7

参考

https://docs.confluent.io/current/connect/quickstart.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐