Flume抽取Oracle中的数据到Kafka
1.1 Flume的安装1)下载Flume 从Flume官网(http://flume.apache.org/download.html)下载对应版本的Flume,这里使用的是Flume的版本是1.7。2)解压Flume 新建flume目录,将下载的Flume上传到该目录下,执行如下命令进行解压:tar -zxvf ./apache-flume-1.7....
1.1 Flume的安装
1)下载Flume
从Flume官网(http://flume.apache.org/download.html)下载对应版本的Flume,这里使用的是Flume的版本是1.7。
2)解压Flume
新建flume目录,将下载的Flume上传到该目录下,执行如下命令进行解压:
tar -zxvf ./apache-flume-1.7.0-bin.tar.gz -C ./
3)解压后进入Flume的lib目录下,将Oracle数据库的驱动包拷贝到此目录下。
1.2 Flume对接Kafka
Flume的使用非常的简单,只需要在编辑一个配置文件即可。配置文件可以自定义,这里命名为flume-topic-oracle.conf。
- 声明source、channel、sink并声明source类型。
- 声明数据库连接、用户名、密码等。
- 设置自动提交、Oracle方言、数据库驱动等。
- 设置查询间隔、Flume状态文件位置及名称等。
其中,Flume状态文件会在启动Flume的时候自动创建,里面保存了查询语句、最后一次查询的最大索引号以及数据库链接等信息,如下图所示:
Flume的状态文件在每一次启动Flume的时候都会自动创建一个,理论上不需要删除,但经过测试,在不删除的情况下,导入到Inceptor的数据有时会存在重复的情况。所以建议在每次停止Flume服务之后将Flume状态文件删除。
- 设置查询的SQL语句以及从哪条数据开始查询。
- 设置分批参数以及c3p0连接池参数。
- 设置channel为内存模式。
- 设置sink的类型
sink可以设置为输出到本地、Kafka或者HDFS等。此处,设置输出到Kafka,方便后续Flume、Kafka、Slipstream的整合。
- 连接source、channel、sink
1.3 测试Flume+Kafka
Flume相应的配置文件写好之后,可以直接在Flume文件夹下执行如下命令启动Flume进行测试:
bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf/flume-topic-oracle.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf
但是,通常情况下,配置文件写好之后,在不变动的情况下,一般不会停止Flume服务,上述命令,会进去一个Flume运行的前台界面,这是最好使用前台转后台的方法启动Flume,如下:
nohup bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf/flume-topic-oracle.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &
其中,--conf指向参数存在的目录,--conf-file指向2.2中配置的参数。
启动Flume之后,向Oracle对应表中插入一条数据,如下图所示:
在集群中,启动Kafka消费者,可以发现,消费者中可以接收到刚刚插入的数据。
至此,Flume对接Kafka成功。
更多推荐
所有评论(0)