1.安装PHP 扩展

a. 安装librdkafka 库 

官网地址:https://github.com/edenhill/librdkafka 

$  git clone https://github.com/edenhill/librdkafka.git
$  ./configure
$  make
$  sudo make install

b. 安装php-rdkafka 扩展

官网地址:https://github.com/arnaud-lb/php-rdkafka

$ git clone https://github.com/arnaud-lb/php-rdkafka.git

#生成configure文件
$ phpize 

#编译安装
$ ./configure --with-php-config=/usr/local/php7/bin/php-config
$ make
$ make install 

#在php.ini 文件中配置 rdkafka扩展
$ vim /usr/local/php7/etc/php.ini
extension=rdkafka.so

#查看扩展是否生效
$php -m

2.编写producer方法

a.我在model层封装了一个kafka类

<?php
/**
 * Created by PhpStorm.
 * User: wangan
 * Date: 2018/10/17
 * Time: 10:06
 */

namespace App\Models;


class Kafka
{
    public $broker_list = 'localhost:9092';
    public $topic = 'test';
    public $partition = 0;
    protected $producer = null;
    protected $consumer = null;

    public function __construct()
    {
        if($this->broker_list){

        }

        $rk = new \RdKafka\Producer();
        if(empty($rk)){
            throw new \Exception('producer error');
        }
        $rk->setLogLevel(LOG_DEBUG);
        if(!$rk->addBrokers($this->broker_list)){
            throw new \Exception('addBrokers error');
        }
        $this->producer = $rk;
    }

    public function sendMessage($array_message = [])
    {
        $topic = $this->producer->newTopic($this->topic);
        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($array_message));
    }

}

b.在controller中调用该方法

 public function testKafkaProducer(){
        $kafuka = new Kafka();
        $kafuka->sendMessage(['cei shi  aaa']);
 }

实现效果如下

可以看到,浏览器没调用一次,下方的消费者都会接受到一个消息。

3.编写异步消费者方法

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐