背景

需要将redis 某个/类频道推送的数据写入到kafka topic中。

实现

一、编写kafka source connector代码和配置文件

我这里是在github上找了一个别人已经实现了的kafka-redis-connector,选取了source的部分。git地址
连接器配置文件xx.properties介绍:

name=定义连接器名字
connector.class=连接器的类名(例如:com.curtain.source.RedisSourceConnector或者RedisSourceConnector)
tasks.max=连接器创建任务的最大数
topic=要写入的kafka主题
redis.uri=redis地址(例如:redis://127.0.0.1:6379)
redis.cluster.enabled=false 不开启集群
redis.channels=订阅的redis频道
redis.channels.pattern.enabled=true 开启模糊订阅

二、打包
  1. 首先package该项目,会有一个该项目的jar包。
  2. 其次收集该项目所依赖的jars。如图:在这里插入图片描述
  3. 将这些收集到的jar包放在一个文件夹下,编写一个tar.bat文件将这些jar包全部解压。tar.bat文件内容如下:
@echo off
 
cd %1
for /r %cd% %%i in (*.jar) do (
	jar xf  %%i
)
 
pause
  1. 解压完之后,删除所有的.jar文件、META-INF文件夹下的.rsa 和.sf的文件。
  2. 打开cmd命令行,输入jar -cvfM connect-redis.jar . 打出一个包含所有依赖的超级jar
三、上传文件

将jar包上传到connect-standalone.properties文件里plugin.path所指的路径下,默认是/usr/share/java。再上传连接器的配置(位置没有要求,最好和别的kafka配置放一起)。

四、启动连接器

执行命令

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties

这样,我们就启动了一个kafka redis source connector啦。

20220825更新

pom文件中加入一下的代码就可以直接package就行啦,不用自己收集所依赖的jar了。

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>none</mainClass>     <!-- 取消查找本项目下的Main方法:为了解决Unable to find main class的问题 -->
                    <classifier>execute</classifier>    <!-- 为了解决依赖模块找不到此模块中的类或属性 -->
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
          <!-- Maven Assembly Plugin -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.4.1</version>
            <configuration>
              <!-- get all project dependencies -->
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <!-- bind to the packaging phase -->
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
    </build>
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐