延时消息(定时消息)指的在 分布式异步消息场景 下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费。
延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息的功能一般会在下沉到中间件层,通常是 MQ 中内置这个功能或者内聚成一个公共基础服务。
本文旨在探讨常见延时消息的实现方案以及方案设计的优缺点。
这里讨论的外部存储指的是在 MQ 本身自带的存储以外又引入的其他的存储系统。
基于外部存储的方案本质上都是一个套路,将 MQ 和 延时模块 区分开来,延时消息模块是一个独立的服务/进程。延时消息先保留到其他存储介质中,然后在消息到期时再投递到 MQ。当然还有一些细节性的设计,比如消息进入的延时消息模块时已经到期则直接投递这类的逻辑,这里不展开讨论。
下述方案不同的是,采用了不同的存储系统。
基于关系型数据库(如MySQL)延时消息表的方式来实现。
CREATE TABLE `delay_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`delivery_time` DATETIME NOT NULL COMMENT '投递时间',
`payloads` blob COMMENT '消息内容',
PRIMARY KEY (`id`),
KEY `time_index` (`delivery_time`)
)
通过定时线程定时扫描到期的消息,然后进行投递。定时线程的扫描间隔理论上就是你延时消息的最小时间精度。
优点:
缺点:
RocksDB 的方案其实就是在上述方案上选择了比较合适的存储介质。
RocksDB 在笔者之前的文章中有聊过,LSM 树根更适合大量写入的场景。滴滴开源的DDMQ中的延时消息模块 Chronos 就是采用了这个方案。
DDMQ 这个项目简单来说就是在 RocketMQ 外面加了一层统一的代理层,在这个代理层就可以做一些功能维度的扩展。延时消息的逻辑就是代理层实现了对延时消息的转发,如果是延时消息,会先投递到 RocketMQ 中 Chronos 专用的 topic 中。延时消息模块 Chronos 消费得到延时消息转出到 RocksDB,后面就是类似的逻辑了,定时扫描到期的消息,然后往 RocketMQ 中投递。
这个方案老实说是一个比较重要的方案。因为基于 RocksDB 来实现的话,从数据可用性的角度考虑,你还需要自己去处理多副本的数据同步等逻辑。
优点:
缺点:
再来聊聊 Redis 的方案。下面放一个比较完善的方案。
本方案来源于: https://www.cnblogs.com/lylife/p/7881950.html
这个方案选用 Redis 存储在我看来有以下几点考虑,
但是这个方案其实也有需要斟酌的地方,上述方案通过创建多个 Delayed Queue 来满足对于并发性能的要求,但这也带来了多个 Delayed Queue 如何在多个节点情况下均匀分配,并且很可能出现到期消息并发重复处理的情况,是否要引入分布式锁之类的并发控制设计?
在量不大的场景下,上述方案的架构其实可以蜕化成主从架构,只允许主节点来处理任务,从节点只做容灾备份。实现难度更低更可控。
上述几个方案中,都通过线程定时扫描的方案来获取到期的消息。
定时线程的方案在消息量较少的时候,会浪费资源,在消息量非常多的时候,又会出现因为扫描间隔设置不合理导致延时时间不准确的问题。可以借助 JDK Timer 类中的思想,通过 wait-notify 来节省 CPU 资源。
获取中最近的延时消息,然后wait(执行时间-当前时间),这样就不需要浪费资源到达时间时会自动响应,如果有新的消息进入,并且比我们等待的消息还要小,那么直接notify唤醒,重新获取这个更小的消息,然后又wait,如此循环。
再来讲讲目前自带延时消息功能的开源MQ,它们是如何实现的
RocketMQ 开源版本支持延时消息,但是只支持 18 个 Level 的延时,并不支持任意时间。只不过这个 Level 在 RocketMQ 中可以自定义的,所幸来说对普通业务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
通俗地讲,设定了延时 Level 的消息会被暂存在名为 SCHEDULE_TOPIC_XXXX 的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1,**即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费。**broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
下面是整个实现方案的示意图,红色代表投递延时消息,紫色代表定时调度到期的延时消息:
优点:
缺点:
Pulsar 支持“任意时间”的延时消息,但实现方式和 RocketMQ 不同。
通俗的讲,Pulsar 的延时消息会直接进入到客户端发送指定的 Topic 中,然后在堆外内存中创建一个基于时间的优先级队列,来维护延时消息的索引信息。延时时间最短的会放在头上,时间越长越靠后。在进行消费逻辑时候,再判断是否有到期需要投递的消息,如果有就从队列里面拿出,根据延时消息的索引查询到对应的消息进行消费。
如果节点崩溃,在这个 broker 节点上的 Topics 会转移到其他可用的 broker 上,上面提到的这个优先级队列也会被重建。
下面是 Pulsar 公众号中对于 Pulsar 延时消息的示意图。
乍一看会觉得这个方案其实非常简单,还能支持任意时间的消息。但是这个方案有几个比较大的问题
对于前面第一点和第二点的问题,社区也设计了解决方案,在队列中加入时间分区,Broker 只加载当前较近的时间片的队列到内存,其余时间片分区持久化磁盘,示例图如下图所示:
但是目前,这个方案并没有对应的版本。可以在实际使用时,规定只能使用较小时间跨度的延时消息,来减少前两点缺陷的影响。
至于第三个方案,估计是比较难解决的,需要在数据存储层将延时消息和正常消息区分开来,单独存储延时消息。
QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。
把 QMQ 放到最后,是因为我觉得 QMQ 是目前开源 MQ 中延时消息设计最合理的。里面设计的核心简单来说就是 多级时间轮 + 延时加载 + 延时消息单独磁盘存储 。
如果对时间轮不熟悉的可以阅读笔者的这篇文章 从 Kafka 看时间轮算法设计
QMQ的延时/定时消息使用的是两层 hash wheel 来实现的。第一层位于磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,可以根据实际情况在配置里进行调整),每个刻度会生成一个日志文件(schedule log),因为QMQ支持两年内的延时消息(默认支持两年内,可以进行配置修改),则最多会生成 2 * 366 * 24 = 17568 个文件(如果需要支持的最大延时时间更短,则生成的文件更少)。 第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(索引包括消息在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度 。
总结一下设计上的亮点:
本文汇总了目前业界常见的延时消息方案,并且讨论了各个方案的优缺点。希望对读者有所启发。
原文 https://ricstudio.top/archives/delay-msg-designs
oxmail邮箱是不少网友都在使用的邮箱,不过,很多foxmail邮箱的人性化功能却不为人知。比如说,foxmail邮箱的密送功能、定时发送邮件功能等。今天,小编就给大家分享一下关于foxmail邮箱定时发送邮件的设置方法。那么,foxmail邮箱的定时发送功能怎么开启呢?一起来看看今天的foxmail邮箱使用方法就知道了!
1、首先我们将邮件写好,点击右上角菜单==定时发送;
2、出现定时发送设置选项,设置您需要定时发送邮件的发送时间;
3、设置好了之后,点击发送邮件会自动到草稿箱里面,等待 定时的时间进行邮件发送操作;
4、点击邮件,查看可以看到 设置好的定时发送邮件的信息如下:
avaScript 是一种异步的、事件驱动的语言,这在处理诸如网络请求、文件操作或定时任务等操作时非常有用。传统上,JavaScript 使用回调函数来处理异步操作,但这可能会导致所谓的“回调地狱”。ES6 引入了 Promises 来帮助解决这个问题,而 ES2017 则引入了 async/await,进一步简化了异步编程。
Async/await 是基于 Promises 的,它允许我们以同步的方式编写异步代码,使得代码更加清晰易懂。下面,我们将通过几个例子来演示如何在实际中使用 async/await。
在这个例子中,我们将使用 async/await 来异步加载一张图片,并将其显示在页面上。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:异步加载图片</title>
</head>
<body>
<button id="loadImageButton">加载图片</button>
<div id="imageContainer"></div>
<script>
async function loadImage(url) {
return new Promise((resolve, reject) => {
const image = new Image();
image.onload = () => resolve(image);
image.onerror = () => reject(new Error('图片加载失败'));
image.src = url;
});
}
document.getElementById('loadImageButton').addEventListener('click', async function() {
try {
const image = await loadImage('https://via.placeholder.com/150');
document.getElementById('imageContainer').appendChild(image);
} catch (error) {
console.error(error);
}
});
</script>
</body>
</html>
在这个例子中,loadImage 函数返回一个 Promise 对象,我们通过 async 关键字声明了一个异步函数,并在事件监听器中使用 await 关键字等待图片加载完成。
在这个例子中,我们将使用 async/await 来异步获取网络数据,并在页面上显示。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:异步获取数据</title>
</head>
<body>
<button id="fetchDataButton">获取数据</button>
<pre id="dataContainer"></pre>
<script>
async function fetchData(url) {
const response = await fetch(url);
if (!response.ok) {
throw new Error('网络请求失败');
}
return response.json();
}
document.getElementById('fetchDataButton').addEventListener('click', async function() {
try {
const data = await fetchData('https://mock.apifox.com/m1/2209590-0-default/pet/findByStatus');
document.getElementById('dataContainer').textContent = JSON.stringify(data, null, 2);
} catch (error) {
console.error(error);
}
});
</script>
</body>
</html>
在这个例子中,fetchData 函数中使用 await 关键字等待 fetch API 的响应,并处理成功和失败的情况。
Async/await 使得异步代码的错误处理变得更加直观。我们可以使用传统的 try/catch 语句来捕获和处理错误。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:错误处理</title>
</head>
<body>
<button id="errorHandlingButton">执行操作</button>
<div id="resultContainer"></div>
<script>
async function riskyOperation() {
throw new Error('出错了!');
}
document.getElementById('errorHandlingButton').addEventListener('click', async function() {
try {
await riskyOperation();
document.getElementById('resultContainer').textContent = '操作成功';
} catch (error) {
document.getElementById('resultContainer').textContent = error.message;
}
});
</script>
</body>
</html>
在这个例子中,我们故意在 riskyOperation 函数中抛出一个错误,然后在事件监听器中使用 try/catch 来捕获这个错误。
Async/await 提供了一种更加直观和简洁的方式来处理 JavaScript 中的异步操作。通过这些例子,我们可以看到它如何帮助我们以更加同步的方式编写异步代码,同时保持代码的可读性和可维护性。随着 JavaScript 语言的不断发展,我们期待未来会有更多的新特性来进一步简化异步编程。
*请认真填写需求信息,我们会在24小时内与您取得联系。