<?php
namespace app\util;

use \RdKafka\Conf;
use \RdKafka\Producer;

class Kafka{

    public $kafkaConfig;
    public $kafkaResource;

    //初始化
    public function __construct(){

        $this->connect();
    }

    /**
     * 连接
     */
    private function connect(){

        $this->kafkaConfig = new Conf();
        $this->kafkaConfig->set('log_level', (string) LOG_DEBUG);
        $this->kafkaConfig->set('debug', 'all');
        $this->kafkaResource = new Producer($this->kafkaConfig);
        $this->kafkaResource->addBrokers("127.0.0.1:9092");
    }

    /**
     * 发布消息
     */
    public function publishMessage($message = ""){

        $topic = $this->kafkaResource->newTopic("EASY_DATA");
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        $this->kafkaResource->poll(0);
        $this->kafkaResource->flush(10000);
        return true;
    }

    /**
     * 销毁
     */
    public function __destruct(){

        // $this->channel->close();
        // $this->connection->close();
    }
}

kafka使用代码如上

<?php
namespace app\service;

use app\util\Kafka;

class KafkaService extends \think\Service
{
    /**
     * 注册服务
     */
    public function register()
    {
        $this->app->bind('kafka', Kafka::class);
    }

    /**
     * 执行服务
     */
    public function boot() {


    } 
}

kafka容器服务

<?php

use app\AppService;

// 系统服务定义文件
// 服务在完成全局初始化之后执行
return [
    AppService::class,
    '\app\service\KafkaService',
];

kafka注册服务

public function test(Request $request){

        $res = app('kafka')->publishMessage('{"driver":2,"user_id":3,"ip":"127.0.0.1","type":7,"joinid":1}');

        var_dump($res);
    }

kafka的应用

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐