本文总结自慕课网的一个教程:PHP消息队列实现及应用;这个教程非常适合新手刚接触消息队列,我觉得蛮不错。

1. 关于消息队列

1.1 基础模式

一个基础的消息队列应该是这样的:

1.2 应用场景

消息队列的应用场景大概可以是以下几种:

  • 冗余;
  • 解耦(比如业务系统和队列处理系统一方奔溃不会影响另一方);
  • 流量削峰(抢购,秒杀等场景);
  • 异步通信;
  • 拓展性;
  • 排序保证(比如做成单进程单线程单进单出);

1.3 队列介质

实现消息队列的方式有很多,这里介绍三种模式的实现:

Mysql:可靠性高、易实现,速度慢

Redis:速度快,单条大消息包时效率低

消息系统:专业性强,可靠,学习成本高(RabbitMQ)

1.4 消息处理触发机制

死循环方式读取:易实现,故障时无法及时恢复(适合比如秒杀系统)

定时任务:压分均分,有处理上限(要控制好进程,防止上一个任务还没完成就开始了下一个)

守护进程:类似于PHP-FPM和PHP-CG,需要shell基础

2. 案例

2.1 解耦案例:处理订单系统和配送系统(Mysql实现)

2.1.1 案例架构

订单系统和配送系统是解耦的,通过MySQL队列表做队列:

2.1.2 案例流程

程序的大概流程:

2.1.3 开发流程

1)创建一个示例MySQL队列表:

CREATE TABLE `order_queue`(
    `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT `id号`,
    `order_id` int(11) NOT NULL,
    `mobile` varchar(20) NOT NULL COMMENT `用户的手机号`,
    `address` varchar(100) NOT NULL COMMENT `用户的地址`,
    `created_at` datetime NOT NULL DEFAULT `0000-00-00 00:00:00` COMMENT `订单创建时间`,
    `updated_at` datetime NOT NULL DEFAULT `0000-00-00 00:00:00` COMMENT `处理完成时间`,
    `status` tinyint(2) NOT NULL COMMENT `当前状态, 0未处理, 1已处理, 2处理中`,
    PRIMARY KEY(`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)接受用户订单order.php

<?php
include '../include/db.php'; //引入一个DB类用来操作数据库

if (!empty($_GET['mobile'])) {
    // 订单处理流程
    // ......
    // 过滤从用户获取的数据

    $order_id = rand (10000, 99999); // 示例生成订单
    // 要插入的数据
    $insert_data = array(
        'order_id' => $order_id,
        'mobile'   => $_GET['mobile'],
        'created_time' => date('Y-m-d H:i:s', time()),
        'status' => 0
    );

    // 插入数据
    $db = DB::getIntance();
    $res = $db -> insert('order_queue', $insert_data);
    if ($res) {
        echo $insert_data['order_id']."保存成功";
    } else {
        echo '保存失败';
    }
}

3)配送处理goods.php

<?php
// 配送系统处理队列中的订单并进行标记的一个文件
include '../include/db.php'; //引入一个DB类用来操作数据库

$db = DB::getIntance();
// 1. 先把要处理的记录更新为等待处理; 这一步是为了实现一个锁的机制,防止其它程序操作数据冲突
$waiting = array('status' => 0);
$lock = array('status' => 2);
$res_lock = $db->update('order_queue', $lock, $waiting, 2);

// 2. 选择出刚刚更新的这些数据, 然后进行配送系统的处理
if ($res_lock) {
    $res = $db->selectAll('order_queue', $lock);

    // 然后由配送系统进行处理
    // ......

    // 3. 把这些处理过的程序更新为已完成
    $success = array(
        'status' => 1,
        'update_time' => date('Y-m-d H:i:s'),
    );
    $res_last = $db->update('order_queue', $success, $lock);
    if ($res_last) {
        echo 'success:'.$res_last;
    } else {
        echo 'Fail:'.$res_last;
    }
} else {
    echo 'All Finished!';
}

4)定时脚本good.sh

#!/bin/bash

date "+%G-%m-%d %H:$M:S"
cd /home/path/to/queue_demo/
php goods.php

5)设置corntab定时任务

设定为每一分钟执行goods.sh一次并记录到log.log中:

*/1 * * * * /home/path/to/queue_demo/goods.sh >> /home/path/to/queue_demo/log.log 2>&1

创建log文件:

touch /home/path/to/queue_demo/log.log

6)运行,测试

调用order.php接收用户的订单信息;查看mysql表中是否插入数据;

定时任务已执行,查看数据status是否改变;

进行测试时,实时查看log文件:

tail -f log.log

2.2 流量削峰案例:通过 Redis 的 List 类型实现秒杀

2.2.1 了解 Redis 的 list 类型数据

Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。一个列表最多可以包含 4294967295 个元素 (每个列表超过40亿个元素)。

常用命令:

  • LPUSH/LPUSHX:将值插入到(/存在的)列表头部
  • RPUSH/RPUSHX:将值插入到(/存在的)列表尾部
  • LPOP:移出并获取列表的第一个元素
  • RPOP:移出并获取列表的最后一个元素
  • LTRIM:保留指定区间内的元素
  • LLEN:获取列表长度
  • LSET:通过索引设置列表元素的值
  • LINDEX:通过索引获取列表中的元素
  • LRANGE:获取列表指定范围的元素

2.2.2 案例架构

2.2.3 代码设计

  • 秒杀程序把请求写入Redis。(Uid,time_stamp)
  • 检查Redis已存放数据的长度,超出上限直接丢弃。(比如秒杀限制为100个,超过100个的数据直接丢弃返回秒杀已结束)
  • 死循环处理存入Redis的数据库并入库。

2.2.4 开发流程

1)创建一个示例MySQL秒杀表:

CREATE TABLE `redis_queue`(
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `uid` int(11) NOT NULL DEFAULT `0`,
    `time_stamp` varchar(24) NOT NULL,    
    PRIMARY KEY(`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)接收用户请求的user.php

<?php
// 加载redis组件
$redis = new Redis();
$redis -> connect('127.0.0.1', 6379);
$redis_name = 'miaosha';

// 接收用户的id
$uid = $_GET['uid'];
// 获取一下redis里面已有的数量
$num = $redis->lLen($redis_name);
// 如果当天人数少于10的时候,则加入这个队列
if ($num < 10) {
    $redis->rPush($redis_name, $uid.'%'.microtime());
    echo $uid.'秒杀成功';
} else{
    // 如果当天人数已经达到了10个人,则返回秒杀已完成
    echo '秒杀已结束';
}

$redis->close();

3)处理队列的入库程序

<?php
include '../include/db.php';

// 加载redis组件
$redis = new Redis();
$redis -> connect('127.0.0.1', 6379);
$redis_name = 'miaosha';
$db= DB::getIntance();

// 死循环
while (1) {
    // 从队列最左取出一个值来
    $user = $redis->lPop($redis_name);
    // 然后判断这个值是否存在
    if (!$user || $user=='nil') {
        sleep(2);
        continue;
    }
    // 切割出时间
    $user_arr = explode('%', $user);
    $insert_data = array(
        'uid' => $user_arr[0],
        'time_stamp' => $user_arr[1], 
        );
    // 保存到数据库中
    $res = $db->insert('redis_queue', $insert_data);
    // 数据库插入失败的时候的回滚机制
    if (!$res) {
        $redis->rPush($redis_name, $user);
    }
    sleep(2);
}
//释放redis
$redis -> close();

3. 其它消息系统

3.1 RabbitMQ

3.1.1 关于RabbitMQ

官网:RabbitMQ
文档:RabbitMQ Documentation

3.1.2 RabbitMQ架构和原理

RabbitMQ完整的实现了AMQP、集群简化、持久化、跨平台。

3.1.3 RabbitMQ使用

1) 安装RabbitMQ(rabbitmq-server、php-amqplib)

2)生产者向消息通道发送消息

3)消费者处理消息

3.1.4 工作队列模式

更多模式参考:http://previous.rabbitmq.com/v3_5_7/getstarted.html