修改源码使kafka-console-consumer.sh支持从指定时间开始消费
目录背景解决方案方案1-用Java新开发一个的消费工具方案2-修改kafka源码,利用kafka-console-consumer.sh方案2-flinkSQL 或 kafka SQL环境准备修改代码打包测试背景 有业务方向我们提出,自从我们给kafka集群启用权限和认证之后,他们在排错过程就十分不方便了,以前他们换一个消费组就可以重新消费数据了
目录
背景
有业务方向我们提出,自从我们给kafka集群启用权限和认证之后,他们在排错过程就十分不方便了,以前他们换一个消费组就可以重新消费数据了,现在每换一个消费组都需要重新由我们给他们授权,碰到有时不在工位的时候,就只能干着急了;第二,我们kafka启用认证之后,像他们对kerberos认证不熟悉的用户,在命令行排错的时候,会浪费大量的时间去调试kerberos。于是他们向我们提出需求,能不能提供一个工具让他们可以在权限允许的范围内,自由的通过命令行消费kafka里的数据,并且可以选定起止位置。
解决方案
方案1-用Java新开发一个的消费工具
完全重新开发一个工具,将jar包提供给业务方使用。优点是自定义程度高,缺点自然是开发量大,工期长,而且会改变用户原有的使用方式,有学习成本。
方案2-修改kafka源码,利用kafka-console-consumer.sh
和方案一类似,不过是基于已有代码修改,工作量不会很大,且不会给用户带来额外的学习成本,只需要用户替换一个kafka安装目录的一个jar包即可。
方案2-flinkSQL 或 kafka SQL
功能强大,可扩展性好,这个也在我们的计划之中,不过因为有新技术调研和学习成本,本季度可能来不及了,未来联合其他部门开发一个webIDE,给用户提供窗口化可视化的flinkSQL任务提交方式。
环境准备
1、下载kafka源码(因为我用的是0.11.0.2.版本的kafka):git clone -b 0.11.0.2 https://github.com/apache/kafka.git
2、IDEA打开项目
修改代码
1.查看bin/kafka-console-consumer.sh,找到改脚本调用的具体类
// 发现这里调用的kafka.tools.ConsoleConsumer这个类
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
2、在源码里查找这个类,ctrl+shift+F,发现在/core/src/main/scala/kafka/tools这个目录下
3、新建一个工具类,用来重置offset
package kafka.tools;
import kafka.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class ResetOffsetUtils {
public static Map <TopicPartition, OffsetAndMetadata> resetByTimestamp(KafkaConsumer consumer,String topicName, long timestamp) throws Exception {
//获取指定topic的所有partition
List <PartitionInfo> partitionsToReset = consumer.partitionsFor(topicName);
Map <TopicPartition, OffsetAndMetadata> offsets = new HashMap <>();
//获取要重置到的offset
for (PartitionInfo partitionInfo : partitionsToReset) {
TopicPartition tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
consumer.assign(Collections.singletonList(tp));
HashMap <TopicPartition, Long> partitionsTimestampMap = new HashMap <>();
partitionsTimestampMap.put(tp, timestamp);
Map <TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitionsTimestampMap);
if (offsetsForTimes == null || offsetsForTimes.isEmpty()) {
Map <TopicPartition, Long> endoffsets = consumer.endOffsets(Collections.singletonList(tp));
offsets.put(tp, new OffsetAndMetadata(endoffsets.get(tp)));
} else {
offsets.put(tp, new OffsetAndMetadata(offsetsForTimes.get(tp).offset()));
}
}
return offsets;
}
public static Map <TopicPartition, OffsetAndMetadata> resetToStartOrEnd(KafkaConsumer consumer, String topicName, String startOrEnd) throws Exception {
//获取指定topic的所有partition
List <PartitionInfo> partitionsToReset = consumer.partitionsFor(topicName);
List <TopicPartition> tpToReset = new ArrayList <>();
for (PartitionInfo partitionInfo : partitionsToReset) {
tpToReset.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
consumer.assign(tpToReset);
Map <TopicPartition, OffsetAndMetadata> offsets = new HashMap <>();
Map <TopicPartition, Long> resetoffset;
if (startOrEnd.toLowerCase().equals("earliest")) {
resetoffset = consumer.beginningOffsets(tpToReset);
} else if (startOrEnd.toLowerCase().equals("latest")) {
resetoffset = consumer.endOffsets(tpToReset);
} else if (startOrEnd.toLowerCase().equals("current")) {
consumer.assign(tpToReset);
for (TopicPartition tp : tpToReset) {
offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
}
return offsets;
} else {
throw new Exception("Please use 'beginning' or 'end' to reset offset");
}
for (Map.Entry <TopicPartition, Long> entry : resetoffset.entrySet()) {
TopicPartition topicPartition = entry.getKey();
Long aLong = entry.getValue();
offsets.put(topicPartition, new OffsetAndMetadata(aLong));
}
//获取要重置到的offset
return offsets;
}
}
4、开始修改ConsoleConsumer类
package kafka.tools
import java.io.PrintStream
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.CountDownLatch
import java.util.{Collections, HashMap, Locale, Map, Properties, Random}
import joptsimple._
import kafka.api.OffsetRequest
import kafka.common.{MessageFormatter, StreamEndException}
import kafka.message._
import kafka.metrics.KafkaMetricsReporter
import kafka.utils._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
import scala.collection.JavaConverters._
import scala.util.control.Breaks
/**
* Consumer that dumps messages to standard out.
*/
object ConsoleConsumer extends Logging {
var messageCount = 0
private val shutdownLatch = new CountDownLatch(1)
def main(args: Array[String]) {
val conf = new ConsumerConfig(args)
try {
run(conf)
} catch {
case e: Throwable =>
error("Unknown error when running consumer: ", e)
Exit.exit(1)
}
}
def run(conf: ConsumerConfig) {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](getNewConsumerProps(conf))
// if (conf.useOldConsumer) {
// System.out.println("OldConsumer is not Support")
// Exit.exit(1, Option.apply("OldConsumer is not Support"))
// checkZk(conf)
// new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
// } else {
// val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
// if (conf.partitionArg.isDefined)
// new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, getNewConsumerProps(conf), timeoutMs)
// else
// new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
// }
addShutdownHook(consumer, conf)
try {
val startPosition = conf.startPosition
if("latest".equals(startPosition)){
val offsets = ResetOffsetUtils.resetToStartOrEnd(consumer, conf.topicArg, "latest")
consumer.commitSync(offsets)
}else if("earliest".equals(startPosition)){
val offsets = ResetOffsetUtils.resetToStartOrEnd(consumer,conf.topicArg,"earliest")
consumer.commitSync(offsets)
}else if("current".equals(startPosition)){
consumer.subscribe(Collections.singletonList(conf.topicArg))
}
else{
val offsets = ResetOffsetUtils.resetByTimestamp(consumer,conf.topicArg,startPosition.toLong)
consumer.commitSync(offsets)
}
process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
} finally {
consumer.close()
conf.formatter.close()
reportRecordCount()
// if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
if (!conf.groupIdPassed)
ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id"))
shutdownLatch.countDown()
}
}
def checkZk(config: ConsumerConfig) {
if (!checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/brokers/ids")) {
System.err.println("No brokers found in ZK.")
Exit.exit(1)
}
if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id") + "/offsets")) {
System.err.println("Found previous offset information for this group " + config.consumerProps.getProperty("group.id")
+ ". Please use --delete-consumer-offsets to delete previous offsets metadata")
Exit.exit(1)
}
}
def addShutdownHook(consumer: KafkaConsumer[Array[Byte], Array[Byte]], conf: ConsumerConfig) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
consumer.wakeup()
shutdownLatch.await()
if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: KafkaConsumer[Array[Byte], Array[Byte]], output: PrintStream, skipMessageOnError: Boolean) {
while (messageCount < maxMessages || maxMessages == -1) {
val msgs = consumer.poll(100)
try {
import scala.collection.JavaConversions._
val loop = new Breaks
loop.breakable {
for (msg <- msgs) {
if (messageCount < maxMessages || maxMessages == -1) {
formatter.writeTo(msg, System.out)
messageCount += 1
}else{
loop.break()
}
}
}
} catch {
case e: Throwable =>
if (skipMessageOnError) {
error("Error processing message, skipping this message: ", e)
} else {
// Consumer will be closed
throw e
}
}
if (checkErr(output, formatter)) {
// Consumer will be closed
return
}
}
}
def reportRecordCount() {
System.err.println(s"Processed a total of $messageCount messages")
}
def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
val gotError = output.checkError()
if (gotError) {
// This means no one is listening to our output stream any more, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
}
gotError
}
def getOldConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
props.put("zookeeper.connect", config.zkConnectionStr)
if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id") + "/offsets")) {
System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
+ ". Please use --delete-consumer-offsets to delete previous offsets metadata")
Exit.exit(1)
}
if (config.options.has(config.deleteConsumerOffsetsOpt))
ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id"))
if (config.timeoutMs >= 0)
props.put("consumer.timeout.ms", config.timeoutMs.toString)
props
}
def getNewConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
props
}
class ConsumerConfig(args: Array[String]) {
val parser = new OptionParser(false)
val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
.withRequiredArg
.describedAs("whitelist")
.ofType(classOf[String])
val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
.withRequiredArg
.describedAs("blacklist")
.ofType(classOf[String])
val partitionIdOpt = parser.accepts("partition", "The partition to consume from. Consumption " +
"starts from the end of the partition unless '--offset' is specified.")
.withRequiredArg
.describedAs("partition")
.ofType(classOf[java.lang.Integer])
val offsetOpt = parser.accepts("offset", "The offset id to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end")
.withRequiredArg
.describedAs("consume offset")
.ofType(classOf[String])
.defaultsTo("latest")
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg
.describedAs("consumer_prop")
.ofType(classOf[String])
val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that ${consumerPropertyOpt} takes precedence over this config.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.")
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up")
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
"start with the earliest message present in the log rather than the latest message.")
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
.withRequiredArg
.describedAs("num_messages")
.ofType(classOf[java.lang.Integer])
val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
.withRequiredArg
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Integer])
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be output here")
.withRequiredArg
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default.")
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer is used): The server to connect to.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val startPositionOpt = parser.accepts("start-position", "consumer start position:earliest/latest/1592980490992")
.withRequiredArg
.describedAs("consumer start position")
.ofType(classOf[String])
val keyDeserializerOpt = parser.accepts("key-deserializer")
.withRequiredArg
.describedAs("deserializer for key")
.ofType(classOf[String])
val valueDeserializerOpt = parser.accepts("value-deserializer")
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
val isolationLevelOpt = parser.accepts("isolation-level",
"Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted" +
"to read all messages.")
.withRequiredArg()
.ofType(classOf[String])
.defaultsTo("read_uncommitted")
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useOldConsumer = options.has(zkConnectOpt)
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
var topicArg: String = null
var whitelistArg: String = null
// var filterSpec: TopicFilter = null
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
new Properties()
val zkConnectionStr = options.valueOf(zkConnectOpt)
val fromBeginning = options.has(resetBeginningOpt)
val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = options.has(skipMessageOnErrorOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val startPosition = if (options.has(startPositionOpt)) options.valueOf(startPositionOpt) else "current"
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
if (keyDeserializer != null && !keyDeserializer.isEmpty) {
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
}
if (valueDeserializer != null && !valueDeserializer.isEmpty) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
}
formatter.init(formatterArgs)
if (useOldConsumer) {
if (options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.")
else if (options.has(newConsumerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
topicArg = options.valueOf(topicOrFilterOpt.head)
// filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
Console.err.println("Using the ConsoleConsumer with old consumer is deprecated and will be removed " +
s"in a future major release. Consider using the new consumer by passing $bootstrapServerOpt instead of ${zkConnectOpt}.")
} else {
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
topicArg = options.valueOf(topicIdOpt)
whitelistArg = options.valueOf(whitelistOpt)
}
if (useOldConsumer && (partitionArg.isDefined || options.has(offsetOpt)))
CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
if (partitionArg.isDefined) {
if (!options.has(topicIdOpt))
CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.")
if (fromBeginning && options.has(offsetOpt))
CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.")
} else if (options.has(offsetOpt))
CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
def invalidOffset(offset: String): Nothing =
CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
"'earliest', 'latest', or a non-negative long.")
val offsetArg =
if (options.has(offsetOpt)) {
options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match {
case "earliest" => OffsetRequest.EarliestTime
case "latest" => OffsetRequest.LatestTime
case offsetString =>
val offset =
try offsetString.toLong
catch {
case _: NumberFormatException => invalidOffset(offsetString)
}
if (offset < 0) invalidOffset(offsetString)
offset
}
}
else if (fromBeginning) OffsetRequest.EarliestTime
else OffsetRequest.LatestTime
if (!useOldConsumer)
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (options.has(csvMetricsReporterEnabledOpt)) {
val csvReporterProps = new Properties()
csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
if (options.has(metricsDirectoryOpt))
csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
else
csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
val verifiableProps = new VerifiableProperties(csvReporterProps)
KafkaMetricsReporter.startReporters(verifiableProps)
}
//Provide the consumer with a randomly assigned group id
if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
groupIdPassed = false
}
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
}
}
}
def checkZkPathExists(zkUrl: String, path: String): Boolean = {
try {
val zk = ZkUtils.createZkClient(zkUrl, 30 * 1000, 30 * 1000)
zk.exists(path)
} catch {
case _: Throwable => false
}
}
}
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var printValue = true
var printTimestamp = false
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
override def init(props: Properties) {
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("print.value"))
printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer"))
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer"))
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
def writeSeparator(columnSeparator: Boolean): Unit = {
if (columnSeparator)
output.write(keySeparator)
else
output.write(lineSeparator)
}
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
}
import consumerRecord._
if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
else
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
writeSeparator(printKey || printValue)
}
if (printKey) {
write(keyDeserializer, key)
writeSeparator(printValue)
}
if (printValue) {
write(valueDeserializer, value)
output.write(lineSeparator)
}
}
}
class LoggingMessageFormatter extends MessageFormatter {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
val logger = Logger.getLogger(getClass().getName)
override def init(props: Properties): Unit = defaultWriter.init(props)
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
import consumerRecord._
defaultWriter.writeTo(consumerRecord, output)
if (logger.isInfoEnabled)
logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
}
}
class NoOpMessageFormatter extends MessageFormatter {
override def init(props: Properties) {}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream){}
}
class ChecksumMessageFormatter extends MessageFormatter {
private var topicStr: String = _
override def init(props: Properties) {
topicStr = props.getProperty("topic")
if (topicStr != null)
topicStr = topicStr + ":"
else
topicStr = ""
}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
import consumerRecord._
val chksum =
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum
else
new Message(value, key, Message.NoTimestamp, Message.MagicValue_V0).checksum
output.println(topicStr + "checksum:" + chksum)
}
}
打包
在idea中点击gradle侧边栏,点开jar,然后生成jar包,替换掉KAFKA_HOME/libs目录下的kafka_2.11-0.11.0.2.jar即可
测试
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic test--consumer.config /root/kafka/config/consumer.properties --max-messages 10 --start-position 1592980490992
成功从指定时间戳消费到数据,修改完成!
更多推荐
所有评论(0)