1 Star 7 Fork 2

鲸歌 / memory_message_queue

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MulanPSL-2.0

Java基于内存的消息队列实现

先看测试情况

1648028706108

需求背景

需求来源于我写的一个动态任务调度框架:https://gitee.com/hyxl-520/auto-job ,该框架需要实现对运行的任务的日志记录。如何有序、高效的采集获得的日志是一个问题,因此我拟采用基于内存的消息队列。本博客只讨论消息队列实现方式,不讨论动态任务调度框架的使用方式。

实现的功能

  1. 支持可阻塞的消息生产和消费。
  2. 支持TTL(即消息可设置过期时长)。
  3. 支持单播和多播。
  4. 支持发布监听和过期监听

拟采用的数据结构

根据需求和功能,拟采用Java的LinkedBlockingQueue,具体原因如下:

  1. LinkedBlockingQueue具有可阻塞、线程安全的优点。
  2. LinkedBlockingQueue具有链式结构,由于要实现TTL,即会经常的删除消息队列中的已过期消息,相比ArrayBlockingQueue,链式结构删除复杂度更低

完整数据结构如下:

Map<String, MessageQueue<MessageEntry<M>>> messageQueues;

public static class MessageQueue<M> {
        private final BlockingQueue<M> messageQueue;

        public MessageQueue() {
            messageQueue = new LinkedBlockingQueue<>();
        }


        public MessageQueue(int maxLength) {
            if (maxLength <= 0) {
                throw new IllegalArgumentException("最大容量应该为非负数");
            }
            messageQueue = new LinkedBlockingDeque<>(maxLength);
        }

        public M takeMessageSync() throws InterruptedException {
            return messageQueue.take();
        }

        public M takeMessage() {
            return messageQueue.poll();
        }

        public M readMessage() {
            return messageQueue.peek();
        }

        public M tryReadMessage() {
            return messageQueue.element();
        }

        public boolean removeIf(Predicate<? super M> predicate) {
            if (predicate == null) {
                return false;
            }
            return messageQueue.removeIf(predicate);
        }

        public int length() {
            return messageQueue.size();
        }

        public boolean remove(M message) {
            if (message == null) {
                return false;
            }
            return messageQueue.remove(message);
        }

        public Iterator<M> iterator() {
            return messageQueue.iterator();
        }

        private boolean publishMessageSync(M message) throws InterruptedException {
            if (message == null) {
                return false;
            }
            try {
                messageQueue.put(message);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        }

        private boolean publishMessage(M message) {
            if (message == null) {
                return false;
            }
            try {
                return messageQueue.offer(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }

    }

public static class MessageEntry<M> {
        long messageId;
        M message;
        long expiringTime;

        public MessageEntry(long messageId, M message, long expiringTime) {
            this.messageId = messageId;
            this.message = message;
            this.expiringTime = expiringTime;
        }

        public MessageEntry() {
        }
    }

Map采用ConcurrentHashMap,确保线程安全。其中MessageQueue封装了LinkedBlockingQueue,作为容器的内部类,其中生产者方法为私有,保证容器可以访问其生产者方法,但是消费者在外部订阅后只能访问消费者的方法。

MessageEntry是消息条目的封装,除了消息内容,还包含消息ID以及过期时间。

模块设计

1648028706108

1648028706108

IMessageQueueContextIMessageQueueContext是消息容器,所有的消息都将放在这个类里面,其实现了IProducerICounsumer接口,即提供了生产者和消费者的所有功能。并且通过创建守护线程实现了TTL功能。过期监听策略分为两种(ExpirationListenerPolicy):

  1. 单线程遍历所有topic的所有消息
  2. 每个topic并发遍历

前者适合主题少、总消息数少的情况,后者适合主题较多、消息总数较多的情况。

使用工厂模式,主要配置属性有:

	/**
     * 默认过期时间:ms,对所有消息设置,-1则表示消息均为永久性消息,除非消费者取出,否则将会一直存在。谨慎使用!
     */
    private long defaultExpiringTime = -1;

    /**
     * 是否允许设置单个消息的过期时间
     */
    private boolean isAllowSetEntryExpired = false;

    /**
     * 允许的最大主题数
     */
    private int allowMaxTopicCount = 255;

    /**
     * 允许每个队列的最大消息数
     */
    private int allowMaxMessageCountPerQueue;

    /**
     * 过期监听器的策略
     */
    private ExpirationListenerPolicy listenerPolicy;

通过修改属性值可以构建出单播和多播的生产者消费者。

一个消息容器可供多个生产者和消费者使用。

提供topic过期功能,当消费者订阅后会统计一个订阅数目,当消费者取消订阅时如果当前topic的订阅数和消息堆积数为0,若指定时间内没有生产者生产消息将会移除该主题。

MessageProducerMessageProducer为生产者,实现了IProducer接口(其实最终实现来自于IMessageQueueContext类),其提供如图上的方法。构建时需要指定所使用的消息容器。

MessageConsumerMessageConsumer为消费者,实现了IConsumer接口(其实最终实现来自于IMessageQueueContext类)。对于消息消费强烈建议不要使用如下参数列表String topic的方法,这个方法将不能使用消息容器提供的topic过期功能,建议先订阅指定队列,再从该队列获取消息,生产者对于队列订阅也提供了阻塞功能。

拓展

消息容器的队列可采用多种实现,如采用DelayQueue可以实现死信队列。

完整源代码

IMessageQueueContext

import java.util.List;

/**
 * 消息上下文抽象接口
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/20 9:57
 */
public interface IMessageQueueContext<M> {

    int length(String topic);

    /**
     * 使得消息立即过期
     *
     * @param topic 主题
     * @return void
     * @throws ErrorExpiredException 过期时发生异常抛出
     * @author Huang Yongxiang
     * @date 2022/3/20 11:31
     */
    void expire(String topic, MessageEntry<M> messageEntry) throws ErrorExpiredException;

    /**
     * 摧毁消息容器并启动垃圾清理
     *
     * @return void
     * @author Huang Yongxiang
     * @date 2022/3/22 14:33
     */
    void destroy();

    /**
     * 添加一个消息发布监听器,监听器的操作不会影响消息的正常发布
     *
     * @param topic    要添加监听器的主题
     * @param listener 监听器
     * @return void
     * @author Huang Yongxiang
     * @date 2022/11/7 9:43
     */
    void addMessagePublishedListener(String topic, MessagePublishedListener<M> listener);

    /**
     * 移除主题下的所有消息发布监听器
     *
     * @param topic 要移除的主题
     * @return java.util.List<com.example.autojob.skeleton.framework.mq.MessagePublishedListener < M>>
     * 移除的监听器列表,如果移除失败或不存在,返回空列表
     * @author Huang Yongxiang
     * @date 2022/11/7 9:53
     */
    List<MessagePublishedListener<M>> removeAllMessagePublishedListener(String topic);

    boolean removeMessagePublishedListener(String topic, String listenerName);

    /**
     * 添加一个消息过期监听器
     *
     * @param topic    主题
     * @param listener 监听器
     * @return void
     * @author Huang Yongxiang
     * @date 2022/11/7 9:54
     */
    void addMessageExpiredListener(String topic, MessageExpiredListener<M> listener);

    /**
     * 移除所有的消息过期监听器
     *
     * @param topic 主题
     * @return java.util.List<com.example.autojob.skeleton.framework.mq.MessageExpiredListener < M>>
     * @author Huang Yongxiang
     * @date 2022/11/7 9:55
     */
    List<MessageExpiredListener<M>> removeAllMessageExpiredListener(String topic);

    boolean removeMessageExpiredListener(String topic, String listenerName);
}

IConsumer

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 消费者抽象接口
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/21 14:20
 */
public interface IConsumer<M> {
    /**
     * 阻塞的获取一条消息,可以决定是否将该消息取出,即移出队列
     *
     * @param topic     主题
     * @param isTakeout 是否移出队列,当为false时该方法将退化成非阻塞的
     * @return M
     * @author Huang Yongxiang
     * @date 2022/3/20 10:55
     */
    M takeMessageBlock(final String topic, final boolean isTakeout);

    M takeMessageNoBlock(final String topic, final boolean isTakeout);

    List<M> takeAllMessageNoBlock(final String topic, final boolean isTakeout);

    /**
     * 取出符合正则表达式的所有主题的第一条消息,该方法为阻塞方法,某个主题如果不存在消息将会阻塞等待
     *
     * @param regexTopic 正则表达式
     * @param isTakeout  是否取出消息
     * @return java.util.List<M>
     * @author Huang Yongxiang
     * @date 2022/11/7 9:58
     */
    List<M> takeMessageByRegexTopicBlock(final String regexTopic, final boolean isTakeout);

    List<M> takeMessageByRegexTopicNoBlock(final String regexTopic, final boolean isTakeout);


    MessageQueue<MessageEntry<M>> subscriptionMessage(String topic);

    /**
     * 阻塞的尝试订阅指定消息队列,如果存在则立即返回,否则将会等待指定时长,若期间创建则会立即返回,否则等
     * 到结束返回null
     *
     * @param topic 主题
     * @param wait  要阻塞获取的时长
     * @param unit  wait的时间单位
     * @return com.example.autojob.skeleton.framework.mq.MessageQueue<com.example.autojob.skeleton.framework.mq.MessageEntry < M>>
     * @author Huang Yongxiang
     * @date 2022/3/22 14:32
     */
    MessageQueue<MessageEntry<M>> subscriptionMessage(String topic, long wait, TimeUnit unit);

    void unsubscribe(String topic);

    void unsubscribe(String topic, long wait, TimeUnit unit);
}

IProducer

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 消息消费者抽象接口
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/21 14:19
 */
public interface IProducer<M> {

    /**
     * 注册一个消息队列
     *
     * @param topic 主题名
     * @return boolean
     * @author Huang Yongxiang
     * @date 2022/3/20 9:59
     */
    boolean registerMessageQueue(String topic);

    /**
     * 注册一个消息队列,队列的数据结构由参数指定
     *
     * @param topic 主题
     * @param queue 队列数据结构类型
     * @return boolean
     * @author JingGe(* ^ ▽ ^ *)
     * @date 2023/5/26 10:05
     */
    boolean registerMessageQueue(String topic, BlockingQueue<MessageEntry<M>> queue);

    boolean hasTopic(String topic);

    boolean hasRegexTopic(String regexTopic);

    boolean removeMessageQueue(String topic);

    /**
     * 非阻塞的发布一条消息,当容量已满时立即返回
     *
     * @param message 消息内容
     * @param topic   队列主题
     * @return boolean
     * @author Huang Yongxiang
     * @date 2022/3/20 9:59
     */
    boolean publishMessageNoBlock(final M message, final String topic);

    /**
     * 非阻塞的发布一条消息,同时指定其过期时间,当容量已满时立即返回
     *
     * @param message      消息内容
     * @param topic        主题
     * @param expiringTime 过期时长
     * @param unit         时间单位
     * @return boolean
     * @author Huang Yongxiang
     * @date 2022/3/20 10:09
     */
    boolean publishMessageNoBlock(final M message, final String topic, final long expiringTime, final TimeUnit unit);

    /**
     * 阻塞的发布一条消息,当容量已满时等待空出
     *
     * @param message 消息内容
     * @param topic   队列主题
     * @return boolean
     * @author Huang Yongxiang
     * @date 2022/3/20 10:03
     */
    boolean publishMessageBlock(final M message, final String topic);

    boolean publishMessageBlock(final M message, final String topic, final long expiringTime, final TimeUnit unit);
}

ErrorExpiredException

/**
 * 消息过期异常类
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/20 11:32
 */
public class ErrorExpiredException extends RuntimeException{
    public ErrorExpiredException() {
        super();
    }

    public ErrorExpiredException(String message) {
        super(message);
    }

    public ErrorExpiredException(String message, Throwable cause) {
        super(message, cause);
    }

    public ErrorExpiredException(Throwable cause) {
        super(cause);
    }

    protected ErrorExpiredException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

ExpirationListenerPolicy

/**
 * 过期策略
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/21 9:30
 */
public enum ExpirationListenerPolicy {
    /**
     * 单线程监听过期
     */
    SINGLE_THREAD,
    /**
     * 按照主题并发监听过期,总消息数目过多时采取该方式可以使得效率更高,注意topic过多会占用大量线程资源
     */
    TOPIC_CONCURRENCY
}

MessageConsumer

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/21 12:53
 */
public class MessageConsumer<M> implements IConsumer<M> {
    private MessageQueueContext<M> messageQueueContext;

    public MessageConsumer(MessageQueueContext<M> messageQueueContext) {
        this.messageQueueContext = messageQueueContext;
    }


    @Override
    public M takeMessageBlock(String topic, boolean isTakeout) {
        if (messageQueueContext == null) {
            throw new NullPointerException("消息容器为空");
        }
        return messageQueueContext.takeMessageBlock(topic, isTakeout);
    }

    @Override
    public M takeMessageNoBlock(String topic, boolean isTakeout) {
        if (messageQueueContext == null) {
            throw new NullPointerException("消息容器为空");
        }
        return messageQueueContext.takeMessageNoBlock(topic, isTakeout);
    }

    @Override
    public List<M> takeAllMessageNoBlock(String topic, boolean isTakeout) {
        return messageQueueContext.takeAllMessageNoBlock(topic, isTakeout);
    }

    @Override
    public List<M> takeMessageByRegexTopicBlock(String regexTopic, boolean isTakeout) {
        return messageQueueContext.takeMessageByRegexTopicBlock(regexTopic, isTakeout);
    }

    @Override
    public List<M> takeMessageByRegexTopicNoBlock(String regexTopic, boolean isTakeout) {
        return messageQueueContext.takeMessageByRegexTopicNoBlock(regexTopic, isTakeout);
    }

    @Override
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic) {
        return messageQueueContext.subscriptionMessage(topic);
    }

    @Override
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic, long wait, TimeUnit unit) {
        return messageQueueContext.subscriptionMessage(topic, wait, unit);
    }

    @Override
    public void unsubscribe(String topic) {
        messageQueueContext.unsubscribe(topic);
    }

    @Override
    public void unsubscribe(String topic, long wait, TimeUnit unit) {
        messageQueueContext.unsubscribe(topic, wait, unit);
    }

    public MessageQueueContext<M> getMessageQueueContext() {
        return messageQueueContext;
    }

    public MessageConsumer<M> setMessageQueueContext(MessageQueueContext<M> messageQueueContext) {
        this.messageQueueContext = messageQueueContext;
        return this;
    }
}

MessageProducer

import lombok.Data;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 生产者
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/21 12:54
 */
@Data
public class MessageProducer<M> implements IProducer<M> {
    private MessageQueueContext<M> queueContext;

    public MessageProducer(final MessageQueueContext<M> messageQueueContext) {
        this.queueContext = messageQueueContext;
    }


    public MessageQueueContext<M> getMessageQueueContext() {
        return queueContext;
    }


    @Override
    public boolean registerMessageQueue(String topic) {
        return queueContext.registerMessageQueue(topic);
    }

    @Override
    public boolean registerMessageQueue(String topic, BlockingQueue<MessageEntry<M>> queue) {
        return queueContext.registerMessageQueue(topic, queue);
    }

    @Override
    public boolean hasTopic(String topic) {
        return queueContext.hasTopic(topic);
    }

    @Override
    public boolean hasRegexTopic(String regexTopic) {
        return queueContext.hasRegexTopic(regexTopic);
    }

    @Override
    public boolean removeMessageQueue(String topic) {
        return queueContext.removeMessageQueue(topic);
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic) {
        return queueContext.publishMessageNoBlock(message, topic);
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        return queueContext.publishMessageNoBlock(message, topic, expiringTime, unit);
    }

    @Override
    public boolean publishMessageBlock(M message, String topic) {
        return queueContext.publishMessageBlock(message, topic);
    }

    @Override
    public boolean publishMessageBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        return queueContext.publishMessageBlock(message, topic, expiringTime, unit);
    }

    public MessageProducer<M> setMessageQueueContext(MessageQueueContext<M> queueContext) {
        this.queueContext = queueContext;
        return this;
    }
}

MessageQueueContext

import util.id.IdGenerator;
import util.RegexUtil;
import util.StringUtils;
import util.thread.NamedThreadFactory;
import util.thread.ScheduleTaskUtil;
import util.thread.SyncHelper;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
 * 内存消息队列的context
 *
 * @Auther Huang Yongxiang
 * @Date 2022/03/18 17:14
 */
@Slf4j
public class MessageQueueContext<M> implements IMessageQueueContext<M>, IProducer<M>, IConsumer<M> {

    /**
     * 默认过期时间:ms,对所有消息设置,-1则表示消息均为永久性消息,除非消费者取出,否则将会一直存在。谨慎使用!
     */
    private long defaultExpiringTime = -1;

    /**
     * 是否允许设置单个消息的过期时间
     */
    private boolean isAllowSetEntryExpired = false;

    /**
     * 允许的最大主题数
     */
    private int allowMaxTopicCount = 255;

    /**
     * 允许每个队列的最大消息数
     */
    private int allowMaxMessageCountPerQueue;

    /**
     * 过期监听器的策略
     */
    private ExpirationListenerPolicy listenerPolicy;

    //存储消息的数据结构
    private Map<String, MessageQueue<MessageEntry<M>>> messageQueues;

    private final Map<String, List<MessagePublishedListener<M>>> publishedListenersMap = new ConcurrentHashMap<>();

    private final Map<String, List<MessageExpiredListener<M>>> expiredListenersMap = new ConcurrentHashMap<>();

    private boolean isOpenListener = false;

    /**
     * 守护线程
     */
    private ScheduledExecutorService executorService;

    /**
     * 各个主题的订阅数
     */
    private Map<String, AtomicLong> subscriptionCount;

    private ThreadPoolExecutor concurrencyThreads;


    public static Builder<Object> builder() {
        return new Builder<>();
    }

    private MessageQueueContext() {

    }

    @PostConstruct
    public void init() {

    }

    @Override
    public boolean registerMessageQueue(String topic) {
        return registerMessageQueue(topic, new LinkedBlockingQueue<>(allowMaxMessageCountPerQueue));
    }

    @Override
    public boolean registerMessageQueue(String topic, BlockingQueue<MessageEntry<M>> queue) {
        if (StringUtils.isEmpty(topic)) {
            log.error("创建队列失败,主题为空");
            return false;
        }
        if (messageQueues.containsKey(topic)) {
            return false;
        }
        if (messageQueues.size() >= allowMaxTopicCount) {
            log.error("当前消息容器最大支持{}个主题", allowMaxTopicCount);
            return false;
        }
        try {
            MessageQueue<MessageEntry<M>> messageQueue = new MessageQueue<>(queue);
            messageQueues.put(topic, messageQueue);
            if (!isOpenListener) {
                synchronized (MessageQueueContext.class) {
                    if (!isOpenListener) {
                        if (listenerPolicy == ExpirationListenerPolicy.SINGLE_THREAD) {
                            scheduleExpiringCheckSingleThread();
                        } else {
                            scheduleExpiringCheckTopicConcurrency();
                        }
                        isOpenListener = true;
                    }
                }
            }
            if (subscriptionCount == null) {
                subscriptionCount = new ConcurrentHashMap<>();
            }
            subscriptionCount.put(topic, new AtomicLong(0));
            return true;
        } catch (Exception e) {
            log.error("创建队列发生异常:{}", e.getMessage());
        }
        return false;
    }

    @Override
    public boolean hasTopic(String topic) {
        return messageQueues.containsKey(topic);
    }

    @Override
    public boolean hasRegexTopic(String regexTopic) {
        return messageQueues
                .keySet()
                .stream()
                .anyMatch(topic -> RegexUtil.isMatch(topic, regexTopic));
    }

    @Override
    public boolean removeMessageQueue(String topic) {
        try {
            messageQueues.remove(topic);
            return true;
        } catch (Exception e) {
            log.error("移除消息队列时发生异常:{}", e.getMessage());
        }
        return false;
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        if (!isAllowSetEntryExpired) {
            log.error("不允许设置单个消息的过期时间");
            return false;
        }
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败,所要发布到的队列:{}不存在", topic);
            return false;
        }
        if (expiringTime <= 0 || unit == null) {
            log.error("非法过期时间");
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(unit.toMillis(expiringTime) + System.currentTimeMillis());
        if (messageQueues
                .get(topic)
                .publishMessage(messageEntry)) {
            executeMessagePublishedListeners(topic, message);
            return true;
        }
        return false;
    }

    @Override
    public boolean publishMessageBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        if (!isAllowSetEntryExpired) {
            log.error("不允许设置单个消息的过期时间");
            return false;
        }
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败,所要发布到的队列:{}不存在", topic);
            return false;
        }
        if (expiringTime <= 0 || unit == null) {
            log.error("非法过期时间");
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(unit.toMillis(expiringTime) + System.currentTimeMillis());
        try {
            if (messageQueues
                    .get(topic)
                    .publishMessageSync(messageEntry)) {
                executeMessagePublishedListeners(topic, message);
                return true;
            }
        } catch (InterruptedException e) {
            log.warn("发布可阻塞消息发生异常,等待时被异常占用:{}", e.getMessage());
        }
        return false;
    }


    /**
     * 阻塞的获取一条消息,可以决定是否将该消息取出,即移出队列,当多播时最好不要移出队列
     *
     * @param topic     主题
     * @param isTakeout 是否移出队列,当为false时该方法将退化成非阻塞的
     * @return M
     * @author Huang Yongxiang
     * @date 2022/3/20 10:55
     */
    @Override
    public M takeMessageBlock(String topic, boolean isTakeout) {
        if (!messageQueues.containsKey(topic)) {
            return null;
        }
        if (isTakeout) {
            try {
                return messageQueues
                        .get(topic)
                        .takeMessageSync().message;
            } catch (InterruptedException e) {
                log.warn("可阻塞获取消息发生异常,等待时被异常占用:{}", e.getMessage());
            }
            return null;
        }
        return messageQueues
                .get(topic)
                .readMessage().message;
    }

    @Override
    public M takeMessageNoBlock(String topic, boolean isTakeout) {
        if (!hasTopic(topic)) {
            return null;
        }
        if (messageQueues
                .get(topic)
                .length() == 0) {
            return null;
        }
        if (isTakeout) {
            return messageQueues
                    .get(topic)
                    .takeMessage().message;
        }
        return messageQueues
                .get(topic)
                .readMessage().message;
    }

    @Override
    public List<M> takeAllMessageNoBlock(String topic, boolean isTakeout) {
        if (!hasTopic(topic)) {
            return Collections.emptyList();
        }
        if (messageQueues
                .get(topic)
                .length() == 0) {
            return Collections.emptyList();
        }
        List<M> messages = new LinkedList<>();
        try {
            if (!isTakeout) {
                return messageQueues.get(topic).messageQueue
                        .stream()
                        .map(item -> item.message)
                        .collect(Collectors.toList());
            } else {
                while (true) {
                    M message = takeMessageNoBlock(topic, true);
                    if (message != null) {
                        messages.add(message);
                    } else {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("获取全部消息时发生异常:{}", e.getMessage());
            return Collections.emptyList();
        }
        return messages;
    }

    @Override
    public List<M> takeMessageByRegexTopicBlock(String regexTopic, boolean isTakeout) {
        if (!hasRegexTopic(regexTopic)) {
            return null;
        }
        List<String> topics = messageQueues
                .keySet()
                .stream()
                .filter(topic -> RegexUtil.isMatch(topic, regexTopic))
                .collect(Collectors.toList());
        log.debug("获取到正则主题:{}的匹配主题{}个", regexTopic, topics.size());
        if (topics.size() == 0) {
            return Collections.emptyList();
        }
        List<M> messages = new LinkedList<>();
        for (String topic : topics) {
            messages.add(takeMessageBlock(topic, isTakeout));
        }
        return messages;
    }

    @Override
    public List<M> takeMessageByRegexTopicNoBlock(String regexTopic, boolean isTakeout) {
        if (!hasRegexTopic(regexTopic)) {
            return null;
        }
        List<String> topics = messageQueues
                .keySet()
                .stream()
                .filter(topic -> RegexUtil.isMatch(topic, regexTopic))
                .collect(Collectors.toList());
        log.debug("获取到正则主题:{}的匹配主题{}个", regexTopic, topics.size());
        if (topics.size() == 0) {
            return null;
        }
        List<M> messages = new LinkedList<>();
        for (String topic : topics) {
            M m = takeMessageNoBlock(topic, isTakeout);
            if (m == null) {
                continue;
            }
            messages.add(m);
        }
        return messages;
    }

    @Override
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic) {
        MessageQueue<MessageEntry<M>> messageQueue = messageQueues.get(topic);
        if (messageQueue != null && subscriptionCount != null) {
            AtomicLong atomicLong = subscriptionCount.get(topic);
            atomicLong.incrementAndGet();
        }
        return messageQueue;
    }

    @Override
    @SuppressWarnings("unchecked")
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic, long wait, TimeUnit unit) {
        MessageQueue<MessageEntry<M>> messageQueue = subscriptionMessage(topic);
        if (messageQueue != null) {
            return messageQueue;
        }
        //进行阻塞获取
        ScheduledFuture<Object> future = ScheduleTaskUtil
                .build(false, "subscriptionBlock")
                .EOneTimeTask(() -> {
                    long blockTime = unit.toMillis(wait);
                    int i = 0;
                    try {
                        do {
                            if (hasTopic(topic)) {
                                return messageQueues.get(topic);
                            }
                            Thread.sleep(1);
                        } while (i++ <= blockTime);
                        return null;
                    } catch (Exception e) {
                        log.error("阻塞订阅时发生异常:{}", e.getMessage());
                    }
                    return null;
                }, 1, TimeUnit.MILLISECONDS);
        try {
            return (MessageQueue<MessageEntry<M>>) future.get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("阻塞订阅时发生异常:{}", e.getMessage());
        }
        return null;
    }

    @Override
    public void unsubscribe(String topic) {
        unsubscribe(topic, 5, TimeUnit.SECONDS);
    }

    @Override
    public void unsubscribe(String topic, long wait, TimeUnit unit) {
        if (subscriptionCount != null) {
            AtomicLong atomicLong = subscriptionCount.get(topic);
            atomicLong.decrementAndGet();
            if (atomicLong.get() < 0) {
                atomicLong.set(0);
            }
            //当有队列取消订阅,且目前消息数为0,则对指定队列监视5秒,5秒后依然没有生产者发布消息则直接移除主题
            if (atomicLong.get() == 0 && length(topic) == 0) {
                long w = unit.toMillis(wait);
                log.debug("主题为{}的消息队列目前订阅数为0且积压消息为0,当{}ms后若无生产者发布消息将自动删除该主题队列", topic, w);
                Thread thread = new Thread(() -> {
                    try {
                        int i = 0;
                        boolean flag = true;
                        do {
                            if (length(topic) > 0) {
                                flag = false;
                                break;
                            }
                            SyncHelper.sleepQuietly(1, TimeUnit.MILLISECONDS);
                        } while (i++ <= w);
                        if (flag) {
                            log.debug("主题:{}自动删除完成", topic);
                            removeMessageQueue(topic);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                thread.setDaemon(true);
                thread.setName("unsubListener");
                thread.start();
            }
        }
    }

    @Override
    public int length(String topic) {
        if (messageQueues.containsKey(topic)) {
            return messageQueues
                    .get(topic)
                    .length();
        }
        return 0;
    }

    @Override
    public void expire(String topic, MessageEntry<M> messageEntry) throws ErrorExpiredException {
        if (messageEntry == null || !messageQueues.containsKey(topic)) {
            throw new IllegalArgumentException("参数有误,ID非法或主题不存在");
        }
        try {
            boolean flag = messageQueues
                    .get(topic)
                    .remove(messageEntry);
            if (!flag) {
                throw new ErrorExpiredException("移出失败");
            }
        } catch (Exception e) {
            log.error("过期时发生异常");
            throw new ErrorExpiredException(e.getMessage());
        }
    }

    @Override
    public void destroy() {
        messageQueues = null;
        if (isOpenListener) {
            try {
                executorService.shutdown();
                isOpenListener = false;
            } catch (Exception e) {
                log.error("关闭守护线程发生异常:{}", e.getMessage());
            }
        }
        System.gc();
    }

    @Override
    public void addMessagePublishedListener(String topic, MessagePublishedListener<M> listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        if (hasTopic(topic)) {
            if (!this.publishedListenersMap.containsKey(topic)) {
                synchronized (publishedListenersMap) {
                    if (!this.publishedListenersMap.containsKey(topic)) {
                        this.publishedListenersMap.put(topic, new ArrayList<>());
                    }
                }
            }
            publishedListenersMap
                    .get(topic)
                    .add(listener);
        } else {
            throw new IllegalArgumentException("不存在相关主题:" + topic);
        }
    }

    @Override
    public List<MessagePublishedListener<M>> removeAllMessagePublishedListener(String topic) {
        if (publishedListenersMap.containsKey(topic)) {
            return publishedListenersMap.remove(topic);
        }
        return null;
    }

    @Override
    public boolean removeMessagePublishedListener(String topic, String listenerName) {
        if (publishedListenersMap.containsKey(topic) && !StringUtils.isEmpty(listenerName)) {
            return publishedListenersMap
                    .get(topic)
                    .removeIf(listener -> listener
                            .listenerName()
                            .equals(listenerName));
        }
        return false;
    }

    @Override
    public void addMessageExpiredListener(String topic, MessageExpiredListener<M> listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        if (hasTopic(topic)) {
            if (!this.expiredListenersMap.containsKey(topic)) {
                synchronized (expiredListenersMap) {
                    if (!this.expiredListenersMap.containsKey(topic)) {
                        this.expiredListenersMap.put(topic, new ArrayList<>());
                    }
                }
            }
            expiredListenersMap
                    .get(topic)
                    .add(listener);
        } else {
            throw new IllegalArgumentException("不存在相关主题:" + topic);
        }
    }

    @Override
    public boolean removeMessageExpiredListener(String topic, String listenerName) {
        if (expiredListenersMap.containsKey(topic) && !StringUtils.isEmpty(listenerName)) {
            return expiredListenersMap
                    .get(topic)
                    .removeIf(listener -> listener
                            .listenerName()
                            .equals(listenerName));
        }
        return false;
    }

    @Override
    public List<MessageExpiredListener<M>> removeAllMessageExpiredListener(String topic) {
        if (expiredListenersMap.containsKey(topic)) {
            return expiredListenersMap.remove(topic);
        }
        return null;
    }


    /**
     * <p>根据迭代器位置来使得一个元素过期,由于迭代器的弱一致性,多线程环境下可能会出现使用迭代器时
     * 发生插入\删除操作,由于该消息队列对于操作严格从队尾执行,因此对于插入修改能检测到,但是由于
     * 删除从队首进行,可能发生当迭代器获取下一个元素时为空,这时应该立即停止遍历,等待下一次</p>
     *
     * @param iterator 迭代器
     * @return void
     * @author Huang Yongxiang
     * @date 2022/3/21 11:42
     */
    public void expire(Iterator<MessageEntry<M>> iterator) throws ErrorExpiredException {
        if (iterator == null) {
            throw new ErrorExpiredException("过期失败,迭代器为空");
        }
        try {
            iterator.remove();
        } catch (UnsupportedOperationException e) {
            throw new ErrorExpiredException("过期失败,该迭代器不支持移除操作");
        } catch (IllegalStateException e) {
            throw new ErrorExpiredException("过期失败,可能发生删除操作");
        }
    }

    private void executeMessagePublishedListeners(String topic, M message) {
        if (publishedListenersMap.containsKey(topic)) {
            publishedListenersMap
                    .get(topic)
                    .forEach(listener -> {
                        try {
                            listener.onMessagePublished(message, messageQueues.get(topic));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
        }
    }

    private void executeMessageExpiredListeners(String topic, M message) {
        if (expiredListenersMap.containsKey(topic)) {
            expiredListenersMap
                    .get(topic)
                    .forEach(listener -> {
                        try {
                            listener.onMessageExpired(message);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
        }
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic) {
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败,所要发布到的队列主题:{}不存在", topic);
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(defaultExpiringTime > 0 ? defaultExpiringTime + System.currentTimeMillis() : -1);
        if (messageQueues
                .get(topic)
                .publishMessage(messageEntry)) {
            executeMessagePublishedListeners(topic, message);
            return true;
        }
        return false;
    }

    public boolean publishMessageBlock(M message, String topic) {
        if (!messageQueues.containsKey(topic)) {
            log.error("发布阻塞消息失败,所要发布到的队列主题:{}不存在", topic);
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        try {
            MessageEntry<M> messageEntry = new MessageEntry<>();
            messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
            messageEntry.setMessage(message);
            messageEntry.setExpiringTime(defaultExpiringTime > 0 ? defaultExpiringTime + System.currentTimeMillis() : -1);
            if (messageQueues
                    .get(topic)
                    .publishMessageSync(messageEntry)) {
                executeMessagePublishedListeners(topic, message);
            } else {
                return false;
            }
        } catch (InterruptedException e) {
            log.warn("发布可阻塞消息发生异常,等待时被异常占用:{}", e.getMessage());
        }
        return false;
    }

    private void scheduleExpiringCheckSingleThread() {
        executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "ExpiringCheckSingleThread");
            thread.setDaemon(true);
            return thread;
        });
        executorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, MessageQueue<MessageEntry<M>>> entry : messageQueues.entrySet()) {
                for (Iterator<MessageEntry<M>> it = entry
                        .getValue()
                        .iterator(); it.hasNext(); ) {
                    MessageEntry<M> message = it.next();
                    //如果达到过期时间则通知其过期
                    if (message.expiringTime >= 0 && message.expiringTime <= System.currentTimeMillis()) {
                        try {
                            log.debug("messageId:{},消息内容:{},已过期", message.messageId, message.message);
                            expire(it);
                            executeMessageExpiredListeners(entry.getKey(), message.message);
                        } catch (ErrorExpiredException e) {
                            log.error("主题:{},消息ID:{}过期失败:{}", entry.getKey(), message.getMessageId(), e.getMessage());
                        }
                    }
                }
            }
        }, 1, 1, TimeUnit.MILLISECONDS);
    }

    private void scheduleExpiringCheckTopicConcurrency() {
        executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "ExpiringCheckTopicConcurrencyThread");
            thread.setDaemon(true);
            return thread;
        });
        executorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, MessageQueue<MessageEntry<M>>> entry : messageQueues.entrySet()) {
                Runnable work = () -> {
                    try {
                        for (Iterator<MessageEntry<M>> it = entry
                                .getValue()
                                .iterator(); it.hasNext(); ) {
                            MessageEntry<M> message = it.next();
                            //如果达到过期时间则通知其过期
                            if (message.expiringTime >= 0 && message.expiringTime <= System.currentTimeMillis()) {
                                try {
                                    log.debug("messageId:{}已过期", message.messageId);
                                    expire(it);
                                    executeMessageExpiredListeners(entry.getKey(), message.message);
                                } catch (ErrorExpiredException e) {
                                    log.error("主题:{},消息ID:{}过期失败:{}", entry.getKey(), message.getMessageId(), e.getMessage());
                                }
                            }
                        }
                    } catch (Exception ignored) {
                    }
                };
                concurrencyThreads.submit(work);
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }


    @Setter
    @Accessors(chain = true)
    public static class Builder<M> {
        /**
         * 默认过期时间,对所有消息设置,-1则表示消息均为永久性消息,除非消费者取出,否则将会一直存在。谨慎使用!
         */
        private long defaultExpiringTime = -1;

        /**
         * 是否允许设置单个消息的过期时间
         */
        private boolean isAllowSetEntryExpired = false;

        /**
         * 允许的最大主题数
         */
        private int allowMaxTopicCount = 255;

        /**
         * 允许每个队列的最大消息数
         */
        private int allowMaxMessageCountPerQueue = 1000;

        /**
         * 过期监听器的策略
         */
        private ExpirationListenerPolicy listenerPolicy = ExpirationListenerPolicy.SINGLE_THREAD;

        public Builder<M> setDefaultExpiringTime(long defaultExpiringTime, TimeUnit unit) {
            if (unit == TimeUnit.MICROSECONDS) {
                log.error("最小支持毫秒级");
                throw new IllegalArgumentException("最小支持毫秒级");
            }
            this.defaultExpiringTime = unit.toMillis(defaultExpiringTime);
            return this;
        }


        public <M1 extends M> MessageQueueContext<M1> build() {
            MessageQueueContext<M1> messageQueueContext = new MessageQueueContext<>();
            messageQueueContext.messageQueues = new ConcurrentHashMap<>(this.allowMaxTopicCount);
            messageQueueContext.isAllowSetEntryExpired = this.isAllowSetEntryExpired;
            messageQueueContext.allowMaxMessageCountPerQueue = this.allowMaxMessageCountPerQueue;
            messageQueueContext.defaultExpiringTime = this.defaultExpiringTime;
            messageQueueContext.allowMaxTopicCount = this.allowMaxTopicCount;
            messageQueueContext.listenerPolicy = this.listenerPolicy;
            if (this.listenerPolicy == ExpirationListenerPolicy.TOPIC_CONCURRENCY) {
                messageQueueContext.concurrencyThreads = new ThreadPoolExecutor(3, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), new NamedThreadFactory("messageQueueExpire"));
            }
            return messageQueueContext;
        }

    }


}

IMessageListener

/**
 * 消息监听器抽象接口
 *
 * @Author Huang Yongxiang
 * @Date 2022/11/04 17:04
 * @Email 1158055613@qq.com
 */
public interface IMessageListener {
    String listenerName();
}

MessagePublishedListener

/**
 * 消息发布监听器,当有消息发布到消息队列时将会执行相关逻辑
 *
 * @Author Huang Yongxiang
 * @Date 2022/11/02 14:52
 * @Email 1158055613@qq.com
 */
public interface MessagePublishedListener<M> extends IMessageListener {
    void onMessagePublished(M message, MessageQueue<MessageEntry<M>> queue);

    default String listenerName() {
        return "defaultMessagePublishedListener";
    }
}

MessageExpiredListener

/**
 * 消息过期监听器
 *
 * @Author Huang Yongxiang
 * @Date 2022/11/02 15:18
 * @Email 1158055613@qq.com
 */
public interface MessageExpiredListener<M> extends IMessageListener {
    void onMessageExpired(M message);

    default String listenerName() {
        return "defaultMessageExpiredListener";
    }
}

MessageEntry

/**
 * 消息条目
 *
 * @author JingGe(* ^ ▽ ^ *)
 * @date 2023-05-26 9:45
 * @email 1158055613@qq.com
 */
public class MessageEntry<M> {
    long messageId;
    M message;
    long expiringTime;

    public MessageEntry(long messageId, M message, long expiringTime) {
        this.messageId = messageId;
        this.message = message;
        this.expiringTime = expiringTime;
    }

    public MessageEntry() {
    }

    MessageEntry<M> setMessageId(long messageId) {
        this.messageId = messageId;
        return this;
    }

    MessageEntry<M> setMessage(M message) {
        this.message = message;
        return this;
    }

    MessageEntry<M> setExpiringTime(long expiringTime) {
        this.expiringTime = expiringTime;
        return this;
    }

    public long getMessageId() {
        return messageId;
    }

    public M getMessage() {
        return message;
    }

    public long getExpiringTime() {
        return expiringTime;
    }
}

MessageQueue

import lombok.extern.slf4j.Slf4j;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;

/**
 * 消息队列
 *
 * @author JingGe(* ^ ▽ ^ *)
 * @date 2023-05-26 9:44
 * @email 1158055613@qq.com
 */
@Slf4j
public class MessageQueue<M> {
    final BlockingQueue<M> messageQueue;

    public MessageQueue() {
        messageQueue = new LinkedBlockingQueue<>();
    }


    public MessageQueue(int maxLength) {
        if (maxLength <= 0) {
            throw new IllegalArgumentException("最大容量应该为非负数");
        }
        messageQueue = new LinkedBlockingDeque<>(maxLength);
    }

    public MessageQueue(BlockingQueue<M> messageQueue) {
        this.messageQueue = messageQueue;
    }

    public M takeMessageSync() throws InterruptedException {
        return messageQueue.take();
    }

    public void clear() {
        messageQueue.clear();
    }

    public M takeMessage() {
        return messageQueue.poll();
    }

    public M readMessage() {
        return messageQueue.peek();
    }

    public M tryReadMessage() {
        return messageQueue.element();
    }

    public boolean removeIf(Predicate<? super M> predicate) {
        if (predicate == null) {
            return false;
        }
        return messageQueue.removeIf(predicate);
    }

    public int length() {
        return messageQueue.size();
    }

    public boolean remove(M message) {
        if (message == null) {
            return false;
        }
        return messageQueue.remove(message);
    }

    public Iterator<M> iterator() {
        return messageQueue.iterator();
    }

    public boolean publishMessageSync(M message) throws InterruptedException {
        if (message == null) {
            return false;
        }
        try {
            messageQueue.put(message);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    public boolean publishMessage(M message) {
        if (message == null) {
            return false;
        }
        try {
            return messageQueue.offer(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}
木兰宽松许可证, 第2版 木兰宽松许可证, 第2版 2020年1月 http://license.coscl.org.cn/MulanPSL2 您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束: 0. 定义 “软件”是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。 “贡献”是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。 “贡献者”是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。 “法人实体”是指提交贡献的机构及其“关联实体”。 “关联实体”是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。 1. 授予版权许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。 2. 授予专利许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。 3. 无商标许可 “本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。 4. 分发限制 您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。 5. 免责声明与责任限制 “软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。 6. 语言 “本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。 条款结束 如何将木兰宽松许可证,第2版,应用到您的软件 如果您希望将木兰宽松许可证,第2版,应用到您的新软件,为了方便接收者查阅,建议您完成如下三步: 1, 请您补充如下声明中的空白,包括软件名、软件的首次发表年份以及您作为版权人的名字; 2, 请您在软件包的一级目录下创建以“LICENSE”为名的文件,将整个许可证文本放入该文件中; 3, 请将如下声明文本放入每个源文件的头部注释中。 Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. Mulan Permissive Software License,Version 2 Mulan Permissive Software License,Version 2 (Mulan PSL v2) January 2020 http://license.coscl.org.cn/MulanPSL2 Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions: 0. Definition Software means the program and related documents which are licensed under this License and comprise all Contribution(s). Contribution means the copyrightable work licensed by a particular Contributor under this License. Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License. Legal Entity means the entity making a Contribution and all its Affiliates. Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity. 1. Grant of Copyright License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not. 2. Grant of Patent License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken. 3. No Trademark License No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4. 4. Distribution Restriction You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software. 5. Disclaimer of Warranty and Limitation of Liability THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. 6. Language THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL. END OF THE TERMS AND CONDITIONS How to Apply the Mulan Permissive Software License,Version 2 (Mulan PSL v2) to Your Software To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps: i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner; ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package; iii Attach the statement to the appropriate annotated syntax at the beginning of each source file. Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details.

简介

基于java的内存消息队列,支持TTL,支持单播和多播。 展开 收起
Java
MulanPSL-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/hyxl-520/memory_message_queue.git
git@gitee.com:hyxl-520/memory_message_queue.git
hyxl-520
memory_message_queue
memory_message_queue
master

搜索帮助