PHP使用Redis的List,列表命令实现消息队列

使用Redis的List(列表)命令实现消息队列,生产者使用lPush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

1.用到的List(列表)命令

命令作用
lPush将一个或多个值插入到列表头部
rpoplpush弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
lRem删除列表内的给定值
lIndex按索引获取列表内的值

2.队列的组成

名称职责
生产者发布消息
消费者获取并处理消息
监听者监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者Producter.php

<?php
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    $redis = new Redis();
    $redis->connect('ip', 6379);
    //向列表中push10条消息
    for ($i = 0;$i < 10;$i++){
        //为消息生成唯一标识
        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
        $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
        var_dump($ret);
    }

} catch (Exception $e){
    echo $e->getMessage();
}

消费者Consumer.php

<?php
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('ip', 6379);
    //队列先进先出,弹出最先加入的消息,同时放入监听队列
    while (true){
        $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            //将唯一id写入缓存设置有效期
            $redis->setex($retArray['uniqid'], 60, 0);
            //模拟失败
            $rand = mt_rand(0,9);
            if ($rand < 3){
                echo "failure:".$ret."\n";
            } else {
                //todo
                //处理成功移除消息
                $redis->lRem($watchQueueKey, $ret, 0);
                echo "success:".$ret."\n";
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

监听者Watcher.php

<?php
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('ip', 6379);

    while (true){
        //取出列表尾部的一个值
        $ret = $redis->lIndex($watchQueueKey, -1);
        //如果不存在则休眠1秒
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            $idCache = $redis->get($retArray['uniqid']);
            if ($idCache === false){
                //如果已过期,表示任务超时,弹回原队列
                $redis->rpoplpush($watchQueueKey, $queueKey);
                echo "rpoplpush:".$ret."\n";
            } else {
                //处理中,继续等待
                sleep(1);
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

4.执行队列

开启监听者php Watcher.php

开启消费者php Consumer.php

执行生产者php Producter.php

生产者输出

int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行

转自:https://www.jmsite.cn/blog-615.html