分布式秒杀
一般在具体的业务中,平台方会发布秒杀席位个数,秒杀的时间段,让各个商家报名,将自己的产品参与秒杀活动。这里将同事画的一张图放上来,大致是这么一个流程。关于秒杀原理可以参考单机秒杀系统样例
这里面关于Nginx的限流可以参考高并发之Nginx的限流
我们这里在秒杀的前一天将所有的批准参加秒杀的商家商品数据导入到redis的商品队列中,这里我们使用当当的elastic-job来实现,关于elastic-job的使用,可以参考分布式调度Elastic-Job攻略
这里是以商家的服务来当成一件商品来处理的,所以service可以理解成商品。这里有一个区位的概念,即秒杀当天商家可以报名的所有时间段,我们称之为区位,比如0点~2点,2点~4点等等。区位的实体类字段如下。
/** * 秒杀区位 */ @Data @AllArgsConstructor @NoArgsConstructor public class Location { private Long id; //区位id private LocalDate date; //日期 private TimeSegment timeSegment; //时间段 private int locationNumber; //区位数量,即可以容纳多少商家报名 private BigDecimal baseCost; //商家参与活动的最低缴纳金 private double sellPercent; //销售百分比缴纳率 private boolean releaseLocation; //是否发布 }
/** * 秒杀服务 */ @RequiredArgsConstructor @NoArgsConstructor public class Service { @Getter @Setter @NonNull private Long id; //服务id @Getter @Setter @NonNull private Long signUpId; //报名id @Getter @Setter @NonNull private String serviceCode; //服务编码 @Getter @Setter @NonNull private String serviceName; //服务名称 @Getter @Setter @NonNull private String serviceLevel; //服务分类 @Getter @Setter @NonNull private Price price; //价格,包含普通价格和秒杀价格 @Getter @Setter @NonNull private int totalCount; //参与秒杀的总数 @Getter @Setter @NonNull private int limitCount; //单个用户可以购买该服务的数量 @Getter @Setter @NonNull private TimeSegment timeSegment; //时间段 @Getter @Setter @NonNull private CheckStatus checkStatus; //平台对该秒杀的审核状态 @Getter private Lock lock = new ReentrantLock(); //重入锁 @Getter private Condition condition = lock.newCondition(); //重入锁条件 }
商家商品(服务)数据导入到redis的商品队列中
/** * 在每天的0点开始处理,查看后一天是否有秒杀活动 */ @Slf4j @Component @ElasticSimpleJob(cron="0 0 0 * * ?",jobName="loadRedis",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=Load0,1=Load1") public class LoadDataToRedisJob implements SimpleJob { @Autowired private RedisService redisService; @Autowired private DataDao dataDao; @Override public void execute(ShardingContext shardingContext) { //获取后一天所有的秒杀区位 List<Location> locationNextDayList = dataDao.findJobToday(LocalDate.now().plusDays(1)); if (!CollectionUtils.isEmpty(locationNextDayList)) { //获取一天所有的秒杀区位时间段 List<TimeSegment> segmentList = locationNextDayList.stream().map(location -> location.getTimeSegment()) .collect(Collectors.toList()); switch (shardingContext.getShardingItem()) { case 0: //获取每个时间段内的所有参与秒杀的服务 segmentList.stream().map(timeSegment -> { List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment); serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment)); return serviceInSegment; //扁平化所有服务,统一为一组List }).flatMap(services -> services.stream()).forEach(service -> { //以服务id以及秒杀时间段组合为主键 String key = service.getId() + service.getTimeSegment().toString(); //如果redis中存在该主键的队列,则清空队列 if (redisService.exists(key)) { for (int i = 0; i < redisService.llen(key); i++) { redisService.rpop(key); } } //清空后,根据每个服务的参与总数,将服务按总数量推送到该主键队列中 for (int i = 0; i < service.getTotalCount(); i++) { redisService.lpush(key, JSONObject.toJSONString(service)); } log.info(service.getId() + service.getTimeSegment().toString()); //以服务id+":count"组合成该服务的总数键,如果redis中存在该键,则删除 String countKey = service.getId() + ":count"; if (redisService.exists(countKey)) { redisService.del(countKey); } //重新将总数放入该键的redis中存储 redisService.set(countKey, String.valueOf(service.getTotalCount())); }); break; case 1: break; default: break; } } } }
将服务导入到redis队列后,我们需要设立一个秒杀活动开始的标识,让秒杀下单只能在秒杀活动进行中开启,不在秒杀时间内不允许下单。
/** * 给秒杀时间点设立开启标识,每天0点开始,每2小时执行一次 */ @Slf4j @Component @ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="openTimeSeg",shardingTotalCount=1,jobParameter="日期",shardingItemParameters="0=Open0") public class OpenTimeSegmentJob implements SimpleJob { @Autowired private RedisService redisService; @Autowired private DataDao dataDao; @Override public void execute(ShardingContext shardingContext) { //获取当天的所有秒杀区位 List<Location> locationToDayList = dataDao.findJobToday(LocalDate.now()); //如果当天有秒杀活动 if (!CollectionUtils.isEmpty(locationToDayList)) { //获取当前时间点,当前时间点不一定是准点 LocalDateTime now = LocalDateTime.now(); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); //将当前时间拼装成整点 LocalDateTime beginDate = LocalDateTime.of(year, month, day, hour, 0, 0); //以整点时间为基准,在redis中放入开启秒杀时间段,119分钟后消失(每个时间段段为1小时59分钟,2小时的最后一分钟结束该时间段秒杀) redisService.set("TimeStart:" + new TimeSegment(beginDate, beginDate.plusMinutes(119)).toString(), "opened",7140); log.info(beginDate.toString() + "至" + beginDate.plusMinutes(119).toString() + "秒杀开始"); } } }
到了秒杀时间,我们就可以开始下单了,先定义一个秒杀单的接口
public interface SecOrder { /** * 秒杀下单 * @param secOrder * @return */ public String makeOrder(SecOrder secOrder); /** * 是否存在该订单编号的秒杀单 * @param orderNo * @return */ public boolean exitsOrder(String orderNo); /** * 修改支付状态 * @param orderNo */ public void changePayStatus(String orderNo); }
SecOrder的实现类的各属性如下,关于Delayed接口会在后面说明。
/** * 服务秒杀单 */ @Slf4j @AllArgsConstructor @NoArgsConstructor @ServiceSecOrderVersion(value = 1) public class ServiceSecOrder implements SecOrder,Delayed { @Getter @Setter private Long id; //订单id @Getter @Setter private String orderNo; //订单编号 @Getter @Setter private Service service; //订单服务内容 @Getter @Setter private TimeSegment timeSegment; //秒杀时间段 @Getter @Setter private int number; //订单数量 @Getter @Setter private BigDecimal amount; //订单金额 @Getter @Setter private AppUser user; //下单人 @Getter @Setter private int orderStatus; //订单状态 @Getter @Setter private LocalDateTime createDate; //创建日期 private long executeTime; //延迟队列时间
然后开始下秒杀订单
@Override public String makeOrder(SecOrder secOrder) { RedisService redisService = SpringBootUtil.getBean(RedisService.class); IdService idService = SpringBootUtil.getBean(IdService.class); MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class); //如果当前时间在秒杀时间段之外,不允许秒杀下单 if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime()) || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) { throw new RuntimeException("不在秒杀时间段内"); } //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复 LocalDateTime now = LocalDateTime.now(); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0); //从redis中检查是否有开启秒杀时间段 if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) { throw new RuntimeException("当前时间段无秒杀"); } ((ServiceSecOrder)secOrder).setId(idService.genId()); if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) { throw new RuntimeException("秒杀数量超出限购"); } AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService)); user.setUsername(loginAppUser.getUsername()); ((ServiceSecOrder)secOrder).setUser(user); //设置订单状态0表示未支付状态 ((ServiceSecOrder)secOrder).setOrderStatus(0); ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now()); //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列) //队列名由User:+服务id+时间段组成 String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId() + ((ServiceSecOrder)secOrder).getTimeSegment().toString(); //将秒杀用户id推送到该队列中 redisService.lpush(key,((ServiceSecOrder)secOrder).getUser().getId() + "");
我们将用户id推送到redis队列后就要开始匹配秒杀结果了,因为商品队列早已经在前一天就推送进去了。
/** * 秒杀结果匹配任务,每天0点开始,每2小时执行一次 */ @Slf4j @Component @RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + "_1", SecendKillMq.SECENDKILL_QUEUE + "_2", SecendKillMq.SECENDKILL_QUEUE + "_3", SecendKillMq.SECENDKILL_QUEUE + "_4", SecendKillMq.SECENDKILL_QUEUE + "_5", SecendKillMq.SECENDKILL_QUEUE + "_6", SecendKillMq.SECENDKILL_QUEUE + "_7", SecendKillMq.SECENDKILL_QUEUE + "_8", SecendKillMq.SECENDKILL_QUEUE + "_9", SecendKillMq.SECENDKILL_QUEUE + "_10"}) @ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="secResult",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=SecKill0,1=SecKill1") public class SecendKillResultJob implements SimpleJob { @Autowired private RedisService redisService; @Autowired private DataDao dataDao; //为了跟MQ搭配,唤醒中断用 private List<Service> serviceList; @Override public void execute(ShardingContext shardingContext) { //获取当天所有的秒杀区位 List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now()); //如果当天有秒杀活动 if (!CollectionUtils.isEmpty(locationTodayList)) { //获取一天所有的秒杀区位时间段 List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment()) .collect(Collectors.toList()); switch (shardingContext.getShardingItem()) { case 0: //从所有秒杀区位时间段过滤当前秒杀时间段 this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) && LocalDateTime.now().isBefore(timeSegment.getEndTime())) //将时间段转化成时间段内的秒杀服务 .map(timeSegment -> { List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment); serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment)); return serviceInSegment; //扁平化所有的秒杀服务,将所有当前时间段内的服务放入serviceList属性中 //就是拿出当前时间段内所有参与秒杀的服务 }).flatMap(services -> services.stream()).collect(Collectors.toList()); //并行化处理所有的秒杀服务 this.serviceList.parallelStream().forEach(service -> { while (true) { try { service.getLock().lock(); String userKey = "User:" + service.getId() + service.getTimeSegment().toString(); String serviceKey = service.getId() + service.getTimeSegment().toString(); String countKey = service.getId() + ":count"; //如果下秒杀时间内没有用户下单该服务,则中断该服务的并行线程 //如果有用户下单则唤醒该并行线程 while (redisService.llen(userKey).equals(0L)) { try { log.info("用户队列无数据,开始中断"); service.getCondition().await(); LocalDateTime now = LocalDateTime.now(); if (now.isAfter(service.getTimeSegment().getEndTime())) { break; } } catch (InterruptedException e) { e.printStackTrace(); } }
到这里,如果没有用户下单,则会进行线程中断,不会去执行while (true)的无限循环。SecendKillResultJob同时又是RabbitMQ的一个消费者,同时监听了10个消息队列,监听后进行如下处理
@RabbitHandler public void receice(byte[] data, Channel channel, Message message) throws IOException { try { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); //收到服务id Long serviceId = unSerialize(data); log.info(serviceId + ""); //如果当前秒杀服务列表不为空 if (!CollectionUtils.isEmpty(this.serviceList)) { //从服务列表中过滤出id为MQ收取的服务ID的服务 this.serviceList.stream().filter(service -> service.getId().equals(serviceId)) .forEach(service -> { log.info("存在" + service.getId()); try { service.getLock().lock(); //对该服务所在线程进行唤醒 service.getCondition().signalAll(); } finally { service.getLock().unlock(); } }); } } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } } /** * 反序列化 * @param data * @return */ private Long unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,Long.class); } finally { input.close(); } }
这样我们再回到ServiceSecOrder的makeOrder下单方法中,将用户下单的服务id异步发送到MQ中,去唤醒秒杀结果匹配任务继续执行。
/** * 服务秒杀单 */ @Slf4j @AllArgsConstructor @NoArgsConstructor @ServiceSecOrderVersion(value = 1) public class ServiceSecOrder implements SecOrder,Delayed { @Getter @Setter private Long id; //订单id @Getter @Setter private String orderNo; //订单编号 @Getter @Setter private Service service; //订单服务内容 @Getter @Setter private TimeSegment timeSegment; //秒杀时间段 @Getter @Setter private int number; //订单数量 @Getter @Setter private BigDecimal amount; //订单金额 @Getter @Setter private AppUser user; //下单人 @Getter @Setter private int orderStatus; //订单状态 @Getter @Setter private LocalDateTime createDate; //创建日期 private long executeTime; //延迟队列时间 /** * 为延迟队列准备的构造器 * @param id * @param delayTime */ public ServiceSecOrder(Long id,long delayTime) { this.id = id; //将延迟时长(单位毫秒)转化成纳秒 this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS) + System.nanoTime(); } @Override public String makeOrder(SecOrder secOrder) { RedisService redisService = SpringBootUtil.getBean(RedisService.class); IdService idService = SpringBootUtil.getBean(IdService.class); MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class); //如果当前时间在秒杀时间段之外,不允许秒杀下单 if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime()) || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) { throw new RuntimeException("不在秒杀时间段内"); } //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复 LocalDateTime now = LocalDateTime.now(); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0); //从redis中检查是否有开启秒杀时间段 if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) { throw new RuntimeException("当前时间段无秒杀"); } ((ServiceSecOrder)secOrder).setId(idService.genId()); if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) { throw new RuntimeException("秒杀数量超出限购"); } AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService)); user.setUsername(loginAppUser.getUsername()); ((ServiceSecOrder)secOrder).setUser(user); //设置订单状态0表示未支付状态 ((ServiceSecOrder)secOrder).setOrderStatus(0); ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now()); //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列) //队列名由User:+服务id+时间段组成 String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId() + ((ServiceSecOrder)secOrder).getTimeSegment().toString(); //将秒杀用户id推送到该队列中 redisService.lpush(key,((ServiceSecOrder)secOrder).getUser().getId() + ""); //唤醒秒杀结果匹配任务继续执行。 CompletableFuture.runAsync(() -> { messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL, SecendKillMq.ROUTING_KEY_SECENDKILL, ((ServiceSecOrder)secOrder).getService().getId()); });
这里我们需要先看一段redis-lua代码,表示如果用户队列,服务队列均有数据,则将其弹出,放入一个hash中作为匹配的结果,同时扣减服务队列总数。其意义可以参考用户金额的终极解决方案--Redis Lua
@Override public Object secondKill(String userKey, String serviceKey,String userResult, String countKey) { String script = "if redis.call('llen',KEYS[1]) > 0 and redis.call('llen',KEYS[2]) > 0 " + "and tonumber(redis.call('get',KEYS[4])) > 0 then " + "redis.call('hset',KEYS[3],redis.call('rpop',KEYS[1]),redis.call('rpop',KEYS[2])) " + "return redis.call('decr',KEYS[4]) else return 0 end"; return execute(jedis -> jedis.eval(script,4,userKey,serviceKey,userResult,countKey)); }
秒杀服务线程唤醒后,继续执行
/** * 秒杀结果匹配任务,每天0点开始,每2小时执行一次 */ @Slf4j @Component @RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + "_1", SecendKillMq.SECENDKILL_QUEUE + "_2", SecendKillMq.SECENDKILL_QUEUE + "_3", SecendKillMq.SECENDKILL_QUEUE + "_4", SecendKillMq.SECENDKILL_QUEUE + "_5", SecendKillMq.SECENDKILL_QUEUE + "_6", SecendKillMq.SECENDKILL_QUEUE + "_7", SecendKillMq.SECENDKILL_QUEUE + "_8", SecendKillMq.SECENDKILL_QUEUE + "_9", SecendKillMq.SECENDKILL_QUEUE + "_10"}) @ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="secResult",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=SecKill0,1=SecKill1") public class SecendKillResultJob implements SimpleJob { @Autowired private RedisService redisService; @Autowired private DataDao dataDao; //为了跟MQ搭配,唤醒中断用 private List<Service> serviceList; @Override public void execute(ShardingContext shardingContext) { //获取当天所有的秒杀区位 List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now()); //如果当天有秒杀活动 if (!CollectionUtils.isEmpty(locationTodayList)) { //获取一天所有的秒杀区位时间段 List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment()) .collect(Collectors.toList()); switch (shardingContext.getShardingItem()) { case 0: //从所有秒杀区位时间段过滤当前秒杀时间段 this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) && LocalDateTime.now().isBefore(timeSegment.getEndTime())) //将时间段转化成时间段内的秒杀服务 .map(timeSegment -> { List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment); serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment)); return serviceInSegment; //扁平化所有的秒杀服务,将所有当前时间段内的服务放入serviceList属性中 //就是拿出当前时间段内所有参与秒杀的服务 }).flatMap(services -> services.stream()).collect(Collectors.toList()); //并行化处理所有的秒杀服务 this.serviceList.parallelStream().forEach(service -> { while (true) { try { service.getLock().lock(); String userKey = "User:" + service.getId() + service.getTimeSegment().toString(); String serviceKey = service.getId() + service.getTimeSegment().toString(); String countKey = service.getId() + ":count"; //如果下秒杀时间内没有用户下单该服务,则中断该服务的并行线程 //如果有用户下单则唤醒该并行线程 while (redisService.llen(userKey).equals(0L)) { try { log.info("用户队列无数据,开始中断"); service.getCondition().await(); LocalDateTime now = LocalDateTime.now(); if (now.isAfter(service.getTimeSegment().getEndTime())) { break; } } catch (InterruptedException e) { e.printStackTrace(); } } log.info("中断被唤醒,继续运行"); //如果用户队列和服务队列均有数据 while (redisService.llen(userKey) > 0 && redisService.llen(serviceKey) > 0) { //匹配出秒杀结果,并扣减服务数量 redisService.secondKill(userKey, serviceKey, "UserResult" + service.getId(), countKey); } //如果服务队列为空,表示被秒杀完了,从用户队列弹出用户,告知秒杀失败 while (redisService.llen(serviceKey).equals(0L)) { redisService.hset("UserResult" + service.getId(),redisService.rpop(userKey),"秒杀失败"); if (redisService.llen(userKey).equals(0L)) { break; } } //当前时间已经超出了秒杀时间段,结束while(true)无限循环 LocalDateTime now = LocalDateTime.now(); if (now.isAfter(service.getTimeSegment().getEndTime())) { break; } } finally { service.getLock().unlock(); } } }); break; case 1: break; default: break; } } }
当有用户秒杀到服务时,或者服务被秒杀完,用户的下单需要知道自己是否秒杀成功或者秒杀失败。
/** * 服务秒杀单 */ @Slf4j @AllArgsConstructor @NoArgsConstructor @ServiceSecOrderVersion(value = 1) public class ServiceSecOrder implements SecOrder,Delayed { @Getter @Setter private Long id; //订单id @Getter @Setter private String orderNo; //订单编号 @Getter @Setter private Service service; //订单服务内容 @Getter @Setter private TimeSegment timeSegment; //秒杀时间段 @Getter @Setter private int number; //订单数量 @Getter @Setter private BigDecimal amount; //订单金额 @Getter @Setter private AppUser user; //下单人 @Getter @Setter private int orderStatus; //订单状态 @Getter @Setter private LocalDateTime createDate; //创建日期 private long executeTime; //延迟队列时间 /** * 为延迟队列准备的构造器 * @param id * @param delayTime */ public ServiceSecOrder(Long id,long delayTime) { this.id = id; //将延迟时长(单位毫秒)转化成纳秒 this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS) + System.nanoTime(); } @Override public String makeOrder(SecOrder secOrder) { RedisService redisService = SpringBootUtil.getBean(RedisService.class); IdService idService = SpringBootUtil.getBean(IdService.class); MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class); //如果当前时间在秒杀时间段之外,不允许秒杀下单 if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime()) || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) { throw new RuntimeException("不在秒杀时间段内"); } //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复 LocalDateTime now = LocalDateTime.now(); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0); //从redis中检查是否有开启秒杀时间段 if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) { throw new RuntimeException("当前时间段无秒杀"); } ((ServiceSecOrder)secOrder).setId(idService.genId()); if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) { throw new RuntimeException("秒杀数量超出限购"); } AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService)); user.setUsername(loginAppUser.getUsername()); ((ServiceSecOrder)secOrder).setUser(user); //设置订单状态0表示未支付状态 ((ServiceSecOrder)secOrder).setOrderStatus(0); ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now()); //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列) //队列名由User:+服务id+时间段组成 String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId() + ((ServiceSecOrder)secOrder).getTimeSegment().toString(); //将秒杀用户id推送到该队列中 redisService.lpush(key,((ServiceSecOrder)secOrder).getUser().getId() + ""); //唤醒秒杀结果匹配任务继续执行。 CompletableFuture.runAsync(() -> { messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL, SecendKillMq.ROUTING_KEY_SECENDKILL, ((ServiceSecOrder)secOrder).getService().getId()); }); //从redis的匹配结果获取当前用户的秒杀结果 Future<String> future = CompletableFuture.supplyAsync(() -> { while (true) { if (redisService.hexists("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(), ((ServiceSecOrder) secOrder).getUser().getId() + "")) { return redisService.hget("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(), ((ServiceSecOrder) secOrder).getUser().getId() + ""); } } }); try { if (future.get(3000, TimeUnit.MILLISECONDS).equals("秒杀失败")) { log.info(((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"失败"); return ((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"失败"; } else { log.info(((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"成功"); SecOrderDao secOrderDao = SpringBootUtil.getBean(SecOrderDao.class); //秒杀成功的将秒杀单存入数据库 secOrderDao.saveServiceSecOrder((ServiceSecOrder) secOrder); //等待支付结果,5分钟后根据支付与否进行处理 //如果未支付,则将被秒杀到的服务重新入服务队列,并增加服务总数 DelayQueue<ServiceSecOrder> queue = new DelayQueue<>(); ServiceSecOrder task = new ServiceSecOrder(idService.genId(),300000); queue.offer(task); TaskExecutor taskExecutor = SpringBootUtil.getBean(TaskExecutor.class); taskExecutor.execute(new Consumer(queue,(ServiceSecOrder) secOrder)); return ((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"成功"; } } catch (Exception e) { e.printStackTrace(); log.info(((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"失败"); return ((ServiceSecOrder)secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder)secOrder).getService().getServiceName() +"失败"; } }
这里ServiceSecOrder也实现了Delayed接口,该接口需要实现的方法如下
先是必须要使用的构造器
/** * 为延迟队列准备的构造器 * @param id * @param delayTime */ public ServiceSecOrder(Long id,long delayTime) { this.id = id; //将延迟时长(单位毫秒)转化成纳秒 this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS) + System.nanoTime(); }
@Override public long getDelay(TimeUnit unit) { return unit.convert(this.executeTime - System.nanoTime(),TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { ServiceSecOrder task = (ServiceSecOrder) o; return this.id > task.id ? 1 : (this.id < task.id) ? -1 : 0; }
最后是放入线程池中执行的线程类Consumer
@AllArgsConstructor private class Consumer implements Runnable { private DelayQueue<ServiceSecOrder> queue; private ServiceSecOrder order; @Override public void run() { while (true) { try { long time = System.currentTimeMillis(); ServiceSecOrder take = queue.take(); //时间到开始执行 take.unPayedDeal(order); log.info("任务id" + take.getId()); log.info("秒杀订单id" + order.getId()); log.info("共使用" + (System.currentTimeMillis() - time)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
private void unPayedDeal(ServiceSecOrder order) { SecOrderDao secOrderDao = SpringBootUtil.getBean(SecOrderDao.class); RedisService redisService = SpringBootUtil.getBean(RedisService.class); //判断秒杀订单是否支付,如果未支付,则回退服务进服务队列,并增加可秒杀服务总数 if (secOrderDao.countServiceSecOrderHasPay(order) == 0) { String key = order.getService().getId() + order.getService().getTimeSegment().toString(); String countKey = order.getService().getId() + ":count"; redisService.unPayedBack(key,JSONObject.toJSONString(order.getService()),countKey); } }
unPayedBack也是一段redis-lua,代码如下
@Override public Object unPayedBack(String serviceKey,String serviceValue, String countKey) { String script = "redis.call('lpush',KEYS[1],ARGV[1]) " + "return redis.call('incr',KEYS[2])"; return execute(jedis -> jedis.eval(script,2,serviceKey,countKey,serviceValue)); }