控制 Redis stream 的消息数量

2021年09月16日 阅读数:3
这篇文章主要向大家介绍控制 Redis stream 的消息数量,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

控制 Redis stream 的消息数量

Intro

Redis Stream 是 Redis 5.0 引入的一个新的类型,以前咱们介绍过使用 Redis Stream 来实现消息队列,能够参考以前的文章 使用 Redis Stream 实现消息队列,而 Stream 的消息会持久化地内存中,若是咱们不控制消息数量的话,可能会出现大量的消息存在内存里致使过大的内存占用,Redis Stream 5.0 开始支持根据 Max Length 来控制 Stream 的长度(消息数量),从 6.2 开始支持根据消息 Id 来控制 Stream 的长度,默认地消息 Id 是一个时间戳,因此使用默认地 Id 也能够理解为按时间来控制 Stream 长度,下面咱们来看使用示例吧git

Redis 语法

控制 Stream 消息长度有两个 Redis 命令,一个是 XTRIM 只作 Trim 操做,把不知足要求的消息去除,另一个是 XADD 在添加 Stream 消息的同时作 Trim 操做,简化还要多一步 Trim 的操做,将添加消息和控制消息长度能够合并为一个操做github

XTRIM 语法:web

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

使用示例:redis

XTRIM mystream MAXLEN 1000
XTRIM mystream MINID 649085820

XTRIM mystream MAXLEN ~ 1000(Nearly trim,不许确,可能有些消息该删掉的会保留下来,可是执行效率会比 `=`(Exactly Trim) 高一些)
XTRIM mystream MINID = 649085820

Trimming the stream can be done using one of these strategies:微信

  • MAXLEN: Evicts entries as long as the stream's length exceeds the specified  threshold, where  threshold is a positive integer.
  • MINID: Evicts entries with IDs lower than  threshold, where  threshold is a stream ID.

XADD 语法:async

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

使用示例:编辑器

redis> XADD mystream * name Sara surname OConnor
"1631546114612-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1631546114612-1"

redis> XADD mystream MAXLEN ~ 1000 field1 value1
redis> XADD mystream MINID ~ 1631546460687 field1 value1

XADD 容许用户在向 Stream 里添加消息的时候控制消息的长度返回值是消息ID,默认是一个时间戳,语法如上,能够 Trim 也能够 不Trim,能够根据须要选择url

Prepare

先来准备一些帮助类和公共方法,下面的示例是基于 StackExchange.Redis 来实现的spa

RedisHelper,获取 Redis 链接.net

internal static class RedisHelper
{
    private static readonly IConnectionMultiplexer ConnectionMultiplexer = StackExchange.Redis.ConnectionMultiplexer.Connect("127.0.0.1:6379");

    public static IDatabase GetRedisDb(int dbIndex = 0)
    {
        return ConnectionMultiplexer.GetDatabase(dbIndex);
    }
}

AddStreamMessage,向指定 stream 中添加若干条消息

private static async Task AddStreamMessage(string key, int msgCount, Action action=null)
{
    var redis = RedisHelper.GetRedisDb();
    for (var i = 0; i < msgCount; i++)
    {
        await redis.StreamAddAsync(key, "messages"$"val-{i}");
        action?.Invoke();
    }
}

Max-Length

根据 MaxLength 来控制 Stream 长度示例

var streamKey = $"stream-{nameof(MaxLengthTrim)}";
await AddStreamMessage(streamKey, 10);
var redis = RedisHelper.GetRedisDb();
Console.WriteLine(await redis.StreamLengthAsync(streamKey));

// trim directly
await redis.StreamTrimAsync(streamKey, 5);
Console.WriteLine(await redis.StreamLengthAsync(streamKey));

// add with trim
await redis.StreamAddAsync(streamKey, StreamMessageField, "Test", maxLength: 3);
Console.WriteLine(await redis.StreamLengthAsync(streamKey));

await redis.KeyDeleteAsync(streamKey);

输出结果以下:

Min-ID

根据 Min-ID 来控制 Stream 消息长度,是 Redis 6.2 新引入的功能,目前 StackExchange.Redis 尚未专门的 API 来支持这个功能,不过咱们能够经过 Execute 来执行 Redis 命令,一般这些 Redis 客户端库都会支持直接调用 Redis 命令,根据 MinID 控制 Stream 长度示例以下:

private const string StreamAddCommandName = "XADD";
private const string StreamTrimCommandName = "XTRIM";

private const string StreamAddAutoMsgId = "*";

private const string StreamTrimByMinIdName = "MINID";

private const string StreamTrimOperator = "=";

private const string StreamMessageField = "message";

private static async Task MinMsgIdTrim()
{
    var streamKey = $"stream-{nameof(MaxLengthTrim)}";
    await AddStreamMessage(streamKey, 10, () => Thread.Sleep(1000));

    var redis = RedisHelper.GetRedisDb();
    var minId = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(5)).ToUnixTimeMilliseconds();
    Console.WriteLine(await redis.StreamLengthAsync(streamKey));

    // https://redis.io/commands/xtrim
    // trim directly
    await redis.ExecuteAsync(
        StreamTrimCommandName, 
        streamKey,
        StreamTrimByMinIdName,
        StreamTrimOperator, // optional
        minId
    );
    Console.WriteLine(await redis.StreamLengthAsync(streamKey));
    minId = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(2)).ToUnixTimeMilliseconds();

    // https://redis.io/commands/xadd
    // add with trim
    var result = redis.Execute(
        StreamAddCommandName, 
        streamKey, 
        StreamTrimByMinIdName, 
        StreamTrimOperator, // optional
        minId,
        StreamAddAutoMsgId, 
        StreamMessageField, 
        "Test"
    );
    Console.WriteLine(await redis.StreamLengthAsync(streamKey));

    await redis.KeyDeleteAsync(streamKey);
}

上述代码输出结果以下:

More

本文主要介绍了控制 Redis Stream 的消息长度,除了介绍 Redis 自己的命令以外,也是介绍一下如何使用 StackExchange.Redis 实现调用没有 API 支持的 Redis 命令,Redis 6.2 以后支持了不少新的特性,可是不少库都还太支持,了解如何原生调用 Redis 命令有些时候会颇有帮助

References

  • https://redis.io/commands/xadd
  • https://redis.io/commands/xtrim
  • https://github.com/WeihanLi/SamplesInPractice/blob/master/RedisSample/StreamTrimSample.cs


本文分享自微信公众号 - dotNET跨平台(opendotnet)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。