首先导入相关pom文件依赖,这里使用的kafak0.8.1,scala是2.10.4版本,注意导入正确的版本,与你的kafka集群版本相匹配。

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.just.shinelon</groupId>
  <artifactId>SparkSql_Proj</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>

  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.10.4</version>
    </dependency>

    <!-- kafak依赖-->
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.2.0</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>2.10.4</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>2.10.4</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

将需要使用的配置属性定一个properties文件。
KafkaProperties.java:

package cn.just.spark.kafka.producer;

/**
 * 配置属性常量
 */
public class KafkaProperties {
    public static final String ZK="hadoop-senior.shinelon.com:2181";      //Zookeeper地址
    public static final String TOPIC="topic01";                          //topic名称
    public static final String BROKER_LIST="hadoop-senior.shinelon.com:9092";    //Broker列表
    public static final String GROUP_ID="test_group01";                 //消费者使用
}

KafkaProducer.java:

package cn.just.spark.kafka.producer;



import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import scala.collection.Seq;

import java.util.Properties;

/**
 * Kafka java API:Producer
 */
public class KafkaProducer extends Thread{

    public String topic;

    public Producer<Integer,String> producer;

    public KafkaProducer(String topic){
        this.topic=topic;

        Properties properties=new Properties();

        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        //设置生产者与消费者的生产握手机制:0代表不需要Broker回复消息
        //1表示等到Broker回复消息之后继续生产
        //-1表示需要所有的Broker都回复消息之后才继续,这种更严格,数据不会丢失,持久性更好
        properties.put("request.required.acks","1");

        ProducerConfig config=new ProducerConfig(properties);

        producer=new Producer<Integer, String>(config);
    }

    @Override
    public void run() {
        int messageId=1;
        while(true){
            String message="kafkaProducer"+messageId;
            producer.send(new KeyedMessage<Integer, String>(topic,message));
            System.out.println(message);
            messageId++;

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

KafkaConsumer.java:

package cn.just.spark.kafka.consumer;

import cn.just.spark.kafka.producer.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer extends Thread{

    public String topic;
    public KafkaConsumer(String topic){
        this.topic=topic;
    }

    public ConsumerConnector getConnection(){

        Properties properties=new Properties();
        properties.put("group.id", KafkaProperties.GROUP_ID);
        properties.put("zookeeper.connect",KafkaProperties.ZK);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }


    @Override
    public void run() {
        ConsumerConnector consumer=getConnection();

        Map<String, Integer> topicMap=new HashMap<String, Integer>();
        topicMap.put(topic,1);     //从一个KafkaStream消费数据

        //String:topic
        //List<KafkaStream<byte[], byte[]>>对应的数据流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream= consumer.createMessageStreams(topicMap);

        //获取我们每次接收到的数据
        KafkaStream<byte[], byte[]> stream=messageStream.get(topic).get(0);        //get(0)对应于上面的一个KafkaStream

        ConsumerIterator<byte[], byte[]> it=stream.iterator();
        while(it.hasNext()){
            String message=new String(it.next().message());
            System.out.println("resever message: "+message);
        }


    }
}

生产者和消费者编写完成后编写主类进行测试:
KafkaProducerApp.java:

package cn.just.spark.kafka.producer;


import cn.just.spark.kafka.consumer.KafkaConsumer;

public class KafkaProducerApp {
    //快捷键psvm
    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();

        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}

测试结果如下,可以看见无论是客户端消费者还是我们的代码编写程序消费者都能接收到生产者产生的数据。

这里写图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐