领导看了我写的关闭超时订单,让我出门左转!

发表于 2年以前  | 总阅读数:344 次

前几天领导突然宣布几年前停用的电商项目又重新启动了,带着复杂的心情仔细赏阅“儿时”的代码,心中的酸楚只有自己能够体会。

这不,昨天又被领导叫进了“小黑屋”,让我把代码重构下进行升级。看到这么“可爱”的代码,心中一万只“xx马”疾驰而过。

让我最深恶痛觉的就是里边竟然用定时任务实现了“关闭超时订单”的功能,现在想来,哭笑不得。我们先分析一波为什么大家都在抵制用定时任务来实现该功能。

定时任务

关闭超时订单是在创建订单之后的一段时间内未完成支付而关闭订单的操作,该功能一般要求每笔订单的超时时间是一致的。

如果我们使用定时任务来进行该操作,很难把握定时任务轮询的时间间隔:

  • 时间间隔足够小,在误差允许的范围内可以达到我们说的时间一致性问题,但是频繁扫描数据库,执行定时任务,会造成网络IO和磁盘IO的消耗,对实时交易造成一定的冲击;
  • 时间间隔比较大,由于每个订单创建的时间不一致,所以上边的一致性要求很难达到,举例如下:

假设30分钟订单超时自动关闭,定时任务的执行间隔时间为30分钟:

  1. 我们在第5分钟进行下单操作;
  2. 当时间来到第30分钟时,定时任务执行一次,但是我们的订单未满足条件,不执行;
  3. 当时间来到第35分钟时,订单达到关闭条件,但是定时任务未执行,所以不执行;
  4. 当时间来到第60分钟时,开始执行我们的订单关闭操作,而此时,误差达到25分钟。

经此种种,我们需要舍弃该方式。

延时队列

为了满足领导的需求,我便将手伸向了消息队列:RabbitMQ。尽管它本身并没有提供延时队列的功能,但是我们可以利用它的存活时间和死信交换机的特性来间接实现。

首先我们先来简单介绍下什么是存活时间?什么是死信交换机?

存活时间

存活时间的全拼是Time To Live,简称 TTL。它既支持对消息本身进行设置(延迟队列的关键),又支持对队列进行设置(该队列中所有消息存在相同的过期时间)。

  • 对消息本身进行设置:即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的;
  • 对队列进行设置:一旦消息过期,就会从队列中抹去;

如果同时使用这两种方法,那么以过期时间的那个数值为准。当消息达到过期时间还没有被消费,那么该消息就“死了”,我们把它称为 死信 消息。

消息变为死信的条件:

  • 消息被拒绝(basic.reject/basic.nack),并且requeue=false;
  • 消息的过期时间到期了;
  • 队列达到最大长度;

队列设置注意事项

  1. 队列中该属性的设置要在第一次声明队列的时候设置才有效,如果队列一开始已存在且没有这个属性,则要删掉队列再重新声明才可以;
  2. 队列的 ttl 只能被设置为某个固定的值,一旦设置后则不能更改,否则会抛出异常;

死信交换机

死信交换机全拼Dead-Letter-Exchange,简称DLX

当消息在一个队列中变成死信之后,如果这个消息所在的队列设置了x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换机上,这个交换机就称之为死信交换机,与这个死信交换器绑定的队列就是死信队列。

  • x-dead-letter-exchange:出现死信之后将死信重新发送到指定交换机;
  • x-dead-letter-routing-key:出现死信之后将死信重新按照指定的routing-key发送,如果不设置默认使用消息本身的routing-key

死信队列与普通队列的区别就是它的RoutingKeyExchange需要作为参数,绑定到正常的队列上。

实战教学

先来张图感受下我们的整体思路

  1. 生产者发送带有 ttl 的消息放入交换机路由到延时队列中;
  2. 在延时队列中绑定死信交换机与死信转发的routing-key
  3. 等延时队列中的消息达到延时时间之后变成死信转发到死信交换机并路由到死信队列中;
  4. 最后供消费者消费。

我们在[上文] 的基础上进行代码实现:

配置类

@Configuration
public class DelayQueueRabbitConfig {

    public static final String DLX_QUEUE = "queue.dlx";//死信队列
    public static final String DLX_EXCHANGE = "exchange.dlx";//死信交换机
    public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信队列与死信交换机绑定的routing-key

    public static final String ORDER_QUEUE = "queue.order";//订单的延时队列
    public static final String ORDER_EXCHANGE = "exchange.order";//订单交换机
    public static final String ORDER_ROUTING_KEY = "routingkey.order";//延时队列与订单交换机绑定的routing-key

 /**
     * 定义死信队列
     **/
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE,true);
    }

    /**
     * 定义死信交换机
     **/
    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange(DLX_EXCHANGE, true, false);
    }


    /**
     * 死信队列和死信交换机绑定
     * 设置路由键:routingkey.dlx
     **/
    @Bean
    Binding bindingDLX(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
    }


    /**
     * 订单延时队列
     * 设置队列里的死信转发到的DLX名称
     * 设置死信在转发时携带的 routing-key 名称
     **/
    @Bean
    public Queue orderQueue() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DLX_EXCHANGE);
        params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(ORDER_QUEUE, true, false, false, params);
    }

    /**
     * 订单交换机
     **/
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 把订单队列和订单交换机绑定在一起
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
    }
}

发送消息

@RequestMapping("/order")
public class OrderSendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public String sendMessage(){

        String delayTime = "10000";
        //将消息携带路由键值
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                "发送消息!",message->{
            message.getMessageProperties().setExpiration(delayTime);
            return message;
        });
        return "ok";
    }

}

消费消息

@Component
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//监听队列名称
public class OrderMQReciever {

    @RabbitHandler
    public void process(String message){
        System.out.println("OrderMQReciever接收到的消息是:"+ message);
    }
}

测试

通过调用接口,发现10秒之后才会消费消息

问题升级

由于开发环境和测试环境使用的是同一个交换机和队列,所以发送的延时时间都是30分钟。但是为了在测试环境让测试同学方便测试,故手动将测试环境的时间改为了1分钟。

问题复现

接着问题就来了:延时时间为1分钟的消息并没有立即被消费,而是等30分钟的消息被消费完之后才被消费了。至于原因,我们下边再分析,先用代码来给大家复现下该问题。

@GetMapping("/sendManyMessage")
public String sendManyMessage(){
    send("延迟消息睡10秒",10000+"");
    send("延迟消息睡2秒",2000+"");
    send("延迟消息睡5秒",5000+"");
    return "ok";
}

private void send(String msg, String delayTime){
 rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, 
                                  DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                                  msg,message->{
                                      message.getMessageProperties().setExpiration(delayTime);
                                      return message;
                                  });
}

执行结果如下:

OrderMQReciever接收到的消息是:延迟消息睡10秒
OrderMQReciever接收到的消息是:延迟消息睡2秒
OrderMQReciever接收到的消息是:延迟消息睡5秒

原因就是延时队列也满足队列先进先出的特征,当10秒的消息未出队列时,后边的消息不能顺利出队,造成后边的消息阻塞了,未能达到精准延时。

问题解决

我们可以利用x-delay-message插件来解决该问题

消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒)

  1. 生产者发送消息到交换机时,并不会立即进入,而是先将消息持久化到 Mnesia(一个分布式数据库管理系统);
  2. 插件将会尝试确认消息是否过期;
  3. 如果消息过期,消息会通过 x-delayed-type 类型标记的交换机投递至目标队列,供消费者消费;

实践

官网下载:https://www.rabbitmq.com/community-plugins.html

我这边使用的是v3.8.0.ez,将文件下载下来放到服务器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路径下,执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

出现如图所示,代表安装成功。

配置类

@Configuration
public class XDelayedMessageConfig {

    public static final String DIRECT_QUEUE = "queue.direct";//队列
    public static final String DELAYED_EXCHANGE = "exchange.delayed";//延迟交换机
    public static final String ROUTING_KEY = "routingkey.bind";//绑定的routing-key

    /**
     * 定义队列
     **/
    @Bean
    public Queue directQueue(){
        return new Queue(DIRECT_QUEUE,true);
    }

    /**
     * 定义延迟交换机
     * args:根据该参数进行灵活路由,设置为“direct”,意味着该插件具有与直连交换机具有相同的路由行为,
     * 如果想要不同的路由行为,可以更换现有的交换类型如:“topic”
     * 交换机类型为 x-delayed-message
     **/
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 队列和延迟交换机绑定
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }

}

发送消息

@RestController
@RequestMapping("/delayed")
public class DelayedSendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendManyMessage")
    public String sendManyMessage(){

        send("延迟消息睡10秒",10000);
        send("延迟消息睡2秒",2000);
        send("延迟消息睡5秒",5000);
        return "ok";
    }

    private void send(String msg, Integer delayTime){
        //将消息携带路由键值
        rabbitTemplate.convertAndSend(
                XDelayedMessageConfig.DELAYED_EXCHANGE,
                XDelayedMessageConfig.ROUTING_KEY,
                msg,
                message->{
                    message.getMessageProperties().setDelay(delayTime);
                    return message;
                });
    }
}

消费消息

@Component
@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//监听队列名称
public class DelayedMQReciever {


    @RabbitHandler
    public void process(String message){
        System.out.println("DelayedMQReciever接收到的消息是:"+ message);
    }
}

测试

DelayedMQReciever接收到的消息是:延迟消息睡2秒
DelayedMQReciever接收到的消息是:延迟消息睡5秒
DelayedMQReciever接收到的消息是:延迟消息睡10秒

这样我们的问题就顺利解决了。

局限性

延迟的消息存储在一个Mnesia表中,当前节点上只有一个磁盘副本,它们将在节点重启后存活。

虽然触发计划交付的计时器不会持久化,但它将在节点启动时的插件激活期间重新初始化。显然,集群中只有一个预定消息的副本意味着丢失该节点或禁用其上的插件将丢失驻留在该节点上的消息。

该插件的当前设计并不适合延迟消息数量较多的场景(如数万条或数百万条),另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/jxkhvGfYFe43JXP5W6zZeg

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237269次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8108次阅读
 目录