vendor/shopware/core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 130

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\Log\Package;
  10. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  11. use Shopware\Core\Framework\Struct\ArrayStruct;
  12. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  13. use Symfony\Component\Messenger\MessageBusInterface;
  14. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  15. /**
  16.  * @deprecated tag:v6.5.0 - reason:remove-subscriber - EntityIndexerRegistry will not implement EventSubscriberInterface anymore, it will also become final and internal in v6.5.0
  17.  */
  18. #[Package('core')]
  19. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  20. {
  21.     public const EXTENSION_INDEXER_SKIP 'indexer-skip';
  22.     /**
  23.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::USE_INDEXING_QUEUE, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::USE_INDEXING_QUEUE)` instead
  24.      */
  25.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  26.     /**
  27.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::DISABLE_INDEXING, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::DISABLE_INDEXING)` instead
  28.      */
  29.     public const DISABLE_INDEXING 'disable-indexing';
  30.     /**
  31.      * @var iterable<EntityIndexer>
  32.      */
  33.     private iterable $indexer;
  34.     private MessageBusInterface $messageBus;
  35.     private bool $working false;
  36.     private EventDispatcherInterface $dispatcher;
  37.     /**
  38.      * @internal
  39.      *
  40.      * @param iterable<EntityIndexer> $indexer
  41.      */
  42.     public function __construct(iterable $indexerMessageBusInterface $messageBusEventDispatcherInterface $dispatcher)
  43.     {
  44.         $this->indexer $indexer;
  45.         $this->messageBus $messageBus;
  46.         $this->dispatcher $dispatcher;
  47.     }
  48.     /**
  49.      * @deprecated tag:v6.5.0 - reason:remove-subscriber - will be removed in v6.5.0, event handling is done in `EntityIndexingSubscriber`
  50.      */
  51.     public static function getSubscribedEvents(): array
  52.     {
  53.         return [];
  54.     }
  55.     public static function getHandledMessages(): iterable
  56.     {
  57.         return [
  58.             EntityIndexingMessage::class,
  59.             IterateEntityIndexerMessage::class,
  60.         ];
  61.     }
  62.     /**
  63.      * @param list<string> $skip
  64.      * @param list<string> $only
  65.      */
  66.     public function index(bool $useQueue, array $skip = [], array $only = []): void
  67.     {
  68.         foreach ($this->indexer as $indexer) {
  69.             if (\in_array($indexer->getName(), $skiptrue)) {
  70.                 continue;
  71.             }
  72.             if (\count($only) > && !\in_array($indexer->getName(), $onlytrue)) {
  73.                 continue;
  74.             }
  75.             $offset null;
  76.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $indexer->getTotal()));
  77.             while ($message $indexer->iterate($offset)) {
  78.                 $message->setIndexer($indexer->getName());
  79.                 $message->addSkip(...$skip);
  80.                 $this->sendOrHandle($message$useQueue);
  81.                 $offset $message->getOffset();
  82.                 try {
  83.                     $count \is_array($message->getData()) ? \count($message->getData()) : 1;
  84.                     $this->dispatcher->dispatch(new ProgressAdvancedEvent($count));
  85.                 } catch (\Exception $e) {
  86.                 }
  87.             }
  88.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  89.         }
  90.     }
  91.     public function refresh(EntityWrittenContainerEvent $event): void
  92.     {
  93.         $context $event->getContext();
  94.         if ($this->working) {
  95.             return;
  96.         }
  97.         $this->working true;
  98.         if ($this->disabled($context)) {
  99.             $this->working false;
  100.             return;
  101.         }
  102.         $useQueue $this->useQueue($context);
  103.         foreach ($this->indexer as $indexer) {
  104.             $message $indexer->update($event);
  105.             if (!$message) {
  106.                 continue;
  107.             }
  108.             $message->setIndexer($indexer->getName());
  109.             self::addSkips($message$context);
  110.             $this->sendOrHandle($message$useQueue);
  111.         }
  112.         $this->working false;
  113.     }
  114.     public static function addSkips(EntityIndexingMessage $messageContext $context): void
  115.     {
  116.         if (!$context->hasExtension(self::EXTENSION_INDEXER_SKIP)) {
  117.             return;
  118.         }
  119.         /** @var ArrayStruct<string, mixed> $skip */
  120.         $skip $context->getExtension(self::EXTENSION_INDEXER_SKIP);
  121.         $message->addSkip(...$skip->all());
  122.     }
  123.     /**
  124.      * @param mixed $message
  125.      */
  126.     public function handle($message): void
  127.     {
  128.         if ($message instanceof EntityIndexingMessage) {
  129.             $indexer $this->getIndexer($message->getIndexer());
  130.             if ($indexer) {
  131.                 $indexer->handle($message);
  132.             }
  133.             return;
  134.         }
  135.         if ($message instanceof IterateEntityIndexerMessage) {
  136.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true$message->getSkip());
  137.             if (!$next) {
  138.                 return;
  139.             }
  140.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset(), $message->getSkip()));
  141.         }
  142.     }
  143.     /**
  144.      * @param list<string> $indexer
  145.      * @param list<string> $skip
  146.      */
  147.     public function sendIndexingMessage(array $indexer = [], array $skip = []): void
  148.     {
  149.         if (empty($indexer)) {
  150.             $indexer = [];
  151.             foreach ($this->indexer as $loop) {
  152.                 $indexer[] = $loop->getName();
  153.             }
  154.         }
  155.         if (empty($indexer)) {
  156.             return;
  157.         }
  158.         foreach ($indexer as $name) {
  159.             if (\in_array($name$skiptrue)) {
  160.                 continue;
  161.             }
  162.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull$skip));
  163.         }
  164.     }
  165.     public function has(string $name): bool
  166.     {
  167.         return $this->getIndexer($name) !== null;
  168.     }
  169.     public function getIndexer(string $name): ?EntityIndexer
  170.     {
  171.         foreach ($this->indexer as $indexer) {
  172.             if ($indexer->getName() === $name) {
  173.                 return $indexer;
  174.             }
  175.         }
  176.         return null;
  177.     }
  178.     private function useQueue(Context $context): bool
  179.     {
  180.         return $context->hasExtension(self::USE_INDEXING_QUEUE) || $context->hasState(self::USE_INDEXING_QUEUE);
  181.     }
  182.     private function disabled(Context $context): bool
  183.     {
  184.         return $context->hasExtension(self::DISABLE_INDEXING) || $context->hasState(self::DISABLE_INDEXING);
  185.     }
  186.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  187.     {
  188.         if ($useQueue || $message->forceQueue()) {
  189.             $this->messageBus->dispatch($message);
  190.             return;
  191.         }
  192.         $this->handle($message);
  193.     }
  194.     /**
  195.      * @param array<string, string>|null $offset
  196.      * @param list<string> $skip
  197.      */
  198.     private function iterateIndexer(string $name, ?array $offsetbool $useQueue, array $skip): ?EntityIndexingMessage
  199.     {
  200.         $indexer $this->getIndexer($name);
  201.         if (!$indexer instanceof EntityIndexer) {
  202.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  203.         }
  204.         $message $indexer->iterate($offset);
  205.         if (!$message) {
  206.             return null;
  207.         }
  208.         $message->setIndexer($indexer->getName());
  209.         $message->addSkip(...$skip);
  210.         $this->sendOrHandle($message$useQueue);
  211.         return $message;
  212.     }
  213. }