本阶段计划
- RxJava响应式编程
- Al生成题目优化方案设计(SSE技术)
- Al评分优化
- 分库分表实战
RxJava响应式编程
响应式编程
响应式编程(Reactive Programming)是一种编程范式,主要关注于数据流和变化传播。它提供了一种声明式的方法来处理异步数据流,使程序可以自动对数据变化作出反应,从而简化异步编程的复杂性。响应式编程通常用于处理UI更新、事件处理、实时数据流和并发操作。
核心概念:
- 数据流(Data Streams):
数据流是响应式编程的基础。数据流可以是从鼠标点击事件、网络请求响应、传感器数据等产生的一系列值。数据流可以是有限的(有开始和结束)或无限的(持续不断产生)。 - 观察者(Observer):
观察者是订阅数据流的实体,当数据流产生新值时,观察者会接收到通知并处理这些值。 - 被观察者(Observable):
被观察者是数据流的来源,它是一个可以被观察的对象。它会在数据流中产生新的值并通知所有订阅它的观察者。 - 订阅(Subscription):
订阅是观察者与被观察者之间的连接,订阅后,观察者会收到被观察者的数据流。 - 操作符(Operators):
操作符是用于创建、转换、过滤、合并和组合数据流的函数。例如,常见的操作符有 map、filter、merge 和 combineLatest 等。 - 异步处理:
响应式编程是异步的,即操作不会阻塞线程,而是通过回调或其他机制在未来某个时间点处理结果。这提高了应用的响应性和性能。 - 变化传播:
当数据源发生变化时,响应式编程模型会自动将变化传播到依赖这些数据源的地方。这种传播是自动的,不需要显式调用。
RxJava
RxJava 是一个事件驱动框架,基于观察者模式实现,分别有观察者和被观察者两个角色,被观察者会实时传输数据流,观察者可以观测到这些数据流。
观察者就是 Observer,被观察者是 Observable 和 Flowable。
0bsenabe 适合处理相对较小的、可控的、不会迅速产生大量数据的大场景,它不具备杯压处理能力,也就是说,当数据生产速度超过消费速度时,可能会导致内存溢出等问题。
Flowable 是针对背压(反向压力)问题而设计的可观测类型,背压问题出现于数据生产速度超过数据消费速度的场景。
被观察者.subscribe(观察者)
,它们之间就建立的订阅关系,被观察者传输的数据或者发出的事件会被观察者观察到。
// 建立 SSE 连接对象,0 表示永不超时 SseEmitter sseEmitter = new SseEmitter(0L); // AI 生成 Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null); // 左括号计数器,除了默认值外,当回归为 0 时,表示左括号等于右括号,可以截取 AtomicInteger counter = new AtomicInteger(0); StringBuilder stringBuilder = new StringBuilder(); modelDataFlowable .observeOn(Schedulers.io()) .map(modelData -> modelData.getChoices().get(0).getDelta().getContent()) .map(message -> message.replaceAll("\\s", "")) .filter(StrUtil::isNotBlank) .flatMap(message -> { List<Character> characterList = new ArrayList<>(); for (char c : message.toCharArray()) { characterList.add(c); } return Flowable.fromIterable(characterList); }) .doOnNext(c -> {完整取到题目的逻辑}) .doOnError((e) -> log.error("sse error", e)) .doOnComplete(sseEmitter::complete) .subscribe();
tips:智谱AI 用的就是flowable,默认已经引入了 RxJava
AI 生成题目优化
需求分析:
原先Al生成题目的场景响应较慢,如果题目数过多,容易产生请求超时,并且界面上没有响应,用户体验不佳。
需要流式化改造Al生成题目接口,将已经生成的题目一道一道实时地返回给前端,而不是让前端请求一直阻塞等待全部生成完毕再一起返回, 提升用户体验且避免请求超时。
技术方案
首先智谱 Al 为我们提供了流式响应的支持,数据已经可以一点一点地返回给后端了,那么我们要思考的问题是如何让后端接收到的一点一点的内容实时返回给前端。
几种主流的前后端实时通讯实现方案:
- 轮询(前端主动调用)
前端间隔一定时间就调用后端提供的结果接口,后端处理一些结果就累加放置在缓存中。 - SSE (后端推送给前端)
前端发送请求并和后端建立连接后,后端可以实时推送数据给前端,无需前端自主轮询。 - WebSocket
全双工协议,前端能实时推送数据给后端(或者从后端缓存拿数据), 后端也可以实时推送数据给前端。
对比:
主动轮询,缺点很明显,轮询的时间不好预估。
WebSocket 和SSE虽然都能实现服务端推送,但Websocket 会更复杂些,且是二进制协议, 调试不方便。AI 对话只需要服务器单向推送即可,不需要使用双向通信,所以选择文本格式的SSE。
开发流程
- 前端向后端发送普通HTTP请求
前端页面的 AI抽屉 中绑定实时生成的按钮如下:
<a-button :loading="submitting" type="primary" html-type="submit" style="width: 120px" > {{ submitting ? "生成中" : "一键生成" }} </a-button> <a-button :loading="sseSubmitting" style="width: 120px" @click="handleSSESubmit" > {{ sseSubmitting ? "生成中" : "实时生成" }} </a-button>
注意:
1)由于两个 button
处于同一个 form
中,因此第二个 button
没有加html-type="submit"
,否则会出现在前端中点击其中一个 提交按钮 却导致触发两个提交按钮事件的情况。
2)与上期实现的提交事件不同,这里不能调用生成的api(上期我们使用 Axios 实现了aiGenerateQuestionUsingPost
),因为 Axios 用于一次性的 HTTP 请求和响应,每次请求完成后连接关闭。而 SSE 需要保持长时间的连接,以便服务器可以不断推送数据。因此 Axios 默认不支持 SSE 这类长时间保持连接以接收服务器推送的事件,那么我们使用浏览器内置的 EventSource 对象用于处理 SSE 连接:
const handleSSESubmit = async () => { if (!props.appId) { return; } sseSubmitting.value = true; const eventSource = new EventSource( "http://localhost:8101/api/question/ai_generate/sse" + `?appId=${props.appId}&optionNumber=${form.optionNumber}&questionNumber=${form.questionNumber}` ); ... };
- 后端创建SSE连接对象(SSE 必须是 Get 请求),为后续的推送做准备,然后流式调用 智谱Al ,获取到数据流,使用RxJava订阅数据流:
... // 建立 SSE 连接对象,0 表示永不超时 SseEmitter sseEmitter = new SseEmitter(0L); // AI 生成 Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null); ...
- 异步:基于RxJava实时获取到智谱Al的数据,并持续将数据拼接为字符串,当拼接出一道完整题目时,通过SSE推送给前端。
modelDataFlowable .observeOn(Schedulers.io()) .map(modelData -> modelData.getChoices().get(0).getDelta().getContent()) .map(message -> message.replaceAll("\\s", "")) .filter(StrUtil::isNotBlank) // 字符串转换 .flatMap(message -> {...}) // 异步拼接JSON单题数据,并利用SSE推送至前端 .doOnNext(c -> {...括号匹配算法}) .doOnError((e) -> log.error("sse error", e)) // 监听flowable完成事件,并开启订阅 .doOnComplete(sseEmitter::complete) .subscribe();
observeOn:指定数据流的观察者(即后续操作)将在指定的调度器上运行,这里是 I/O 线程池。
Schedulers.io():这是一个 RxJava 提供的内置调度器,用于 I/O 密集型操作。它会管理一个线程池,专门用于处理 I/O 任务(如网络请求、文件读写等)。
4) 前端每获取一道题目,立刻插入到表单项中
const onAiGenerateSuccessSSE = (result: API.QuestionContentDTO) => { questionContent.value = [...questionContent.value, result]; };
AI评分优化
优化点分析
目前的Al评分功能存在2个问题:
1)我们提供 AI 调用需要费用,用户对同样的题目做出同样的选择,理论会得到一致的解答,不需要每次都询问AI。
2)Al评分的响应时间较长,效率有待提升。
显然,答案必然是“缓存"。只要提到“响应慢”、“数据可复用", 就要想到缓存。
技术选型分析
缓存的技术选型上,一般是本地缓存和 Redis 分布式缓存。
对于我们的缓存需求,哪怕是多机部署,每台服务器上分别缓存也是ok的,不用保证多台机器缓存间的一致性,所以采用Caffeine本地缓存。
缓存设计(缓存哪些内容?如何设计缓存的key / value结构?)
- 缓存key设计
我们在上述分析:用户对同样的题目做出同样的选择,理论会得到一致的解答
所以可以将 应用id和用户的答案列表 作为 key 。
但答案列表可能很长,可以利用哈希算法(md5) 来压缩key,节省空间。
注意:如果是分布式缓存,还需要在key开头拼接业务前缀。此处我们可以单独为每个业务创建本地缓存,相互隔离,所以key可以简单一些。 - 缓存value设计
缓存 Al 回答的结果,为了可读性可以存JSON结构,为了压缩空间可以存二进制等其他结构。 - 缓存过期时间设置(必须)
过期时间根据实际业务场景和缓存空间的大小、数据的一致性的要求设置合适即可。
缓存击穿问题解决
缓存击穿问题:大量请求并发访问热点数据,刚好热点数据过期,会直接绕过缓存,命中数据库或 Al 接口。
在 AI 场景因接口限流,Al 应该不会崩溃,但是token (钱)浪费了,而且搞不好平台会以为你的服务器是攻击者,把你的IP封禁。
在数据库场景,所有请求打到数据库上,数据库可能直接宕机。
因此,我们需要避免缓存击穿,一种常见的解决方式就是加锁。如果服务部署在多个机器上,就必须要使用分布式锁。
我们这里使用 Redisson 客户端,它为 Redis 提供了多种数据结构的支持,并提供了线程安全的操作,简化了在Java中使用 Redis 的复杂度,并对Redis的一些功能进行了增强。
开发流程
- 引入所需依赖(本地缓存 + redisson 分布式锁)
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.21.0</version> </dependency> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> <version>2.9.2</version> </dependency>
- 本地缓存构建
private final Cache<String, String> answerCacheMap = Caffeine.newBuilder().initialCapacity(1024) .expireAfterAccess(8L, TimeUnit.MINUTES) .build();
- 配置 Redisson 客户端,读取 Redis 配置
@Configuration @ConfigurationProperties(prefix = "spring.redis") @Data public class RedissonConfig { private String host; private Integer port; private Integer database; private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setDatabase(database) .setPassword(password); return Redisson.create(config); } }
- 使用本地缓存的评分方法构建
public UserAnswer doScore(List<String> choices, App app) throws Exception { Long appId = app.getId(); String jsonStr = JSONUtil.toJsonStr(choices); String cacheKey = buildCacheKey(appId, jsonStr); String answerJson = answerCacheMap.getIfPresent(cacheKey); // 如果有缓存,直接返回 if (StrUtil.isNotBlank(answerJson)) { UserAnswer userAnswer = JSONUtil.toBean(answerJson, UserAnswer.class); userAnswer.setAppId(appId); userAnswer.setAppType(app.getAppType()); userAnswer.setScoringStrategy(app.getScoringStrategy()); userAnswer.setChoices(jsonStr); return userAnswer; } // 没有缓存 // 定义锁 RLock lock = redissonClient.getLock(AI_ANSWER_LOCK + cacheKey); try { // 竞争锁 boolean res = lock.tryLock(3, 15, TimeUnit.SECONDS); // 没抢到锁,强行返回 if (!res){ return null; } // 抢到锁,执行业务逻辑 // 1. 根据 id 查询到题目 并 调用 AI 获取结果 ... String json = result.substring(start, end + 1); //结果 // 2. 缓存结果 answerCacheMap.put(cacheKey, json); // 3. 构造返回值,填充答案对象的属性 ... } finally { // 释放锁操作,非常关键,所以放在finally中 if (lock != null && lock.isLocked()) { if (lock.isHeldByCurrentThread()) { // 判断是否是自己的锁 lock.unlock(); } } } }
分库分表
从数据库层面去思考,user_ answer 用户答题记录表可能最先遇到瓶颈。因为一个用户可以对同个应用多次答题,也可以在多个应用多次答题,理论上如果用户足够大,那么这个表肯定是最先遇到瓶颈的。除了清理数据外,常见的一种优化方案是分库分表。
在电商网站的场景中,使用人数不断增加,用户数不断增加,订单数也日益增长,此时就应该把用户库和订单库拆开来,这样就能降低数据库的压力,且对业务而言数据分的也更清晰,并且理论上订单数会远大于用户数,还可以针对订单库单一升配。
关于订单表的分表,可以按时间维度,比如order. 202401、order _202402来拆分,如果每天的订单量很大,还可以更细粒度。
分。
分库分表的组件选型
shardingsphere(ShardingSphere 的仓库 Star 是远多于另外两个的!!)
Mycat2
cobar
Sharding-JDBC 相比而言功能更丰富,还支持读写分离、数据脱敏、分布式事务等等。
ShardingSphere不仅支持嵌入式的Sharding-JDBC,还支持Sharding-Proxy (独立代理服务)和Sharding-Sidecar (服务网格模式)。
再者ShardingSphere非常活跃,社区庞大且资料丰富,项目迭代也非常快。
因此本项目选择ShardingSphere内的Sharding-JDBC。
ShardingSphere 的使用步骤
- 引入依赖
<dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId> <version>5.2.0</version> </dependency>
- 复制原表的 DDL 表结构语句,更改表名,创建新表
- 在 application.yml 中添加配置