第一种方式 

public async Task SendMessageAsync(string topic, string message)
        {
            var guid = Guid.NewGuid().ToString("N");
            var topics = new List<string>
            {
                this._kafkaSetting.ProducerTopics.DeviceTopic,
                this._kafkaSetting.ProducerTopics.StorageTopic,
                this._kafkaSetting.ProducerTopics.TaskTopic,
                this._kafkaSetting.ConsumerTopics.StorageTopic,
                this._kafkaSetting.ConsumerTopics.TaskTopic
            };
            if (!topics.Contains(topic))
            {
                this._logger?.LogError($"guid:{guid} SendMessageAsync error: ' topic:{topic} error");
                return;
            }

            this._logger?.LogDebug($"guid:{guid} SendMessageAsync begin: ' topic:{topic},message:{message}'");
            var msg = new Message<string, string>
            {
                Key = guid,
                Value = message
            };
            await Policy.Handle<ProduceException<string, string>>(ex =>
                {
                    this._logger.LogError(ex, $"guid:{guid} SendMessageAsync error");
                    return true;
                })
                .OrResult<DeliveryResult<string, string>>(r =>
                {
                    if (r.Status == PersistenceStatus.Persisted)
                    {
                        this._logger.LogDebug($"guid:{guid}   SendMessageAsync end");
                        return false;
                    }

                    this._logger.LogError($"guid:{guid}  SendMessageAsync error:because PersistenceStatus.NotPersisted or PersistenceStatus.PossiblyPersisted");
                    return true;
                })
                .WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(0.5),
                    (ex, tm) => { this._logger.LogError($"guid:{guid}  Send message retry {tm}", ex); })
                .ExecuteAndCaptureAsync(() => this._producer.ProduceAsync(topic, msg));
        }

第二种方式

        / <summary>
        / 发送消息
        / </summary>
        / <param name="topic"></param>
        / <param name="message"></param>
        / <returns></returns>
        //public Task SendMessageAsync(string topic, string message)
        //{
        //    if (this._kafkaEnable)
        //    {
        //        Task _sendMessage()
        //        {
        //            var config = new Dictionary<string, string> {{"bootstrap.servers", this._brokerUrl}};
        //            var builder = new ProducerBuilder<string, string>(config);
        //            builder.SetKeySerializer(new AplKeySerialize<string>());
        //            builder.SetValueSerializer(new AplKeySerialize<string>());
        //            var msg = new Message<string, string>();
        //            msg.Key = this.Index.ToString();
        //            msg.Value = message;
        //            using (var producer = builder.Build())
        //            {
        //                var result = producer.ProduceAsync(topic, msg)
        //                    .ContinueWith(task => { this._logger?.LogInformation($"Sent kafka.{topic} Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); });
        //                // producer.Flush(TimeSpan.FromSeconds(10));
        //                return result;
        //            }
        //        }

        //        return Policy.Handle<Exception>()
        //            .WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(0.5), (ex, tm) => { this._logger.LogError($"Send message to {topic} fail wait {tm}", ex); }).ExecuteAndCaptureAsync(_sendMessage);
        //    }

        //    return Task.CompletedTask;
        //}

 

 

监听

一种方式

 public Task CreateConsume(string topic, CancellationToken ct)
             => Task.Factory.StartNew(() =>
             {
                 try
                 {
                     string brokerList = _brokerUrl;
                     var topics = topic;

                     var config = new Dictionary<string, string>
                     {
                         { "group.id", "bgi_lims_consumer" },
                         { "bootstrap.servers", brokerList },
                         { "auto.offset.reset", "latest" }
                     };
                     using (var consumer = new ConsumerBuilder<Ignore, string>(config.ToList()).Build())
                     {
                         consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics, 0, 0) });
                         while (true)
                         {
                             var msg = consumer.Consume(TimeSpan.FromSeconds(1));
                             if (msg != null && !string.IsNullOrEmpty(msg.Value))
                             {
                                 _logger?.LogInformation($"Receive meesage Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} Message:{msg.Value}");
                                 HandleMessage(msg);
                             }
                         }
                     }
                 }
                 catch (OperationCanceledException ex)
                 {
                     _logger.LogError(ex, "Monitor consume is fail.");
                 }
             }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);

 

第二种方式


        public void StartAsync(KafkaSetting kafkaSetting)
        {
            Task.Run(() =>
            {
                try
                {
                    var topics = new List<string> { kafkaSetting.ConsumerTopics.TaskTopic, kafkaSetting.ConsumerTopics.StorageTopic };
                    var config = new ConsumerConfig
                    {
                        BootstrapServers = kafkaSetting.IpPort,
                        GroupId = kafkaSetting.ConsumerGroupId,
                        //EnableAutoCommit = false,
                        //StatisticsIntervalMs = 5000,
                        AutoOffsetReset = AutoOffsetReset.Latest,
                        EnablePartitionEof = true
                    };

                    using (var consumer = new ConsumerBuilder<string, string>(config)
                        .SetErrorHandler((_, e) => this._logger?.LogError($"consumer:{topics} error handler :{JsonConvert.SerializeObject(e)}"))
                        .Build())
                    {
                        consumer.Subscribe(topics);
                        while (true)
                        {
                            //var consumeResult = consumer.Consume(TimeSpan.FromSeconds(1));
                            var consumeResult = consumer.Consume();
                            if (consumeResult.IsPartitionEOF)
                                continue;

                            this._logger?.LogInformation($"guid:{consumeResult.Key} Receive meesage from Topic: {consumeResult.Topic} Partition: {consumeResult.Partition} Offset: {consumeResult.Offset} Message:{consumeResult.Value}");
                            this.HandleMessageAsync(consumeResult);
                        }
                    }
                }
                catch (Exception ex)
                {
                    this._logger.LogError(ex, "Monitor consume is fail.");
                }
            });
        }
 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐