安装依赖

核心依赖

composer require workerman/gateway-worker

如果想在 php 后端推送消息给客户端,还需要安装

composer require workerman/gatewayclient

创建thinkphp自定义指令

生成自定义指令

php think make:command Workerman workerman

执行完,会在app/command目录下生成一个Workerman.php的文件

注册自定义指令

在根目录的config/console.php添加一行

<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
    // 指令定义
    'commands' => [
        // 添加这行
        'workerman'     =>  'app\command\Workerman',
    ],
];

实现功能

配置指令

打开app/command/Workerman.php文件

<?php
declare (strict_types = 1);

namespace app\command;

use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use Workerman\Worker;

class Workerman extends Command
{
    protected function configure(): void
    {
        $this->setName("workerman")
            ->addArgument("action", Argument::OPTIONAL, "start|stop|restart|reload|status|connections", "start")
            ->addOption("mode", "m", Option::VALUE_OPTIONAL, "Run the workerman server in daemon mode.")
            ->setDescription("the workerman command");
    }

    protected function execute(Input $input, Output $output): void
    {
        $action = $input->getArgument("action");
        $mode = $input->getOption("mode");

        global $argv;

        $argv = [];

        array_unshift($argv, "think", $action);
        if ($mode == "d") {
            $argv[] = "-d";
        } else if ($mode == "g") {
            $argv[] = "-g";
        }
    }
}

完成后我们就可以使用 workerman 的执行了,一些常用的指令

php think workerman start

php think workerman stop

php think workerman reload

在后台启动服务php think workerman start d

初始化Gateway服务

打开app/command/Workerman.php文件,添加startGateway方法

<?php
protected function startGateway(): void
{
  // 暴露给客户端连接的地址,使用 websocket 协议
  $gateway = new Gateway("websocket://0.0.0.0:10002");
  // Gateway 进程的名称
  $gateway->name = "WebSocketGateway";
  // Gateway进程的数量,一般一台服务器设置1-2个足够,设置多了对性能有一定影响
  $gateway->count = 2;
  // lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可
  $gateway->lanIp = "127.0.0.1";
  // 心跳发送间隔
  $gateway->pingInterval = 30;
  // 心跳无响应检测次数
  $gateway->pingNotResponseLimit = 2;
  // 心跳数据格式
  $gateway->pingData = "{\"type\": \"ping\"}";
  // 设置 Register 服务地址
  $gateway->registerAddress = '127.0.0.1:1236';
}

这样就启动了一个 Gateway 服务。更多的配置项参考配置

初始化BusinessWorker服务

打开app/command/Workerman.php文件,添加startBusinessWorker方法

protected function startBusinessWorker(): void
{
  $businessWorker = new BusinessWorker();
  // BusinessWorker进程的名称
  $businessWorker->name = "WebSocketBusinessWorker";
  // BusinessWorker进程的数量,以便充分利用多cpu资源
  $businessWorker->count = 4;
  // 设置 Register 服务地址
  $businessWorker->registerAddress = "127.0.0.1:1236";
  // 设置使用哪个类来处理业务,业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现
  $businessWorker->eventHandler = SocketEvents::class;
}

同时,我们需要一个类来处理消息业务,在app/command,下新建一个src目录,并且在该目录下新建SocketEvents.php文件来处理消息业务

<?php
namespace app\command\src;
use think\console\Output;

class SocketEvents {
    /**
     * 客户端连接的回调
     * @param string $client_id
     * @return void
     */
    public static function onConnect(string $client_id): void
    {
        $output = new Output();
        $output->writeln("客户端: $client_id 已连接");
    }

    /**
     * 客户端接收到消息的回调
     * @param string $client_id
     * @param mixed $message
     * @return void
     */
    public static function onMessage(string $client_id, mixed $message): void
    {
        $output = new Output();
        $output->writeln("接收到客户端[$client_id]的新消息: $message");
    }

    /**
     * 客户端断开的回调
     * @param string $client_id
     * @return void
     */
    public static function onClose(string $client_id): void
    {
        $output = new Output();
        $output->writeln("客户端: $client_id 已断开连接");
    }
}

这样,我们就启动了一个 BusinessWorker 服务。更多的配置项参考配置

启动Register服务

打开app/command/Workerman.php文件,添加startRegister方法

<?php
protected function startRegister(): void
{
  // Register 服务地址,一定是 text 协议。该端口千万不能对外暴露
  new Register("text://127.0.0.1:1236");
}

这样就启动了一个 Register 服务

启动服务

打开app/command/Workerman.php文件,添加runAll方法

<?php
protected function runAll(): void
{
  $this->startRegister();
  $this->startBusinessWorker();
  $this->startGateway();

  Worker::runAll();
}

完整代码

<?php
declare (strict_types = 1);

namespace app\command;

use app\command\src\SocketEvents;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use Workerman\Worker;

class Workerman extends Command
{
    protected function configure(): void
    {
        $this->setName("workerman")
            ->addArgument("action", Argument::OPTIONAL, "start|stop|restart|reload|status|connections", "start")
            ->addOption("mode", "m", Option::VALUE_OPTIONAL, "Run the workerman server in daemon mode.")
            ->setDescription("the workerman command");
    }

    protected function execute(Input $input, Output $output): void
    {
        $action = $input->getArgument("action");
        $mode = $input->getOption("mode");

        global $argv;

        $argv = [];

        array_unshift($argv, "think", $action);
        if ($mode == "d") {
            $argv[] = "-d";
        } else if ($mode == "g") {
            $argv[] = "-g";
        }

        $this->runAll();
    }

    protected function runAll(): void
    {
        $this->startRegister();
        $this->startBusinessWorker();
        $this->startGateway();

        Worker::runAll();
    }

    protected function startGateway(): void
    {
        $gateway = new Gateway("websocket://0.0.0.0:10002");
        $gateway->name = "WebSocketGateway";
        $gateway->count = 2;
        $gateway->lanIp = "127.0.0.1";
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 2;
        $gateway->pingData = "{\"type\": \"ping\"}";
        $gateway->registerAddress = '127.0.0.1:1236';
    }

    protected function startRegister(): void
    {
        new Register("text://127.0.0.1:1236");
    }

    protected function startBusinessWorker(): void
    {
        $businessWorker = new BusinessWorker();
        $businessWorker->name = "WebSocketBusinessWorker";
        $businessWorker->count = 4;
        $businessWorker->registerAddress = "127.0.0.1:1236";
        $businessWorker->eventHandler = SocketEvents::class;
    }
}

使用示例

启动服务

php think workerman start

后台运行

php think workerman start d

停止服务

php think workerman stop

重启服务

php think workerman reload

在php后端向客户端推送消息

<?php
use GatewayClient\Gateway;

class Message
{
  public function sendMessage(): void
  {
    // 这里使用的是GatewayClient的Gateway,不要搞错了。注册的地址是启动服务时监听的 Register 类的地址
    Gateway::$registerAddress = "127.0.0.1:1236";

    Gateway::sendToClient("客户端ID", "消息内容");
  }
}

实现原理

GatewayWorker 实现原理参照实现原理

写在最后

本文仅探讨 GatewayWorker 的使用方法。根据项目需求可选择使用 Wokerman 或是 GatewayWorker。具体参照用GatewayWorker还是Workerman?

最后修改:2025 年 02 月 04 日
如果觉得我的文章对你有用,请随意赞赏