一、集成之前

1、启动zookeeper

2、启动kafka

3、查看kafka有哪些主题

[root@master bin]# ./kafka-topics.sh --list --zookeeper master:2181,slaves1:2181,slaves2:2181

topic_walkCount
mytest
mytopic

4、选择一个主题进行数据写入

[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667,slaves1:6667,slaves2:6667 --topic mytest
test01
test02
test03
test04

二、集成kafka

1、创建kafka.properties

在presto的每个节点的/opt/softWare/presto/ln_presto/etc/catalog目录下创建kafka.properties配置文件

connector.name=kafka
kafka.nodes=master:6667,slaves1:6667,slaves2:6667 //在单机启动3个broker,按照逗号进行分割
kafka.table-names=mytest,topic_walkCount,mytopic //每一个都是kafka中的一个topic
kafka.hide-internal-columns=false //这个connector有很多的kafka相关的默认字段,这里配置是否在客户端显示

2、启动presto

在每个节点启动presto

./launcher start

3、presto连接kafka

[root@master bin]# ./presto --server master:8099 --catalog=kafka
presto> show schemas;
       Schema       
--------------------
 default            
 information_schema 
(2 rows)

Query 20201118_015149_00001_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:04 [2 rows, 35B] [0 rows/s, 8B/s]

presto> use default;
USE
presto:default> show tables;
      Table      
-----------------
 mytest          
 mytopic         
 topic_walkcount 
(3 rows)

Query 20201118_015235_00003_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:03 [3 rows, 79B] [1 rows/s, 29B/s]
presto:default> select * from mytest;
 _partition_id | _partition_offset | _message_corrupt | _message | _message_length | _key_corrupt | _key | _key_length |  _timestamp   
---------------+-------------------+------------------+----------+-----------------+--------------+------+-------------+---------------
             0 |                 0 | false            | test01   |               6 | false        | NULL |           0 | 1605664501494 
             0 |                 1 | false            | test02   |               6 | false        | NULL |           0 | 1605664503897 
             0 |                 2 | false            | test03   |               6 | false        | NULL |           0 | 1605664506376 
             0 |                 3 | false            | test04   |               6 | false        | NULL |           0 | 1605664509616 
(4 rows)

Query 20201118_015514_00006_hiptr, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [4 rows, 24B] [2 rows/s, 12B/s]

presto:default> desc mytest;
      Column       |  Type   | Extra |                   Comment                   
-------------------+---------+-------+---------------------------------------------
 _partition_id     | bigint  |       | Partition Id                                
 _partition_offset | bigint  |       | Offset for the message within the partition 
 _message_corrupt  | boolean |       | Message data is corrupt                     
 _message          | varchar |       | Message text                                
 _message_length   | bigint  |       | Total number of message bytes               
 _key_corrupt      | boolean |       | Key data is corrupt                         
 _key              | varchar |       | Key text                                    
 _key_length       | bigint  |       | Total number of key bytes                   
 _timestamp        | bigint  |       | Offset Timestamp                            
(9 rows)

Query 20201118_015606_00007_hiptr, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:01 [9 rows, 819B] [7 rows/s, 723B/s]

presto:default>

三、kafka.properties的另一种写法

1、配置文件

connector.name=kafka
kafka.nodes=master:6667,slaves1:6667,slaves2:6667
kafka.table-names=mydb01.mytest,mydb01.topic_walkCount,mydb02.mytopic//此时mydb01.mytest,mydb01.topic_walkCount,mydb02.mytopic整体就是topic,mydb01、mydb02即是presto中显示的库名
kafka.hide-internal-columns=false

2、presto查询 

[root@master bin]# ./presto --server 192.168.230.21:8099 --catalog=kafka
presto> show schemas;
       Schema       
--------------------
 information_schema 
 mydb01             
 mydb02             
(3 rows)

Query 20201118_021745_00000_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:05 [3 rows, 45B] [0 rows/s, 8B/s]

presto> use mydb01;
USE
presto:mydb01> show tables;
      Table      
-----------------
 mytest          
 topic_walkcount 
(2 rows)

Query 20201118_021804_00002_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:02 [2 rows, 53B] [0 rows/s, 21B/s]

presto:mydb01> use mydb02;
USE
presto:mydb02> show tables;
  Table  
---------
 mytopic 
(1 row)

Query 20201118_021819_00005_g9y3u, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:02 [1 rows, 23B] [0 rows/s, 15B/s]

presto:mydb02>

3、注意

此时写数据要往mydb01.mytest主题写

[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667,slaves1:6667,slaves2:6667 --topic mydb01.mytest
test
test00
test55
presto:mydb01> select * from mytest;
 _partition_id | _partition_offset | _message_corrupt | _message | _message_length | _key_corrupt | _key | _key_length |  _timestamp   
---------------+-------------------+------------------+----------+-----------------+--------------+------+-------------+---------------
             0 |                 0 | false            | test   |               8 | false        | NULL |           0 | 1605667188718 
             0 |                 1 | false            | test00   |               6 | false        | NULL |           0 | 1605667192709 
             0 |                 2 | false            | test55   |               6 | false        | NULL |           0 | 1605667195083 
(3 rows)

Query 20201118_023959_00007_vzqka, FINISHED, 2 nodes
Splits: 17 total, 17 done (100.00%)
0:02 [3 rows, 20B] [1 rows/s, 11B/s]

presto:mydb01>
[root@master bin]./kafka-topics.sh --list --zookeeper master:2181,slaves1:2181,slaves2:2181

topic_walkCount
mytest
mytopic
mydb01.mytest

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐