目录

一、Kafka 创建topic、生产者

二、向kafka生产数据

三、Apache Druid 配置DataSource 数据源

1) Start

2) Connect

3) Pase Data

4) Pase Time

5) Transform【可跳过】

6) Filter 【可跳过】

7) Configure Schema【重点配置】

8) Partition

9) Tune

10) Pulish

11) Edit Json spec

案例一:销售数据查询示例

案例二:通过服务器系统时间(毫秒)作为时间戳上传

配置Apache Druid 数据源DataSource

案例三:通过服务器系统时间(秒)作为时间戳上传

配置Apache Druid 数据源DataSource


一、Kafka 创建topic、生产者

1. 创建topic

kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales

2. 创建生产者

kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales

3. 创建消费者

kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales --group topic_test1_g1

二、向kafka生产数据

{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}

三、Apache Druid 配置DataSource 数据源

1) Start

2) Connect

3) Pase Data

4) Pase Time

5) Transform【可跳过】

6) Filter 【可跳过】

7) Configure Schema【重点配置

8) Partition

9) Tune

10) Pulish

Max parse exceptions: 2147483647

11) Edit Json spec

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "fast_sales",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "iso"
        },
        "dimensionsSpec": {
          "dimensions": [
            "areaName",
            "category"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "type": "count",
        "name": "count"
      },
      {
        "type": "longSum",
        "name": "sum_monye",
        "fieldName": "monye",
        "expression": null
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "MINUTE",
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": true,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": true,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "fast_sales",
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT3600S",
    "consumerProperties": {
      "bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "fast_sales",
    "useEarliestSequenceNumber": false,
    "type": "kafka"
  },
  "context": null,
  "suspended": false
}

案例一:销售数据查询示例

1)数据源

2)回忆向Kafka输入数据有,如下:

{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}

-- 查询所有数据

-- 按时间范围查询数据

-- 查询输入数据总记录数

-- 按地域、商品类别分类,统计销售总金额 

-- 按地域分组,计算消费总额

-- 按商品品类分组,计算消费总额

-- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额


案例二:通过服务器系统时间(毫秒)作为时间戳上传

1. 创建topic

kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales_test_timestamp

2. 创建生产者

kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp

3. 创建消费者

kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp --group topic_test1_g1

4. Kafka 生产数据

-- 需要在服务器系统时间戳上加上8小时对应的毫秒数偏移量
1609549913324 对应 2021-01-02 09:11:53 + 28800000 = 1609578713324  对应  2021-01-02 17:11:53
1609550385297 对应 2021-01-02 09:19:45 + 28800000 = 1609579185297  对应  2021-01-02 17:19:45

{"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609549913324","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2200"}

============================== 加上偏8小时偏移后的数据 =================================
{"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609578713324","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2200"}

配置Apache Druid 数据源DataSource

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "fast_sales_test_timestamp",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "millis"
        },
        "dimensionsSpec": {
          "dimensions": [
            "areaName",
            "category"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "type": "count",
        "name": "count"
      },
      {
        "type": "longSum",
        "name": "sum_monye",
        "fieldName": "monye",
        "expression": null
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "MINUTE",
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": true,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": true,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "fast_sales_test_timestamp",
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT3600S",
    "consumerProperties": {
      "bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "fast_sales_test_timestamp",
    "useEarliestSequenceNumber": false,
    "type": "kafka"
  },
  "context": null,
  "suspended": false
}

 


查询数据示例:

SELECT * FROM  "fast_sales_test_timestamp" -- 查询所有数据
-- SELECT * FROM  "fast_sales_test_timestamp" WHERE __time <= '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
-- SELECT * FROM  "fast_sales_test_timestamp" WHERE __time > '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
-- SELECT SUM("count") FROM "fast_sales_test_timestamp" -- 查询输入数据总记录数
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
-- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName -- 按地域分组,计算消费总额
-- SELECT category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY category -- 按商品品类分组,计算消费总额
-- SELECT SUM(sum_monye) FROM "fast_sales_test_timestamp" -- 计算消费总额
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" 
-- WHERE __time <= '2021-01-02T09:11:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
  • SELECT * FROM  "fast_sales_test_timestamp" -- 查询所有数据

  • SELECT * FROM  "fast_sales_test_timestamp" WHERE __time <= '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据

  • SELECT * FROM  "fast_sales_test_timestamp" WHERE __time > '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据

  • SELECT SUM("count") FROM "fast_sales_test_timestamp" -- 查询输入数据总记录数

  • SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额

  • SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName -- 按地域分组,计算消费总额

  • SELECT category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY category -- 按商品品类分组,计算消费总额

  • SELECT SUM(sum_monye) FROM "fast_sales_test_timestamp" -- 计算消费总额

SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" 

WHERE __time <= '2021-01-02T09:11:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额


案例三:通过服务器系统时间(秒)作为时间戳上传

1. 创建topic

kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales_test_second

2. 创建生产者

kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_second

3. 创建消费者

kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_second --group topic_test1_g1

4. Kafka 生产数据

-- 需要在服务器系统时间戳上加上8小时对应的秒数【28800秒】偏移量

1609555315 对应 2021-01-02 10:41:55 + 28800 = 1609584115  对应  2021-01-02 18:41:55
1609555530 对应 2021-01-02 10:45:30 + 28800 = 1609584330  对应  2021-01-02 18:45:30

{"timestamp":"1609555315","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609555315","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609555315","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"2200"}

-- 需要在服务器系统时间戳上加上8小时对应的秒数【28800】偏移量

{"timestamp":"1609584115","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609584115","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609584115","category":"家电","areaName":"北京","monye":"1550"}

{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"2200"}

配置Apache Druid 数据源DataSource

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "fast_sales_test_second",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "posix"
        },
        "dimensionsSpec": {
          "dimensions": [
            "areaName",
            "category"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "sum_monye",
        "type": "longSum",
        "fieldName": "monye"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "MINUTE",
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": true,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": true,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "fast_sales_test_second",
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT3600S",
    "consumerProperties": {
      "bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "fast_sales_test_timestamp",
    "useEarliestSequenceNumber": false,
    "type": "kafka"
  },
  "context": null,
  "suspended": false
}

 查询数据示例:

SELECT * FROM "fast_sales_test_second" -- 查询所有数据
-- SELECT * FROM "fast_sales_test_second" WHERE __time <= '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
-- SELECT * FROM "fast_sales_test_second" WHERE __time > '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
-- SELECT SUM("count") FROM "fast_sales_test_second" -- 查询输入数据总记录数
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
-- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName -- 按地域分组,计算消费总额
-- SELECT category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY category -- 按商品品类分组,计算消费总额
-- SELECT SUM(sum_monye) FROM "fast_sales_test_second" -- 计算消费总额
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" 
-- WHERE __time <= '2021-01-02T10:41:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
  • SELECT * FROM "fast_sales_test_second" -- 查询所有数据

  • SELECT * FROM "fast_sales_test_second" WHERE __time <= '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据

  • SELECT * FROM "fast_sales_test_second" WHERE __time > '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据

  • SELECT SUM("count") FROM "fast_sales_test_second" -- 查询输入数据总记录数

  • SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额

  • SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName -- 按地域分组,计算消费总额

  • SELECT category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY category -- 按商品品类分组,计算消费总额

  • SELECT SUM(sum_monye) FROM "fast_sales_test_second" -- 计算消费总额

SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" 
WHERE __time <= '2021-01-02T10:41:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额

 


案例四:通过服务器系统时间(20210101082805)作为时间戳上传

此方式不可行,必须通过时间戳进行时间传递,同时也方便进行时间方面的校正。


文章最后,给大家推荐一些受欢迎的技术博客链接

  1. JAVA相关的深度技术博客链接
  2. Flink 相关技术博客链接
  3. Spark 核心技术链接
  4. 设计模式 —— 深度技术博客链接
  5. 机器学习 —— 深度技术博客链接
  6. Hadoop相关技术博客链接
  7. 超全干货--Flink思维导图,花了3周左右编写、校对
  8. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  9. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  10. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
  11. 深入聊聊Java 垃圾回收机制【附原理图及调优方法】

欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐