vendor/symfony/messenger/Command/FailedMessagesRetryCommand.php line 44

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\Console\Attribute\AsCommand;
  13. use Symfony\Component\Console\Exception\RuntimeException;
  14. use Symfony\Component\Console\Input\InputArgument;
  15. use Symfony\Component\Console\Input\InputInterface;
  16. use Symfony\Component\Console\Input\InputOption;
  17. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  18. use Symfony\Component\Console\Output\OutputInterface;
  19. use Symfony\Component\Console\Style\SymfonyStyle;
  20. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  21. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  22. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  23. use Symfony\Component\Messenger\Exception\LogicException;
  24. use Symfony\Component\Messenger\MessageBusInterface;
  25. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  26. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  27. use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
  28. use Symfony\Component\Messenger\Worker;
  29. use Symfony\Contracts\Service\ServiceProviderInterface;
  30. /**
  31.  * @author Ryan Weaver <ryan@symfonycasts.com>
  32.  */
  33. #[AsCommand(name'messenger:failed:retry'description'Retry one or more messages from the failure transport')]
  34. class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
  35. {
  36.     private EventDispatcherInterface $eventDispatcher;
  37.     private MessageBusInterface $messageBus;
  38.     private ?LoggerInterface $logger;
  39.     public function __construct(?string $globalReceiverNameServiceProviderInterface $failureTransportsMessageBusInterface $messageBusEventDispatcherInterface $eventDispatcherLoggerInterface $logger null)
  40.     {
  41.         $this->eventDispatcher $eventDispatcher;
  42.         $this->messageBus $messageBus;
  43.         $this->logger $logger;
  44.         parent::__construct($globalReceiverName$failureTransports);
  45.     }
  46.     /**
  47.      * {@inheritdoc}
  48.      */
  49.     protected function configure(): void
  50.     {
  51.         $this
  52.             ->setDefinition([
  53.                 new InputArgument('id'InputArgument::IS_ARRAY'Specific message id(s) to retry'),
  54.                 new InputOption('force'nullInputOption::VALUE_NONE'Force action without confirmation'),
  55.                 new InputOption('transport'nullInputOption::VALUE_OPTIONAL'Use a specific failure transport'self::DEFAULT_TRANSPORT_OPTION),
  56.             ])
  57.             ->setHelp(<<<'EOF'
  58. The <info>%command.name%</info> retries message in the failure transport.
  59.     <info>php %command.full_name%</info>
  60. The command will interactively ask if each message should be retried
  61. or discarded.
  62. Some transports support retrying a specific message id, which comes
  63. from the <info>messenger:failed:show</info> command.
  64.     <info>php %command.full_name% {id}</info>
  65. Or pass multiple ids at once to process multiple messages:
  66. <info>php %command.full_name% {id1} {id2} {id3}</info>
  67. EOF
  68.             )
  69.         ;
  70.     }
  71.     /**
  72.      * {@inheritdoc}
  73.      */
  74.     protected function execute(InputInterface $inputOutputInterface $output): int
  75.     {
  76.         $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
  77.         $io = new SymfonyStyle($input$output instanceof ConsoleOutputInterface $output->getErrorOutput() : $output);
  78.         $io->comment('Quit this command with CONTROL-C.');
  79.         if (!$output->isVeryVerbose()) {
  80.             $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  81.         }
  82.         $failureTransportName $input->getOption('transport');
  83.         if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
  84.             $this->printWarningAvailableFailureTransports($io$this->getGlobalFailureReceiverName());
  85.         }
  86.         if ('' === $failureTransportName || null === $failureTransportName) {
  87.             $failureTransportName $this->interactiveChooseFailureTransport($io);
  88.         }
  89.         $failureTransportName self::DEFAULT_TRANSPORT_OPTION === $failureTransportName $this->getGlobalFailureReceiverName() : $failureTransportName;
  90.         $receiver $this->getReceiver($failureTransportName);
  91.         $this->printPendingMessagesMessage($receiver$io);
  92.         $io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>'$failureTransportName));
  93.         $shouldForce $input->getOption('force');
  94.         $ids $input->getArgument('id');
  95.         if (=== \count($ids)) {
  96.             if (!$input->isInteractive()) {
  97.                 throw new RuntimeException('Message id must be passed when in non-interactive mode.');
  98.             }
  99.             $this->runInteractive($failureTransportName$io$shouldForce);
  100.             return 0;
  101.         }
  102.         $this->retrySpecificIds($failureTransportName$ids$io$shouldForce);
  103.         $io->success('All done!');
  104.         return 0;
  105.     }
  106.     private function runInteractive(string $failureTransportNameSymfonyStyle $iobool $shouldForce)
  107.     {
  108.         $receiver $this->failureTransports->get($failureTransportName);
  109.         $count 0;
  110.         if ($receiver instanceof ListableReceiverInterface) {
  111.             // for listable receivers, find the messages one-by-one
  112.             // this avoids using get(), which for some less-robust
  113.             // transports (like Doctrine), will cause the message
  114.             // to be temporarily "acked", even if the user aborts
  115.             // handling the message
  116.             while (true) {
  117.                 $ids = [];
  118.                 foreach ($receiver->all(1) as $envelope) {
  119.                     ++$count;
  120.                     $id $this->getMessageId($envelope);
  121.                     if (null === $id) {
  122.                         throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.'$failureTransportName));
  123.                     }
  124.                     $ids[] = $id;
  125.                 }
  126.                 // break the loop if all messages are consumed
  127.                 if (=== \count($ids)) {
  128.                     break;
  129.                 }
  130.                 $this->retrySpecificIds($failureTransportName$ids$io$shouldForce);
  131.             }
  132.         } else {
  133.             // get() and ask messages one-by-one
  134.             $count $this->runWorker($failureTransportName$receiver$io$shouldForce);
  135.         }
  136.         // avoid success message if nothing was processed
  137.         if (<= $count) {
  138.             $io->success('All failed messages have been handled or removed!');
  139.         }
  140.     }
  141.     private function runWorker(string $failureTransportNameReceiverInterface $receiverSymfonyStyle $iobool $shouldForce): int
  142.     {
  143.         $count 0;
  144.         $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io$receiver$shouldForce, &$count) {
  145.             ++$count;
  146.             $envelope $messageReceivedEvent->getEnvelope();
  147.             $this->displaySingleMessage($envelope$io);
  148.             $shouldHandle $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
  149.             if ($shouldHandle) {
  150.                 return;
  151.             }
  152.             $messageReceivedEvent->shouldHandle(false);
  153.             $receiver->reject($envelope);
  154.         };
  155.         $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
  156.         $worker = new Worker(
  157.             [$failureTransportName => $receiver],
  158.             $this->messageBus,
  159.             $this->eventDispatcher,
  160.             $this->logger
  161.         );
  162.         try {
  163.             $worker->run();
  164.         } finally {
  165.             $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
  166.         }
  167.         return $count;
  168.     }
  169.     private function retrySpecificIds(string $failureTransportName, array $idsSymfonyStyle $iobool $shouldForce)
  170.     {
  171.         $receiver $this->getReceiver($failureTransportName);
  172.         if (!$receiver instanceof ListableReceiverInterface) {
  173.             throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.'$failureTransportName));
  174.         }
  175.         foreach ($ids as $id) {
  176.             $envelope $receiver->find($id);
  177.             if (null === $envelope) {
  178.                 throw new RuntimeException(sprintf('The message "%s" was not found.'$id));
  179.             }
  180.             $singleReceiver = new SingleMessageReceiver($receiver$envelope);
  181.             $this->runWorker($failureTransportName$singleReceiver$io$shouldForce);
  182.         }
  183.     }
  184. }