代码拉取完成,页面将自动刷新
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @see https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Lwz\HyperfRocketMQ;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\ProcessManager;
use Lwz\HyperfRocketMQ\Annotation\Consumer as ConsumerAnnotation;
use Lwz\HyperfRocketMQ\Message\ConsumerMessageInterface;
use Psr\Container\ContainerInterface;
class ConsumerManager
{
private ContainerInterface $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function run()
{
$classes = AnnotationCollector::getClassesByAnnotation(ConsumerAnnotation::class);
/**
* @var string $class
* @var ConsumerAnnotation $annotation
*/
foreach ($classes as $class => $annotation) {
$instance = make($class);
if (! $instance instanceof ConsumerMessageInterface) {
continue;
}
$annotation->poolName && $instance->setPoolName($annotation->poolName);
$annotation->topic && $instance->setTopic($annotation->topic);
$annotation->groupId && $instance->setGroupId($annotation->groupId);
$annotation->messageTag && $instance->setMessageTag($annotation->messageTag);
$annotation->numOfMessage && $instance->setNumOfMessage($annotation->numOfMessage);
$annotation->waitSeconds && $instance->setWaitSeconds($annotation->waitSeconds);
$annotation->enable && $instance->setEnable($instance->isEnable());
$annotation->maxConsumption && $instance->setMaxConsumption($annotation->maxConsumption);
$annotation->openCoroutine && $instance->setOpenCoroutine($annotation->openCoroutine);
property_exists($instance, 'container') && $instance->container = $this->container;
$nums = $annotation->processNums;
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->name = $annotation->name . '-' . $instance->getMessageTag();
ProcessManager::register($process);
}
}
private function createProcess(ConsumerMessageInterface $consumerMessage): AbstractProcess
{
return new class($this->container, $consumerMessage) extends AbstractProcess {
/**
* @var \Hyperf\Amqp\Consumer
*/
private $consumer;
/**
* @var ConsumerMessageInterface
*/
private $consumerMessage;
public function __construct(ContainerInterface $container, ConsumerMessageInterface $consumerMessage)
{
parent::__construct($container);
$this->consumer = $container->get(Consumer::class);
$this->consumerMessage = $consumerMessage;
}
public function handle(): void
{
$this->consumer->consume($this->consumerMessage);
}
public function getConsumerMessage(): ConsumerMessageInterface
{
return $this->consumerMessage;
}
public function isEnable($server): bool
{
return $this->consumerMessage->isEnable();
}
};
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。