SwooleShell.php:

<?php
    App::uses('AppShell', 'Console/Command');

/**
 * Class SwooleShell
 * @property KafkaTask $Kafka
 * @property PermissionMigrateTask $PermissionMigrate
 */
class SwooleShell extends AppShell
{
	/**
	 * 设置可选参数
	 * @return ConsoleOptionParser
	 */
	public function getOptionParser()
	{
		$parser = parent::getOptionParser();
		$parser->addOption('daemon', array(
			'short' => 'd',
			'help' => '以守护进程运行',
			'boolean' => true,
			'default' => true
		));
		return $parser;
	}
	
	public function startup()
	{
		$this->out('基于Swoole扩展的shell程序');
		$this->hr();
		if (!extension_loaded('swoole')) {
			$this->err('Swoole扩展没有安装');
			$this->hr();
			$this->_stop();
		}
		$logPath = Configure::read('Log.path') ?: LOGS;
		if (!is_dir($logPath)) {
			mkdir($logPath);
			if (!file_exists($logPath . DS . 'swoole.log')) {
				file_put_contents($logPath . DS . 'swoole.log', '');
			}
		}
		$this->out('Swoole版本:' . SWOOLE_VERSION);
		$this->hr();
		$this->out('PHP版本:' . PHP_VERSION);
		$this->hr();
	}

	public function main()
	{
		$command = null;
		if ($this->command) {
			$command = $this->command;
		} else {
			$this->out('支持的命令');
			$this->out('[p]PublishTask          --开启异步下发任务');
			$this->out('[q]Queue                --开启任务队列功能');
			$this->out('[e]Exit                 --退出');
			$this->hr();
			$input = strtoupper($this->in('选择要执行的命令', ['p', 'q', 'e'], 'e'));
			switch ($input) {
				case 'p':
					$command = 'Publish';
					break;
				case 'Q':
					$command = 'Queue';
					break;
				default:
					$this->out('退出');
					$this->_stop();
			}
			$this->hr();
		}
		try {
			$this->out('执行命令:"' . $command . '"');
			$this->hr();
			/**
			 * @var $task AsyncTask
			 */
			$task = $this->Tasks->load($command);
			$passArgs = $this->args;
			array_shift($passArgs);
			$task->args = $passArgs;
			$task->params = $this->params;
			$task->execute();
			if ($this->command) {
				$this->out('退出');
				$this->_stop();
			} else {
				$this->out('结束');
				$this->hr();
				$this->command = null;
				$this->main();
			}
		} catch (Exception $exception) {
			$this->log($exception->__toString(), 'kafka');
			$this->err('退出,错误的命令:"' . $this->command . '"');
			$this->hr();
			$this->_stop();
		}
	}
}

Bash下执行命令:

首先项目用的是cakephp框架,在文件根目录下执行上述bash模式下命令,开启swoole脚本,同时第三个参数MPC指定到MpcTask.php文件,也是SwooleShell.php中main函数里的$command变量。

MpcTask.php:

<?php

App::uses('ProcessTask', 'Console/Command/Task');
App::uses('Tenant', 'Model');
App::uses('InterfaceModel', 'Model');
App::uses('Controller', 'Controller');

/**
 * @link https://wiki.swoole.com/wiki/index/prid-1
 */
class MpcTask extends ProcessTask
{
	public function process()
	{

		
	}
}

ProcessTask.php:

<?php

use Swoole\Process;

App::uses('AppShell', 'Console/Command');
App::uses('Controller', 'Controller');
App::uses('ConnectionManager', 'Model');

/**
 * 继承并实现方法以开发基于swoole的多进程应用
 * Class ProcessTask
 */
abstract class ProcessTask extends AppShell
{
	/**
	 * 程序是否以守护者进程方式运行
	 * @var bool
	 */
	public $daemon = false;
	/**
	 * 主进程pid
	 * @var int
	 */
	public $masterPid = 0;
	/**
	 * 最大进程数
	 * @var int
	 */
	public $maxPrecess = 1;
	/**
	 * 子进程信息
	 * @var array
	 */
	public $workers = [];

	/**
	 * 设置可选参数
	 * @return ConsoleOptionParser
	 */
	public function getOptionParser()
	{
		$parser = parent::getOptionParser();
		$parser->addOption('daemon', array(
			'short' => 'd',
			'help' => '以守护进程运行',
			'boolean' => true,
			'default' => true
		));
		return $parser;
	}

	/**
	 * 主程序入口
	 */
	public function execute()
	{
		try {
			if ($this->daemon) {
				Process::daemon();
			}
			$this->setProcessName(sprintf('php-ps:%s', 'master'));
			$this->masterPid = posix_getpid();
			$this->run();
			$this->processWait();
		} catch (Exception $exception) {
			$this->log($exception->__toString(), 'kafka');
			$this->err($exception->getMessage());
		}
	}


	/**
	 * 进程池入口
	 */
	public function run()
	{
		for ($i = 0; $i < $this->maxPrecess; $i++) {
			$this->CreateProcess($i);
		}
	}

	/**
	 * 创建新进程
	 * @param $index
	 * @return int
	 */
	public function CreateProcess($index)
	{
		$process = new Process(function (Process $worker) use ($index) {
			$this->setProcessName(sprintf('php-ps:%s', $index));
			$this->log($worker->pid . ':开启进程', 'kafka');
			$this->printToScreen($worker->pid . ':开启进程');
			try {
				//外层循环,使kafka出现问题时能重新初始化
				while (true) {
					//检测父进程是否已退出,父进程退出则子进程退出
					$this->checkMasterPid($worker);
					$this->process();
					//一分钟后重新初始化kafka消费者
					sleep(10);
				}
			} catch (Exception $exception) {
				$this->log($worker->pid . ':' . $exception->__toString(), 'kafka');
				$this->printToScreen($worker->pid . ':' . $exception->__toString(), 'error');
				sleep(10);
			}
			$this->log($worker->pid . ':进程结束', 'kafka');
			$this->printToScreen($worker->pid . ':进程结束');
			$worker->exit(0);
		}, false, false);
		$pid = $process->start();
		$this->workers[$index] = $pid;
		return $pid;
	}

	/**
	 * 检测父进程是否已退出
	 * @param Process $worker
	 */
	public function checkMasterPid(Process &$worker)
	{
		if (!Swoole\Process::kill($this->masterPid, 0)) {
			$this->printToScreen($worker->pid . ':父进程已退出,子进程退出');
			$this->log($worker->pid . ':父进程已退出,子进程退出', 'kafka');
			$worker->exit(0);
		}
	}

	/**
	 * 重启进程
	 * @param $ret
	 * @throws Exception
	 */
	public function rebootProcess($ret)
	{
		$index = array_search($ret['pid'], $this->workers);
		if ($index !== false) {
			$newPid = $this->CreateProcess($index);
			$this->log($newPid . ':重启进程', 'kafka');
			$this->printToScreen($newPid . ':重启进程');
		} else {
			throw new \Exception('rebootProcess Error: no pid');
		}
	}

	/**
	 * 处理僵尸进程,并重启进程
	 * @throws Exception
	 */
	public function processWait()
	{
		while (true) {
			if (count($this->workers)) {
				$ret = Process::wait();
				if ($ret) {
					$this->rebootProcess($ret);
				}
			} else {
				break;
			}
		}
	}

	/**
	 * 设置进程名
	 * @param $name
	 */
	public function setProcessName($name)
	{
		if (function_exists('cli_set_process_title')) {
			cli_set_process_title($name);
		} else {
			swoole_set_process_name($name);
		}
	}

	/**
	 * 输出到屏幕
	 * @param $message
	 * @param string $type
	 * @param bool $newLine
	 */
	public function printToScreen($message, $type = 'out', $newLine = true)
	{
		if (!$this->daemon) {
			if ($type == 'error') {
				$this->err($message, $newLine);
			} else {
				$this->out($message, $newLine);
			}
		}
	}

	/**
	 * 用户的逻辑
	 * @return mixed
	 */
	abstract public function process();
}

我们只需要在子类(MpcTask.php)中去重写父类的process方法就可实现开启多进程,同时在进程中书写自己的业务代码。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐