之前的Kafka学习笔记,我们介绍了Kafka的基本特性以及windows环境下Kafka的安装和使用,详情可以参见如下两篇博客:

kafka学习笔记(一)–初识kafka

kafka学习笔记(二)–windows环境下kafka2.1的安装和使用

因为Kafka的源码使用Scala和Java两种语言实现的,所以本篇博客的开发的案例中使用了
Java和Sacla两种语言

为了保持代码简洁性, Scala版本和Java版本,只在producer端提供详尽的注释,消费端只对不同于生产端的代码提供注释

本篇博客涉及到的所有代码,均已经进行脱敏处理上传到github
github连接地址:
本博客涉及到的java程序链接:
本篇博客涉及到的scala程序链接:
需要的童鞋可以自行登录github下载~

本篇博客要点如下:

一. 开发环境准备

pom.xml文件需要导入如下配置:

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
    </dependencies>
二. 生产端开发
2.1 Scala版本
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

/**
  * @author xmr
  * @date 2019/4/28 13:35
  * @description
  */
object KafkaProducerTestScala {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    // 指定broker的地址清单,地址格式为 host : port
    properties.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092")
    // 使用该类将键对象序列化为字节数组
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 使用该类将值对象序列化为字节数组
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 添加消费组(非必须操作,但生产端要和消费端保持一致)
    properties.put("group.id", "g2")
    // 创建生产者对象
    val producer=new KafkaProducer[String, String](properties)
    // 将要发送的消息,我这里选用的是json格式
    val json = "{\n    \"K_LOG_SOURCE\": \"posp\",\n    \"K_LOG_STATE\": \"25\",\n    \"K_LOG_DESC\": \"交易耗时\",\n    \"K_LOG_ERROR\": \"原交易不存在\",\n    \"K_LOG_QUERY\": {\n    \"LOG_NO\": \"367706019798\",\n    \"T42_MERC_ID\": \"826440397038005\",\n    \"TXN_TM\": \"20190428161147\",\n    \"AC_DT\": \"20190409\",\n    \"T41_TRM_NO\": \"J6010457\",\n    \"TXN_CD\": \"2020003\",\n    \"CORG_NO\": null\n    },\n    \"K_LOG_MS_END\": 1554806798155,\n    \"K_LOG_MS_START\": 1554806747000,\n    \"K_LOG_METHOD\": \"hessian\"\n    }"
    println(json)
    // 创建ProducerRecord对象, 传入参数是主题名, 和要发送的消息内容
    val rcd=new ProducerRecord[String, String]("posp_trade_time", json)
    // 发送消息
    producer.send(rcd)
    // 这里必须要调结束,否则kafka那边收不到消息
    producer.close()

  }
}

2.2 Java版本
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author xmr
 * @date 2019/4/28 13:41
 * @description Kafka生产端(Java版)
 */
public class KafkaProducerTestJava {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 指定broker的地址清单,地址格式为 host : port
        properties.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092");
        // 使用该类将键对象序列化为字节数组
        properties.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
        // 使用该类将值对象序列化为字节数组
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 添加消费组(非必须操作,但生产端要和消费端保持一致)
        properties.put("group.id", "g2");
        // 创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
        String json = "{\n    \"K_LOG_SOURCE\": \"posp\",\n    \"K_LOG_STATE\": \"25\",\n    \"K_LOG_DESC\": \"交易耗时\",\n    \"K_LOG_ERROR\": \"原交易不存在\",\n    \"K_LOG_QUERY\": {\n    \"LOG_NO\": \"367706019798\",\n    \"T42_MERC_ID\": \"826440397038005\",\n    \"TXN_TM\": \"20190428161147\",\n    \"AC_DT\": \"20190409\",\n    \"T41_TRM_NO\": \"J6010457\",\n    \"TXN_CD\": \"2020003\",\n    \"CORG_NO\": null\n    },\n    \"K_LOG_MS_END\": 1554806798155,\n    \"K_LOG_MS_START\": 1554806747000,\n    \"K_LOG_METHOD\": \"hessian\"\n    }";
        System.out.println(json);
        // 创建ProducerRecord对象, 传入参数是主题名, 和要发送的消息内容
        ProducerRecord producerRecord = new ProducerRecord<String, String>("posp_trade_time", json);
        // 发送消息 (消息先被放进缓冲区,然后使用单独的线程发送到服务器端)
        kafkaProducer.send(producerRecord);
        // 关闭
        kafkaProducer.close();
    }
}

三. Kafka消费端开发
3.1 Scala版本

import java.util
import java.util.{Arrays, Properties}
import scala.collection.JavaConversions._

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}

/**
  * @author xmr
  * @date 2019/4/28 13:55
  * @description
  */
object KafkaConsumerTestScala extends Thread{
  def main(args: Array[String]): Unit = {

    start()
  }

  override def run() {
    val properties = new Properties()
    properties.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092")
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("group.id", "g2")
    // 创建消费端对象
    val  kafkaConsumer = new KafkaConsumer[String,String](properties)
    // 订阅主题
    kafkaConsumer.subscribe(util.Arrays.asList("posp_trade_time"))
    // 消息轮询,消费端的核心
    while (true) {
      // 持续进行轮询,返回记录列表, 传递的参数是超时时间
      val records: ConsumerRecords[String, String] = kafkaConsumer.poll(1000)
      for (record <- records) {
        System.out.println("消费者消费到数据: " + record)
      }
      // 提交最后一个返回的偏移量
      kafkaConsumer.commitAsync()
    }
  }
}

3.2 Java版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author xmr
 * @date 2019/4/28 13:56
 * @description Kafka消费端(Java版)
 */
public class KafkaConsumerTestJava extends Thread{
    public static void main(String[] args) {
        KafkaConsumerTestJava kafkaConsumerTestJava = new KafkaConsumerTestJava();
        kafkaConsumerTestJava.start();
    }
    @Override
    public void run() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092");
        properties.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "g2");
        // 创建消费端对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        kafkaConsumer.subscribe(Arrays.asList("posp_trade_time"));
        // 消息轮询,消费端的核心
        while (true) {
            // 持续进行轮询,返回记录列表, 传递的参数是超时时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            // 对获取到的记录进行处理
            for(final ConsumerRecord record: records) {
                System.out.println("消费者消费到数据: " + record);
            }
            // 提交最后一个返回的偏移量
            kafkaConsumer.commitAsync();
        }

    }
}

四. 程序运行结果

首先开启消费端轮询监控
然后开启生产端生产数据
生产端开启之后,将会在消费端监测到数据:
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐