Skip to content
Snippets Groups Projects
IndexRunner.php 2.71 KiB
Newer Older
Pavel's avatar
Pavel committed
<?php

namespace IQDEV\ElasticSearch\Indexer;

use Elastic\Elasticsearch\Client;
use IQDEV\ElasticSearch\Configuration;
Pavel's avatar
Pavel committed
use Psr\Log\LoggerInterface;
Pavel's avatar
Pavel committed

final class IndexRunner
{
    private Client $esClient;
Pavel's avatar
Pavel committed
    private EsHelperEndpoint $helper;
Pavel's avatar
Pavel committed

    public function __construct(
Pavel's avatar
Pavel committed
        Client          $esClient,
        Configuration   $configuration,
        LoggerInterface $logger
    )
    {
Pavel's avatar
Pavel committed
        $this->esClient = $esClient;
Pavel's avatar
Pavel committed
        $this->helper = new EsHelperEndpoint($esClient, $configuration, $logger);
Pavel's avatar
Pavel committed
    }

    public function run(IndexProvider $indexProvider)
    {
Pavel's avatar
Pavel committed
        $this->helper->create();

        if ($indexProvider->getBatchSize() !== null && $indexProvider->getBatchSize() > 0) {
            $counter = 0;
            $params = ['body' => []];
            foreach ($indexProvider->get() as $index) {
                if ($index instanceof DeleteIndex) {
                    if (!empty($params['body'])) {
                        $this->esClient->bulk($params);
                        $params = ['body' => []];
                        $counter = 0;
                    }
                    $this->esClient->delete($index->es());
                    continue;
                }

                if ($index instanceof UpdateIndex) {
                    if (!empty($params['body'])) {
                        $this->esClient->bulk($params);
                        $params = ['body' => []];
                        $counter = 0;
                    }
                    $this->esClient->update($index->es());
                    continue;
                }

                if (!$index instanceof BulkIndex) {
                    continue;
                }
                $esIndex = $index->es();
                foreach ($esIndex as $indexItem) {
                    $params['body'][] = $indexItem;
                }
Pavel's avatar
Pavel committed

Pavel's avatar
Pavel committed
                if (++$counter >= $indexProvider->getBatchSize()) {
                    $this->esClient->bulk($params);
                    $params = ['body' => []];
                    $counter = 0;
                }
Pavel's avatar
Pavel committed
            }

Pavel's avatar
Pavel committed
            if (!empty($params['body'])) {
                $this->esClient->bulk($params);
            }
        } else {
            foreach ($indexProvider->get() as $index) {
                if ($index instanceof DeleteIndex) {
                    $this->esClient->delete($index->es());
                    continue;
                }

                if ($index instanceof AddIndex) {
                    $this->esClient->index($index->es());
Pavel's avatar
Pavel committed
                    continue;
                }

                if ($index instanceof UpdateIndex) {
                    $this->esClient->update($index->es());
                    continue;
                }
Pavel's avatar
Pavel committed
            }
Pavel's avatar
Pavel committed
        }
    }
}