RabbitMQ的安装与基本使用(windows+php版本)
RabbitMQ的安装与基本使用
一、安装erlang
RabbitMQ服务端代码是基于并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
1.1 下载
Erlang官网:http://www.erlang.org/downloads
1.2 配置环境变量
变量名:ES_HOME
变量值:erlang 的安装路径
1.3 配置 erlang 可执行命令 path
1.4 查看 erlang 是否安装成功
二、安装 RabbitMQ
RabbitMQ 官网下载地址:http://www.rabbitmq.com/download.html
双击下载后的.exe文件,傻瓜式"下一步"即可。
RabbitMQ 安装好后接下来安装 RabbitMQ-Plugins:
- 打开命令行cd,输入RabbitMQ的sbin目录
- 输入 rabbitmq-plugins enable rabbitmq_management 命令进行安装
- 打开sbin目录,双击 rabbitmq-server.bat
访问可视化界面:http://localhost:15672
用户名和密码默认都是:guest
三、安装 php 的 amqp 扩展
3.1 查看 php 的 thread safty 状态
3.2 根据php版本及thread satety状态,下载稳定版的dll
amqp 的扩展下载地址:http://pecl.php.net/package/amqp
注意:php版本,X86 和X64 根据自己情况 而定 ,NTS 和 TS 就是那个thread safty 的状态
3.3 将 php_amqp.dll 文件放到 php 目录的 ext 文件夹下
3.4 将 rabbitmq.4.dll 文件放到 php 根目录
3.5 编辑 php.ini 文件,新增内容:
extension=php_amqp.dll
3.6 重启服务器,查看 amqp 扩展是否安装成功
四、MQ 基本使用(自己看代码吧,demo比较简单)
4.1 目录结构
文件名 | 描述 |
---|---|
config.php | 配置文件 |
BaseMQ.php | MQ基类 |
ProductMQ.php | 生产者类 |
ConsumerMQ.php | 消费者类 |
TestUserConsumer.php | User消费者测试实例 |
TestCompanyConsumer.php | Company消费者测试实例 |
4.2 文件说明
config.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-06
* Time: 11:23
*/
return [
// 配置
'config' => [
'host' => '127.0.0.1',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest',
],
// 交换机
'exchange' => 'hello',
// 路由key
'route_keys' => ['user', 'company']
];
BaseMQ.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 14:50
*/
class BaseMQ
{
/**
* 连接对象
*
* @var
*/
protected $AMQPConnection;
/**
* 管道
*
* @var
*/
protected $AMQPChannel;
/**
* 交换机
*
* @var
*/
protected $AMQPExchange;
/**
* 队列
*
* @var
*/
protected $AMQPQueue;
/**
* 报文对象
*
* @var
*/
protected $AMQPEnvelope;
/**
* 配置
*
* @var
*/
protected $conf;
/**
* 交换机名称
*
* @var
*/
protected $exchangeName;
/**
* BaseMQ constructor.
* @param $conf
* @param $exchangeName
* @throws AMQPConnectionException
*/
public function __construct($conf, $exchangeName)
{
$this->conf = $conf;
$this->exchangeName = $exchangeName;
$this->connect();
}
/**
* 连接
*
* @throws AMQPConnectionException
*/
protected function connect()
{
$this->AMQPConnection = New AMQPConnection($this->conf);
if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n");
}
/**
* 关闭连接
*/
protected function close()
{
$this->AMQPConnection->disconnect();
}
/**
* 创建管道
*
* @return AMQPChannel
* @throws AMQPConnectionException
*/
protected function createChannel()
{
if (!$this->AMQPChannel) {
$this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
}
return $this->AMQPChannel;
}
/**
* 创建交换机
*
* @return AMQPExchange
* @throws AMQPConnectionException
* @throws AMQPExchangeException
*/
protected function createExchange()
{
if (!$this->AMQPExchange) {
$this->AMQPExchange = new \AMQPExchange($this->createChannel());
$this->AMQPExchange->setName($this->exchangeName);
}
return $this->AMQPExchange;
}
/**
* 创建队列
*
* @return AMQPQueue
* @throws AMQPConnectionException
* @throws AMQPQueueException
*/
public function createQueue()
{
if (!$this->AMQPQueue) {
$this->AMQPQueue = new \AMQPQueue($this->createChannel());
}
return $this->AMQPQueue;
}
/**
* 报文对象
*
* @return AMQPEnvelope
*/
public function envelope()
{
if (!$this->AMQPEnvelope) {
$this->AMQPEnvelope = new \AMQPEnvelope();
}
return $this->AMQPEnvelope;
}
}
ProductMQ.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:04
*/
require_once 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
/**
* 路由key数组
*
* @var
*/
private $routeKeys;
/**
* ProductMQ constructor.
* @param $conf
* @param $exchangeName
* @param $routeKeys
* @throws AMQPConnectionException
*/
public function __construct($conf, $exchangeName, $routeKeys)
{
$this->routeKeys = $routeKeys;
parent::__construct($conf, $exchangeName);
}
/**
* 发布
*
* @param string $message
* @throws AMQPChannelException
* @throws AMQPConnectionException
* @throws AMQPExchangeException
*/
public function run(string $message)
{
// 创建管道
$ch = $this->createChannel();
// 创建交换机
$ex = $this->createExchange();
// 开启事务,推送多个路由key
$ch->startTransaction();
$count = 0;
foreach ($this->routeKeys as $routeKey) {
// 发布到多个路由key上
$sendEd = $ex->publish($message, $routeKey);
$sendEd && $count++;
}
if ($count != count($this->routeKeys)) {
$ch->rollbackTransaction();
}
$ch->commitTransaction();
// 关闭连接
$this->close();
}
}
// 测试
try {
$config = require 'config.php';
$sender = (new ProductMQ($config['config'], $config['exchange'], $config['route_keys']));
$sender->run('hello' . rand(1, 10));
} catch (Exception $exception) {
echo 'failed:' . $exception->getMessage();
}
ConsumerMQ.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:15
*/
require_once 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
/**
* 路由key
*
* @var
*/
private $routeKey;
/**
* 队列名
*
* @var
*/
private $queueName;
/**
* $queueName
*
* ConsumerMQ constructor.
* @param $conf
* @param $exchangeName
* @param $routerKey
* @param $queueName
* @throws AMQPConnectionException
*/
public function __construct($conf, $exchangeName, $routerKey, $queueName)
{
$this->routeKey = $routerKey;
$this->queueName = $queueName;
parent::__construct($conf, $exchangeName);
}
/**
* 消费
*
* @throws AMQPChannelException
* @throws AMQPConnectionException
* @throws AMQPEnvelopeException
* @throws AMQPExchangeException
* @throws AMQPQueueException
*/
public function run()
{
// 创建交换机
$ex = $this->createExchange();
// 设置交换机类型: direct
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机持久化
$ex->setFlags(AMQP_DURABLE);
// 声明
$exStatus = $ex->declareExchange();
echo "Exchange Status:" . $exStatus . "\n";
// 创建队列
$queue = $this->createQueue();
// 设置队列名称
$queue->setName($this->queueName);
// 设置队列持久化
$queue->setFlags(AMQP_DURABLE);
// 声明
$queueMessageCount = $queue->declareQueue();
echo "Queue Message Count:" . $queueMessageCount . "\n";
// 队列绑定到交换机路由key
$queue->bind($this->exchangeName, $this->routeKey);
// 阻塞模式接收消息
echo "Message:\n";
while (true) {
$queue->consume(function ($envelope, $queue) {
$msg = $envelope->getBody();
// todo:处理消息
echo $this->routeKey . ' receive message:' . $msg . "\n";
// 手动发送ACK应答
$queue->ack($envelope->getDeliveryTag());
});
}
$this->close();
}
}
TestUserConsumer.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:59
*/
require_once 'ConsumerMQ.php';
// 测试: 监听user路由消息
$config = require 'config.php';
try {
$routeKey = 'company';
$queueName = 'hello_company';
(new ConsumerMQ($config, $config['exchange'], $routeKey, $queueName))->run();
} catch (\Exception $exception) {
echo $exception->getMessage();
}
TestCompanyConsumer.php
<?php
/**
* Created by PhpStorm
* User: Jason
* Date: 2023-02-08
* Time: 15:59
*/
require_once 'ConsumerMQ.php';
// 测试: 监听user路由消息
$config = require 'config.php';
try {
$routeKey = 'user';
$queueName = 'hello_user';
(new ConsumerMQ($config, $config['exchange'], $routeKey, $queueName))->run();
} catch (\Exception $exception) {
echo $exception->getMessage();
}
4.3 测试展示