ksql介绍
参考:ksql官网查询:select * frommy_table emit changes;创建查询:CREATE STREAM AS SELECT or CREATE TABLE AS SELECT终止查询:非持久化查询:ctrl+c,持久化查询:terminate query_id;ksql语法pull querykafka主题非持久化SELECT select_expr [, ...]FR
参考:ksql官网
查询:select * from my_table emit changes;
创建查询:CREATE STREAM AS SELECT
or CREATE TABLE AS SELECT
终止查询:
- 非持久化查询:ctrl+c,
- 持久化查询:terminate query_id;
ksql语法
pull query
kafka主题非持久化
SELECT select_expr [, ...]
FROM aggregate_table
WHERE key_column=key
[AND window_bounds];
从实时表中pull当前值并终止。该语句的结果不会保留在Kafka主题中,而只会在控制台中打印出来。
通过pull查询,可以获取实例化视图的当前状态。
因为随着新事件的到来,物化视图会进行增量更新,所以pull查询的运行可预见地延时低。
它们非常适合请求/响应流。有关异步应用程序流,请参见push查询。
通过向ksqlDB REST API发送HTTP请求来执行请求查询,并且该API会以单个响应进行响应。
WHERE子句必须包含一个要检索的主键,如果对实例化表进行窗口化处理,则可以选择在WINDOWSTART上包含边界。
SELECT * FROM pageviews_by_region
WHERE regionId = 'Region_1'
AND 1570051876000 <= WINDOWSTART AND WINDOWSTART <= 1570138276000;
也可以使用 ISO-8601日期格式表达时间。AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16'
如果没有WINDOWSTART则返回所有windows的行
push query
SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_table ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];
将连续的更新流推送到ksqlDB流或表。
该语句的结果不会保留在Kafka主题中,只会在控制台中打印出来,或返回给客户端。
要停止在CLI中启动的推送查询,请按Ctrl + C。
通过CLI或通过向ksqlDB REST API发送HTTP请求来执行推送查询,然后该API发送回不确定长度的分块响应。
push查询使您可以订阅更改信息,从而可以实时响应新的信息。它们非常适合异步应用程序流。
推式查询可以使用所有可用的SQL功能,当对持久性查询进行原型设计或从CLI运行临时查询时,此功能很有用。但是与持久查询不同,推送查询不共享。如果多个客户端提交相同的push查询,则ksqlDB将计算每个客户的独立结果。
SELECT * FROM pageviews
WHERE ROWTIME >= 1510923225000
AND ROWTIME <= 1510923228000
EMIT CHANGES;
SELECT * FROM pageviews EMIT CHANGES LIMIT 5;
如果想获取更久远的数据,设置SET 'auto.offset.reset' = 'earliest';
WINDOW子句可控制如何将具有相同键的输入记录分组到窗口中,以进行聚合或联接之类的操作。Windows中每个记录键都被记录。
窗口化为数据添加了两个附加的系统列,它们提供了窗口边界:WINDOWSTART和WINDOWEND。
ksqlDB支持以下WINDOW类型。
翻转窗口
滚动窗口根据记录的时间戳将输入记录分组为固定大小的非重叠窗口。
您必须指定滚动窗口的窗口大小。滚动窗口是跳跃窗口的一种特殊情况,其中窗口大小等于前进间隔。
SELECT windowstart, windowend, item_id, SUM(quantity)
FROM orders
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY item_id
EMIT CHANGES;
跳跃窗口
跳窗将输入记录根据记录的时间戳分为固定大小(可能)重叠的窗口。
您必须指定窗口大小和跳跃窗口的前进间隔。
SELECT windowstart, windowend, item_id, SUM(quantity) FROM orders WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS) GROUP BY item_id EMIT CHANGES;
会话窗口
会话窗口将输入记录分组为所谓的会话。
您必须为会话窗口指定会话不活动间隔参数。
例如,假设将闲置时间间隔设置为5分钟。
如果对于给定的记录键(例如“ alice”),超过5分钟没有新输入数据到达,则“ alice”的当前会话关闭,并且将来任何“ alice”的新到达数据将标记新会话的开始。
SELECT windowstart, windowend, item_id, SUM(quantity)
FROM orders
WINDOW SESSION (20 SECONDS)
GROUP BY item_id
EMIT CHANGES;
以下语句显示了如何创建具有会话窗口的推式查询。
乱序事件
窗口结束后最多两个小时接受事件。
宽限期结束后到达的事件将被删除,并且不包括在汇总结果中。
SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS)
GROUP BY order_zipcode
EMIT CHANGES;
问题总结:
消息到达失败,通过消息数据判断。DESCRIBE EXTENDED GOOD_RATINGS;
可判断有串行化错误。
select 查询没有返回结果
- 查看查询语句中的源话题
DESCRIBE EXTENDED PAGEVIEWS;
- 验证源话题有数据涌入
- 验证源话题有新数据涌入
SET 'auto.offset.reset'='earliest';
再写查询语句 - 没有串行化deserialization错误
DESCRIBE EXTENDED pageviews;
检查VALUE_FORMAT
ksqldb命令窗口没有连接到ksqldb服务器(端口、配置被占用)
不能通过windowed聚合的输出创建流(不支持结构化keys)
主题未被真正删除delete.topic.enable=true
avro shema重复主题导致错误
Confluent Replicator在复制期间重命名主题。
如果存在关联的Avro模式,复制完成后,它们不会自动与重命名的主题匹配。
将PRINT语句用于复制的主题将显示Avro架构ID存在于架构注册表中。
ksqlDB可以反序列化Avro消息,但是CREATE STREAM语句失败,并出现反序列化错误。
解决方案是针对该主题的复制主题名称手动注册Avro模式。 curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $(curl -s http://localhost:8081/subjects/pageviews-value/versions/latest | jq '.schema')}" http://localhost:8081/subjects/pageviews.replica-value/versions
Snappy编码的邮件不会解压缩-Dorg.xerial.snappy.tempdir=/path/to/newtmp
检查ksqldb日志confluent local log connect
更多推荐
所有评论(0)