src/EventSubscriber/RMQSubscriber.php line 137

Open in your IDE?
  1. <?php
  2. namespace App\EventSubscriber;
  3. use App\Entity\User;
  4. use App\Event\CompanyCreatedEvent;
  5. use App\Event\CompanyUpdatedEvent;
  6. use App\Event\DepartmentCreatedEvent;
  7. use App\Event\DepartmentUpdatedEvent;
  8. use App\Event\UserCreatedEvent;
  9. use App\Event\UserUpdatedEvent;
  10. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  11. use Symfony\Component\Serializer\Normalizer\NormalizableInterface;
  12. use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
  13. use PhpAmqpLib\Connection\AMQPStreamConnection;
  14. use PhpAmqpLib\Message\AMQPMessage;
  15. use Psr\Log\LoggerInterface;
  16. use Symfony\Component\DependencyInjection\ContainerInterface;
  17. class RMQSubscriber implements EventSubscriberInterface
  18. {
  19.     /**
  20.      * @var AMQPStreamConnection
  21.      */
  22.     private $connection;
  23.     /**
  24.      * @var \PhpAmqpLib\Channel\AMQPChannel
  25.      */
  26.     private $channel;
  27.     /**
  28.      * @var NormalizerInterface
  29.      */
  30.     private $normalizer;
  31.     /**
  32.      * @var ContainerInterface
  33.      */
  34.     private $container;
  35.     /**
  36.      * @var bool
  37.      */
  38.     private $connect false;
  39.     /**
  40.      * @var LoggerInterface
  41.      */
  42.     private $logger;
  43.     /**
  44.      * 
  45.      */
  46.     public function __construct(
  47.         NormalizerInterface $normalizer null,
  48.         ContainerInterface $container,
  49.         LoggerInterface $logger
  50.     ) {
  51.         
  52.         $this->normalizer $normalizer;
  53.         $this->container $container;
  54.         $this->logger $logger;
  55.         $this->toConnect();
  56.     }
  57.     /**
  58.      * 
  59.      */
  60.     public function toConnect(){
  61.         return;
  62.         $container $this->container;
  63.         $host $container->getParameter("rmqHost");
  64.         $port $container->getParameter("rmqPort");
  65.         $user $container->getParameter("rmqUser");
  66.         $pw $container->getParameter("rmqPw");
  67.         try {
  68.             $this->connection = new AMQPStreamConnection($host$port$user$pw);
  69.             $this->channel $this->connection->channel();
  70.             
  71.             $this->channel->queue_declare('user'falsefalsefalsefalse);
  72.             $this->channel->queue_declare('department'falsefalsefalsefalse);
  73.             $this->connect true;
  74.             $this->logger->info("[##Rabbitmq##]: ""connect success");
  75.             
  76.         } catch (\Throwable $th) {
  77.             $this->connect false;
  78.             $this->logger->error("[##Rabbitmq##]: "$th->getMessage());
  79.         }
  80.         
  81.     }
  82.     /**
  83.      * 
  84.      */
  85.     public function onUserCreated(UserCreatedEvent $event)
  86.     {
  87.         $user $event->getUser();
  88.         $this->exec($user);
  89.     }
  90.     /**
  91.      * 
  92.      */
  93.     public function onUserUpdated(UserUpdatedEvent $event)
  94.     {
  95.         $user $event->getUser();
  96.         $this->exec($user);
  97.     }
  98.     /**
  99.      * 
  100.      */
  101.     public function onDepartmentCreated(DepartmentCreatedEvent $event)
  102.     {
  103.         $user $event->getUser();
  104.         $this->exec($user);
  105.     }
  106.     /**
  107.      * 
  108.      */
  109.     public function onDepartmentUpdated(DepartmentUpdatedEvent $event)
  110.     {
  111.         $user $event->getUser();
  112.         $this->exec($user);
  113.     }
  114.     /**
  115.      * 
  116.      */
  117.     public function onCompanyCreated(CompanyCreatedEvent $event)
  118.     {
  119.         $user $event->getUser();
  120.         $this->exec($user);
  121.     }
  122.     /**
  123.      * 
  124.      */
  125.     public function onCompanyUpdated(CompanyUpdatedEvent $event)
  126.     {
  127.         $user $event->getUser();
  128.         $this->exec($user);
  129.     }
  130.     /**
  131.      * 
  132.      */
  133.     private function exec(User $user=null)
  134.     {
  135.         return;
  136.         if (! $this->connect) {
  137.             $this->logger->error("[##Rabbitmq##]: ""not connect for publish user "$user->getName());
  138.             return;
  139.         }
  140.         if (is_null($user)) {
  141.             return;
  142.         }
  143.         //dump($user->getName());
  144.         $data $this->normalizer->normalize($user,null,["groups"=> ["rmq:user"]]);
  145.         $content = [
  146.             "meta"=> [
  147.                 "source" => $this->getSource()
  148.             ],
  149.             "data"=> $data
  150.         ];
  151.         $dataString json_encode($content);
  152.         //dump("$dataString");
  153.         $this->logger->notice("[##Rabbitmq##]: ""publish user "$user->getName());
  154.         $msg = new AMQPMessage($dataString);
  155.         $this->channel->basic_publish($msg'''user');
  156.     }
  157.     /**
  158.      * 
  159.      */
  160.     public function getSource()
  161.     {
  162.         return "baadhi";
  163.     }
  164.     public static function getSubscribedEvents()
  165.     {
  166.         return [
  167.             'user.created' => 'onUserCreated',
  168.             'user.updated' => 'onUserUpdated',
  169.             'department.created' => 'onDepartmentCreated',
  170.             'department.updated' => 'onDepartmentUpdated',
  171.             'company.created' => 'onCompanyCreated',
  172.             'company.updated' => 'onCompanyUpdated'
  173.         ];
  174.     }
  175. }