kafka监听和处理信息
第一种方式public async Task SendMessageAsync(string topic, string message){var guid = Guid.NewGuid().ToString("N");var topics = new List<string>{...
第一种方式
public async Task SendMessageAsync(string topic, string message)
{
var guid = Guid.NewGuid().ToString("N");
var topics = new List<string>
{
this._kafkaSetting.ProducerTopics.DeviceTopic,
this._kafkaSetting.ProducerTopics.StorageTopic,
this._kafkaSetting.ProducerTopics.TaskTopic,
this._kafkaSetting.ConsumerTopics.StorageTopic,
this._kafkaSetting.ConsumerTopics.TaskTopic
};
if (!topics.Contains(topic))
{
this._logger?.LogError($"guid:{guid} SendMessageAsync error: ' topic:{topic} error");
return;
}
this._logger?.LogDebug($"guid:{guid} SendMessageAsync begin: ' topic:{topic},message:{message}'");
var msg = new Message<string, string>
{
Key = guid,
Value = message
};
await Policy.Handle<ProduceException<string, string>>(ex =>
{
this._logger.LogError(ex, $"guid:{guid} SendMessageAsync error");
return true;
})
.OrResult<DeliveryResult<string, string>>(r =>
{
if (r.Status == PersistenceStatus.Persisted)
{
this._logger.LogDebug($"guid:{guid} SendMessageAsync end");
return false;
}
this._logger.LogError($"guid:{guid} SendMessageAsync error:because PersistenceStatus.NotPersisted or PersistenceStatus.PossiblyPersisted");
return true;
})
.WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(0.5),
(ex, tm) => { this._logger.LogError($"guid:{guid} Send message retry {tm}", ex); })
.ExecuteAndCaptureAsync(() => this._producer.ProduceAsync(topic, msg));
}
第二种方式
/ <summary>
/ 发送消息
/ </summary>
/ <param name="topic"></param>
/ <param name="message"></param>
/ <returns></returns>
//public Task SendMessageAsync(string topic, string message)
//{
// if (this._kafkaEnable)
// {
// Task _sendMessage()
// {
// var config = new Dictionary<string, string> {{"bootstrap.servers", this._brokerUrl}};
// var builder = new ProducerBuilder<string, string>(config);
// builder.SetKeySerializer(new AplKeySerialize<string>());
// builder.SetValueSerializer(new AplKeySerialize<string>());
// var msg = new Message<string, string>();
// msg.Key = this.Index.ToString();
// msg.Value = message;
// using (var producer = builder.Build())
// {
// var result = producer.ProduceAsync(topic, msg)
// .ContinueWith(task => { this._logger?.LogInformation($"Sent kafka.{topic} Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); });
// // producer.Flush(TimeSpan.FromSeconds(10));
// return result;
// }
// }
// return Policy.Handle<Exception>()
// .WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(0.5), (ex, tm) => { this._logger.LogError($"Send message to {topic} fail wait {tm}", ex); }).ExecuteAndCaptureAsync(_sendMessage);
// }
// return Task.CompletedTask;
//}
监听
一种方式
public Task CreateConsume(string topic, CancellationToken ct)
=> Task.Factory.StartNew(() =>
{
try
{
string brokerList = _brokerUrl;
var topics = topic;
var config = new Dictionary<string, string>
{
{ "group.id", "bgi_lims_consumer" },
{ "bootstrap.servers", brokerList },
{ "auto.offset.reset", "latest" }
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config.ToList()).Build())
{
consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics, 0, 0) });
while (true)
{
var msg = consumer.Consume(TimeSpan.FromSeconds(1));
if (msg != null && !string.IsNullOrEmpty(msg.Value))
{
_logger?.LogInformation($"Receive meesage Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} Message:{msg.Value}");
HandleMessage(msg);
}
}
}
}
catch (OperationCanceledException ex)
{
_logger.LogError(ex, "Monitor consume is fail.");
}
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
第二种方式
public void StartAsync(KafkaSetting kafkaSetting)
{
Task.Run(() =>
{
try
{
var topics = new List<string> { kafkaSetting.ConsumerTopics.TaskTopic, kafkaSetting.ConsumerTopics.StorageTopic };
var config = new ConsumerConfig
{
BootstrapServers = kafkaSetting.IpPort,
GroupId = kafkaSetting.ConsumerGroupId,
//EnableAutoCommit = false,
//StatisticsIntervalMs = 5000,
AutoOffsetReset = AutoOffsetReset.Latest,
EnablePartitionEof = true
};
using (var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => this._logger?.LogError($"consumer:{topics} error handler :{JsonConvert.SerializeObject(e)}"))
.Build())
{
consumer.Subscribe(topics);
while (true)
{
//var consumeResult = consumer.Consume(TimeSpan.FromSeconds(1));
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
continue;
this._logger?.LogInformation($"guid:{consumeResult.Key} Receive meesage from Topic: {consumeResult.Topic} Partition: {consumeResult.Partition} Offset: {consumeResult.Offset} Message:{consumeResult.Value}");
this.HandleMessageAsync(consumeResult);
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex, "Monitor consume is fail.");
}
});
}
更多推荐
所有评论(0)