laravel使用rabbitmq
1 . 拉取composer包
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2 . 在config/queue.php 文件中配置信息
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
'hosts' => [
[
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
],
],
'options' => [
'ssl_options' => [
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
],
'queue' => [
'job' => \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
]
],
/*
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('RABBITMQ_WORKER', 'default'),
]
3 . 在.env文件配置
.env文件中
# 默认使用rabbitmq队列,后面启动队列监听的时候,如果不指定驱动,就默认用这个名称指定的驱动
QUEUE_CONNECTION=rabbitmq
# 使用的队列驱动
QUEUE_DRIVER=rabbitmq
# mq的ip地址
RABBITMQ_HOST=172.17.0.10
# mq的端口
RABBITMQ_PORT=5672
# mq的账号
RABBITMQ_USER=admin
# mq的密码
RABBITMQ_PASSWORD=admin
# 默认的虚拟主机
RABBITMQ_VHOST=my_vhost
# 默认队列名称
RABBITMQ_QUEUE=product
4 . 创建jobs文件
使用laravel的队列监听
php artisan make:job UpdateProduct
5 . 在UpdateProduct.php中编写生产者与消费者
生产者在把消息推送到laravel的事件监听中,初始化生产者配置,创建rabbitmq的所需要绑定的交换机,路由,队列,并且进行绑定。并且监听消费者,当有消息消费时,则从rabbitmq的队列中获取消息,消费成功进行ack
<?php
/**
* php artisan make:job UpdateProduct
*/
namespace App\Jobs;
use App\Services\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class UpdateProduct implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $productKey;
/**
* UpdateProduct constructor.
* @param $data
* @throws \Exception
*/
public function __construct($data)
{
$this->productKey = "product::info::{$data->id}";
//服务生产者
RabbitmqService::push('product','exc_product','pus_product',$data);
}
/**
* 服务消费者会走到这里,把消息消费掉
* @throws \Exception
*/
public function handle()
{
RabbitmqService::pop('product',function ($message){
print_r('消费者消费消息'.PHP_EOL);
print_r(PHP_EOL);
$key = $this->productKey . ':' . date('Y-m-d H:i:s');
$input = serialize(json_decode($message,true));
$product = app('redis')->set($key,$input);
if($product){
print_r('消息消费成功');
return true;
}else{
print_r('消息消费失败');
return false;
}
});
}
/**
* 异常扑获
* @param \Exception $exception
*/
public function failed(\Exception $exception){
print_r($exception->getMessage());
}
}
6 . 创建操作rabbitmq的service
封装具体操作rabbitmq的方法
<?php
namespace App\Services;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqService
{
private static function getConnect(){
$config = [
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
];
return new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);
}
/**
* 数据插入到mq队列中(生产者)
* @param $queue .队列名称
* @param $messageBody .消息体
* @param string $exchange .交换机名称
* @param string $routing_key .设置路由
* @throws \Exception
*/
public static function push($queue,$exchange,$routing_key,$messageBody){
//获取连接
$connection = self::getConnect();
//构建通道(mq的数据存储与获取是通过通道进行数据传输的)
$channel = $connection->channel();
//监听数据,成功
$channel->set_ack_handler(function (AMQPMessage $message){
dump("数据写入成功");
});
//监听数据,失败
$channel->set_nack_handler(function (AMQPMessage $message){
dump("数据写入失败");
});
//声明一个队列
$channel->queue_declare($queue,false,true,false,false);
//指定交换机,若是路由的名称不匹配不会把数据放入队列中
$channel->exchange_declare($exchange,'direct',false,true,false);
//队列和交换器绑定/绑定队列和类型
$channel->queue_bind($queue,$exchange,$routing_key);
$config = [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
//实例化消息推送类
$message = new AMQPMessage($messageBody,$config);
//消息推送到路由名称为$exchange的队列当中
$channel->basic_publish($message,$exchange,$routing_key);
//监听写入
$channel->wait_for_pending_acks();
dump('生产者已操作');
//关闭消息推送资源
$channel->close();
//关闭mq资源
$connection->close();
}
/**
* 消费者:取出消息进行消费,并返回
* @param $queue
* @param $callback
* @return bool
* @throws \Exception
*/
public static function pop($queue,$callback){
print_r('消费者中心'.PHP_EOL);
$connection = self::getConnect();
//构建消息通道
$channel = $connection->channel();
//从队列中取出消息,并且消费
$message = $channel->basic_get($queue);
if(!$message) return false;
//消息主题返回给回调函数
$res = $callback($message->body);
if($res){
print_r('ack验证'.PHP_EOL);
//ack验证,如果消费失败了,从新获取一次数据再次消费
$channel->basic_ack($message->getDeliveryTag());
}
print_r('ack消费完成'.PHP_EOL);
$channel->close();
$connection->close();
return true;
}
}
7 . 生产者控制器中派遣任务
<?php
namespace App\Http\Controllers\Api\V1;
use App\Http\Controllers\ApiResponse;
use App\Http\Controllers\Controller;
use App\Jobs\CloseOrder;
use App\Jobs\UpdateProduct;
use App\Models\Product;
use App\pool\redisPool;
use App\Services\PublicService;
use Illuminate\Http\Request;
use App\Services\ElasticsearchService;
class ProductController extends Controller
{
use ApiResponse;
/**
* 推送消息到mq中
* @param Request $request
* @return \Illuminate\Http\JsonResponse
*/
public function pushRabbitmq(Request $request){
try{
$id = $request->input('id');
if(!$id) return $this->failed('缺少店铺id');
$select = 'id,name,long_name,shop_id,create_time';
$info = Product::query()->selectRaw($select)->where('id',$id)->first();
$productJob = new UpdateProduct($info);
//派遣
$this->dispatch($productJob);
return $this->success('操作成功');
}catch (\Exception $e){
return $this->failed($e->getMessage().$e->getLine());
}
}
}
8 . 测试:
8 . 1 生产者开始推送消息
8 . 2 消费者监听消息
php artisan queue:work rabbitmq
这里分别在两个不同的服务器中部署同一份代码,分别启动消费者进行消息消费
服务器一
服务器二
两个消费者消费消息,并不是轮询的,而是看谁空闲,则由谁来消费,如果都空闲,则随机,如果消费者处理不过来,可以增加多几台服务器,一起进行消息的消费
注: 为何能在其他服务器监听到别的服务器监听到laravel发布的消息?因为在 监听队列的时候,指定了rabbitmq为驱动
本文含有隐藏内容,请 开通VIP 后查看