7 Star 33 Fork 4

corn-core / open-delay-queue

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 4.02 KB
一键复制 编辑 原始数据 按行查看 历史
玉米 提交于 2021-03-09 14:12 . update README.md.

DelayQueue延迟消息队列

介绍

架构:

SpringCloud-Stream + RocketMQ + RocksDB(可以使用Nacos,Eureka作为注册中心,作为单独的延迟消息服务使用)

基于Apache RocketMQ的一种延迟消息队列。

由于Apache RocketMQ不支持自定义延迟消息队列,为了支持延时消息队列特开此项目进行对Apache RocketMQ进行补充。以至于达到可以实现自定义延迟的一种方式。

相关Apache RocketMQ消息队列:

TOPIC_SYSTEM_QUEUE_REALTIME: 实时消息队列(当消息到期后,此队列消息增加1个)

TOPIC_SYSTEM_QUEUE_DELAY: 延迟队列(用于接收延迟消息请求)

OpenDelayQueue 调用流程

1.0.0支持功能项:

1、支持水平扩展DelayServer

2、支持自定义延迟消息,可以精确到毫秒级。

请求过程:

1、应用调用请求

2、发起延时消息

3、应用方将消息写入 TOPIC_SYSTEM_QUEUE_DELAY

4、延时服务方监听TOPIC_SYSTEM_QUEUE_DELAY 将消息读取出来写入RocksDB

5、延时服务方循环遍历RocksDB检测延时消息是否需要发送

6、如果需要发送则将消息放入TOPIC_SYSTEM_QUEUE_REALTIME

7、应用监听TOPIC_SYSTEM_QUEUE_REALTIME 进行业务操作。

如何扩展?

完全沿袭Apache RocketMQ高可用、高性能、低延迟、高可靠的优点,如果RocketMQ性能不够,则增加RocketMQ Broker节点。

如果是消息消费性能不够,则将此服务复制一份,消费组配置成TOPIC_SYSTEM_QUEUE_DELAY 同一个即可。

当然可以改造成Eureka服务,或者Nacos服务,使其Restful也支持高可用。

OpenDelayQueue 高可用

如何调用

提供2种模式,一种是Restful接口,一种是SpringCloudStream

Restful接口模式:
非延迟消息:

GET 1.1 http://delay-server:port/message/push/normal

参数:

id: 消息ID(唯一)

message: 消息内容

例子:

GET 1.1 http://delay-server:port/message/push/normal?id=1&message=Hello World

延迟消息(时间戳):

GET 1.1 http://delay-server:port/message/push/delay

参数:

id: 消息ID(唯一)

message: 消息内容

timestamp:发送时间

例子:

GET 1.1 http://delay-server:port/message/push/delay?id=1&message=Hello World&timestamp=未来发出消息的时间戳

延迟消息字(符串日期):

GET 1.1 http://delay-server:port/message/push/delay/pattern

参数:

id: 消息ID(唯一)

message: 消息内容

date:字符串日期

pattern:格式化规则

例子:

GET 1.1 http://delay-server:port/message/push/delay/pattern?id=1&message=Hello World&date=2021-03-03 11:00:00&pattern=yyyy-MM-dd HH:mm:ss

SpringCloudStream模式

配置消息生产通道Topic, Topic TOPIC_SYSTEM_QUEUE_DELAY

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          #新版使用该地址
          name-server: rocketmq-server:9876
      bindings:
        #发送通道
        system_topic_queue_realtime_output:
          #目的地:主题(Topic)
          destination: TOPIC_SYSTEM_QUEUE_REALTIME
        #发送通道
        system_topic_queue_delay_output:
          #目的地:主题(Topic)
          destination: TOPIC_SYSTEM_QUEUE_DELAY
        #接收通道
        system_topic_queue_delay_input:
          #目的地:主题(Topic)
          destination: TOPIC_SYSTEM_QUEUE_DELAY
          #MessageConvert 类型, JSON协议
          content-type: application/json
          #消费组
          group: TOPIC_SYSTEM_QUEUE_DELAY_GROUP
  application:
    name: omuao-delay-queue
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
  mvc:
    date-format: yyyy-MM-dd HH:mm:ss
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 10MB
Java
1
https://gitee.com/corn-cob/open-delay-queue.git
git@gitee.com:corn-cob/open-delay-queue.git
corn-cob
open-delay-queue
open-delay-queue
master

搜索帮助