一、基础内容

Spark Streaming 从Kafka中接收数据,其有两种方法:(1)、使用Receivers和Kafka高层次的API;(2)、使用 Direct API,这是使用低层次的Kafka API,并没有使用到Receivers,是Spark1.3.0中开始引入。

由于本篇文章使用的是第二种 Direct API 方式,所以对其进行简单的介绍一下:其会定期地从 Kafka 的 topic+partition 中查询最新的偏移量,再根据定义的偏移量范围在每个 batch 里面处理数据。当作业需要处理的数据来临时,spark 通过调用 Kafka 的简单消费者 API 读取一定范围的数据。
和基于Receiver方式相比,这种方式主要有一些几个优点:
  (1)、简化并行。我们不需要创建多个 Kafka 输入流,然后 union 他们。而使用 directStream,Spark Streaming 将会创建和 Kafka 分区一样的 RDD 分区个数,而且会从 Kafka 并行地读取数据,也就是说Spark 分区将会和 Kafka 分区有一一对应的关系,这对我们来说很容易理解和使用;
  (2)、高效。第一种实现零数据丢失是通过将数据预先保存在 WAL 中,这将会复制一遍数据,这种方式实际上很不高效,因为这导致了数据被拷贝两次:一次是被 Kafka 复制;另一次是写到 WAL 中。但是 Direct API 方法因为没有 Receiver,从而消除了这个问题,所以不需要 WAL 日志;
  (3)、恰好一次语义(Exactly-once semantics)。通过使用 Kafka 高层次的 API 把偏移量写入 Zookeeper 中,这是读取 Kafka 中数据的传统方法。虽然这种方法可以保证零数据丢失,但是还是存在一些情况导致数据会丢失,因为在失败情况下通过 Spark Streaming 读取偏移量和 Zookeeper 中存储的偏移量可能不一致。而 Direct API 方法是通过 Kafka 低层次的 API,并没有使用到 Zookeeper,偏移量仅仅被 Spark Streaming 保存在 Checkpoint 中。这就消除了 Spark Streaming 和 Zookeeper 中偏移量的不一致,而且可以保证每个记录仅仅被 Spark Streaming 读取一次,即使是出现故障。

但是本方法唯一的坏处就是没有更新 Zookeeper 中的偏移量,所以基于 Zookeeper 的 Kafka 监控工具将会无法显示消费的状况。然而你可以通过 Spark 提供的 API 手动地将偏移量写入到 Zookeeper 中。


二、问题重现

1、先编写一个简单的Spark Streaming WordCount 程序,但是此程序必须设置 checkpoint 并且可以 Recoverable,我的测试程序如下:

public class SparkStreamingOnKafkaDirect{

    public static JavaStreamingContext createContext(){

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
        jsc.checkpoint("/checkpoint");

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");

        Set<String> topics = new HashSet<String>();
        topics.add("kafka_direct");

        JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jsc, String.class,
                        String.class, StringDecoder.class,
                        StringDecoder.class, kafkaParams,
                        topics);

        JavaDStream<String> words = lines
                .flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                    public Iterable<String> call(
                            Tuple2<String, String> event)
                            throws Exception {
                        String line = event._2;
                        return Arrays.asList(line);
                    }
                });

        JavaPairDStream<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {

                    public Tuple2<String, Integer> call(
                            String word) throws Exception {
                        return new Tuple2<String, Integer>(
                                word, 1);
                    }
                });

        JavaPairDStream<String, Integer> wordsCount = pairs
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer v1, Integer v2)
                            throws Exception {
                        return v1 + v2;
                    }
                });

        wordsCount.print();

        return jsc;
    }

    public static void main(String[] args) {
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            public JavaStreamingContext create() {
              return createContext();
            }
          };

        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("/checkpoint", factory);

        jsc.start();

        jsc.awaitTermination();
        jsc.close();
    }

}

2、准备测试环境,并记录目前topic中的信息,如下图:
这里写图片描述

从截图中可以看出,目前 kafka_topic 这个 topic ,一共有3个分区,每个分区的 Latest Offset 都是 25 。

3、运行Spark Streaming 程序,并在运行几个 batch 之后,退出程序。查看 checkpoint 目录下生成的文件,如下图:
这里写图片描述

4、现在对上面的程序做一些改动,具体的改动如下:

        JavaDStream<Word> words = lines
                .flatMap(new FlatMapFunction<Tuple2<String, String>, Word>() {
                    public Iterable<Word> call(
                            Tuple2<String, String> event)
                            throws Exception {
                        String line = event._2;
                        return Arrays.asList(new Word(line));
                    }
                });

        JavaPairDStream<String, Integer> pairs = words
                .mapToPair(new PairFunction<Word, String, Integer>() {

                    public Tuple2<String, Integer> call(
                            Word word) throws Exception {
                        return new Tuple2<String, Integer>(
                                word.getWord(), 1);
                    }
                });

5、再次运行程序,看运行的效果,我的效果如下:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/07/31 19:23:35 INFO CheckpointReader: Checkpoint files found: file:/checkpoint/checkpoint-1469964000000,file:/checkpoint/checkpoint-1469964000000.bk,file:/checkpoint/checkpoint-1469963970000,file:/checkpoint/checkpoint-1469963970000.bk
16/07/31 19:23:35 INFO CheckpointReader: Attempting to load checkpoint from file file:/checkpoint/checkpoint-1469964000000
16/07/31 19:23:35 WARN CheckpointReader: Error reading checkpoint from file file:/checkpoint/checkpoint-1469964000000
java.io.InvalidClassException: com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect$2; local class incompatible: stream classdesc serialVersionUID = -6382155631557363180, local class serialVersionUID = 2116323532858154515
    at java.io.ObjectStreamClass.initNonProxy(Unknown Source)
......
    at com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect.main(SparkStreamingOnKafkaDirect.java:85)
16/07/31 19:23:35 INFO CheckpointReader: Attempting to load checkpoint from file file:/checkpoint/checkpoint-1469964000000.bk
16/07/31 19:23:35 WARN CheckpointReader: Error reading checkpoint from file file:/checkpoint/checkpoint-1469964000000.bk
java.io.InvalidClassException: com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect$2; local class incompatible: stream classdesc serialVersionUID = -6382155631557363180, local class serialVersionUID = 2116323532858154515
    at java.io.ObjectStreamClass.initNonProxy(Unknown Source)
......
    at com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect.main(SparkStreamingOnKafkaDirect.java:85)
16/07/31 19:23:35 INFO CheckpointReader: Attempting to load checkpoint from file file:/checkpoint/checkpoint-1469963970000
16/07/31 19:23:35 WARN CheckpointReader: Error reading checkpoint from file file:/checkpoint/checkpoint-1469963970000
java.io.InvalidClassException: com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect$2; local class incompatible: stream classdesc serialVersionUID = -6382155631557363180, local class serialVersionUID = 2116323532858154515
    at java.io.ObjectStreamClass.initNonProxy(Unknown Source)
......
    at com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect.main(SparkStreamingOnKafkaDirect.java:85)
16/07/31 19:23:35 INFO CheckpointReader: Attempting to load checkpoint from file file:/checkpoint/checkpoint-1469963970000.bk
16/07/31 19:23:35 WARN CheckpointReader: Error reading checkpoint from file file:/checkpoint/checkpoint-1469963970000.bk
java.io.InvalidClassException: com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect$2; local class incompatible: stream classdesc serialVersionUID = -6382155631557363180, local class serialVersionUID = 2116323532858154515
    at java.io.ObjectStreamClass.initNonProxy(Unknown Source)
......
    at com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect.main(SparkStreamingOnKafkaDirect.java:85)
Exception in thread "main" org.apache.spark.SparkException: Failed to read checkpoint from directory /checkpoint
    at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:367)
......
    at com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect.main(SparkStreamingOnKafkaDirect.java:85)
Caused by: java.io.InvalidClassException: com.bixkjwfnh.spark.streaming.SparkStreamingOnKafkaDirect$2; local class incompatible: stream classdesc serialVersionUID = -6382155631557363180, local class serialVersionUID = 2116323532858154515
    at java.io.ObjectStreamClass.initNonProxy(Unknown Source)
......
    at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:350)
    ... 4 more
16/07/31 19:23:35 INFO ShutdownHookManager: Shutdown hook called

从报错的信息中,可以看出,其是因为checkpoint目录下的内容没有办法正常的反序列化导致的,最后程序退出。

如果在没有任何措施的情况下,出现了上面的错误,那问题就出来了,是什么问题呢?
问题是:我现在要从 topic 中 partition 的哪些位置开始任何呢!!! 如果我能知道上一次每个partition中的数据都取到什么位置了,那该多好啊!!!

三、问题解决

正如上面提到的,”如果我能知道上一次每个partition中的数据都取到什么位置了,那该多好啊!!!“,所以我们就先把 Streaming 获取的每个partition的位置信息保存到数据库中。
其实次处使用的解决方法,在官方网站中也提到过了,

You can also start consuming from any arbitrary offset using other variations of KafkaUtils.createDirectStream. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.

final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

 directKafkaStream.transformToPair(
   new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
     @Override
     public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
       offsetRanges.set(offsets);
       return rdd;
     }
   }
 ).map(
   ...
 ).foreachRDD(
   new Function<JavaPairRDD<String, String>, Void>() {
     @Override
     public Void call(JavaPairRDD<String, String> rdd) throws IOException {
       for (OffsetRange o : offsetRanges.get()) {
         System.out.println(
           o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
         );
       }
       ...
       return null;
     }
   }
 );

那我们就采用此方法将 topic 中的 partition 的 offset 保存到mysql 数据库中。

1、根据官网上的内容修改现有代码,并可以将offset保存到mysql数据库中。

public class SparkStreamingOnKafkaDirect{

    public static JavaStreamingContext createContext(){
        final Map<String, String> params = new HashMap<String, String>();
        params.put("driverClassName", "com.mysql.jdbc.Driver");
        params.put("url", "jdbc:mysql://192.168.1.151:3306/hive");
        params.put("username", "hive");
        params.put("password", "hive");

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
        jsc.checkpoint("/checkpoint");

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");

        Set<String> topics = new HashSet<String>();
        topics.add("kafka_direct");

        JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jsc, String.class,
                        String.class, StringDecoder.class,
                        StringDecoder.class, kafkaParams,
                        topics);

        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

        JavaDStream<String> words = lines.transformToPair(
                new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
                    @Override
                    public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
                      OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                      offsetRanges.set(offsets);
                      return rdd;
                    }
                  }
                ).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                    public Iterable<String> call(
                            Tuple2<String, String> event)
                            throws Exception {
                        String line = event._2;
                        return Arrays.asList(line);
                    }
                });

        JavaPairDStream<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {

                    public Tuple2<String, Integer> call(
                            String word) throws Exception {
                        return new Tuple2<String, Integer>(
                                word, 1);
                    }
                });

        JavaPairDStream<String, Integer> wordsCount = pairs
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer v1, Integer v2)
                            throws Exception {
                        return v1 + v2;
                    }
                });

        lines.foreachRDD(new VoidFunction<JavaPairRDD<String,String>>(){
            @Override
            public void call(JavaPairRDD<String, String> t) throws Exception {
                DataSource ds = DruidDataSourceFactory.createDataSource(params);
                Connection conn = ds.getConnection();
                Statement stmt = conn.createStatement();
                for (OffsetRange offsetRange : offsetRanges.get()) {
                    stmt.executeUpdate("update kafka_offsets set offset ='"
                            + offsetRange.untilOffset() + "'  where topic='"
                            + offsetRange.topic() + "' and partition='"
                            + offsetRange.partition() + "'");
                }
                conn.close();
            }

        });

        wordsCount.print();

        return jsc;
    }

    public static void main(String[] args) {
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            public JavaStreamingContext create() {
              return createContext();
            }
          };

        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("/checkpoint", factory);

        jsc.start();

        jsc.awaitTermination();
        jsc.close();
    }

}

2、准备环境
2.1、数据库
这里写图片描述

至于里面的字段,一看就应该明白。

2.2、最新的offset
这里写图片描述

3、运行Spark Streaming 程序(注意:要先清空 checkpoint 目录下的内容),在程序运行的过程中查看 mysql 数据库表中的字段值如下图:
第一次 Job:
这里写图片描述
表字段值:
这里写图片描述

第三次Job:
这里写图片描述
表字段值:
这里写图片描述
kafka Manager 中的图:
这里写图片描述

从截图中可以看出,完全没有问题。

其实我们完全可以将此 offset 回写 Zookeeper 中,这样就同步到以在Kafka Manager 中进行监控了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐