• 企业门户APP
  • 频率限制

    频率限制器是一个用于避免消息推送频率超过推送平台限制的工具包。它被整合到 PushProvider 类中,每个 PushProvider 对象都会创建一个 RateLimiter 对象。

    在推送服务中,RateLimiter 位于此处: 频率限制器所在位置

    频率限制算法

    频率限制器的实际实现比较复杂,但大体流程简单,算法简单叙述如下:

    1. 创建一个普通队列 MQ 存放消息,一个延迟队列 DQ 存放令牌。延迟队列具有 Java 的 DelayQueue 类提供的功能;
    2. 创建 N 个令牌,N = 每个限制周期可推送次数;
    3. 等待来自 MQ 队列的消息,然后取出 1 条消息;
    4. 等待来自 DQ 队列的令牌,直到可以取出 1 个令牌;
    5. 执行推送;
    6. 推送完成之后,向队列 DQ 写入 1 个令牌,延迟时间 = 频率限制周期;
    7. 回到步骤 3。

    该算法可以实现:在任意一个长度为频率限制周期的时间段中,执行的推送次数最多正好等于每个周期的最大次数。

    在实际的实现中,我们不使用 DelayQueue,队列的读取和写入可以批量进行,推送流程可以并发执行。

    开发指南

    频率限制器的主要逻辑位于 org.hzero.hippius.ratelimiter.RateLimiter 中。

    调用 start() 方法会开启推送线程,调用 stop() 方法则会在当前推送线程完成以后关闭推送线程。

    频率限制器不使用依赖注入来创建,所以无法直接注入 Spring 的 Bean。在 org.hzero.hippius.pusher.provider.BasePushProvider 中注入的 SpringContext 用于给频率限制器提供获取 Spring Bean 对象的能力。虽然它会被 IDE 提示未被使用,但您不能删除它。

    org.hzero.hippius.ratelimiter 包中包含几个子包,主要有:

    其中的代码关键步骤和方法都有详细的注释,如需要了解具体实现细节,可以从 RateLimiter 类入手了解整个流程。一般情况下,只需要关心 RateLimiter 中的 start()execute() 方法。

    目前消息队列和令牌桶都是基于 Redis 实现的。将来会加入 RabbitMQ 和 Kafka 的支持。

    execute 方法工作流程如图所示:

    推送整体流程

    令牌桶使用前需要把令牌数量调整到和每周期最大次数一致。相关的逻辑在 org.hzero.hippius.ratelimiter.tokenbucket.RedisTokenBucket 的构造方法中。

    使用方法

    创建

    使用管理器创建:

    @Autowired
    protected RateLimiterManager rateLimiterManager;
    
    RateLimiter rateLimiter = rateLimiterManager.createOrGetRateLimiter("RATE_LIMITER_NAME", config.getInterval(),
            config.getCount(), config.getBatchSize(), true);
    rateLimiter.regExecutor("executorName", this::executor);
    

    createOrGetRateLimiter() 的原型如下:

    /**
     * 获取限流器,如果没有则创建
     * @param name 限流器,队列和令牌的名称,在RabbitMQ/Redis/Kafka中使用
     * @param interval 限流周期
     * @param count 每周期最大次数
     * @param msgBatchSize 每次获取消息的最大数量
     * @param oneTargetOneTokenMode 是否使用每个推送目标消耗一个令牌的模式
     * @return 限流器
     */
    public RateLimiter createOrGetRateLimiter(String name, long interval, long count, long msgBatchSize, boolean oneTargetOneTokenMode)
    

    regExecutor() 的原型如下:

    /**
     * 注册限流执行器
     * @param name 执行器名称
     * @param executor 执行器逻辑
     */
    public void regExecutor(String name, Consumer<ISplittable> executor);
    

    编写一个DTO类,用来存储要传递给执行器的数据。需要包含一个 List 类型的接收者列表参数。以下称为 目标列表

    频率限制主要用于推送和消息发送中。为了实现可以用参数配置接口允许的最大接收者数量,以及按接收者数量计算发送频率,在限流器中传递的数据应该实现 ISplittable<T> 接口。

    /**
     * 必须实现此接口,才能使用 限制单个接口最大目标数量 和 按接收者数量限流的模式
     *
     * 限流器使用此接口来分割接收者列表。在限流请求参数中*必须*要包含一个List属性,表示一组接收者。
     * 然后实现下面定义的接口。
     *
     * 限流器执行逻辑中,直接从上述List属性里取得目标列表,即可得到符合接口要求的数据。
     *
     * 如果无需使用,继承 BaseSplittable 类即可
     *
     * @author 陈文驰 wenchi.chen@hand-china.com
     */
    public interface ISplittable<T> {
        /**
         * 按内容数量切割成多组数据
         *
         * 实现方法:调用 Helper.partition(目标列表, maxCount) 分割目标列表
         * 然后组装成完整的参数 T
         * 加入List中返回
         *
         * @param maxCount 每组包含最大的元素个数
         * @return 切割后的每一组
         */
        List<ISplittable<T>> splitByElementCount(int maxCount);
    
        /**
         * 切割出指定下标前的部分
         *
         * 实现方法:
         * if (index > 目标列表.size()) {
         *     index = 目标列表.size();
         * }
         * List<目标类型> head = 目标列表.subList(0, index);
         * List<目标类型> part = new ArrayList<>(head);
         * head.clear();
         * 复制当前对象,替换目标列表为 part,返回
         *
         * @param index 切割点
         * @return 切下的部分
         */
        ISplittable<T> cut(int index);
    
        /**
         * 获取内容数量
         *
         * 实现方法: return 目标列表.size();
         *
         * @return 内容数量
         */
        int size();
    }
    

    其中,目标列表 应为可变的 List 类型,不需要支持并发写入。在执行器逻辑中,取出 目标列表 的内容来构造消息发送数据即可满足限流要求。

    若需求过于简单,无法实现此接口,请直接继承 BaseSplittable<T> 类,然后将 oneTargetOneTokenMode 设为 false。此时,频率限制参数的最大目标数参数无任何意义。

    然后编写执行器:

    private void executor(ISplittable<自定义DTO> splittable) {
        自定义DTO dto = (自定义DTO) splittable;
        // 在这里写上需要限制频率的逻辑
    }
    

    调用

    当您需要调用执行器逻辑时,使用下面的代码:

    自定义DTO dto = new 自定义DTO();
    dto.setXXXX("xxxx");
    rateLimiter.addMessage("executorName", dto);
    

    这里的 executorName 与之前的 regExecutor(String name, Consumer<ISplittable> executor) 方法中指定的必须完全一致。实际上,你可以添加多个执行器,分别处理不同的逻辑。