什么是Kafka Global Table (GlobalKTable)
在kafka中,有三个重要的抽象,分别为KStream,KTable和GlobalKTable。本文主要讲解GlobalTable。Global table存在的理由Kafka根据数据的key来分区,一般情况下相同的key会存入相同的分区中,如果使用两个KTable来进行join操作,那么join的结果需要进一步在硬盘中进行分区操作。而硬盘的不断寻址读写操作会严重影响性能。在流操作应用中,经常..
在kafka中,有三个重要的抽象,分别为KStream,KTable和GlobalKTable。本文主要讲解GlobalTable。
1. Global table存在的理由
Kafka根据数据的key来分区,一般情况下相同的key会存入相同的分区中,如果使用两个KTable来进行join操作,因为两个表的数据在不同的分区中,甚至在不同的机器中,那么在进行join操作的时候需要进行类似于hadoop中的shuffle的操作,也就是数据分发。而硬盘的不断寻址读写操作和网络延迟会严重影响性能。
如果在流操作应用中,使用join操作来合并两张表,并且其中的一个表的数据集很小,而且改动不频繁。针对这个提出了优化方案:global table。
2. Global Table
Global table 是一个高层的抽象。假设A join B,其中A和B是两个数据集,其中B大小不大,并且存在global table中。A在kafka系统中被根据其key分区。在join操作时,B会整个儿复制到A分区相应的Kafka节点上面,这样所有的B数据都能够被A直接使用到。并且这个join操作不是基于A和B两个数据集的key的,B数据集并没有被分区。Join操作就不存在“shuffle”的过程,这样子就提高了系统的性能。
如果不使用global table,而是使用KTable,其操作图是下面这个样子的:
那么使用global table之后,其为:
也就是少了红圈中的内容。
而对应的代码分别为如下所示:
代码1:使用KTable
final KStream<Long, Purchase> purchases = builder.stream("purchase");
final KTable<Long, Customer> customers = builder.table("customer", "customer-store");
final KTable<Long, Product> products = builder.table("product", "product-store");
// re-key purchases stream on customerId
purchases.map((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase))
// join to customers. This will create an intermediate topic to repartition
// the purchases stream based on the customerId
.leftJoin(customers, EnrichedPurchase::new)
// re-key enrichedPurchase based on productId
.map((key, enrichedPurchase) ->
KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase))
// join to products. This will create an intermediate topic to repartition
// the previous intermediate topic based on productId
.leftJoin(products, EnrichedPurchase::withProduct);
代码2: 使用global table
final KStream<Long, Purchase> purchases = builder.stream("purchase");
final GlobalKTable<Long, Customer> customers = builder.globalTable("customer", "customer-store");
final GlobalKTable<Long, Product> products = builder.globalTable("product", "product-store");
// join to the customers table by providing a mapping to the customerId
purchases.leftJoin(customers,
((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase),
EnrichedPurchase::new)
// join to the products table by providing a mapping to the productId
.leftJoin(products,
KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase),
EnrichedPurchase::withProduct);
结束! 谢谢!
参考:
更多推荐
所有评论(0)