一、Centos7 部署kafka

二、kafka客户端用C#实现生产者(Producer)和消费者(Consumer)

先添加“Confluent.Kafka”的引用

生产者代码实现,此处默认的topic是“test”,topic的添加过程在上一篇文章“Centos7 部署kafka”中。

using Confluent.Kafka;



static void Main(string[] args)
{
    Console.WriteLine("请输入消息内容");
    using (var producer = new KafkaProducer())
    {
        while (true)
        {
            string message = Console.ReadLine();
            try
            {
                //topic名称是test
                var result = producer.ProduceAsync("test",
                new Confluent.Kafka.Message<string, string>() { Key = Guid.NewGuid().ToString(), Value = message })
                    .GetAwaiter().GetResult();
                Console.WriteLine($"offset:{result.Offset.Value},partition:{result.Partition.Value}");
            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"失败的消息: {e.Message} [{e.Error.Code}]");
                continue;
            }

        }
    }
}
class KafkaProducer : IDisposable
{
    private ProducerConfig _config = new ProducerConfig();
    private IProducer<string, string> _producer;
    public KafkaProducer(string server = null)
    {
        if (string.IsNullOrEmpty(server))
        {
            //这里可以添加更多的Kafka集群,比如
            //server=" server ="192.168.1.129:9092,192.168.1.133:9092,192.168.1.134:9092";";                   
            server = "192.168.0.129:9092";

        }
        _config.BootstrapServers = server;
        _producer = new ProducerBuilder<string, string>(_config).Build();

    }

    public async Task<DeliveryResult<string, string>> ProduceAsync(string topic, Message<string, string> message)
    {
        return await _producer.ProduceAsync(topic, message);

    }

    public void Dispose()
    {
        _producer?.Dispose();
    }
}

消费者代码实现

using Confluent.Kafka;



static void Main(string[] args)
{
    Console.WriteLine("默认只关注test主题的消息)");
    using (var consumer = new KafkaConsumer())
    {
        while (true)
        {
            consumer.Consume(a =>
            {
                if (a == null)
                {
                    Console.WriteLine("暂无消息");
                }
                else
                {
                    Console.WriteLine($"Key:{a.Message.Key},Value:{a.Message.Value}");
                }
            });
        }
    }
}

class KafkaConsumer : IDisposable
{
    private IConsumer<string, string> _consumer;
    public KafkaConsumer(string server = null)
    {
        if (string.IsNullOrEmpty(server))
        {
            server = "192.168.0.129:9092";
        }
        var config = new ConsumerConfig
        {
            GroupId = "TestGroupone",
            BootstrapServers = server,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        //topic名称默认是test
        _consumer.Subscribe("test");

    }

    public void Consume(Action<ConsumeResult<string, string>> action = null)
    {
        var consumerResult = _consumer.Consume(TimeSpan.FromSeconds(2));
        action?.Invoke(consumerResult);
    }

    public void Dispose()
    {
        _consumer?.Dispose();
    }
}

运行结果 

本文代码实现参考:https://blog.csdn.net/qq_43629223/article/details/107510400

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐