<?php
namespace App\EventSubscriber;
use App\Entity\User;
use App\Event\CompanyCreatedEvent;
use App\Event\CompanyUpdatedEvent;
use App\Event\DepartmentCreatedEvent;
use App\Event\DepartmentUpdatedEvent;
use App\Event\UserCreatedEvent;
use App\Event\UserUpdatedEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Serializer\Normalizer\NormalizableInterface;
use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
class RMQSubscriber implements EventSubscriberInterface
{
/**
* @var AMQPStreamConnection
*/
private $connection;
/**
* @var \PhpAmqpLib\Channel\AMQPChannel
*/
private $channel;
/**
* @var NormalizerInterface
*/
private $normalizer;
/**
* @var ContainerInterface
*/
private $container;
/**
* @var bool
*/
private $connect = false;
/**
* @var LoggerInterface
*/
private $logger;
/**
*
*/
public function __construct(
NormalizerInterface $normalizer = null,
ContainerInterface $container,
LoggerInterface $logger
) {
$this->normalizer = $normalizer;
$this->container = $container;
$this->logger = $logger;
$this->toConnect();
}
/**
*
*/
public function toConnect(){
return;
$container = $this->container;
$host = $container->getParameter("rmqHost");
$port = $container->getParameter("rmqPort");
$user = $container->getParameter("rmqUser");
$pw = $container->getParameter("rmqPw");
try {
$this->connection = new AMQPStreamConnection($host, $port, $user, $pw);
$this->channel = $this->connection->channel();
$this->channel->queue_declare('user', false, false, false, false);
$this->channel->queue_declare('department', false, false, false, false);
$this->connect = true;
$this->logger->info("[##Rabbitmq##]: ". "connect success");
} catch (\Throwable $th) {
$this->connect = false;
$this->logger->error("[##Rabbitmq##]: ". $th->getMessage());
}
}
/**
*
*/
public function onUserCreated(UserCreatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
public function onUserUpdated(UserUpdatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
public function onDepartmentCreated(DepartmentCreatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
public function onDepartmentUpdated(DepartmentUpdatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
public function onCompanyCreated(CompanyCreatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
public function onCompanyUpdated(CompanyUpdatedEvent $event)
{
$user = $event->getUser();
$this->exec($user);
}
/**
*
*/
private function exec(User $user=null)
{
return;
if (! $this->connect) {
$this->logger->error("[##Rabbitmq##]: ". "not connect for publish user ". $user->getName());
return;
}
if (is_null($user)) {
return;
}
//dump($user->getName());
$data = $this->normalizer->normalize($user,null,["groups"=> ["rmq:user"]]);
$content = [
"meta"=> [
"source" => $this->getSource()
],
"data"=> $data
];
$dataString = json_encode($content);
//dump("$dataString");
$this->logger->notice("[##Rabbitmq##]: ". "publish user ". $user->getName());
$msg = new AMQPMessage($dataString);
$this->channel->basic_publish($msg, '', 'user');
}
/**
*
*/
public function getSource()
{
return "baadhi";
}
public static function getSubscribedEvents()
{
return [
'user.created' => 'onUserCreated',
'user.updated' => 'onUserUpdated',
'department.created' => 'onDepartmentCreated',
'department.updated' => 'onDepartmentUpdated',
'company.created' => 'onCompanyCreated',
'company.updated' => 'onCompanyUpdated'
];
}
}