在kafka中,有三个重要的抽象,分别为KStream,KTable和GlobalKTable。本文主要讲解GlobalTable。

1. Global table存在的理由

Kafka根据数据的key来分区,一般情况下相同的key会存入相同的分区中,如果使用两个KTable来进行join操作,因为两个表的数据在不同的分区中,甚至在不同的机器中,那么在进行join操作的时候需要进行类似于hadoop中的shuffle的操作,也就是数据分发。而硬盘的不断寻址读写操作和网络延迟会严重影响性能。

如果在流操作应用中,使用join操作来合并两张表,并且其中的一个表的数据集很小,而且改动不频繁。针对这个提出了优化方案:global table。

2. Global Table

Global table 是一个高层的抽象。假设A join B,其中A和B是两个数据集,其中B大小不大,并且存在global table中。A在kafka系统中被根据其key分区。在join操作时,B会整个儿复制到A分区相应的Kafka节点上面,这样所有的B数据都能够被A直接使用到。并且这个join操作不是基于A和B两个数据集的key的,B数据集并没有被分区。Join操作就不存在“shuffle”的过程,这样子就提高了系统的性能。

如果不使用global table,而是使用KTable,其操作图是下面这个样子的:

在这里插入图片描述

那么使用global table之后,其为:
在这里插入图片描述
也就是少了红圈中的内容。

而对应的代码分别为如下所示:
代码1:使用KTable

final KStream<Long, Purchase> purchases = builder.stream("purchase");
final KTable<Long, Customer> customers = builder.table("customer", "customer-store");
final KTable<Long, Product> products = builder.table("product", "product-store");
 
     
// re-key purchases stream on customerId
purchases.map((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase))
        // join to customers. This will create an intermediate topic to repartition
        // the purchases stream based on the customerId
        .leftJoin(customers, EnrichedPurchase::new)
        // re-key enrichedPurchase based on productId
        .map((key, enrichedPurchase) ->
                   KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase))
        // join to products. This will create an intermediate topic to repartition
        // the previous intermediate topic based on productId
        .leftJoin(products, EnrichedPurchase::withProduct);

代码2: 使用global table

final KStream<Long, Purchase> purchases = builder.stream("purchase");
final GlobalKTable<Long, Customer> customers = builder.globalTable("customer", "customer-store");
final GlobalKTable<Long, Product> products = builder.globalTable("product", "product-store");
 
 
// join to the customers table by providing a mapping to the customerId
purchases.leftJoin(customers,
                   ((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase),
                      EnrichedPurchase::new)
          // join to the products table by providing a mapping to the productId
          .leftJoin(products,
                    KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase),
                    EnrichedPurchase::withProduct);

结束! 谢谢!

参考:

文章1
文章2

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐