PHP简易延时队列的实现流程详解

需求说明

  • 当用户申请售后,商家未在n小时内处理,系统自动进行退款。
  • 商家拒绝后,用户可申请客服介入,客服x天内超时未处理,系统自动退款。
  • 用户收到货物,x天自动确认收货
  • 等等需要延时操作的流程……

设计思路

  • 设计一张队列表,记录所有队列的参数,执行状态,重试次数
  • 将创建队列的id 存于redis 中,使用zset有序集合。按照时间戳进行排序
  • 使用croontab定时任务每分钟执行一次

实现

新建队列表

CREATE TABLE `delay_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `params` varchar(512) DEFAULT NULL,
  `message` varchar(255) DEFAULT '' COMMENT '执行结果',
  `ext_string` varchar(255) DEFAULT '' COMMENT '扩展字符串,可用于快速检索。取消该队列',
  `retry_times` int(2) DEFAULT '0' COMMENT '重试次数',
  `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待执行, 10 执行成功, 20 执行失败,30取消执行',
  `created` datetime DEFAULT NULL,
  `modified` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `ext_idx` (`ext_string`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

部分队列的操作方法,新增队列、取消队列、队列执行成功、队列执行失败、队列重试【重试时间间隔抄的微信支付的异步通知时间】

class DelayQueueService
{
    // 重试时间,最大重试次数 15
    private static $retryTimes = [
        15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60,
        3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60,
    ];
        /**
         * @description 增加队列至redis
         * @param $queueId
         * @param int $delay 需要延迟执行的时间。单位秒
         * @return void
         */
        public function addDelayQueue($queueId, int $delay)
        {
            $time = time() + $delay;
            $redis = RedisService::getInstance();
            $redis->zAdd("delay_queue_job", $time, $queueId);
        }
        // 取消redis 队列
        public function cancelDelayQueue($ext)
        {
            $row = $query->getRow(); // 使用ext_string 快速检索到相应的记录
            if ($row) {
                $redis = RedisService::getInstance();
                $redis->zRem('delay_queue_job', $row->id);
                $row->status = DelayQueueTable::STATUS_CANCEL;
                $table->save($row);
            }
        }
        /**
         * @description 执行成功
         * @return void
         */
        public static function success($id, $message = null)
        {
            $table->update([
                'status' => DelayQueueTable::STATUS_SUCCESS,
                'message' => $message ?? '',
                'modified' => date('Y-m-d H:i:s'),
            ], [
                'id' => $id,
            ]);
        }
        /**
         * @description 执行失败
         * @return void
         */
        public static function failed($id, $message = null)
        {
            $table->updateAll([
                'status' => DelayQueueTable::STATUS_FAILED,
                'message' => $message ?? '',
                'modified' => date('Y-m-d H:i:s'),
            ], [
                'id' => $id,
            ]);
        }
        /**
         * @description 失败队列重试,最大重试15次
         * @param $id
         * @return void
         */
        public static function retry($id)
        {
            $info = self::getById($id);
            if (!$info) {
                return;
            }
            $retryTimes = ++$info['retry_times'];
            if ($retryTimes > 15) {
                return;
            }
            $entity = [
                'params' => $info['params'],
                'ext_string' => $info['ext_string'],
                'retry_times' => $retryTimes,
            ];
            $queueId = $table->save($entity);
            self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]);
        }
}

在命令行进行任务的运行

public function execute(Arguments $args, ConsoleIo $io)
{
    $startTimestamp = strtotime("-1 days");
    $now = time();
    $redis = RedisService::getInstance();
    $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now);
    if ($queueIds) {
        foreach ($queueIds as $id) {
            $info = // 按照队列id 获取相应的信息
            if ($info['status'] === DelayQueueTable::STATUS_PADDING) {
                $params = unserialize($info['params']); // 创建记录的时候,需要试用serialize 将类名,方法,参数序列化
                $class = $params['class'];
                $method = $params['method'];
                $data = $params['data'];
                try {
                    call_user_func_array([$class, $method], [$data]);
                    $redis->zRem('delay_queue_job', $id);
                    $msg = date('Y-m-d H:i:s') . " [info] success: $id";
                    DelayQueueService::success($id, $msg);
                    $io->success($msg);
                } catch (Exception $e) {
                    $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}";
                        DelayQueueService::failed($id, $msg);
                        // 自定义异常code,不进行队列重试
                        if (10000 != $e->getCode()) {
                            DelayQueueService::retry($id);
                        }
                        $io->error($msg);
                }
            }
        }
    }
}

最后说点

  • 我这边的系统对实时性要求不高,所以直接使用的是linuxcrond 服务,每分钟运行一次。如需精确到秒级,可写一个shell,一分钟循环执行<=60
  • 因为目前的数据较少,延时队列加入的只有小部分。所以就在command 里面直接执行更新操作了,后期如果队列多,且有比较耗时的操作,可考虑把耗时操作单独放置一个队列中。本方法只用于将数据塞进队列。

附上 shell 脚本 一分钟执行60次

#!/bin/bash
step=2 #间隔的秒数,不能大于60
for (( i = 0; i < 60; i=(i+step) )); do
   echo $i # do something
   sleep $step
done 

原文地址:https://blog.csdn.net/qq_39059866/article/details/126470332