Skip to content
Snippets Groups Projects
IndexRunner.php 2.58 KiB
<?php

namespace IQDEV\ElasticSearch\Indexer;

use Elastic\Elasticsearch\Client;
use IQDEV\ElasticSearch\Configuration;
use Psr\Log\LoggerInterface;

final class IndexRunner
{
    private Client $esClient;
    private EsHelperEndpoint $helper;

    public function __construct(
        Client          $esClient,
        Configuration   $configuration,
        LoggerInterface $logger
    )
    {
        $this->esClient = $esClient;
        $this->helper = new EsHelperEndpoint($esClient, $configuration, $logger);
    }

    public function run(IndexProvider $indexProvider)
    {
        $this->helper->create();

        if ($indexProvider->getBatchSize() !== null && $indexProvider->getBatchSize() > 0) {
            $counter = 0;
            $params = ['body' => []];
            foreach ($indexProvider->get() as $index) {
                if ($index instanceof DeleteIndex) {
                    if (!empty($params['body'])) {
                        $this->esClient->bulk($params);
                        $params = ['body' => []];
                        $counter = 0;
                    }
                    $this->esClient->delete($index->es());
                    continue;
                }

                if ($index instanceof UpdateIndex) {
                    if (!empty($params['body'])) {
                        $this->esClient->bulk($params);
                        $params = ['body' => []];
                        $counter = 0;
                    }
                    $this->esClient->update($index->es());
                    continue;
                }

                if (!$index instanceof BulkIndex) {
                    continue;
                }
                $esIndex = $index->es();
                foreach ($esIndex as $indexItem) {
                    $params['body'][] = $indexItem;
                }

                if (++$counter >= $indexProvider->getBatchSize()) {
                    $this->esClient->bulk($params);
                    $params = ['body' => []];
                    $counter = 0;
                }
            }

            if (!empty($params['body'])) {
                $this->esClient->bulk($params);
            }
        } else {
            foreach ($indexProvider->get() as $index) {
                if ($index instanceof DeleteIndex) {
                    $this->esClient->delete($index->es());
                    continue;
                }

                if (!($index instanceof AddIndex) && !($index instanceof UpdateIndex)) {
                    continue;
                }

                $this->esClient->index($index->es());
            }
        }
    }
}