Python创建制作者

让我们准备对 pykafka 作一些基本了解。只需 5 行代码,我们就可以轻松地向 Kafka 主题生成消息。

from pykafka import KafkaClient

client = KafkaClient(hosts='localhost:9092')
topic = client.topics['topicname']
producer = topic.get_sync_producer()
producer.produce('test message')

一旦我们对 Kafka Producer 有了基本的了解,我们就可以开始生成我们的总线数据了。我使用 geojson.io 的折线工具生成了一些随机总线坐标,并将它们作为文件保存到我的 PC。

现在我们准备好构建我们的消息并将它们生成到 Kafka 主题。

其次,我们将遍历坐标数组(第 10 行)并为每个坐标集。

Python创建消费者

现在用 Python 编写一个 Kafka 消费者了。我们将再次使用 pykafka 客户端,并再次从 pykafka 消费者的基本解释开始。

前两行对您来说应该很熟悉。导入 KafkaClient(第 1 行)后,我们指定 Kafka Broker 的位置并将其分配给客户端变量(第 3 行)。

现在,一旦我们对 pykafka 消费者有了基本的了解,让我们用 Flask 封装一个 API,我们可以从浏览器调用它。

最后,我们从 get_messages 函数返回消费的事件作为事件流。

创建实时交互式前端

我们将了解如何使用服务器发送事件从我们的 Javascript 前端调用 Python Kafka Consumer API,并将它们实时显示在地图上。

我们首先启动我们的 index.html。基本上,它包括生成基本地图的三个步骤。

首先,我们需要创建一个新的地图对象,并为其赋予与我们 div 的 id 相同的名称(mapid)。 setView() 中的三个参数是纬度、经度和缩放因子值(第 1 行)。

我们使用函数参数 e 获取事件消息。 我们可以使用 e.data 访问事件数据,并将其转换为 JSON 对象 obj(第 7 行)。 我们可以将其登录到控制台以查看结构,但它会提醒您我们在生产者中定义的结构(第 8 行)。

源代码

详情参阅 - 亚图跨际

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐