通过clickhouse中的kafka引擎将数据读取到clickhouse的kafka数据管道中,再通过物化视图将数据持久化到指定表中。

具体步骤如下:

1. 首先创建一张kafka引擎的表,让其充当数据管道:

create table kafka_queue
(
 id   Int16,
 code String,
 name String
  ) engine = Kafka ()
  settings
  kafka_broker_list = 'ip1:9092,ip2:9092,ip3:9092',
  kafka_topic_list = 'topicName',
  kafka_group_name = 'clickhouse',
  kafka_format = 'JSONEachRow',
  kafka_skip_broken_messages = 10;

2. 接着,新建一张持久化的表,也就是面向终端用户的查询表,这里是有MergeTree表引擎:

create table kafka_table (
    id Int16,
    code String,
    name String,
    insert_time DateTime DEFAULT now(),
    update_time Date  DEFAULT toDate(now())
    ) engine = MergeTree () order by id;

3. 最后,创建一张物化视图,用于将数据从kafka_queue同步到kafka_table:

create materialized view consumer TO kafka_table as
    select id,code,name from kafka_queue;

完成!

当向kafka主题中发送消息时,Json数据会映射到clickhouse的kafka_queue表中,再经过物化视图consumer将kafka_queue中的数据持久化到kafka_table表中。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐