Commit deacece4 authored by Адлан Шамавов's avatar Адлан Шамавов
Browse files

SYM-3 | Поправил консюмеры

parent f4190d84
Loading
Loading
Loading
Loading
+38 −0
Original line number Diff line number Diff line
<?php

declare(strict_types=1);

namespace App\Kafka\Command;

use App\Kafka\Service\KafkaService;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'app:create-admin-role', description: 'Hello PhpStorm')]
class CreateAdminRoleCommand extends Command
{
    public function __construct(private readonly KafkaService $kafkaService)
    {
        parent::__construct();
    }

    protected function configure(): void
    {
        $this
            ->addArgument('name', InputArgument::REQUIRED)
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $data = $input->getArguments();
        $data['cmd'] = 'create-admin-role';

        $this->kafkaService->send('admin_topic', $data);

        return Command::SUCCESS;
    }
}
+7 −3
Original line number Diff line number Diff line
@@ -21,12 +21,16 @@ class AdminConsumer extends BaseConsumer
                self::TIMEOUT_MS,
                function (Message $message) use ($kafkaConsumer): void {
                    $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR);
                    $result = $this->messageBus->dispatch(AdminMessageFactory::create($data));

                    $msg = AdminMessageFactory::create($data);
                    if ($msg !== null) {
                        $result = $this->messageBus->dispatch($msg);

                        var_dump($result);

                        $kafkaConsumer->commit($message);
                    }
                }
            );
        }
    }
+7 −3
Original line number Diff line number Diff line
@@ -21,12 +21,16 @@ class AdminRoleConsumer extends BaseConsumer
                self::TIMEOUT_MS,
                function (Message $message) use ($kafkaConsumer): void {
                    $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR);
                    $result = $this->messageBus->dispatch(AdminRoleMessageFactory::create($data));

                    $msg = AdminRoleMessageFactory::create($data);
                    if ($msg !== null) {
                        $result = $this->messageBus->dispatch($msg);

                        var_dump($result);

                        $kafkaConsumer->commit($message);
                    }
                }
            );
        }
    }
+8 −4
Original line number Diff line number Diff line
@@ -21,12 +21,16 @@ class UserConsumer extends BaseConsumer
                self::TIMEOUT_MS,
                function (Message $message) use ($kafkaConsumer): void {
                    $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR);
                    $result = $this->messageBus->dispatch(UserMessageFactory::create($data));

                    $msg = UserMessageFactory::create($data);
                    if ($msg !== null) {
                        $result = $this->messageBus->dispatch($msg);

                        var_dump($result);

                        $kafkaConsumer->commit($message);
                    }
                }
            );
        }
    }
+8 −4
Original line number Diff line number Diff line
@@ -21,12 +21,16 @@ class UserRoleConsumer extends BaseConsumer
                self::TIMEOUT_MS,
                function (Message $message) use ($kafkaConsumer): void {
                    $data = json_decode($message->payload, true, 512, JSON_THROW_ON_ERROR);
                    $result = $this->messageBus->dispatch(UserRoleMessageFactory::create($data));

                    $msg = UserRoleMessageFactory::create($data);
                    if ($msg !== null) {
                        $result = $this->messageBus->dispatch($msg);

                        var_dump($result);

                        $kafkaConsumer->commit($message);
                    }
                }
            );
        }
    }