vendor/symfony/messenger/Command/ConsumeMessagesCommand.php line 52

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\Console\Attribute\AsCommand;
  14. use Symfony\Component\Console\Command\Command;
  15. use Symfony\Component\Console\Completion\CompletionInput;
  16. use Symfony\Component\Console\Completion\CompletionSuggestions;
  17. use Symfony\Component\Console\Exception\InvalidOptionException;
  18. use Symfony\Component\Console\Exception\RuntimeException;
  19. use Symfony\Component\Console\Input\InputArgument;
  20. use Symfony\Component\Console\Input\InputInterface;
  21. use Symfony\Component\Console\Input\InputOption;
  22. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  23. use Symfony\Component\Console\Output\OutputInterface;
  24. use Symfony\Component\Console\Question\ChoiceQuestion;
  25. use Symfony\Component\Console\Style\SymfonyStyle;
  26. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  27. use Symfony\Component\Messenger\EventListener\ResetServicesListener;
  28. use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
  29. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  30. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  31. use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
  32. use Symfony\Component\Messenger\RoutableMessageBus;
  33. use Symfony\Component\Messenger\Worker;
  34. /**
  35.  * @author Samuel Roze <samuel.roze@gmail.com>
  36.  */
  37. #[AsCommand(name'messenger:consume'description'Consume messages')]
  38. class ConsumeMessagesCommand extends Command
  39. {
  40.     private RoutableMessageBus $routableBus;
  41.     private ContainerInterface $receiverLocator;
  42.     private EventDispatcherInterface $eventDispatcher;
  43.     private ?LoggerInterface $logger;
  44.     private array $receiverNames;
  45.     private ?ResetServicesListener $resetServicesListener;
  46.     private array $busIds;
  47.     public function __construct(RoutableMessageBus $routableBusContainerInterface $receiverLocatorEventDispatcherInterface $eventDispatcherLoggerInterface $logger null, array $receiverNames = [], ResetServicesListener $resetServicesListener null, array $busIds = [])
  48.     {
  49.         $this->routableBus $routableBus;
  50.         $this->receiverLocator $receiverLocator;
  51.         $this->eventDispatcher $eventDispatcher;
  52.         $this->logger $logger;
  53.         $this->receiverNames $receiverNames;
  54.         $this->resetServicesListener $resetServicesListener;
  55.         $this->busIds $busIds;
  56.         parent::__construct();
  57.     }
  58.     /**
  59.      * {@inheritdoc}
  60.      */
  61.     protected function configure(): void
  62.     {
  63.         $defaultReceiverName === \count($this->receiverNames) ? current($this->receiverNames) : null;
  64.         $this
  65.             ->setDefinition([
  66.                 new InputArgument('receivers'InputArgument::IS_ARRAY'Names of the receivers/transports to consume in order of priority'$defaultReceiverName ? [$defaultReceiverName] : []),
  67.                 new InputOption('limit''l'InputOption::VALUE_REQUIRED'Limit the number of received messages'),
  68.                 new InputOption('failure-limit''f'InputOption::VALUE_REQUIRED'The number of failed messages the worker can consume'),
  69.                 new InputOption('memory-limit''m'InputOption::VALUE_REQUIRED'The memory limit the worker can consume'),
  70.                 new InputOption('time-limit''t'InputOption::VALUE_REQUIRED'The time limit in seconds the worker can handle new messages'),
  71.                 new InputOption('sleep'nullInputOption::VALUE_REQUIRED'Seconds to sleep before asking for new messages after no messages were found'1),
  72.                 new InputOption('bus''b'InputOption::VALUE_REQUIRED'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
  73.                 new InputOption('queues'nullInputOption::VALUE_REQUIRED InputOption::VALUE_IS_ARRAY'Limit receivers to only consume from the specified queues'),
  74.                 new InputOption('no-reset'nullInputOption::VALUE_NONE'Do not reset container services after each message'),
  75.             ])
  76.             ->setHelp(<<<'EOF'
  77. The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
  78.     <info>php %command.full_name% <receiver-name></info>
  79. To receive from multiple transports, pass each name:
  80.     <info>php %command.full_name% receiver1 receiver2</info>
  81. Use the --limit option to limit the number of messages received:
  82.     <info>php %command.full_name% <receiver-name> --limit=10</info>
  83. Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
  84.     <info>php %command.full_name% <receiver-name> --failure-limit=2</info>
  85. Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
  86.     <info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
  87. Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached.
  88. If a message is being handled, the worker will stop after the processing is finished:
  89.     <info>php %command.full_name% <receiver-name> --time-limit=3600</info>
  90. Use the --bus option to specify the message bus to dispatch received messages
  91. to instead of trying to determine it automatically. This is required if the
  92. messages didn't originate from Messenger:
  93.     <info>php %command.full_name% <receiver-name> --bus=event_bus</info>
  94. Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
  95.     <info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
  96. Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
  97.     <info>php %command.full_name% <receiver-name> --no-reset</info>
  98. EOF
  99.             )
  100.         ;
  101.     }
  102.     /**
  103.      * {@inheritdoc}
  104.      */
  105.     protected function interact(InputInterface $inputOutputInterface $output)
  106.     {
  107.         $io = new SymfonyStyle($input$output instanceof ConsoleOutputInterface $output->getErrorOutput() : $output);
  108.         if ($this->receiverNames && !$input->getArgument('receivers')) {
  109.             $io->block('Which transports/receivers do you want to consume?'null'fg=white;bg=blue'' 'true);
  110.             $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
  111.             if (\count($this->receiverNames) > 1) {
  112.                 $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>'implode(', '$this->receiverNames)));
  113.             }
  114.             $question = new ChoiceQuestion('Select receivers to consume:'$this->receiverNames0);
  115.             $question->setMultiselect(true);
  116.             $input->setArgument('receivers'$io->askQuestion($question));
  117.         }
  118.         if (!$input->getArgument('receivers')) {
  119.             throw new RuntimeException('Please pass at least one receiver.');
  120.         }
  121.     }
  122.     /**
  123.      * {@inheritdoc}
  124.      */
  125.     protected function execute(InputInterface $inputOutputInterface $output): int
  126.     {
  127.         $receivers = [];
  128.         foreach ($receiverNames $input->getArgument('receivers') as $receiverName) {
  129.             if (!$this->receiverLocator->has($receiverName)) {
  130.                 $message sprintf('The receiver "%s" does not exist.'$receiverName);
  131.                 if ($this->receiverNames) {
  132.                     $message .= sprintf(' Valid receivers are: %s.'implode(', '$this->receiverNames));
  133.                 }
  134.                 throw new RuntimeException($message);
  135.             }
  136.             $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
  137.         }
  138.         if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
  139.             $this->eventDispatcher->addSubscriber($this->resetServicesListener);
  140.         }
  141.         $stopsWhen = [];
  142.         if (null !== $limit $input->getOption('limit')) {
  143.             if (!is_numeric($limit) || >= $limit) {
  144.                 throw new InvalidOptionException(sprintf('Option "limit" must be a positive integer, "%s" passed.'$limit));
  145.             }
  146.             $stopsWhen[] = "processed {$limit} messages";
  147.             $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit$this->logger));
  148.         }
  149.         if ($failureLimit $input->getOption('failure-limit')) {
  150.             $stopsWhen[] = "reached {$failureLimit} failed messages";
  151.             $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit$this->logger));
  152.         }
  153.         if ($memoryLimit $input->getOption('memory-limit')) {
  154.             $stopsWhen[] = "exceeded {$memoryLimit} of memory";
  155.             $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
  156.         }
  157.         if (null !== $timeLimit $input->getOption('time-limit')) {
  158.             if (!is_numeric($timeLimit) || >= $timeLimit) {
  159.                 throw new InvalidOptionException(sprintf('Option "time-limit" must be a positive integer, "%s" passed.'$timeLimit));
  160.             }
  161.             $stopsWhen[] = "been running for {$timeLimit}s";
  162.             $this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit$this->logger));
  163.         }
  164.         $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
  165.         $io = new SymfonyStyle($input$output instanceof ConsoleOutputInterface $output->getErrorOutput() : $output);
  166.         $io->success(sprintf('Consuming messages from transport%s "%s".'\count($receivers) > 's' ''implode(', '$receiverNames)));
  167.         if ($stopsWhen) {
  168.             $last array_pop($stopsWhen);
  169.             $stopsWhen = ($stopsWhen implode(', '$stopsWhen).' or ' '').$last;
  170.             $io->comment("The worker will automatically exit once it has {$stopsWhen}.");
  171.         }
  172.         $io->comment('Quit the worker with CONTROL-C.');
  173.         if (OutputInterface::VERBOSITY_VERBOSE $output->getVerbosity()) {
  174.             $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  175.         }
  176.         $bus $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
  177.         $worker = new Worker($receivers$bus$this->eventDispatcher$this->logger);
  178.         $options = [
  179.             'sleep' => $input->getOption('sleep') * 1000000,
  180.         ];
  181.         if ($queues $input->getOption('queues')) {
  182.             $options['queues'] = $queues;
  183.         }
  184.         $worker->run($options);
  185.         return 0;
  186.     }
  187.     public function complete(CompletionInput $inputCompletionSuggestions $suggestions): void
  188.     {
  189.         if ($input->mustSuggestArgumentValuesFor('receivers')) {
  190.             $suggestions->suggestValues(array_diff($this->receiverNamesarray_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
  191.             return;
  192.         }
  193.         if ($input->mustSuggestOptionValuesFor('bus')) {
  194.             $suggestions->suggestValues($this->busIds);
  195.         }
  196.     }
  197.     private function convertToBytes(string $memoryLimit): int
  198.     {
  199.         $memoryLimit strtolower($memoryLimit);
  200.         $max ltrim($memoryLimit'+');
  201.         if (str_starts_with($max'0x')) {
  202.             $max \intval($max16);
  203.         } elseif (str_starts_with($max'0')) {
  204.             $max \intval($max8);
  205.         } else {
  206.             $max = (int) $max;
  207.         }
  208.         switch (substr(rtrim($memoryLimit'b'), -1)) {
  209.             case 't'$max *= 1024;
  210.             // no break
  211.             case 'g'$max *= 1024;
  212.             // no break
  213.             case 'm'$max *= 1024;
  214.             // no break
  215.             case 'k'$max *= 1024;
  216.         }
  217.         return $max;
  218.     }
  219. }