PHP+RabbitMQ实现消息队列的完整代码(java中消息队列rabbitmq)

网友投稿 254 2022-07-21

为什么使用RabbitMq而不是ActiveMq或者RocketMq?

首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级)。至于ActiveMq,貌似问题较多。RabbitMq对各种语言的支持较好,所以选择RabbitMq。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址

: http://pecl.php.net/package/amqp

具体以官网为准

http://rabbitmq.com/getstarted.html

config.php

 //配置  'host' => [

  'host' => '127.0.0.1',

  'port' => '5672',

  'login' => 'guest',

  'password' => 'guest',

  'vhost'=>'/',

 ],

 //交换机  'exchange'=>'word',

 //路由  'routes' => [],

];

BaseMQ.php

 * Created by PhpStorm.

 * User: pc

 * Date: 2019/07/13

 * Time: 14:11

 */ namespace MyObjSummary\rabbitMQ; /** Member

 *  AMQPChannel

 *  AMQPConnection

 *  AMQPEnvelope

 *  AMQPExchange

 *  AMQPQueue

 * Class BaseMQ

 * @package MyObjSummary\rabbitMQ

 */ class BaseMQ {

 /** MQ Channel

  * @var \AMQPChannel

  */  public $AMQPChannel ;

 /** MQ Link

  * @var \AMQPConnection

  */  public $AMQPConnection ;

 /** MQ Envelope

  * @var \AMQPEnvelope

  */  public $AMQPEnvelope ;

 /** MQ Exchange

  * @var \AMQPExchange

  */  public $AMQPExchange ;

 /** MQ Queue

  * @var \AMQPQueue

  */  public $AMQPQueue ;

 /** conf

  * @var   */  public $conf ;

 /** exchange

  * @var   */  public $exchange ;

 /** link

  * BaseMQ constructor.

  * @throws \AMQPConnectionException

  */  public function __construct()  {

  $conf = require 'config.php' ;

  if(!$conf)

   throw new \AMQPConnectionException('config error!');

  $this->conf  = $conf['host'] ;

  $this->exchange = $conf['exchange'] ;

  $this->AMQPConnection = new \AMQPConnection($this->conf);

  if (!$this->AMQPConnection->connect())

   throw new \AMQPConnectionException("Cannot connect to the broker!\n");

 }

 /**

  * close link

  */  public function close()  {

  $this->AMQPConnection->disconnect();

 }

 /** Channel

  * @return \AMQPChannel

  * @throws \AMQPConnectionException

  */  public function channel()  {

  if(!$this->AMQPChannel) {

   $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);

  }

  return $this->AMQPChannel;

 }

 /** Exchange

  * @return \AMQPExchange

  * @throws \AMQPConnectionException

  * @throws \AMQPExchangeException

  */  public function exchange()  {

  if(!$this->AMQPExchange) {

   $this->AMQPExchange = new \AMQPExchange($this->channel());

   $this->AMQPExchange->setName($this->exchange);

  }

  return $this->AMQPExchange ;

 }

 /** queue

  * @return \AMQPQueue

  * @throws \AMQPConnectionException

  * @throws \AMQPQueueException

  */  public function queue()  {

  if(!$this->AMQPQueue) {

   $this->AMQPQueue = new \AMQPQueue($this->channel());

  }

  return $this->AMQPQueue ;

 }

 /** Envelope

  * @return \AMQPEnvelope

  */  public function envelope()  {

  if(!$this->AMQPEnvelope) {

   $this->AMQPEnvelope = new \AMQPEnvelope();

  }

  return $this->AMQPEnvelope;

 }

}

ProductMQ.php

 private $routes = ['hello','word']; //路由key  /**

  * ProductMQ constructor.

  * @throws \AMQPConnectionException

  */  public function __construct()  {

  parent::__construct();

 }

 /** 只控制发送成功 不接受消费者是否收到

  * @throws \AMQPChannelException

  * @throws \AMQPConnectionException

  * @throws \AMQPExchangeException

  */  public function run()  {

  //频道   $channel = $this->channel();

  //创建交换机对象   $ex = $this->exchange();

  //消息内容   $message = 'product message '.rand(1,99999);

  //开始事务   $channel->startTransaction();

  $sendEd = true ;

  foreach ($this->routes as $route) {

   $sendEd = $ex->publish($message, $route) ;

   echo "Send Message:".$sendEd."\n";

  }

  if(!$sendEd) {

   $channel->rollbackTransaction();

  }

  $channel->commitTransaction(); //提交事务   $this->close();

  die ;

 }

} try{

 (new ProductMQ())->run();

}catch (\Exception $exception){

 var_dump($exception->getMessage()) ;

}

ConsumerMQ.php

 private $q_name = 'hello'; //队列名  private $route = 'hello'; //路由key  /**

  * ConsumerMQ constructor.

  * @throws \AMQPConnectionException

  */  public function __construct()  {

  parent::__construct();

 }

 /** 接受消息 如果终止 重连时会有消息

  * @throws \AMQPChannelException

  * @throws \AMQPConnectionException

  * @throws \AMQPExchangeException

  * @throws \AMQPQueueException

  */  public function run()  {

  //创建交换机   $ex = $this->exchange();

  $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型   $ex->setFlags(AMQP_DURABLE); //持久化   //echo "Exchange Status:".$ex->declare()."\n";   //创建队列   $q = $this->queue();

  //var_dump($q->declare());exit();   $q->setName($this->q_name);

  $q->setFlags(AMQP_DURABLE); //持久化   //echo "Message Total:".$q->declareQueue()."\n";   //绑定交换机与队列,并指定路由键   echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";

  //阻塞模式接收消息   echo "Message:\n";

  while(True){

   $q->consume(function ($envelope,$queue){

    $msg = $envelope->getBody();

    echo $msg."\n"; //处理消息     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答    });

   //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答   }

  $this->close();

 }

} try{

 (new ConsumerMQ)->run();

}catch (\Exception $exception){

 var_dump($exception->getMessage()) ;

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PHP RBAC权限控制实现思路,你会了吗(php是什么意思)
下一篇:php中Session使用方法详解,你会了吗(php销毁session)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~