<?php namespace IQDEV\ElasticSearch\Indexer; use Elastic\Elasticsearch\Client; use IQDEV\ElasticSearch\Configuration; use Psr\Log\LoggerInterface; final class IndexRunner { private Client $esClient; private EsHelperEndpoint $helper; public function __construct( Client $esClient, Configuration $configuration, LoggerInterface $logger ) { $this->esClient = $esClient; $this->helper = new EsHelperEndpoint($esClient, $configuration, $logger); } public function run(IndexProvider $indexProvider) { $this->helper->create(); if ($indexProvider->getBatchSize() !== null && $indexProvider->getBatchSize() > 0) { $counter = 0; $params = ['body' => []]; foreach ($indexProvider->get() as $index) { if ($index instanceof DeleteIndex) { if (!empty($params['body'])) { $this->esClient->bulk($params); $params = ['body' => []]; $counter = 0; } $this->esClient->delete($index->es()); continue; } if ($index instanceof UpdateIndex) { if (!empty($params['body'])) { $this->esClient->bulk($params); $params = ['body' => []]; $counter = 0; } $this->esClient->update($index->es()); continue; } if (!$index instanceof BulkIndex) { continue; } $esIndex = $index->es(); foreach ($esIndex as $indexItem) { $params['body'][] = $indexItem; } if (++$counter >= $indexProvider->getBatchSize()) { $this->esClient->bulk($params); $params = ['body' => []]; $counter = 0; } } if (!empty($params['body'])) { $this->esClient->bulk($params); } } else { foreach ($indexProvider->get() as $index) { if ($index instanceof DeleteIndex) { $this->esClient->delete($index->es()); continue; } if ($index instanceof AddIndex) { $this->esClient->index($index->es()); continue; } if ($index instanceof UpdateIndex) { $this->esClient->update($index->es()); continue; } } } } }