RabbitMQ的安装与基本使用(windows+php版本)

RabbitMQ的安装与基本使用

一、安装erlang

RabbitMQ服务端代码是基于并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。

1.1 下载

Erlang官网:http://www.erlang.org/downloads

1.2 配置环境变量

在这里插入图片描述
变量名:ES_HOME
变量值:erlang 的安装路径

1.3 配置 erlang 可执行命令 path

在这里插入图片描述

1.4 查看 erlang 是否安装成功

在这里插入图片描述

二、安装 RabbitMQ

RabbitMQ 官网下载地址:http://www.rabbitmq.com/download.html
在这里插入图片描述
双击下载后的.exe文件,傻瓜式"下一步"即可。

RabbitMQ 安装好后接下来安装 RabbitMQ-Plugins:

  • 打开命令行cd,输入RabbitMQ的sbin目录
  • 输入 rabbitmq-plugins enable rabbitmq_management 命令进行安装
  • 打开sbin目录,双击 rabbitmq-server.bat

访问可视化界面:http://localhost:15672
用户名和密码默认都是:guest

三、安装 php 的 amqp 扩展

3.1 查看 php 的 thread safty 状态

在这里插入图片描述

3.2 根据php版本及thread satety状态,下载稳定版的dll

amqp 的扩展下载地址:http://pecl.php.net/package/amqp
在这里插入图片描述
在这里插入图片描述

注意:php版本,X86 和X64 根据自己情况 而定 ,NTS 和 TS 就是那个thread safty 的状态

3.3 将 php_amqp.dll 文件放到 php 目录的 ext 文件夹下

在这里插入图片描述

3.4 将 rabbitmq.4.dll 文件放到 php 根目录

在这里插入图片描述

3.5 编辑 php.ini 文件,新增内容:
extension=php_amqp.dll
3.6 重启服务器,查看 amqp 扩展是否安装成功在这里插入图片描述

四、MQ 基本使用(自己看代码吧,demo比较简单)

4.1 目录结构

在这里插入图片描述

文件名描述
config.php配置文件
BaseMQ.phpMQ基类
ProductMQ.php生产者类
ConsumerMQ.php消费者类
TestUserConsumer.phpUser消费者测试实例
TestCompanyConsumer.phpCompany消费者测试实例
4.2 文件说明

config.php

<?php
/**
 * Created by PhpStorm
 * User: Jason
 * Date: 2023-02-06
 * Time: 11:23
 */

return [
    // 配置
    'config' => [
        'host' => '127.0.0.1',
        'port' => '5672',
        'vhost' => '/',
        'login' => 'guest',
        'password' => 'guest',
    ],
    // 交换机
    'exchange' => 'hello',
    // 路由key
    'route_keys' => ['user', 'company']
];

BaseMQ.php

<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 14:50
*/

class BaseMQ
{
   /**
    * 连接对象
    *
    * @var
    */
   protected $AMQPConnection;

   /**
    * 管道
    *
    * @var
    */
   protected $AMQPChannel;

   /**
    * 交换机
    *
    * @var
    */
   protected $AMQPExchange;

   /**
    * 队列
    *
    * @var
    */
   protected $AMQPQueue;

   /**
    * 报文对象
    *
    * @var
    */
   protected $AMQPEnvelope;

   /**
    * 配置
    *
    * @var
    */
   protected $conf;

   /**
    * 交换机名称
    *
    * @var
    */
   protected $exchangeName;

   /**
    * BaseMQ constructor.
    * @param $conf
    * @param $exchangeName
    * @throws AMQPConnectionException
    */
   public function __construct($conf, $exchangeName)
   {
       $this->conf = $conf;

       $this->exchangeName = $exchangeName;

       $this->connect();
   }

   /**
    * 连接
    *
    * @throws AMQPConnectionException
    */
   protected function connect()
   {
       $this->AMQPConnection = New AMQPConnection($this->conf);
       if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n");
   }

   /**
    * 关闭连接
    */
   protected function close()
   {
       $this->AMQPConnection->disconnect();
   }

   /**
    * 创建管道
    *
    * @return AMQPChannel
    * @throws AMQPConnectionException
    */
   protected function createChannel()
   {
       if (!$this->AMQPChannel) {
           $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
       }

       return $this->AMQPChannel;
   }

   /**
    * 创建交换机
    *
    * @return AMQPExchange
    * @throws AMQPConnectionException
    * @throws AMQPExchangeException
    */
   protected function createExchange()
   {
       if (!$this->AMQPExchange) {
           $this->AMQPExchange = new \AMQPExchange($this->createChannel());
           $this->AMQPExchange->setName($this->exchangeName);
       }

       return $this->AMQPExchange;
   }

   /**
    * 创建队列
    *
    * @return AMQPQueue
    * @throws AMQPConnectionException
    * @throws AMQPQueueException
    */
   public function createQueue()
   {
       if (!$this->AMQPQueue) {
           $this->AMQPQueue = new \AMQPQueue($this->createChannel());
       }

       return $this->AMQPQueue;
   }

   /**
    * 报文对象
    *
    * @return AMQPEnvelope
    */
   public function envelope()
   {
       if (!$this->AMQPEnvelope) {
           $this->AMQPEnvelope = new \AMQPEnvelope();
       }

       return $this->AMQPEnvelope;
   }
}

ProductMQ.php

<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:04
*/

require_once 'BaseMQ.php';

class ProductMQ extends BaseMQ
{
   /**
    * 路由key数组
    *
    * @var
    */
   private $routeKeys;

   /**
    * ProductMQ constructor.
    * @param $conf
    * @param $exchangeName
    * @param $routeKeys
    * @throws AMQPConnectionException
    */
   public function __construct($conf, $exchangeName, $routeKeys)
   {
       $this->routeKeys = $routeKeys;

       parent::__construct($conf, $exchangeName);
   }

   /**
    * 发布
    *
    * @param string $message
    * @throws AMQPChannelException
    * @throws AMQPConnectionException
    * @throws AMQPExchangeException
    */
   public function run(string $message)
   {
       // 创建管道
       $ch = $this->createChannel();

       // 创建交换机
       $ex = $this->createExchange();

       // 开启事务,推送多个路由key
       $ch->startTransaction();
       $count = 0;
       foreach ($this->routeKeys as $routeKey) {
           // 发布到多个路由key上
           $sendEd = $ex->publish($message, $routeKey);
           $sendEd && $count++;
       }
       if ($count != count($this->routeKeys)) {
           $ch->rollbackTransaction();
       }
       $ch->commitTransaction();

       // 关闭连接
       $this->close();
   }
}

// 测试
try {
   $config = require 'config.php';

   $sender = (new ProductMQ($config['config'], $config['exchange'], $config['route_keys']));

   $sender->run('hello' . rand(1, 10));

} catch (Exception $exception) {
   echo 'failed:' . $exception->getMessage();
}

ConsumerMQ.php

<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:15
*/

require_once 'BaseMQ.php';

class ConsumerMQ extends BaseMQ
{
   /**
    * 路由key
    *
    * @var
    */
   private $routeKey;

   /**
    * 队列名
    *
    * @var
    */
   private $queueName;

   /**
    * $queueName
    *
    * ConsumerMQ constructor.
    * @param $conf
    * @param $exchangeName
    * @param $routerKey
    * @param $queueName
    * @throws AMQPConnectionException
    */
   public function __construct($conf, $exchangeName, $routerKey, $queueName)
   {
       $this->routeKey = $routerKey;

       $this->queueName = $queueName;

       parent::__construct($conf, $exchangeName);
   }

   /**
    * 消费
    *
    * @throws AMQPChannelException
    * @throws AMQPConnectionException
    * @throws AMQPEnvelopeException
    * @throws AMQPExchangeException
    * @throws AMQPQueueException
    */
   public function run()
   {
       // 创建交换机
       $ex = $this->createExchange();
       // 设置交换机类型: direct
       $ex->setType(AMQP_EX_TYPE_DIRECT);
       // 设置交换机持久化
       $ex->setFlags(AMQP_DURABLE);
       // 声明
       $exStatus = $ex->declareExchange();
       echo "Exchange Status:" . $exStatus . "\n";

       // 创建队列
       $queue = $this->createQueue();
       // 设置队列名称
       $queue->setName($this->queueName);
       // 设置队列持久化
       $queue->setFlags(AMQP_DURABLE);
       // 声明
       $queueMessageCount = $queue->declareQueue();
       echo "Queue Message Count:" . $queueMessageCount . "\n";

       // 队列绑定到交换机路由key
       $queue->bind($this->exchangeName, $this->routeKey);

       // 阻塞模式接收消息
       echo "Message:\n";
       while (true) {
           $queue->consume(function ($envelope, $queue) {
               $msg = $envelope->getBody();

               // todo:处理消息
               echo $this->routeKey . ' receive message:' . $msg . "\n";

               // 手动发送ACK应答
               $queue->ack($envelope->getDeliveryTag());
           });
       }

       $this->close();
   }
}

TestUserConsumer.php

<?php
/**
 * Created by PhpStorm
 * User: Jason
 * Date: 2023-02-08
 * Time: 15:59
 */

require_once 'ConsumerMQ.php';

// 测试: 监听user路由消息
$config = require 'config.php';
try {
    $routeKey = 'company';
    $queueName = 'hello_company';

    (new ConsumerMQ($config, $config['exchange'], $routeKey, $queueName))->run();
} catch (\Exception $exception) {
    echo $exception->getMessage();
}

TestCompanyConsumer.php

<?php
/**
 * Created by PhpStorm
 * User: Jason
 * Date: 2023-02-08
 * Time: 15:59
 */

require_once 'ConsumerMQ.php';

// 测试: 监听user路由消息
$config = require 'config.php';
try {
    $routeKey = 'user';
    $queueName = 'hello_user';
    (new ConsumerMQ($config, $config['exchange'], $routeKey, $queueName))->run();

} catch (\Exception $exception) {
    echo $exception->getMessage();
}
4.3 测试展示

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述