前端每隔15秒就发起一次请求,将播放记录写入数据库。
但问题是,提交播放记录的业务太复杂了,其中涉及到大量的数据库操作:
如何进行优化
单机并发能力 变同步为异步 合并写请求
提高单机并发:优化SQL,尽量走索引,避免双重for循环,添加缓存
提高单机并发能力
变同步为异步
合并写请求
合并写请求方案其实是参考高并发读的优化思路:当读数据库并发较高时,我们可以把数据缓存到Redis,这样就无需访问数据库,大大减少数据库压力,减少响应时间。
既然读数据可以建立缓存,那么写数据可以不可以也缓存到Redis呢?
答案是肯定的,合并写请求就是指当写数据库并发较高时,不再直接写到数据库。而是先将数据缓存到Redis,然后定期将缓存中的数据批量写入数据库。
提交学习记录业务优化
记录是否已经存在放入缓存中(因为每次提交都会去数据库查询),更新学习记录时间放入缓存中
对提交学习记录进行改造,每隔15秒进行一次提交对数据库压力太大,考虑到只有最后一次提交才有效,所以我们对存在数据库操作的地方进行优化
而播放进度信息,不管更新多少次,下一次续播肯定是从最后的一次播放进度开始续播。也就是说我们只需要记住最后一次即可。因此可以采用合并写方案来降低数据库写的次数和频率,而异步写做不到。
如何设计缓存字段?
用户学习视频的过程中,可能会在多个视频之间来回跳转,这就会导致频繁的创建缓存、缓存过期,影响到最终的业务性能。该如何解决呢?
使用hash key解决
实际操作中可以直接把实体类转化为JSON 当做value存入
但是存在一定问题
但问题来了,我们怎么知道哪一次提交是最后一次提交呢?
只要用户一直在提交记录,Redis中的播放进度就会一直变化。如果Redis中的播放进度不变,肯定是停止了播放,是最后一次提交。
因此,我们只要能判断Redis中的播放进度是否变化即可。怎么判断呢?
核心思想
每当前端提交播放记录时,我们可以设置一个延迟任务并保存这次提交的进度。等待20秒后(因为前端每15秒提交一次,20秒就是等待下一次提交),检查Redis中的缓存的进度与任务中的进度是否一致。(把数据缓存到redis中,同时设置一个20秒的延迟任务,20秒后执行这个任务,执行这个任务的时候再一次跟redis中的时间比对,如果一样则更新数据库,否则跳过)
不一致:说明持续在提交,无需处理
一致:说明是最后一次提交(暂停了或者离开播放了),(提交延迟任务)更新学习记录、更新课表最近学习小节和时间到数据库中
延迟任务方案
DelayQueue的用法
package com.tianji.learning.utils; import lombok.Data; import java.time.Duration; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @Data public class DelayTask<D> implements Delayed { private D data; private long deadlineNanos; public DelayTask(D data, Duration delayTime) { this.data = data; this.deadlineNanos = System.nanoTime() + delayTime.toNanos(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); if(l > 0){ return 1; }else if(l < 0){ return -1; }else { return 0; } } }
接下来就可以创建延迟任务,交给延迟队列保存:
package com.tianji.learning.utils; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.concurrent.DelayQueue; @Slf4j class DelayTaskTest { @Test void testDelayQueue() throws InterruptedException { // 1.初始化延迟队列 DelayQueue<DelayTask<String>> queue = new DelayQueue<>(); // 2.向队列中添加延迟执行的任务 log.info("开始初始化延迟任务。。。。"); queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3))); queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1))); queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2))); // TODO 3.尝试执行任务 } }
执行任务
package com.tianji.learning.utils; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.concurrent.DelayQueue; @Slf4j class DelayTaskTest { @Test void testDelayQueue() throws InterruptedException { // 1.初始化延迟队列 DelayQueue<DelayTask<String>> queue = new DelayQueue<>(); // 2.向队列中添加延迟执行的任务 log.info("开始初始化延迟任务。。。。"); queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3))); queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1))); queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2))); // 3.尝试执行任务 while (true) { DelayTask<String> task = queue.take(); log.info("开始执行延迟任务:{}", task.getData()); } } }
开始改造
封装的工具类
延迟队列里面放的就是这一个个DelayTask<T>
@Data public class DelayTask<D> implements Delayed { private D data; private long deadlineNanos; public DelayTask(D data, Duration delayTime) { this.data = data; this.deadlineNanos = System.nanoTime() + delayTime.toNanos(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); if(l > 0){ return 1; }else if(l < 0){ return -1; }else { return 0; } } }
需要开启另一个线程来执行任务
@Slf4j @RequiredArgsConstructor @Component public class LearningRecordDelayTaskHandler { private final StringRedisTemplate redisTemplate; private final DelayQueue<DelayTask<RecordTaskData>> queue = new DelayQueue<>(); private final static String RECORD_KEY_TEMPLATE = "learning:record:{}"; private final LearningRecordMapper recordMapper; private final ILearningLessonService lessonService; private static volatile boolean begin = true; // 项目启动后 当前类实例化 属性输入之后 方法就会运行 一般用来做初始化工作 @PostConstruct public void init(){ CompletableFuture.runAsync(this::handleDelayTask); log.debug("开启 新线程执行handleDelayTask方法");// 开启 新线程执行handleDelayTask方法 } @PreDestroy // 当前类是实例 销毁之前该方法执行 public void destroy(){ log.debug("关闭学习记录处理的延迟任务"); begin = false; } private void handleDelayTask(){ while (begin){ try { // 1.尝试获取任务 take是阻塞方法 DelayTask<RecordTaskData> task = queue.take(); log.debug("获取到要处理的播放记录任务"); RecordTaskData data = task.getData(); // 2.读取Redis缓存 LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId()); log.debug("获取到要处理的播放记录任务,任务数据{} 缓存中的数据{}",data,record); if (record == null) { continue; } // 3.比较数据 if(!Objects.equals(data.getMoment(), record.getMoment())){ // 4.如果不一致,播放进度在变化,无需持久化 continue; } // 5.如果一致,证明用户离开了视频,需要持久化 // 5.1.更新学习记录 record.setFinished(null); recordMapper.updateById(record); // 5.2.更新课表 LearningLesson lesson = new LearningLesson(); lesson.setId(data.getLessonId()); lesson.setLatestSectionId(data.getSectionId()); lesson.setLatestLearnTime(LocalDateTime.now()); lessonService.updateById(lesson); log.debug("准备持久化学习记录信息"); } catch (Exception e) { log.error("处理播放记录任务发生异常", e); } } } public void addLearningRecordTask(LearningRecord record){ // 1.添加数据到Redis缓存 writeRecordCache(record); // 2.提交延迟任务到延迟队列 DelayQueue queue.add(new DelayTask<>(new RecordTaskData(record), Duration.ofSeconds(20))); } public void writeRecordCache(LearningRecord record) { log.debug("更新学习记录的缓存数据"); try { // 1.数据转换 String json = JsonUtils.toJsonStr(new RecordCacheData(record)); // 2.写入Redis String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId()); redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json); // 3.添加缓存过期时间 redisTemplate.expire(key, Duration.ofMinutes(1)); } catch (Exception e) { log.error("更新学习记录缓存异常", e); } } public LearningRecord readRecordCache(Long lessonId, Long sectionId){ try { // 1.读取Redis数据 String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId); Object cacheData = redisTemplate.opsForHash().get(key, sectionId.toString()); if (cacheData == null) { return null; } // 2.数据检查和转换 return JsonUtils.toBean(cacheData.toString(), LearningRecord.class); } catch (Exception e) { log.error("缓存读取异常", e); return null; } } public void cleanRecordCache(Long lessonId, Long sectionId){ // 删除数据 String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId); redisTemplate.opsForHash().delete(key, sectionId.toString()); } // 缓存实体类 @Data @NoArgsConstructor private static class RecordCacheData{ private Long id; private Integer moment; private Boolean finished; public RecordCacheData(LearningRecord record) { this.id = record.getId(); this.moment = record.getMoment(); this.finished = record.getFinished(); } } // 任务实体类 @Data @NoArgsConstructor private static class RecordTaskData{ private Long lessonId; private Long sectionId; private Integer moment; public RecordTaskData(LearningRecord record) { this.lessonId = record.getLessonId(); this.sectionId = record.getSectionId(); this.moment = record.getMoment(); } } }
使用线程池来处理任务
目前我们的延迟任务执行还是单线程模式,大家将其改造为线程池模式,
拉取方法 还是使用哪个开辟的线程去拉取,但是拉取完之后的执行让线程池里面的来执行