整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

Redis实战:20w缓存数据如何保持热度不减退?

言: 在处理大规模数据集时,保持热数据的高性能和可用性是一个关键问题。特别是在拥有 200 万数据中,有 20w 数据存储在 Redis 中的情况下,我们需要采取一些策略和技术来确保这些热数据的热度不减退。本文将介绍一些应对挑战的方法,帮助我们保持数据的热度,使其永不减退。

数据热度的重要性

数据热度指的是数据被频繁访问和使用的程度。热数据通常是应用程序的关键部分,对性能和用户体验有重要影响。因此,保持热数据的热度对于应用程序的成功至关重要。

保持热度不减退

处理大规模数据集带来了存储、查询和更新的效率挑战。而在这 200 万数据中,我们需要特别关注的是存储在 Redis 中的 20w 热数据。这些热数据需要保持高性能和可用性,避免热度减退。下面将介绍一些策略和技术来解决这些挑战。

策略和技术

1. 数据分析和预测

通过利用数据分析工具和算法,我们可以识别和预测热数据。通过分析历史数据的访问模式和趋势,我们可以预测未来的热数据。这样可以帮助我们更好地针对这些数据做出决策,例如优先存储在高性能的 Redis 中。

2. 缓存策略

利用缓存技术,我们可以将热数据存储在高性能的内存中,例如 Redis。通过使用 LRU(最近最少使用)或 LFU(最不常用)等缓存替换算法,我们可以保证热数据的高命中率,从而提高性能和响应时间。以下是一个使用 Redis 缓存的示例代码:

import redis

# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 设置热数据
r.set('hot_data_1', 'value_1')
r.set('hot_data_2', 'value_2')
r.set('hot_data_3', 'value_3')

# 获取热数据
data_1 = r.get('hot_data_1')
data_2 = r.get('hot_data_2')

3. 数据预加载

在系统启动或低峰期,我们可以预先加载热数据到缓存中。这样可以避免热数据第一次访问时的延迟和性能问题,提前准备好热数据的访问路径。以下是一个简单的数据预加载示例代码:

import redis

# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 预加载热数据
def preload_hot_data():
    hot_data = ['hot_data_1', 'hot_data_2', 'hot_data_3']
    for data in hot_data:
        value = fetch_data_from_database(data)  # 从数据库获取数据
        r.set(data, value)

# 执行数据预加载
preload_hot_data()

4. 数据更新和同步

保持热数据与底层数据的一致性非常重要。我们可以采用实时或异步的方式将底层数据的变更同步到热数据中。一种常见的方法是使用发布-订阅模式(Pub-Sub)来实现数据更新的实时通知机制。当底层数据发生变化时,即时更新热数据,以保持其与底层数据的一致性。以下是一个使用 Pub-Sub 模式进行数据更新和同步的示例代码:

import redis
import json

# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 订阅底层数据的更新通知
def update_hot_data(channel, message):
    data = json.loads(message)
    hot_data = data['hot_data']
    value = fetch_data_from_database(hot_data)  # 从数据库获取最新数据
    r.set(hot_data, value)

# 创建订阅者
pubsub = r.pubsub()

# 订阅底层数据的更新通知
pubsub.subscribe('data_updates')

# 监听数据更新通知
for message in pubsub.listen():
    if message['type'] == 'message':
        update_hot_data(message['channel'].decode('utf-8'), message['data'].decode('utf-8'))

5. 水平扩展和负载均衡

随着数据规模的增长,我们可能需要进行水平扩展和负载均衡,以应对高并发和大规模数据的挑战。通过将热数据分片存储在多个 Redis 节点上,并使用负载均衡算法将请求分发到这些节点上,我们可以提高系统的性能和可伸缩性。以下是一个使用 Redis Cluster 进行水平扩展和负载均衡的示例代码:

from rediscluster import RedisCluster

# Redis Cluster节点配置
startup_nodes = [
    {'host': 'node1', 'port': 6379},
    {'host': 'node2', 'port': 6379},
    {'host': 'node3', 'port': 6379}
]

# 创建Redis Cluster连接
r = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# 设置热数据
r.set('hot_data_1', 'value_1')
r.set('hot_data_2', 'value_2')
r.set('hot_data_3', 'value_3')

# 获取热数据
data_1 = r.get('hot_data_1')
data_2 = r.get('hot_data_2')

6. 监控和调优:

对于处理大规模数据集的系统,监控和调优是保持热数据热度的关键。我们可以使用监控工具来实时监测热数据的访问情况、性能指标和系统负载。根据监控数据,我们可以进行性能优化和资源调配,以保持热数据的高性能。以下是一个使用 Redis 的监控和调优示例代码:

import redis

# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 获取热数据的访问次数
def get_hot_data_access_count():
    hot_data_access_count = {}
    keys = r.keys('hot_data_*')
    for key in keys:
        count = r.get(key)
        hot_data_access_count[key] = count
    return hot_data_access_count

# 监控热数据的访问情况
def monitor_hot_data():
    while True:
        hot_data_access_count = get_hot_data_access_count()
        # 打印热数据的访问次数
        print(hot_data_access_count)
        # 做一些性能优化和资源调配的决策

结论

通过采取数据分析和预测、缓存策略、数据预加载、数据更新和同步、水平扩展和负载均衡以及监控和调优等策略和技术,我们可以有效应对保持热度不减退的挑战。这些方法可以帮助我们保持热数据的高性能和可用性,提供出色的用户体验和系统性能。

参考文献:

  • Smith, J. (2020). Scaling Redis at Twitter. Retrieved from https://blog.twitter.com/engineering/en_us/topics/infrastructure/2020/scaling-redis-at-twitter.html
  • Redis Labs. (n.d.). Redis as a Caching Layer. Retrieved from https://redislabs.com/solutions/use-cases/caching/

以上是一份关于保持热度不减退的详细文章,其中包含了更多的解释和代码示例。如果您有任何进一步的问题或需要更多帮助,请随时提问。

在前面

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. 消费者(客户端)掉线;
  2. ​ 消费者未订阅(所以使用的时候一定记得先订阅再生产);
  3. ​ 服务端宕机;
  4. ​ 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章;

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

XREAD--订阅消息

订阅消息

XREAD COUNT 5 STREAMS stream1 0-0 
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

'0-0' 表示从开头读取

如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息

50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

XGROUP CREATE stream1 group1 0-0  
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;


127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

同样

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

#分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 > 

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

 XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
      2) "4"

表示:

  1. (integer) 4 //表示当前消费者组的待处理消息的数量
  2. "1631628884174-0" //消息最大id
  3. "1631629084083-0" //最小id
      1. "consumer1" // 消费者名称
      2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
      2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

发布:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

订阅:

static async Task CsRedisStreamConsumer()
{
    Console.WriteLine("CsRedis StreamConsumer start!");

    var csRedis = new CSRedis.CSRedisClient(_connstr);
    csRedis.XAdd(_keyStream, "*", ("name", "message1"));

    try
    {
        csRedis.XGroupCreate(_keyStream, _nameGrourp);
    }
    catch { }

    (string key, (string id, string[] items)[] data)[] product;
    (string Pid, string Platform, string Time) data = (null, null, null);

    while (true)
    {
        try
        {
            product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
            if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
            {
                Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");

                product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
                {
                    Console.WriteLine($"    {value}");
                });

                //csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
            }
        }
        catch (Exception)
        {
            //throw;
        }
    }
}

CSRedisCore

这里的超时报错可通过修改连接参数:syncTimeout 解决

CSRedisCore支持阻塞读取;

StackExchange.Redis

发布:

db.StreamAdd(_keyStream, "name", "message1", "*");

订阅:

static async Task StackExchangeRedisStreamConsumer()
{
    Console.WriteLine("StackExchangeRedis StreamConsumer start!");

    var redis = ConnectionMultiplexer.Connect(_connstr);
    var db = redis.GetDatabase();

    try
    {
        ///初始化方式1
        //db.StreamAdd(_keyStream, "name", "message1", "*");
        //db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);

        //方式2
        db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
    }
    catch { }

    StreamEntry[] data = null;

    while (true)
    {
        data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);

        if (data?.Length > 0 == true)
        {
            Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");

            data.FirstOrDefault().Values.ToList().ForEach(c =>
            {
                Console.WriteLine($"    {c.Name}:{c.Value}");
            });

            db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
        }
    }
}

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支持AOF、RDB持久化?

A:支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

A:会;1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库)

总结

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

# 揭秘神器:Node.js开发者必备的Redis客户端可视化管理工具

### 引言:Redis与Node.js的不解之缘

在现代Web开发领域,尤其是Node.js生态中,Redis凭借其高性能、内存存储、丰富的数据结构以及实时性等特性,成为了开发者手中不可或缺的缓存和数据存储利器。然而,如何高效、直观地管理和操作Redis数据库,对开发者来说至关重要。本文将为你揭示一款专为Node.js开发者设计的Redis客户端可视化管理工具——Redis Desktop Manager(简称RDM),助你在开发过程中如虎添翼。

### 一、Redis Desktop Manager:功能强大且易用

. 界面友好,操作直观

Redis Desktop Manager以其简洁明了的界面设计和人性化的交互体验,让开发者能轻松实现Redis数据的可视化管理。通过图形化界面,无论是查看键值对、执行命令,还是监控Redis服务器状态,都变得简单易行。

2. 支持多平台及多种Redis版本

RDM支持Windows、macOS和Linux三大主流操作系统,并兼容Redis 2.x至最新版本,确保无论在哪种开发环境下,都能提供一致且高效的使用体验。

3. 多数据库连接与分组管理

RDM允许用户同时连接并管理多个Redis实例,支持自定义数据库分组,方便在不同项目间快速切换,极大提高了工作效率。

### 二、实战演练:使用Redis Desktop Manager进行数据操作

1. 连接Redis服务

javascript
// 假设你的Redis服务器配置如下:
{
  host: 'localhost',
  port: 6379,
  password: 'your_password'
}

首先,打开RDM,点击“Add New Connection”,输入Redis服务器地址、端口、密码等相关信息,即可成功连接到Redis服务。

2. 数据库操作示例

- **插入键值对**:在RDM左侧键空间列表中选择一个数据库,然后在右侧操作区域直接输入`SET key value`,例如`SET myKey myValue`,点击执行按钮或按回车即可。

- **查询键值**:查询`myKey`对应的值,只需输入`GET myKey`并执行。

- **其他高级操作**:如集合(Set)、有序集合(ZSet)、哈希表(Hash)等复杂数据类型的增删查改,均可在RDM的命令行界面便捷完成。

### 三、进阶功能:深度探索与性能监控

1. 数据备份与恢复

RDM提供了便捷的数据导出导入功能,可以将Redis中的数据以RDB或AOF格式备份,并在需要时恢复,保障数据安全。

2. 实时监控Redis性能