1 Star 0 Fork 0

三仓库,镜像/php-amqp

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
amqpexchange_publish_confirms_consume.phpt 2.58 KB
一键复制 编辑 原始数据 按行查看 历史
Tim Bezhashvyly 提交于 2018-03-07 22:23 +08:00 . #313: Fix exception message (#314)
--TEST--
AMQPExchange::publish() - publish in conform mode and handle conforms with AMQPQueue::consume() method
--SKIPIF--
<?php if (!extension_loaded("amqp")) print "skip"; ?>
--FILE--
<?php
function exception_error_handler($errno, $errstr, $errfile, $errline)
{
echo $errstr, PHP_EOL;
}
set_error_handler('exception_error_handler');
$cnn = new AMQPConnection();
$cnn->setReadTimeout(2);
$cnn->connect();
$ch = new AMQPChannel($cnn);
$ch->confirmSelect();
$ex = new AMQPExchange($ch);
$ex->setName("exchange-" . microtime(true));
$ex->setType(AMQP_EX_TYPE_FANOUT);
$ex->setFlags(AMQP_AUTODELETE);
$ex->declareExchange();
echo $ex->publish('message 1', 'routing.key', AMQP_MANDATORY) ? 'true' : 'false', PHP_EOL;
// Create a new queue
$q = new AMQPQueue($ch);
$q->setName('queue-' . microtime(true));
$q->setFlags(AMQP_AUTODELETE);
$q->declareQueue();
$msg = $q->get();
var_dump($msg);
try {
$q->consume(function() use ($q) {
echo 'Message returned', PHP_EOL;
var_dump(func_get_args());
return false;
});
} catch(Exception $e) {
echo get_class($e), "({$e->getCode()}): ", $e->getMessage(). PHP_EOL;
}
echo $ex->publish('message 2', 'routing.key', AMQP_MANDATORY) ? 'true' : 'false', PHP_EOL;
/* callback(int $reply_code, string $reply_text, string $exchange, string $routing_key, AMQPBasicProperties $properties, string $body); */
$ch->setReturnCallback(function ($reply_code, $reply_text, $exchange, $routing_key, AMQPBasicProperties $properties, $body) {
echo 'Message returned: ', $reply_text, ', message body:', $body, PHP_EOL;
});
$cnt = 1;
$ch->setConfirmCallback(function ($delivery_tag, $multiple) use(&$cnt) {
echo 'Message acked', PHP_EOL;
var_dump(func_get_args());
return --$cnt > 0;
}, function ($delivery_tag, $multiple, $requeue) {
echo 'Message nacked', PHP_EOL;
var_dump(func_get_args());
return false;
});
try {
$q->consume(function() use ($q) {
echo 'Received message', PHP_EOL;
var_dump(func_get_args());
return false;
});
} catch(Exception $e) {
echo get_class($e), "({$e->getCode()}): ", $e->getMessage(). PHP_EOL;
}
$q->delete();
$ex->delete();
?>
--EXPECTF--
true
bool(false)
Unhandled basic.return method from server received. Use AMQPChannel::setReturnCallback() to process it.
Unhandled basic.ack method from server received. Use AMQPChannel::setConfirmCallback() to process it.
AMQPQueueException(0): Consumer timeout exceed
true
Message returned: NO_ROUTE, message body:message 2
Message acked
array(2) {
[0]=>
int(2)
[1]=>
bool(false)
}
AMQPQueueException(0): Consumer timeout exceed
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/third-repo/php-amqp.git
git@gitee.com:third-repo/php-amqp.git
third-repo
php-amqp
php-amqp
master

搜索帮助