Watch 1 Star 2 Fork 1

my_teste / PHP Simple NIO Server PHPMIT

Create your Gitee Account
Explore and code with more than 5 million developers,Free private repositories !:)
Sign up
利用socket_select实现的一个简单的NIO服务。 spread retract

Clone or download
nio_server.php 12.50 KB
Copy Edit Web IDE Raw Blame History
<?php
$port = 8080;
$socket = @stream_socket_server("tcp://0.0.0.0:$port", $errno, $errMsg);
if ($socket === false) {
throw new \RuntimeException("fail to listen on port: {$port}!");
}
fwrite(STDOUT, "socket server listen on port: {$port}" . PHP_EOL);
stream_set_blocking($socket, false);
$clients = [];
$changed = [];
$cartCheck = array();
while (true) {
checkMessage();
fwrite(STDOUT, "\nnew read message\n");
accept();
handleMessage();
}
function checkMessage() {
global $socket, $changed, $clients;
$changed = array_merge([$socket], $clients);
$write = null;
$except = null;
stream_select($changed, $write, $except, null);
}
function accept() {
global $socket, $changed, $clients;
if (!in_array($socket, $changed)) {
return;
}
while ($client = @stream_socket_accept($socket, 0)) {
$clients[] = $client;
fwrite(STDOUT, "client:" . (int)$client . " connected.\n");
stream_set_blocking($client, false);
$key = array_search($client, $changed);
unset($changed[$key]);
}
}
/**
* @param $host
* @param $port
* @param $method
* @param $data
* @param $socket
* @param bool $noBlocking
* @return bool|false|string
*/
function checkClient($host, $port, $method, $data, &$socket, $noBlocking = true)
{
fwrite(STDOUT, 'request other client: ' . json_encode(func_get_args()) . "\n");
$socket = @stream_socket_client("tcp://{$host}:{$port}", $errno, $errMsg);
if ($socket === false) {
throw new \RuntimeException("unable to create socket: " . $errMsg);
}
fwrite(STDOUT, "\nconnect to server: [{$host}:{$port}]...\n");
$message = json_encode([
"method" => $method,
"data" => $data
]);
fwrite(STDOUT, "send to server: $message\n");
$len = @fwrite($socket, $message);
if ($len === 0) {
fwrite(STDOUT, "socket closed\n");
return false;
}
if ($noBlocking) {
// 非阻塞方式, 不等结果,之间返回调用成功
return true;
} else {
// 阻塞方式,等待响应,读取后返回
$msg = @fread($socket, 4096);
if ($msg) {
fwrite(STDOUT, "receive server: $msg\n");
// 获取到返回结果后,再返回
return json_decode($msg, true);
} elseif (feof($socket)) {
fwrite(STDOUT, "socket closed\n");
return false;
}
return true;
}
}
/**
* 发起请求
*
* @param integer $productId 产品ID
* @param resource $socket 套接字
* @param bool $noBlocking 阻塞与否
*
* @return bool|false|string
*/
function checkInventory($productId, &$socket, $noBlocking = true)
{
// client.php
$host = "127.0.0.1";
$port = 8081;
$data = array('productId' => $productId);
return checkClient($host, $port, 'inventory', $data, $socket, $noBlocking);
}
/**
* 检查产品可售
*
* @param $productId
* @param $socket
* @param bool $noBlocking
* @return bool|false|string
*/
function checkProduct($productId, &$socket, $noBlocking = true)
{
// client.php
$host = "127.0.0.1";
$port = 8082;
$data = array('productId' => $productId);
return checkClient($host, $port, 'product', $data, $socket, $noBlocking);
}
/**
* 检查促销信息
*
* @param $productId
* @param $socket
* @param bool $noBlocking
* @return bool|false|string
*/
function checkPromo($productId, &$socket, $noBlocking = true)
{
// client.php
$host = "127.0.0.1";
$port = 8083;
$data = array('productId' => $productId);
return checkClient($host, $port, 'promo', $data, $socket, $noBlocking);
}
/**
* 三项检查是否已完成
*
* @param $oneCartCheck
* @param $reProductId
*/
function checkAllComplete($oneCartCheck, $reProductId)
{
global $cartCheck, $clients, $changed;
if (isset($oneCartCheck['inventory']) && isset($oneCartCheck['product']) && isset($oneCartCheck['promo'])) {
// 这里定义了一个cartCheck 全局变量,减产完毕,就会被设置为 true。三项为true就表示检查完毕
$checkRe = $oneCartCheck['inventory'] && ($oneCartCheck['product']) && ($oneCartCheck['promo']);
$requestSocket = $oneCartCheck['requestSocket'];
$requestKey = array_search($requestSocket, $clients);
// $requestSocket = $clients[$requestKey];
$reData = array('method' => 'cart', 'data' => array('product_id' => $reProductId), 're' => $checkRe, 'msg' => 'suc');
$msg = json_encode($reData);
fwrite($requestSocket, "response " . (int)$requestSocket . ": $msg");
fclose($requestSocket);
unset($clients[$requestKey], $cartCheck[$reProductId]);
fwrite(STDOUT, "complete response " . (int)$requestSocket . ", time " . date('Y-m-d H:i:s'));
}
}
/**
* 响应来自客户加购请求
*
* @param $socket
* @param $method
* @param $productId
* @param $re
* @return bool
*/
function responseClient($socket, $productId, $re)
{
global $clients;
$reMsg = array('method' => 'cart', 'data' => array('product_id' => $productId), 're' => $re, 'msg' => 'suc');
$data = json_encode($reMsg);
// 这里响应了,发起 加购 请求的客户端,
fwrite($socket, $data);
$key = array_search($socket, $clients);
// 关闭连接,
fclose($socket);
// 响应客户端后,就把它的socket从 select 去移除
unset($clients[$key]);
return true;
}
/**
* 消息处理
*/
function handleMessage() {
global $changed, $clients, $cartCheck;
foreach ($changed as $key => $client) {
while (true) {
$msg = @fread($client, 1024);
// $msg = 1;
if ($msg) {
fwrite(STDOUT, "receive client " . (int)$client . " message: $msg\n");
$json = json_decode($msg, true);
if ($json) {
$method = $json["method"];
if ('cart' == $method) {
$noBlocking = boolval($json['noBlocking']);
$productId = $json['data']['productId'];
fwrite(STDOUT, "start check cart , product_id: $productId, time" . date('Y-m-d H:i:s', time()) . "\n");
if ($noBlocking) {
// 非阻塞方式调用其他服务
$cartCheck[$productId] = array(
'requestSocket' => $client, // 这里记录发起亲求的socket
);
$inventorySocket = $productSocket = $promoSocket = null;
$requestSuc = checkInventory($productId, $inventorySocket, $noBlocking);
if (!$requestSuc) {
continue;
}
$clients[] = $inventorySocket;
$requestSuc = checkProduct($productId, $productSocket, $noBlocking);
if (!$requestSuc) {
continue;
}
$clients[] = $productSocket;
$requestSuc = checkPromo($productId, $promoSocket, $noBlocking);
if (!$requestSuc) {
continue;
}
$clients[] = $promoSocket;
// 这就完成了 3个检查请求
fwrite(STDOUT, "completely send check requests, product_id: $productId, time" . date('Y-m-d H:i:s', time()) . "\n");
} else {
// 阻塞方式调用其他服务
$cartCheck[$productId] = array(
'requestSocket' => null, // 这里记录发起亲求的socket
);
$checkAllSuc = false;
$inventorySocket = $productSocket = $promoSocket = null;
// 依次发起三个检查请求
$requestInventorySuc = checkInventory($productId, $inventorySocket, $noBlocking);
if ($requestInventorySuc) {
if ($requestInventorySuc['data']['re']) {
$requestProductSuc = checkProduct($productId, $productSocket, $noBlocking);
if ($requestProductSuc) {
if ($requestProductSuc['data']['re']) {
$requestPromoSuc = checkPromo($productId, $promoSocket, $noBlocking);
if (isset($requestPromoSuc['data']['re'])) {
$checkAllSuc = true;
}
}
}
}
// 如果检查库存失败,后面流程就不用继续了
}
responseClient($client, $productId, $checkAllSuc);
fwrite(STDOUT, "completely send check requests, product_id: $productId, time" . date('Y-m-d H:i:s', time()) . "\n");
break;
}
} elseif ('inventory' == $method) {
fwrite(STDOUT, "receive inventory's client " . (int)$client . ". \n");
$data = $json['data'];
if (isset($data['productId'])) {
$reProductId = $data['productId'];
if (isset($cartCheck[$reProductId])) {
$cartCheck[$reProductId]['inventory'] = $data['re'];
}
}
// 响应第三方服务
responseClient($client, $reProductId, true);
// 检查是否所有的信息已完成
$oneCartCheck = $cartCheck[$reProductId];
checkAllComplete($oneCartCheck, $reProductId);
break;
} elseif ('product' == $method) {
fwrite(STDOUT, "receive product's client " . (int)$client . ". \n");
$data = $json['data'];
if (isset($data['productId'])) {
$reProductId = $data['productId'];
if (isset($cartCheck[$reProductId])) {
$cartCheck[$reProductId]['product'] = $data['re'];
}
}
// 响应第三方服务
responseClient($client, $reProductId, true);
// 检查是否所有的信息已完成
$oneCartCheck = $cartCheck[$reProductId];
checkAllComplete($oneCartCheck, $reProductId);
break;
} elseif ('promo' == $method) {
fwrite(STDOUT, "receive promo's client " . (int)$client . ". \n");
$data = $json['data'];
if (isset($data['productId'])) {
$reProductId = $data['productId'];
if (isset($cartCheck[$reProductId])) {
$cartCheck[$reProductId]['promo'] = $data['re'];
}
}
// 响应第三方服务
responseClient($client, $reProductId, true);
// 检查是否所有的信息已完成
$oneCartCheck = $cartCheck[$reProductId];
checkAllComplete($oneCartCheck, $reProductId);
break;
} else {
foreach ($clients as $cl) {
@fwrite($cl, "message from " . (int)$client . ": $msg");
}
}
}
} else {
if (feof($client)) {
// fwrite(STDOUT, "\nclient " . (int)$client . " closed.\n");
// fclose($client);
// $key = array_search($client, $clients);
// unset($clients[$key]);
sleep(1);
}
break;
}
}
}
}

Comment ( 0 )

Sign in for post a comment

PHP
1
https://gitee.com/xupaul/php-nio-server.git
git@gitee.com:xupaul/php-nio-server.git
xupaul
php-nio-server
PHP Simple NIO Server
master

Search

231008 48f1a665 1899542 231017 9a6720c6 1899542