Loading config/services.yaml +3 −2 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ services: App\Modules\Admin\Infrastructure\Consumer\AdminConsumer: arguments: $topic: !php/const App\Kafka\Service\KafkaService::SEND_MESSAGE_TOPIC $topic: admin_topic $groupId: group1 $name: main_topic $name: admin_topic $messageFactory: '@App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory' src/Kafka/Command/CreateAdminCommand.php +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class CreateAdminCommand extends Command $data = $input->getArguments(); $data['cmd'] = 'create-admin'; $this->kafkaService->send(KafkaService::SEND_MESSAGE_TOPIC, $data); $this->kafkaService->send('admin_topic', $data); return Command::SUCCESS; } Loading src/Kafka/Command/GetAdminCommand.php +1 −1 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ class GetAdminCommand extends Command $data = $input->getArguments(); $data['cmd'] = 'get-admin'; $this->kafkaService->send(KafkaService::SEND_MESSAGE_TOPIC, $data); $this->kafkaService->send('admin_topic', $data); return Command::SUCCESS; } Loading src/Modules/Admin/Infrastructure/Consumer/AdminConsumer.php +3 −3 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; use App\Modules\Admin\Infrastructure\Message\AdminMessageFactory; use App\Shared\Infrastructure\Message\MessageFactory; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; Loading @@ -22,13 +22,13 @@ final class AdminConsumer implements NamedConsumer private readonly string $groupId, private readonly string $name, private readonly MessageBusInterface $messageBus, private readonly MessageFactory $messageFactory, ) { } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe([$this->topic]); while (true) { Loading @@ -36,7 +36,7 @@ final class AdminConsumer implements NamedConsumer self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $this->messageBus->dispatch(AdminMessageFactory::create($data)); $this->messageBus->dispatch($this->messageFactory->create($data)); $kafkaConsumer->commit($message); } Loading src/Modules/Admin/Infrastructure/Consumer/AdminRoleConsumer.php 0 → 100644 +8 −0 Original line number Diff line number Diff line <?php namespace App\Modules\Admin\Infrastructure\Consumer; class AdminRoleConsumer { } No newline at end of file Loading
config/services.yaml +3 −2 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ services: App\Modules\Admin\Infrastructure\Consumer\AdminConsumer: arguments: $topic: !php/const App\Kafka\Service\KafkaService::SEND_MESSAGE_TOPIC $topic: admin_topic $groupId: group1 $name: main_topic $name: admin_topic $messageFactory: '@App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory'
src/Kafka/Command/CreateAdminCommand.php +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class CreateAdminCommand extends Command $data = $input->getArguments(); $data['cmd'] = 'create-admin'; $this->kafkaService->send(KafkaService::SEND_MESSAGE_TOPIC, $data); $this->kafkaService->send('admin_topic', $data); return Command::SUCCESS; } Loading
src/Kafka/Command/GetAdminCommand.php +1 −1 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ class GetAdminCommand extends Command $data = $input->getArguments(); $data['cmd'] = 'get-admin'; $this->kafkaService->send(KafkaService::SEND_MESSAGE_TOPIC, $data); $this->kafkaService->send('admin_topic', $data); return Command::SUCCESS; } Loading
src/Modules/Admin/Infrastructure/Consumer/AdminConsumer.php +3 −3 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; use App\Modules\Admin\Infrastructure\Message\AdminMessageFactory; use App\Shared\Infrastructure\Message\MessageFactory; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; Loading @@ -22,13 +22,13 @@ final class AdminConsumer implements NamedConsumer private readonly string $groupId, private readonly string $name, private readonly MessageBusInterface $messageBus, private readonly MessageFactory $messageFactory, ) { } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe([$this->topic]); while (true) { Loading @@ -36,7 +36,7 @@ final class AdminConsumer implements NamedConsumer self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $this->messageBus->dispatch(AdminMessageFactory::create($data)); $this->messageBus->dispatch($this->messageFactory->create($data)); $kafkaConsumer->commit($message); } Loading
src/Modules/Admin/Infrastructure/Consumer/AdminRoleConsumer.php 0 → 100644 +8 −0 Original line number Diff line number Diff line <?php namespace App\Modules\Admin\Infrastructure\Consumer; class AdminRoleConsumer { } No newline at end of file