(四)presto集成kafka
一、集成之前1、启动zookeeper2、启动kafka3、查看kafka有哪些主题[root@master bin]# ./kafka-topics.sh --list --zookeeper master:2181,slaves1:2181,slaves2:2181topic_walkCountmytestmytopic4、选择一个主题进行数据写入[root@master bin]# ./ka
·
一、集成之前
1、启动zookeeper
2、启动kafka
3、查看kafka有哪些主题
[root@master bin]# ./kafka-topics.sh --list --zookeeper master:2181,slaves1:2181,slaves2:2181
topic_walkCount
mytest
mytopic
4、选择一个主题进行数据写入
[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667,slaves1:6667,slaves2:6667 --topic mytest
test01
test02
test03
test04
二、集成kafka
1、创建kafka.properties
在presto的每个节点的/opt/softWare/presto/ln_presto/etc/catalog目录下创建kafka.properties配置文件
connector.name=kafka
kafka.nodes=master:6667,slaves1:6667,slaves2:6667 //在单机启动3个broker,按照逗号进行分割
kafka.table-names=mytest,topic_walkCount,mytopic //每一个都是kafka中的一个topic
kafka.hide-internal-columns=false //这个connector有很多的kafka相关的默认字段,这里配置是否在客户端显示
2、启动presto
在每个节点启动presto
./launcher start
3、presto连接kafka
[root@master bin]# ./presto --server master:8099 --catalog=kafka
presto> show schemas;
Schema
--------------------
default
information_schema
(2 rows)
Query 20201118_015149_00001_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:04 [2 rows, 35B] [0 rows/s, 8B/s]
presto> use default;
USE
presto:default> show tables;
Table
-----------------
mytest
mytopic
topic_walkcount
(3 rows)
Query 20201118_015235_00003_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:03 [3 rows, 79B] [1 rows/s, 29B/s]
presto:default> select * from mytest;
_partition_id | _partition_offset | _message_corrupt | _message | _message_length | _key_corrupt | _key | _key_length | _timestamp
---------------+-------------------+------------------+----------+-----------------+--------------+------+-------------+---------------
0 | 0 | false | test01 | 6 | false | NULL | 0 | 1605664501494
0 | 1 | false | test02 | 6 | false | NULL | 0 | 1605664503897
0 | 2 | false | test03 | 6 | false | NULL | 0 | 1605664506376
0 | 3 | false | test04 | 6 | false | NULL | 0 | 1605664509616
(4 rows)
Query 20201118_015514_00006_hiptr, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [4 rows, 24B] [2 rows/s, 12B/s]
presto:default> desc mytest;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_message_corrupt | boolean | | Message data is corrupt
_message | varchar | | Message text
_message_length | bigint | | Total number of message bytes
_key_corrupt | boolean | | Key data is corrupt
_key | varchar | | Key text
_key_length | bigint | | Total number of key bytes
_timestamp | bigint | | Offset Timestamp
(9 rows)
Query 20201118_015606_00007_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:01 [9 rows, 819B] [7 rows/s, 723B/s]
presto:default>
三、kafka.properties的另一种写法
1、配置文件
connector.name=kafka
kafka.nodes=master:6667,slaves1:6667,slaves2:6667
kafka.table-names=mydb01.mytest,mydb01.topic_walkCount,mydb02.mytopic//此时mydb01.mytest,mydb01.topic_walkCount,mydb02.mytopic整体就是topic,mydb01、mydb02即是presto中显示的库名
kafka.hide-internal-columns=false
2、presto查询
[root@master bin]# ./presto --server 192.168.230.21:8099 --catalog=kafka
presto> show schemas;
Schema
--------------------
information_schema
mydb01
mydb02
(3 rows)
Query 20201118_021745_00000_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:05 [3 rows, 45B] [0 rows/s, 8B/s]
presto> use mydb01;
USE
presto:mydb01> show tables;
Table
-----------------
mytest
topic_walkcount
(2 rows)
Query 20201118_021804_00002_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:02 [2 rows, 53B] [0 rows/s, 21B/s]
presto:mydb01> use mydb02;
USE
presto:mydb02> show tables;
Table
---------
mytopic
(1 row)
Query 20201118_021819_00005_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:02 [1 rows, 23B] [0 rows/s, 15B/s]
presto:mydb02>
3、注意
此时写数据要往mydb01.mytest主题写
[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667,slaves1:6667,slaves2:6667 --topic mydb01.mytest
test
test00
test55
presto:mydb01> select * from mytest;
_partition_id | _partition_offset | _message_corrupt | _message | _message_length | _key_corrupt | _key | _key_length | _timestamp
---------------+-------------------+------------------+----------+-----------------+--------------+------+-------------+---------------
0 | 0 | false | test | 8 | false | NULL | 0 | 1605667188718
0 | 1 | false | test00 | 6 | false | NULL | 0 | 1605667192709
0 | 2 | false | test55 | 6 | false | NULL | 0 | 1605667195083
(3 rows)
Query 20201118_023959_00007_vzqka, FINISHED, 2 nodes
Splits: 17 total, 17 done (100.00%)
0:02 [3 rows, 20B] [1 rows/s, 11B/s]
presto:mydb01>
[root@master bin]./kafka-topics.sh --list --zookeeper master:2181,slaves1:2181,slaves2:2181
topic_walkCount
mytest
mytopic
mydb01.mytest
更多推荐
已为社区贡献2条内容
所有评论(0)