代码拉取完成,页面将自动刷新
<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2019/6/24
* Time: 9:42
*/
namespace app\http;
use Channel\Client;
use think\facade\Db;
use think\worker\Server;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;
/**
* Class Websocket
* @package app\http
* TP里面用workerman的用法测试
*/
class Websocket extends Server
{
protected $HEARTBEAT_TIME = 55;
// 全局变量,保存当前进程的客户端连接数
protected $connection_count = 0;
protected $socket = 'websocket://0.0.0.0:2345';
//protected $host = '0.0.0.0';
protected $option = ['count' => 4, 'name' => 'newWebsocket'];
protected $mysql_host = '127.0.0.1';
protected $mysql_port = '3306';
protected $mysql_user = 'root';
protected $mysql_pwd = 'root';
protected $mysql_db_name = 'workerman';
protected function init()
{
// 初始化一个Channel服务端
//$channel_server = new \Channel\Server('0.0.0.0', 2206);
//$channel_server->name = 'channel_server';
parent::init(); // TODO: Change the autogenerated stub
}
/**
* @param $worker
* 初始化workerman
*/
public function onWorkerStart($worker)
{
//保存每个用户的connection
$worker->connection_uids = array();
// global $db;
// $db = 100;
// 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
//参考文档:http://doc.workerman.net/faq/send-data-to-client.html
//----------------------------------------初始化mysql链接-------------------------
// 将db实例存储在全局变量中(也可以存储在某类的静态成员中),参考手册http://doc.workerman.net/components/workerman-mysql.html
//global $db;
global $inner_worker;
//$db = new \Workerman\MySQL\Connection($this->mysql_host, $this->mysql_port, $this->mysql_user, $this->mysql_pwd, $this->mysql_db_name);
//self::websocketAsyncTcpConnection($worker);
//----------------------------------------内部监听其他的端口------------------------
$inner_worker = new \Workerman\Worker('tcp://0.0.0.0:2347');
/**
* 多个进程监听同一个端口(监听套接字不是继承自父进程)
* 需要开启端口复用,不然会报Address already in use错误
*/
$inner_worker->count = 4;
$inner_worker->reusePort = true;
//获取转发过来的消息
$inner_worker->onMessage = function ($connection, $message) use ($worker) {
// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
$connection->lastMessageTime = time();
// 解析数据
$resData = json_decode($message, true);
// 这里处理设备发来的数据 $data
// 把tcp设备消息发送数据转发给websocket,前端根据不同消息类型展示不同的数据
foreach ($worker->connections as $websocket_con) {
//发给websocket绑定的某一个用户
$websocket_con->send($message);
self::sendMessageByUid(100, 'websock我想你了',$websocket_con);
}
};
$inner_worker->onWorkerStart = function ($inner_worker) {
//---------------------定义TCP的心跳-------------------
Timer::add(1, function () use ($inner_worker) {
$time_now = time();
foreach ($inner_worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > ($this->HEARTBEAT_TIME)) {
$connection->close();
} else {
$connection->send('{"type":"ping"}');
}
}
});
};
$inner_worker->onConnect = function ($connection) {
// 有新的客户端连接时,连接数+1
global $connection_count;
++$connection_count;
echo "tcp closed\n";
};
// 执行监听
$inner_worker->listen();
//------------------------------------------创建一个定时器-----------------------------
// 1秒后启动一个udp客户端,连接1234端口并发送字符串 hi
Timer::add(1, function ()use($worker) {
$udp_connection = new AsyncTcpConnection('udp://127.0.0.1:1234');
$udp_connection->onConnect = function ($udp_connection)use($worker) {
$udp_connection->send('hi');
};
$udp_connection->onMessage = function ($udp_connection, $data) {
// 收到服务端返回的数据 hello
echo "recv $data\r\n";
// 关闭连接
$udp_connection->close();
};
$udp_connection->connect();
}, null, false);
//-------------------------------------------心跳检测---------------------------------
Timer::add(1, function () use ($worker) {
$time_now = time();
foreach ($worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - ($connection->lastMessageTime) > ($this->HEARTBEAT_TIME)) {
$connection->close();
} else {
$connection->send('{"type":"ping"}');
}
}
});
echo "Worker starting...\n";
}
/**
* @param $connection
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
* $connection->client_id 连接id
*/
public function onConnect($connection)
{
// 设置连接的onMessage回调,跟下面的onMessage方法是一样的效果,不过一般建议写成单独的方法体处理
//$connection->onMessage = function($connection, $data)
//{
//var_dump($data);
//$connection->send('receive success');
//};
global $id;
$id = $connection->id;
++$id;
// 有新的客户端连接时,连接数+1
global $connection_count;
++$connection_count;
// 给connection对象动态添加一个属性,用来保存当前连接发来多少个请求
$connection->messageCount = 0;
echo "new connection from ip " . $connection->getRemoteIp() . ":" . $connection->getRemotePort() . "\n";
}
/**
* @param $connection
* @param $data 内容
* 当客户端发来消息时触发
*/
public function onMessage($connection, $data)
{
global $inner_worker;
// global $db;
// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
$connection->lastMessageTime = time();
$connection->connection_uids = array();
// 解析数据
$resData = json_decode($data, true);
//--------------------每个连接接收100个请求后就不再接收数据-------------------
// $limit = 100;
// if (++$connection->messageCount > $limit) {
// $connection->pauseRecv();
// // 30秒后恢复接收数据
// Timer::add(30, function ($connection) {
// $connection->resumeRecv();
// }, array($connection), false);
// }
//找到workerman的生成的ID,就绑定用户的userID,因为只要下线了ID会变,所以在下线的时候要做下线处理
$id = $connection->id;
$type = $resData['type'];
switch ($type) {
case 'bindUser':
//处理数据表中对用的用户userID的workerman的ID
break;
case 'init':
//绑定用户到$client_id
$client_id = $connection->id;
$connection->uid = $resData['uid'];
$connection->connection_uids[$resData['uid']] = $connection;
//把uid加入数组中,设置全局(global)引用
//把websocket的消息发给tcp设备
foreach ($inner_worker->connections as $tcp_con) {
//发给tcp绑定的某一个用户
$uid = $resData['uid'];
if (isset($connection->connection_uids[$uid])) {
$tcp_con->send('300');
}
}
break;
case 'login':
$connection->uid = $resData->uid;
$connection->connection_uids[$resData->uid] = $connection;
echo "收到登录请求,uid={$resData->uid}\n";
break;
case 'send_message':
if ($resData->to == 'all') {
echo "向全部用户发送消息\n";
self::broadcast($resData->message,$connection);
} else {
echo "向用户{$resData->to}发送消息\n";
self::sendMessageByUid($resData->to, $resData->message,$connection);
}
break;
default :
// --------注意不能在最开始就发送---不然就会报错--------------
//$connection->send($db);
//测试发送数据,(表这些按照你自己的实际情况填写)前端可以收到
//$data = Db::table('tp_admin')->select();
//$connection->send(json_encode($data));
break;
}
$connection->send($data);
}
//向所有用户发送消息
protected static function broadcast($message,$connection)
{
foreach ($connection as $conn) {
$conn->send($message);
}
}
//向某个用户发送消息
protected static function sendMessageByUid($uid, $message,$connection)
{
if (isset($connection->connection_uids[$uid])) {
$conn = $connection->connection_uids[$uid];
$conn->send($message);
}
}
public function onClose($connection)
{
// 客户端关闭时,连接数-1
global $connection_count;
$connection_count--;
global $id;
$id--;
//如果当前的id下线了 就操作用户的数据表
//db('shop_user')->where([['status','=',1]])->update(['worker_id'=>0]);
echo "connection closed\n";
}
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* @param $worker 这个主监听的worker,不是异步监听或者内部监听
* @throws \Exception
* 异步访问外部websocket服务,并设置以哪个本地ip及端口访问
*/
protected function websocketAsyncTcpConnection($worker)
{
global $AsyncTcpWorker;
// 设置访问对方主机的本地ip及端口(每个socket连接都会占用一个本地端口)
$context_option = array(
'socket' => array(
// ip必须是本机网卡ip,并且能访问对方主机,否则无效
'bindto' => '127.0.0.1:2346',
),
// ssl选项,参考http://php.net/manual/zh/context.ssl.php
//'ssl' => array(
// 本地证书路径。 必须是 PEM 格式,并且包含本地的证书及私钥。
//'local_cert' => '/your/path/to/pemfile',
// local_cert 文件的密码。
//'passphrase' => 'your_pem_passphrase',
// 是否允许自签名证书。
//'allow_self_signed' => true,
// 是否需要验证 SSL 证书。
//'verify_peer' => false
//)
);
// 发起异步连接
$AsyncTcpWorker = new AsyncTcpConnection('ws://127.0.0.1:2345', $context_option);
// 设置以ssl加密方式访问
//$con->transport = 'ssl';
$AsyncTcpWorker->onConnect = function ($con) {
echo 'AsyncTcpConnection start....' . "\r\n";
};
$AsyncTcpWorker->onClose = function ($con) {
// 如果连接断开,则在1秒后重连
$con->reConnect(1);
};
$AsyncTcpWorker->onMessage = function ($con, $data) use ($worker) {
foreach ($worker->connections as $connection) {
//$connection->send('110110');
}
//echo $data;
};
$AsyncTcpWorker->connect();
}
/**
* 异步访问外部tcp服务
*/
protected function tcpAsyncTcpConnection($worker)
{
// 发起异步连接
$tcp_worker = new AsyncTcpConnection('tcp://127.0.0.1:2347');
// 设置以ssl加密方式访问
//$con->transport = 'ssl';
$tcp_worker->onConnect = function ($con) {
echo 'AsyncTcpConnection start....' . "\r\n";
};
$tcp_worker->onClose = function ($con) {
// 如果连接断开,则在1秒后重连
$con->reConnect(1);
};
$tcp_worker->onMessage = function ($con, $data) use ($worker) {
foreach ($worker->connections as $connection) {
//$connection->send('110110');
}
//echo $data;
};
$tcp_worker->connect();
}
/**
* @param $connection
* 内部流量控制
* 将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用
* onMessage方法里面调用
*/
protected function InternalFlowControl($connection)
{
// 建立本地80端口的异步连接
$connection_to_80 = new AsyncTcpConnection('tcp://127.0.0.1:80');
// 设置将当前客户端连接的数据导向80端口的连接
$connection->pipe($connection_to_80);
// 设置80端口连接返回的数据导向客户端连接
$connection_to_80->pipe($connection);
// 执行异步连接
$connection_to_80->connect();
}
/**
* @param $connection
* 使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用
* 这个方法在onMessage方法里面调用
*/
protected function pauseRecvConnection($connection, $limit)
{
// 每个连接接收100个请求后就不再接收数据
if (++$connection->messageCount > $limit) {
//使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用
$connection->pauseRecv();
// 30秒后恢复接收数据
Timer::add(30, function ($connection) {
//使当前连接继续接收数据
$connection->resumeRecv();
}, array($connection), false);
}
}
/**
* @param $worker
* @param string $event_name
* 一般结合在onWorkerStart里面引用初始化操作
* 订阅主题和相关的操作 返回给客户端
*/
protected function ChannelGroupServer($worker, $event_name_title = '广播')
{
// Channel客户端连接到Channel服务端
Client::connect('127.0.0.1', 2206);
// 以自己的进程id为事件名称
$event_name = $worker->id;
// 订阅worker->id事件并注册事件处理函数
Client::on($event_name, function ($event_data) use ($worker) {
$to_connection_id = $event_data['to_connection_id'];
$message = $event_data['content'];
if (!isset($worker->connections[$to_connection_id])) {
echo "connection not exists\n";
return;
}
$to_connection = $worker->connections[$to_connection_id];
$to_connection->send($message);
});
// 收到广播事件后向当前进程内所有客户端连接发送广播数据
Client::on($event_name_title, function ($event_data) use ($worker) {
$message = $event_data['content'];
foreach ($worker->connections as $connection) {
$connection->send($message);
}
});
}
/**
* @param $data 接受客户端发来的内容
* 一般结合在onMessage里面引用初始化操作
* 发送主题相关内容到客户端
*/
protected function ChannelGroupClient($data, $event_name_title)
{
//接收的data必须格式化之后才能用
$data_connet = json_decode($data);
if (empty($data_connet['content'])) return;
// 是向某个worker进程中某个连接推送数据
if (isset($data_connet['to_worker_id']) && isset($data_connet['to_connection_id'])) {
$event_name = $data_connet['to_worker_id'];
$to_connection_id = $data_connet['to_connection_id'];
$content = $data_connet['content'];
Client::publish($event_name, array(
'to_connection_id' => $to_connection_id,
'content' => $content
));
} else {
// 是全局广播数据
$content = $data_connet['content'];
Client::publish($event_name_title, array(
'content' => $content
));
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。