Loading config/services.yaml +20 −3 Original line number Diff line number Diff line Loading @@ -35,6 +35,23 @@ services: App\Modules\Admin\Infrastructure\Consumer\AdminConsumer: arguments: $topic: admin_topic $groupId: group1 $name: admin_topic $messageFactory: '@App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory' $groupId: admin_consumer $name: admin_consumer App\Modules\Admin\Infrastructure\Consumer\AdminRoleConsumer: arguments: $topic: admin_topic $groupId: admin_role_consumer $name: admin_role_consumer App\Modules\User\Infrastructure\Consumer\UserConsumer: arguments: $topic: user_topic $groupId: user_consumer $name: user_consumer App\Modules\User\Infrastructure\Consumer\UserRoleConsumer: arguments: $topic: user_topic $groupId: user_role_consumer $name: user_role_consumer No newline at end of file src/Modules/Admin/Infrastructure/Consumer/AdminConsumer.php +6 −37 Original line number Diff line number Diff line Loading @@ -4,28 +4,13 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; use App\Shared\Infrastructure\Message\MessageFactory; use App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory; use App\Shared\Infrastructure\Consumer\BaseConsumer; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer; use SimPod\KafkaBundle\Kafka\Configuration; use Symfony\Component\Messenger\MessageBusInterface; final class AdminConsumer implements NamedConsumer class AdminConsumer extends BaseConsumer { private const TIMEOUT_MS = 2000; public function __construct( private readonly Configuration $configuration, private readonly string $topic, private readonly string $groupId, private readonly string $name, private readonly MessageBusInterface $messageBus, private readonly MessageFactory $messageFactory, ) { } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); Loading @@ -36,29 +21,13 @@ final class AdminConsumer implements NamedConsumer self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $this->messageBus->dispatch($this->messageFactory->create($data)); $result = $this->messageBus->dispatch(AdminMessageFactory::create($data)); var_dump($result); $kafkaConsumer->commit($message); } ); } } public function getName(): string { return $this->name; } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers()); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, $this->groupId); return $config; } } src/Modules/Admin/Infrastructure/Consumer/AdminRoleConsumer.php +26 −1 Original line number Diff line number Diff line <?php declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; class AdminRoleConsumer use App\Modules\Admin\Infrastructure\Factory\AdminRoleMessageFactory; use App\Shared\Infrastructure\Consumer\BaseConsumer; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; class AdminRoleConsumer extends BaseConsumer { public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe([$this->topic]); while (true) { $kafkaConsumer->start( self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $result = $this->messageBus->dispatch(AdminRoleMessageFactory::create($data)); var_dump($result); $kafkaConsumer->commit($message); } ); } } } No newline at end of file src/Modules/Admin/Infrastructure/Factory/AdminMessageFactory.php +18 −10 Original line number Diff line number Diff line Loading @@ -4,22 +4,30 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Factory; use App\Modules\Admin\Infrastructure\Message\Admin\CreateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\GetAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\CreateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\DeleteAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\UpdateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Query\GetAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Query\GetListAdminsMessage; use App\Shared\Infrastructure\Message\MessageFactory; use App\Shared\Infrastructure\Message\MessageInterface; class AdminMessageFactory extends MessageFactory { public function create(array $data): MessageInterface private static array $types = [ 'create-admin' => CreateAdminMessage::class, 'get-admin' => GetAdminMessage::class, 'update-admin' => UpdateAdminMessage::class, 'delete-admin' => DeleteAdminMessage::class, 'get-list-admins' => GetListAdminsMessage::class, ]; public static function create(array $data): ?MessageInterface { switch ($data['cmd']) { case 'create-admin': return new CreateAdminMessage($data); case 'get-admin': return new GetAdminMessage($data); default: break; if (isset(self::$types[$data['cmd']]) && class_exists(self::$types[$data['cmd']])) { return new self::$types[$data['cmd']]($data); } return null; } } src/Modules/Admin/Infrastructure/Factory/AdminRoleMessageFactory.php +29 −2 Original line number Diff line number Diff line <?php declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Factory; class AdminRoleMessageFactory use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\AttachRoleToAdminMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\CreateAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\DeleteAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\DetachRoleFromAdminMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\UpdateAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Query\GetListAdminRolesMessage; use App\Shared\Infrastructure\Message\MessageFactory; use App\Shared\Infrastructure\Message\MessageInterface; class AdminRoleMessageFactory extends MessageFactory { private static array $types = [ 'attach-role-to-admin' => AttachRoleToAdminMessage::class, 'detach-role-from-admin' => DetachRoleFromAdminMessage::class, 'create-admin-role' => CreateAdminRoleMessage::class, 'update-admin-role' => UpdateAdminRoleMessage::class, 'delete-admin-role' => DeleteAdminRoleMessage::class, 'get-list-admin-roles' => GetListAdminRolesMessage::class, ]; public static function create(array $data): ?MessageInterface { if (isset(self::$types[$data['cmd']]) && class_exists(self::$types[$data['cmd']])) { return new self::$types[$data['cmd']]($data); } return null; } } Loading
config/services.yaml +20 −3 Original line number Diff line number Diff line Loading @@ -35,6 +35,23 @@ services: App\Modules\Admin\Infrastructure\Consumer\AdminConsumer: arguments: $topic: admin_topic $groupId: group1 $name: admin_topic $messageFactory: '@App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory' $groupId: admin_consumer $name: admin_consumer App\Modules\Admin\Infrastructure\Consumer\AdminRoleConsumer: arguments: $topic: admin_topic $groupId: admin_role_consumer $name: admin_role_consumer App\Modules\User\Infrastructure\Consumer\UserConsumer: arguments: $topic: user_topic $groupId: user_consumer $name: user_consumer App\Modules\User\Infrastructure\Consumer\UserRoleConsumer: arguments: $topic: user_topic $groupId: user_role_consumer $name: user_role_consumer No newline at end of file
src/Modules/Admin/Infrastructure/Consumer/AdminConsumer.php +6 −37 Original line number Diff line number Diff line Loading @@ -4,28 +4,13 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; use App\Shared\Infrastructure\Message\MessageFactory; use App\Modules\Admin\Infrastructure\Factory\AdminMessageFactory; use App\Shared\Infrastructure\Consumer\BaseConsumer; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer; use SimPod\KafkaBundle\Kafka\Configuration; use Symfony\Component\Messenger\MessageBusInterface; final class AdminConsumer implements NamedConsumer class AdminConsumer extends BaseConsumer { private const TIMEOUT_MS = 2000; public function __construct( private readonly Configuration $configuration, private readonly string $topic, private readonly string $groupId, private readonly string $name, private readonly MessageBusInterface $messageBus, private readonly MessageFactory $messageFactory, ) { } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); Loading @@ -36,29 +21,13 @@ final class AdminConsumer implements NamedConsumer self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $this->messageBus->dispatch($this->messageFactory->create($data)); $result = $this->messageBus->dispatch(AdminMessageFactory::create($data)); var_dump($result); $kafkaConsumer->commit($message); } ); } } public function getName(): string { return $this->name; } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers()); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, $this->groupId); return $config; } }
src/Modules/Admin/Infrastructure/Consumer/AdminRoleConsumer.php +26 −1 Original line number Diff line number Diff line <?php declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Consumer; class AdminRoleConsumer use App\Modules\Admin\Infrastructure\Factory\AdminRoleMessageFactory; use App\Shared\Infrastructure\Consumer\BaseConsumer; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; class AdminRoleConsumer extends BaseConsumer { public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe([$this->topic]); while (true) { $kafkaConsumer->start( self::TIMEOUT_MS, function (Message $message) use ($kafkaConsumer): void { $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR); $result = $this->messageBus->dispatch(AdminRoleMessageFactory::create($data)); var_dump($result); $kafkaConsumer->commit($message); } ); } } } No newline at end of file
src/Modules/Admin/Infrastructure/Factory/AdminMessageFactory.php +18 −10 Original line number Diff line number Diff line Loading @@ -4,22 +4,30 @@ declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Factory; use App\Modules\Admin\Infrastructure\Message\Admin\CreateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\GetAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\CreateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\DeleteAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Command\UpdateAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Query\GetAdminMessage; use App\Modules\Admin\Infrastructure\Message\Admin\Query\GetListAdminsMessage; use App\Shared\Infrastructure\Message\MessageFactory; use App\Shared\Infrastructure\Message\MessageInterface; class AdminMessageFactory extends MessageFactory { public function create(array $data): MessageInterface private static array $types = [ 'create-admin' => CreateAdminMessage::class, 'get-admin' => GetAdminMessage::class, 'update-admin' => UpdateAdminMessage::class, 'delete-admin' => DeleteAdminMessage::class, 'get-list-admins' => GetListAdminsMessage::class, ]; public static function create(array $data): ?MessageInterface { switch ($data['cmd']) { case 'create-admin': return new CreateAdminMessage($data); case 'get-admin': return new GetAdminMessage($data); default: break; if (isset(self::$types[$data['cmd']]) && class_exists(self::$types[$data['cmd']])) { return new self::$types[$data['cmd']]($data); } return null; } }
src/Modules/Admin/Infrastructure/Factory/AdminRoleMessageFactory.php +29 −2 Original line number Diff line number Diff line <?php declare(strict_types=1); namespace App\Modules\Admin\Infrastructure\Factory; class AdminRoleMessageFactory use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\AttachRoleToAdminMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\CreateAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\DeleteAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\DetachRoleFromAdminMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Command\UpdateAdminRoleMessage; use App\Modules\Admin\Infrastructure\Message\AdminRole\Query\GetListAdminRolesMessage; use App\Shared\Infrastructure\Message\MessageFactory; use App\Shared\Infrastructure\Message\MessageInterface; class AdminRoleMessageFactory extends MessageFactory { private static array $types = [ 'attach-role-to-admin' => AttachRoleToAdminMessage::class, 'detach-role-from-admin' => DetachRoleFromAdminMessage::class, 'create-admin-role' => CreateAdminRoleMessage::class, 'update-admin-role' => UpdateAdminRoleMessage::class, 'delete-admin-role' => DeleteAdminRoleMessage::class, 'get-list-admin-roles' => GetListAdminRolesMessage::class, ]; public static function create(array $data): ?MessageInterface { if (isset(self::$types[$data['cmd']]) && class_exists(self::$types[$data['cmd']])) { return new self::$types[$data['cmd']]($data); } return null; } }