安装依赖
核心依赖
composer require workerman/gateway-worker
如果想在 php 后端推送消息给客户端,还需要安装
composer require workerman/gatewayclient
创建thinkphp自定义指令
生成自定义指令
php think make:command Workerman workerman
执行完,会在app/command
目录下生成一个Workerman.php
的文件
注册自定义指令
在根目录的config/console.php
添加一行
<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
// 指令定义
'commands' => [
// 添加这行
'workerman' => 'app\command\Workerman',
],
];
实现功能
配置指令
打开app/command/Workerman.php
文件
<?php
declare (strict_types = 1);
namespace app\command;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use Workerman\Worker;
class Workerman extends Command
{
protected function configure(): void
{
$this->setName("workerman")
->addArgument("action", Argument::OPTIONAL, "start|stop|restart|reload|status|connections", "start")
->addOption("mode", "m", Option::VALUE_OPTIONAL, "Run the workerman server in daemon mode.")
->setDescription("the workerman command");
}
protected function execute(Input $input, Output $output): void
{
$action = $input->getArgument("action");
$mode = $input->getOption("mode");
global $argv;
$argv = [];
array_unshift($argv, "think", $action);
if ($mode == "d") {
$argv[] = "-d";
} else if ($mode == "g") {
$argv[] = "-g";
}
}
}
完成后我们就可以使用 workerman 的执行了,一些常用的指令
php think workerman start
php think workerman stop
php think workerman reload
在后台启动服务php think workerman start d
初始化Gateway服务
打开app/command/Workerman.php
文件,添加startGateway
方法
<?php
protected function startGateway(): void
{
// 暴露给客户端连接的地址,使用 websocket 协议
$gateway = new Gateway("websocket://0.0.0.0:10002");
// Gateway 进程的名称
$gateway->name = "WebSocketGateway";
// Gateway进程的数量,一般一台服务器设置1-2个足够,设置多了对性能有一定影响
$gateway->count = 2;
// lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可
$gateway->lanIp = "127.0.0.1";
// 心跳发送间隔
$gateway->pingInterval = 30;
// 心跳无响应检测次数
$gateway->pingNotResponseLimit = 2;
// 心跳数据格式
$gateway->pingData = "{\"type\": \"ping\"}";
// 设置 Register 服务地址
$gateway->registerAddress = '127.0.0.1:1236';
}
这样就启动了一个 Gateway 服务。更多的配置项参考配置
初始化BusinessWorker服务
打开app/command/Workerman.php
文件,添加startBusinessWorker
方法
protected function startBusinessWorker(): void
{
$businessWorker = new BusinessWorker();
// BusinessWorker进程的名称
$businessWorker->name = "WebSocketBusinessWorker";
// BusinessWorker进程的数量,以便充分利用多cpu资源
$businessWorker->count = 4;
// 设置 Register 服务地址
$businessWorker->registerAddress = "127.0.0.1:1236";
// 设置使用哪个类来处理业务,业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现
$businessWorker->eventHandler = SocketEvents::class;
}
同时,我们需要一个类来处理消息业务,在app/command
,下新建一个src
目录,并且在该目录下新建SocketEvents.php
文件来处理消息业务
<?php
namespace app\command\src;
use think\console\Output;
class SocketEvents {
/**
* 客户端连接的回调
* @param string $client_id
* @return void
*/
public static function onConnect(string $client_id): void
{
$output = new Output();
$output->writeln("客户端: $client_id 已连接");
}
/**
* 客户端接收到消息的回调
* @param string $client_id
* @param mixed $message
* @return void
*/
public static function onMessage(string $client_id, mixed $message): void
{
$output = new Output();
$output->writeln("接收到客户端[$client_id]的新消息: $message");
}
/**
* 客户端断开的回调
* @param string $client_id
* @return void
*/
public static function onClose(string $client_id): void
{
$output = new Output();
$output->writeln("客户端: $client_id 已断开连接");
}
}
这样,我们就启动了一个 BusinessWorker 服务。更多的配置项参考配置
启动Register服务
打开app/command/Workerman.php
文件,添加startRegister
方法
<?php
protected function startRegister(): void
{
// Register 服务地址,一定是 text 协议。该端口千万不能对外暴露
new Register("text://127.0.0.1:1236");
}
这样就启动了一个 Register 服务
启动服务
打开app/command/Workerman.php
文件,添加runAll
方法
<?php
protected function runAll(): void
{
$this->startRegister();
$this->startBusinessWorker();
$this->startGateway();
Worker::runAll();
}
完整代码
<?php
declare (strict_types = 1);
namespace app\command;
use app\command\src\SocketEvents;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use Workerman\Worker;
class Workerman extends Command
{
protected function configure(): void
{
$this->setName("workerman")
->addArgument("action", Argument::OPTIONAL, "start|stop|restart|reload|status|connections", "start")
->addOption("mode", "m", Option::VALUE_OPTIONAL, "Run the workerman server in daemon mode.")
->setDescription("the workerman command");
}
protected function execute(Input $input, Output $output): void
{
$action = $input->getArgument("action");
$mode = $input->getOption("mode");
global $argv;
$argv = [];
array_unshift($argv, "think", $action);
if ($mode == "d") {
$argv[] = "-d";
} else if ($mode == "g") {
$argv[] = "-g";
}
$this->runAll();
}
protected function runAll(): void
{
$this->startRegister();
$this->startBusinessWorker();
$this->startGateway();
Worker::runAll();
}
protected function startGateway(): void
{
$gateway = new Gateway("websocket://0.0.0.0:10002");
$gateway->name = "WebSocketGateway";
$gateway->count = 2;
$gateway->lanIp = "127.0.0.1";
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 2;
$gateway->pingData = "{\"type\": \"ping\"}";
$gateway->registerAddress = '127.0.0.1:1236';
}
protected function startRegister(): void
{
new Register("text://127.0.0.1:1236");
}
protected function startBusinessWorker(): void
{
$businessWorker = new BusinessWorker();
$businessWorker->name = "WebSocketBusinessWorker";
$businessWorker->count = 4;
$businessWorker->registerAddress = "127.0.0.1:1236";
$businessWorker->eventHandler = SocketEvents::class;
}
}
使用示例
启动服务
php think workerman start
后台运行
php think workerman start d
停止服务
php think workerman stop
重启服务
php think workerman reload
在php后端向客户端推送消息
<?php
use GatewayClient\Gateway;
class Message
{
public function sendMessage(): void
{
// 这里使用的是GatewayClient的Gateway,不要搞错了。注册的地址是启动服务时监听的 Register 类的地址
Gateway::$registerAddress = "127.0.0.1:1236";
Gateway::sendToClient("客户端ID", "消息内容");
}
}
实现原理
GatewayWorker 实现原理参照实现原理
写在最后
本文仅探讨 GatewayWorker 的使用方法。根据项目需求可选择使用 Wokerman 或是 GatewayWorker。具体参照用GatewayWorker还是Workerman?