vendor/enqueue/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php line 39

Open in your IDE?
  1. <?php
  2. namespace Enqueue\Symfony\Client\DependencyInjection;
  3. use Enqueue\Client\CommandSubscriberInterface;
  4. use Enqueue\Client\Route;
  5. use Enqueue\Client\RouteCollection;
  6. use Enqueue\Symfony\DiUtils;
  7. use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
  8. use Symfony\Component\DependencyInjection\ContainerBuilder;
  9. final class BuildCommandSubscriberRoutesPass implements CompilerPassInterface
  10. {
  11.     public function process(ContainerBuilder $container): void
  12.     {
  13.         if (false == $container->hasParameter('enqueue.clients')) {
  14.             throw new \LogicException('The "enqueue.clients" parameter must be set.');
  15.         }
  16.         $names $container->getParameter('enqueue.clients');
  17.         $defaultName $container->getParameter('enqueue.default_client');
  18.         foreach ($names as $name) {
  19.             $diUtils DiUtils::create(ClientFactory::MODULE$name);
  20.             $routeCollectionId $diUtils->format('route_collection');
  21.             if (false == $container->hasDefinition($routeCollectionId)) {
  22.                 throw new \LogicException(sprintf('Service "%s" not found'$routeCollectionId));
  23.             }
  24.             $tag 'enqueue.command_subscriber';
  25.             $routeCollection = new RouteCollection([]);
  26.             foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
  27.                 $processorDefinition $container->getDefinition($serviceId);
  28.                 if ($processorDefinition->getFactory()) {
  29.                     throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
  30.                 }
  31.                 $processorClass $processorDefinition->getClass() ?? $serviceId;
  32.                 if (false == class_exists($processorClass)) {
  33.                     throw new \LogicException(sprintf('The processor class "%s" could not be found.'$processorClass));
  34.                 }
  35.                 if (false == is_subclass_of($processorClassCommandSubscriberInterface::class)) {
  36.                     throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"'CommandSubscriberInterface::class, $tag));
  37.                 }
  38.                 foreach ($tagAttributes as $tagAttribute) {
  39.                     $client $tagAttribute['client'] ?? $defaultName;
  40.                     if ($client !== $name && 'all' !== $client) {
  41.                         continue;
  42.                     }
  43.                     /** @var CommandSubscriberInterface $processorClass */
  44.                     $commands $processorClass::getSubscribedCommand();
  45.                     if (empty($commands)) {
  46.                         throw new \LogicException('Command subscriber must return something.');
  47.                     }
  48.                     if (is_string($commands)) {
  49.                         $commands = [$commands];
  50.                     }
  51.                     if (!is_array($commands)) {
  52.                         throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
  53.                     }
  54.                     // 0.8 command subscriber
  55.                     if (isset($commands['processorName'])) {
  56.                         @trigger_error('The command subscriber 0.8 syntax is deprecated since Enqueue 0.9.'E_USER_DEPRECATED);
  57.                         $source $commands['processorName'];
  58.                         $processor $params['processorName'] ?? $serviceId;
  59.                         $options $commands;
  60.                         unset(
  61.                             $options['processorName'],
  62.                             $options['queueName'],
  63.                             $options['queueNameHardcoded'],
  64.                             $options['exclusive'],
  65.                             $options['topic'],
  66.                             $options['source'],
  67.                             $options['source_type'],
  68.                             $options['processor'],
  69.                             $options['options']
  70.                         );
  71.                         $options['processor_service_id'] = $serviceId;
  72.                         if (isset($commands['queueName'])) {
  73.                             $options['queue'] = $commands['queueName'];
  74.                         }
  75.                         if (isset($commands['queueNameHardcoded']) && $commands['queueNameHardcoded']) {
  76.                             $options['prefix_queue'] = false;
  77.                         }
  78.                         $routeCollection->add(new Route($sourceRoute::COMMAND$processor$options));
  79.                         continue;
  80.                     }
  81.                     if (isset($commands['command'])) {
  82.                         $commands = [$commands];
  83.                     }
  84.                     foreach ($commands as $key => $params) {
  85.                         if (is_string($params)) {
  86.                             $routeCollection->add(new Route($paramsRoute::COMMAND$serviceId, ['processor_service_id' => $serviceId]));
  87.                         } elseif (is_array($params)) {
  88.                             $source $params['command'] ?? null;
  89.                             $processor $params['processor'] ?? $serviceId;
  90.                             unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
  91.                             $options $params;
  92.                             $options['processor_service_id'] = $serviceId;
  93.                             $routeCollection->add(new Route($sourceRoute::COMMAND$processor$options));
  94.                         } else {
  95.                             throw new \LogicException(sprintf(
  96.                                 'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
  97.                                 $processorClass,
  98.                                 json_encode($processorClass::getSubscribedCommand())
  99.                             ));
  100.                         }
  101.                     }
  102.                 }
  103.             }
  104.             $rawRoutes $routeCollection->toArray();
  105.             $routeCollectionService $container->getDefinition($routeCollectionId);
  106.             $routeCollectionService->replaceArgument(0array_merge(
  107.                 $routeCollectionService->getArgument(0),
  108.                 $rawRoutes
  109.             ));
  110.         }
  111.     }
  112. }