Redis与RabbitMQ配合使用多线程(多消费者)处理消息

avatar
作者
筋斗云
阅读量:0

引言

并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。

案例描述与流程

本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。

环境准备

创建模块名为Redis

yml配置文件的编写
server:   port: 9000 spring:   application:     name: redis   datasource:     driver-class-name: com.mysql.jdbc.Driver     url: jdbc:mysql://localhost:3306/user?useSSL=false     username: root     password: 123456    redis:     host: 192.168.136.130     port: 6379     password: 123456     lettuce:       pool:         max-active: 10         max-idle: 10         min-idle: 1         time-between-eviction-runs: 10s    rabbitmq:     host: 192.168.136.130  #MQ地址     port: 5672        #端口     virtual-host: /   #虚拟主机     username: demo   #用户密码     password: 123321     connection-timeout: 1s     template:       retry:  #重试机制         enabled: true         initial-interval: 1000ms         multiplier: 1         max-attempts: 3     publisher-confirm-type: correlated     publisher-returns: true  
Controller Test类的编写

 设置路径为  /testAddsetxAddFinally

 @RestController @RequestMapping("/") public class Test {     @Autowired     private RedisTemplate redisTemplate;     @Autowired     private RabbitTemplate rabbitTemplate;        /*使用setnx锁,同时给锁释放过期时间,自动释放锁     * */      @RequestMapping("testAddsetxAddFinally")     String cherkAndReduceStockAddSetnxAddFinally()     {          Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS);         //获取锁失败,停止50ms,递归调用         if (!lock){             try {                 Thread.sleep(3000);                 this.cherkAndReduceStockAddSetnxAddFinally();             } catch (InterruptedException e) {                 e.printStackTrace();             }          }else {             try {                 String stock = redisTemplate.opsForValue().get("Apple").toString();                 if(stock!=null&&stock.length()!=0)                 {                     Integer valueOf = Integer.valueOf(stock);                     if (valueOf>0)                     {                         redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf));                         //推送MQ                         String queue="demo.queue";                          //123456为用户id    1为商品id                         String masg="123456:1";                         rabbitTemplate.convertAndSend(queue,masg);                         return "抢购成功!";                                             }else {                         System.out.println("商品售罄!!!");                         return "商品售罄!!!";                     }                 }             }finally {                 redisTemplate.delete("lock-stock");             }          }         return "";     }  } 
 编写RedisConfig类序列化存储
@Configuration public class RedisConfig {     @Bean     public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory)     {         //缓存序列化配置避免存储乱码          RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();         redisTemplate.setConnectionFactory(factory);         redisTemplate.setKeySerializer(new StringRedisSerializer());         redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());         return  redisTemplate;     } }

 

创建Consumer模块

 yml文件
server:   port: 9004 spring:   application:     name: redis   datasource:     driver-class-name: com.mysql.jdbc.Driver     url: jdbc:mysql://localhost:3306/user?useSSL=false     username: root     password: 123456    redis:     host: 192.168.136.130     port: 6379     password: 123456     lettuce:       pool:         max-active: 10         max-idle: 10         min-idle: 1         time-between-eviction-runs: 10s    rabbitmq:     host: 192.168.136.130     port: 5672     virtual-host: /     username: demo     password: 123321     connection-timeout: 1s     template:       retry:         enabled: true         initial-interval: 1000ms         multiplier: 1         max-attempts: 3     publisher-confirm-type: correlated     publisher-returns: true  
编写order实体类
package cn.itcast.mq.pojo;  import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import org.springframework.data.relational.core.mapping.Table;  @Data @TableName("orderlist") public class order {     //用户id     @TableField("userId")     private String userId;     //商品id     private String  id;      public order(String userId, String id) {         this.userId = userId;         this.id = id;     } } 

注意对应关系

 编写orderMapper
package cn.itcast.mq.mapper;  import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper;  @Mapper public interface orderMapper extends BaseMapper<order> { } 
编写orderService
package cn.itcast.mq.service;  import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.extension.service.IService;  public interface orderService extends IService<order> { } 
编写Iml实现类
package cn.itcast.mq.service;  import cn.itcast.mq.mapper.orderMapper; import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;  import org.springframework.stereotype.Service;  @Service public class orderServiceImpl extends ServiceImpl<orderMapper, order> implements orderService{     } 
构建Listerner线程池,构建容器工厂

使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

package cn.itcast.mq.thread;   import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync;  @Configuration @EnableAsync public class ThreadPoolConfig {      @Bean("customContainerFactory")      public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {          SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();          factory.setConcurrentConsumers(10); //设置线程数          factory.setMaxConcurrentConsumers(10); //最大线程数          configurer.configure(factory, connectionFactory);          return factory;      }     }  
编写MQListener监听队列
package cn.itcast.mq.listeners;   import cn.itcast.mq.pojo.order; import cn.itcast.mq.service.orderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener;  import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;  @Component @Slf4j public class MqListener {       @Autowired     private orderService orderService;       //声明队列    mq的容器工厂     @RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory")     public void listenSimpleQueue(String msg)     {         //拆分消息         String[] split = msg.split(":");         order order = new order(split[0], split[1]);         System.out.println(order.toString());         //保存MYSQL         orderService.save(order);          //测试是否多个消费者         System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" );     }  } 

RabbitMQ的准备

创建demo.queue队列

 创建demo用户并且配置虚拟主机

 进行测试

启动Redis和Consumer服务

 使用JMeter压测12000个用户

 开始压测

查看队列

观察Consumer控制台,一万条消息瞬间执行完成!

 查看MySQL orderlist表,有一万条数据

 查看Redis 数据库并没有出现超卖问题,案例成功!!

 附加

解决RabbitMQ消息堆积的方案有三种

  • 增加更多消费者,提高消息速度。(本案例采用这一种)
  • 在消费者中开启线程池加快消息处理速度。
  • 扩大队列容积,提高堆积上限,采用惰性队列。

 总结

通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!