亿级数据从mysql迁移到mongodb辛酸历程
一,亿级数据从mysql迁移到mongodb辛酸历程
如需转载,请标明出处:https://zhenghuisheng.blog.csdn.net/article/details/140302930
1,核心业务和前期实现
在前面文章中测试了mongodb中单个文档存一亿条数据的性能测试,发现monhodb是完全符合我们业务的,因此需要将mysql中的数据迁移到mongodb中,数据迁移就是时间问题了。在数据迁移时,需要保证的是业务是不能停止的,我这边主要是保存吧用户游戏的数据,游戏一直处于投放状态,因此需要平滑的将用户的数据迁移到mongodb中。
我这边的业务是这样的,就是用户将数据存到mongodb中,比如在用户游戏过关,升级,用户信息,充值修改等等会触发一次数据上报,然后在用户登录或者充值时就会来数据库拉取一次数据,这样能保证游戏用户的数据不会丢失,即使用户将本地数据清理了,数据也能通过服务器找回。
根据上面的需求,为了是整个业务简化,这里大概使用两张模拟表搞定,一张是用户表,一张是用户数据表。
user 用户表的部分核心字段如下,在每个游戏每个用户会有一个唯一的openId,类似于抖音一样,每个用户会有一个唯一的openId,这里为了简化整个流程,后面不用该字段,直接用主键id作为唯一字段使用
CREATE TABLE `user` ( `id` bigint NOT NULL AUTO_INCREMENT, `username` varchar(64) DEFAULT NULL COMMENT '用户名', `avatar` varchar(255) DEFAULT NULL COMMENT '头像', `open_id` varchar(64) DEFAULT NULL COMMENT '用户唯一id', `created_time` datetime DEFAULT NULL COMMENT '创建时间', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', `is_deleted` tinyint DEFAULT NULL COMMENT '是否删除 0:未删除 1:删除 ', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
另一张表是 **user_data ** 数据表,部分核心字段如下,parentId指的是user表中用户的账户id,并给key字段和parentId字段添加一个唯一索引
CREATE TABLE `user_data` ( `id` bigint NOT NULL AUTO_INCREMENT, `key` varchar(255) DEFAULT NULL COMMENT '上报数据的key', `value` mediumtext CHARACTER SET utf8mb4 COMMENT '上报数据的value', `parent_id` bigint DEFAULT NULL COMMENT '用户id', `created_time` datetime DEFAULT NULL COMMENT '创建时间', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_parentid_key` (`key`,`parent_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
因为游戏那边不可能每次将用户所有数据上报过来,如增加100个金币,就几个json字符串,不能把该用户所有的信息每次都发给后端,因此后端根据这个key,value来存数据和读数据,并且游戏那边值存更新过的字段值,游戏后台可以根据key来进行插入进入或者更新记录。value存放的就是更新的内容,由于实际开发中这个value值可能很复杂,可能是json嵌套json,因此表中字段类型给的是 mediumtext
{"coin":100,"exp":9999}
前半年大概就是通过上面这样实现的,数据全部存储在mysql中,随着时间的推移,用户数据量达到1000多万,假设每个游戏每个用户的key/value 都有50个,那么现在data数据表现在就是将近有5亿条数据,单表5亿数据,value值还是特别大,索引也加了七八个,尤其是现在暑假玩游戏的人更多,导致每天都是高峰期,不管是具体的服务还是mysql,都达到了一个性能瓶颈,同时也影响着很多其他的业务。那么在就需要在架构上或者硬件上升级
2,分库分表考虑
首先想到的是分库分表考虑,由于在用户拉取数据时,需要通过用户openId获取到用户的账户id,然后根据用户的账户id去数据表中拉取数据,如果一个用户50个key/value,那么得把这50条数据全部找出来,返回给sdk。目前由于我们线上一个数据库磁盘容量还是可以的,所以暂时不考虑分库
根据上面的需求,首先被排除的是按时间维度去分表,首先这种方案被pass。以月为例,如果用户的某个key长时间没有更新过,那么这样就有可能出现某个用户部分的key/value 出现在这个月,部分的 key/value 出现在上一个月,不管是存数据还是读数据都会因为数据不确定在哪张表因此这种方案被pass
第二种方案就是根据游戏的维度,根据不同用户数据量的游戏拆分在不同的表中,这种方案也不靠谱,先是还要算每个游戏的数据量以及用户量,还不确定某个游戏在未来是否可能会出现爆火的情况,最后是随着时间的推移数据量的也会不断地上涨,终究不是解决方案
最后一种就是稍微可行的一种数据方案,因为先在用户表中存一条数据,然后携带用户的id再去存数据表,此时就是可以考虑建10张表,然后对用户id进行求余的方式去存用户的数据,如id为1的,那么这个用户的数据就会存储在 user_data1 表里,那么依次建立10张表,分别是user_data1,user_data2… 这样某个用户的数据也全部在一张表里,查询对每张表的压力也不会那么大
//先确定账户id select id from user where open_id = 'zhs' //如id为10006,再对10进行求余 prefix = 10006 % 10 = 6 //最后确定这个用户的数据在 user_data6中 ,然后通过user_data6将该用户全部数据查询出来 select key,value from user_data6 where parent_id = 10006
最后一种方案对于我们的业务来讲在短期内确实是可行的,通过用户id确定数据在哪张表,因为数据库表中的id采用的是自增的方案,因此根据用户id求余的方式,也可以让10张表的数据量不会相差太大,从而解决因为某张表查询带来的数据库压力。
但是最后的这种方案依旧被pass,首先因为游戏端存储的数据是非常复杂的,可能某个value就嵌套了10几个json,并且json中嵌套json,相对于这种nosql数据,使用mysql来存根本没有优势,而且还带来很多数据库相关瓶颈。
淘汰理由如下:
1,最简单的如bufferpool这个缓存池,如果偶尔,某个用户查出来的数据有1m,并且此时多来几个用户查询数据,那么bufferpool里面很快就会被撑满,甚至以前的热点数据也可能会被淘汰,这样就会造成很多业务查询的效率就会降低,以前本来只需要通过bufferpool就能读去到的数据,现在又得查磁盘才能拿到,因为用户数据量大的原因,因此这种量大的nosql的json不适合存储mysql的,既影响自身的查询效率,又影响其它的业务,到头来还是得不偿失。
2,如果考虑分库,那么将大大的加大时间和人力成本,那么肯定就是不考虑。如果不考虑那么对于水平扩容就很不友好,因为在一个库里面已经对分了10张表了,后期在单库中,如果10张表中的数据量也越来越大,那么新的瓶颈又会出现,那么是不适合的,除非一开始就分20张表,对20求余分别放入对应的20张表中。如果表分的越多,那么想一些数据的统计,做报表那么也是很头痛,包括以后数据迁移这些,肯定会麻烦的很。
总而言之,最后是没有采用分库分表这个方案的,并且什么几个方案都是本人深思熟虑后pass掉的
3,nosql的选择
在选择nosql之前,主要是参考了两个市面上比较好用的两个产品,分别是 ElasticSearch 和 Mongodb 。由于所有的业务以及对应技术选型只有本人一人负责,因此也没有过多的研究其它产品。
在经过对比和分析之后,最终选择了mongodb,首先按使用熟练度来看,本人 在以前项目上用过mongodb,因此更倾向于余使用mongodb,其次是mongodb更加使用于大数据和实时分析,同时也支持水平扩展,可以大规模的处理数据,以及扛得住高并发,在前面也做过这亿级数据量的测试和压测,发现其性能是真的很强。并且mongodb内部支持索引,增加查询效率,尤其是符合我这个业务的唯一索引
而相对于es,更加的是适用于全文检索场景,内部使用的倒排索引,结合我这业务场景似乎不占优势。
打听了很多游戏公司也是通过mongodb去存储游戏数据,并且完全符合我的这个业务场景,因此最终在确定使用这个nosql产品之后,就开始猛干了。
4,mongodb服务器购买or自己搭建
首先优先考虑使用阿里云服务器购买mongodb,因为上面不管是为了以后分片,还是现在的副本备份,都比较方便,并且可以直接通过阿里云服务器的控制台进行一些实时监测。
但是打开了阿里云服务器看到价格之后,当时对看到的价格就有点惊讶,怎么可以这么贵mongodb服务器,由于这个mongodb存数据是本人提出来的,所以看到这个价格之后,心想老板会同意买这么贵的服务器吗,看了一个8核16g3个副本2T磁盘容量的mongodb服务器,一年要8万多。不同mysql的白菜价,因为mysql早已开源,因此阿里云服务器也相对比较便宜,其次是mysql竞争对手多,如postgresql等,也让mysql便宜,而mongodb不一样,人家同类型的产品可以说是没有对手,并且没有开源,其次人家确实是这么牛逼,所以人家有资本贵,再贵也有人用,所以根据多方面综合因素最后放弃了购买
最后选择在一个8核16g的云服务上面自己搭建,后面将这个服务器磁盘容量进行扩容200g,然后将mongodb的数据挂载到新的分区上面,后面容量不够的话在考虑
5,数据从mysq迁移到mongodb
在数据迁移之前,需要考虑的事前如下,首先得加新接口给游戏端去存数据和读数据,其次是需要将旧数据迁移到新的数据库里面,此时mysql中1000万多用户,5亿多条数据。并且游戏那边是不能停止的,就是他们可以立马接入我的新接口,但是游戏是不能停止的,因为随时随刻都有人玩,换句话说就是不会让我把服务停掉,给我时间去同步数据,数据需要在不停服务的情况下进行数据迁移
由于很多业务都用了这种用户表,因此在迁移数据时,只迁移用户的数据表数据,既只需要迁移user_data这种表的数据,user表的数据不迁移,并且流程还是先查用户表的数据,随后将数据全部查询出来。在mongodb中创建一张MongodbUserData的集合,并且加上一个parent_id和key的联合唯一索引
//先确定账户id select id from user where open_id = 'zhs' //携带id去mongodb中查询数据,下面的parent_id就是什么查询的id Query query = new Query(); query.addCriteria(Criteria.where("parentId").is(id + "")); //只返回部分字段 query.fields().include("parentId").include("key"); List<MongoUserData> mongouserDataList = mongoTemplate.find(query, MongoUserData.class);
MongoUserData 实体类的信息如下,id由本地的雪花算法生成,不使用mongodb自带的uuid
@Data public class MongoUserData { @Field("_id") private String id; //账号id @Field("parentId") private String parentId; @Field("key") private String key; @Field("value") private Object value; //更新时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @Field("updatedTime") private Date updatedTime; //创建时间 @Field("createdTime") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createdTime; }
5.1,数据上报接口
因此我这边在存数据的时候,直接将用户数据作为新数据存入到mongodb里面,因为插入的数据一定是最新的数据,那么我就当做这个key是第一次存入到mongodb中,即使这个key在mysql中没同步过来,我认为mysql的就是旧数据,如下面的这个key在数据上报之后,这个coin值如果在mysql应该去执行更新操作的,将值更新为120,和我直接插入到mongodb将值设置成120也是一样的,如果第二次调用这个新的上报数据接口还有这个key就去执行更新操作
{"coin":120,"exp":100}
接下来给一段代码参考,首先将该用户全部的key从mongodb全部查询出来,随后将所有的Key存放在 keyList中
//根据parentId找出全部分类数据 Query query = new Query(); query.addCriteria(Criteria.where("parentId").is(parentId + "")); //只返回部分字段 query.fields().include("parentId").include("key"); List<MongoUserData> mongouserDataList = mongoTemplate.find(query, MongoUserData.class); List<String> keyList = new ArrayList<>(); //只需要获取全部的key keyList = mongouserDataList.stream().map(MongoUserData::getKey).collect(Collectors.toList());
然后将游戏那边最终传来的userData数据解析成key/value的形式
//转成对象 public List<MongoUserData> getMapData(Map<String, Object> map) { List<MongoUserData> list = new ArrayList<>(); for (Map.Entry<String, Object> entry : map.entrySet()) { MongoUserData mongoUserData = new MongoUserData(); mongoUserData.setKey(entry.getKey()); //以json的格式存储数据 mongoUserData.setValue(entry.getValue() + ""); list.add(mongoUserData); } return list; }
随后根据上报的key和已有全部的key进行对比,keyList存在这个key更新文档,不存在则新增这条数据
String userData = ... //将传来的json数据解析 Map<String, Object> map = JSON.parseObject(userData, Map.class); List<MongoUserData> gameList = getMapData(map); //随后就是一个简单的判断 for (MongoUserData data : gameList) { if (CollectionUtils.isEmpty(keyList) || !currentCategoryList.contains(category.getArchivesCategory())){ //mongodb插入操作... }else{ //mongodb更新操作... } }
5.2,数据拉取
数据上报比较简单,就是全部当做新用户的数据去存,key有则更新无则插入,但是在用户拉取数据的时候,可能需要将mongodb的新数据和存在mysql中的老数据全部读取出来,也是为了让整个业务都平滑的度过,在用mongodb新的拉数据的接口,尝试的去同步一下最新的数据。
为了保证只需要同步一次数据,因此在mysql中加一张中间表,用来存同步过数据的用户的id,这张表也比较简单,只需要一个存用户id的parentId,并且在该字段加上唯一索引,一定要加,不仅仅是为了查询效率。
CREATE TABLE `user_data_sync_mongo` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `parent_id` bigint(20) DEFAULT NULL COMMENT '账户id', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_parentid` (`parent_id`) USING BTREE COMMENT '账户id' ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='数据同步到mongodb';
因此该用户在读数据时,需要先查这这个用户的账号id,然后查数据是否同步过,如果这张表中有数据表示同步过,没有的话则先将mysql的用户数据全部查出来
//先确定账户id select id from user where open_id = 'zhs' //随后判断是否同步过数据 select * from user_data_sync_mongo where parent_id = #{id} //查询出全部mysql用户存的用户的信息 select * from user_data where parent_id = #{id}
在上面的查询条件做一个判断,如果确定是用户信息没有被同步过,那么查询出来的数据可以全部存放在 list集合中,如通过这个findAllByParentId方法获取用户的全部在mysql中上报的数据
//service List<UserData> findAllByParentId(@Param("parentId")Integer parentId); //mapper <select id="findAllByParentId" parameterType="java.lang.Integer" resultType="UserData"> select * from user_data where parent_id = #{id} </select>
伪代码如下,先获取全部的mysql中的数据,先存放在userDataList中,如mysql中已经有50条数据
List<UserData> userDatalist = userDataMapper.findAllByParentId(10006);
随后查询该用户mongodb中的数据,代码和上面获取keyList的一样,数据上报是将keyList和上报的数据对比,查询的同步数据是keyList和mysql中查询出来的数据对比,就是mongodb中没有的key,但是在mysql中存在,那么就是需要将mysql的key/value同步到mongodb中,同步完成之后,在给中间表中插入一条数据作为标志,后面来拉取数据就不需要再次同步。并且将key/value 数据存入到map中
//map用于返回数据给前端 Map<String,Object> map = new HashMap<>(); //如果mysql中查询的数据不为空 if (CollectionUtil.isNotEmpty(userDatalist)){ //获取mysql中的旧数据,先存入map userDatalist.stream().forEach(p -> map.put(p.geKey(),p.getValue())); //新增更新标志,就是构建一个实体类, UserDataSyncMongo currentSyncMongo = buildUserDataSyncMongo(currentAccount.getId()); UserDataSyncMongoMapper.insert(currentSyncMongo); //构建需要插入的mongo实例,过滤掉mongodb中已经存在的key addMongoArchiveList = buildMongoArchiveList(userDatalist,keyList); if (CollectionUtil.isNotEmpty(addMongoArchiveList)){ //mongodb批量插入数据 mongoTemplate.insertAll(addMongoArchiveList); } }
上面代码就已经完成了mongodb数据的同步以及标志为同步,这样这个用户下一次进来的时候就不用进行第二次的数据同步。当然最重要的事情还没完成,就是将数据返回给用户,上面已经定义了一个map,存了mysql的数据,而keyList,存储的是mongodb存储的数据,那么就简单好办了,直接将mongodb的数据put到map就行,因为mongodb的数据肯定是比mysql的数据新的,所以直接put进去,如果相同的key,那么value会进行覆盖,从而保证拿到最新的数据
if (CollectionUtil.isNotEmpty(keyList)){ //获取mysql中的旧数据,先存入map keyList.stream().forEach(p -> map.put(p.geKey(),p.getValue())); } return map;
整个流程就是几点,先查用户账户信息,然后查询是否同步过,如果同步过就不会再去查mysql,直接查mongodb将全部数据返回,如果未同步数据,那么就需要先查mysql,将数据同步到mongodb,然后增加标志位,最后通过map替换将最新数据返回给用户
5.3,手动同步数据
如果开每次用户主动拉数据的时候去同步数据,那么对于用户体验来说是不友好,但是如果是提前去同步数据,如果游戏那边还没有替换成新接口,那么就算是数据同步完也不完整,因为同步完数据还是写在mysql中,一次手动同步数据的时机选在游戏那边使用新接口上线之后,保证mysql数据不再被更新
手动同步数据就是纯苦力活,思路如下:先查该游戏没有同步标志的用户数据,然后通过这些用户查询出对应的游戏数据,随后通过多线程+批量插入的方式实现插入到mongodb中,最后增加同步标志位
这里只提供思路和伪代码,代码如下,首先每次获取2000个未同步用户数据,即每次同步2000个数据,即每次同步2000*50=10万条用户数据
select u,id as userId,mongo.id as mongoId from user u left join user_data_sync_mongo mongo ON u.id = mongo.parent_id where mongo.id is null limit 2000
查出2000条用户数据之后,就是要查用户的游戏数据,需要查mysql的user_data表,因此在这2000条数据中,每20个用户做一次查询,通过in去将要同步的数据全部查询出来,最后通过parentId分组,将数据异步批量插入到mongodb中,并且将同步标志加上。
//上面的2000条数据 List<UserDataDto> list = getUserDataList(); List<UserData> categoryList = new ArrayList<>(); //切割的20条数据 List<Integer> parentIdList = new ArrayList<>(); int count = 0 for(UserDataDto userDataDto : list){ count++; parentIdList.add(userDataDto.getUserId()); if(count % 40 == 0){ //mysql每20次查询一次 //in查询,user_data 每次查询20*50条数据 categoryList = userDataMapper.selectBatchByParentId(parentIdList); if (CollectionUtil.isNotEmpty(categoryList)){ Map<String, List<MongoUserData>> mongoMap = categoryList.stream().collect(Collectors.groupingBy(MongoArchive::getParentId)); //异步批量插入 mongoMapBatchInsert(mongoMap); } parentIdList.clear(); //手动gc parentIdList = new ArrayList<>(); categoryList.clear(); categoryList = new ArrayList<>(); //mysql新增同步 lyUserArchivesCategorySyncMongoMapper.insertList(syncMongoList); syncMongoList.clear(); syncMongoList = new ArrayList<>(); } }
还有一个骚操作就是这个手动gc,因为list在外部定义,所以整个for循环中list对象都被引用不被垃圾回收,在很多源码中都有写,因为将list置为空,那么被list引用的1000个对象就不会被引用,根据java的可达性分析算法,那么里面的对象就会被回收,也不会导致内存泄漏等等。实际开发中我们几百万千万的list集合对象都是这么玩的。属于是jvm层面的一个优化
最后查看这个异步批量插入的mongoMapBatchInsert 方法,直接通过定义一个任务丢到线程池中即可
private static ThreadPoolExecutor threadPoolUtil = ThreadPoolUtil.getThreadPool(); public void mongoMapBatchInsert(Map<String, List<MongoUserData>> mongoMap){ if (CollectionUtil.isEmpty(mongoMap)) return; mongoMap.entrySet().stream().forEach(p -> { MongoSaveUserDataTask task = new MongoSaveUserDataTask(mongoTemplate,p.getValue()); threadPoolUtil.submit(task); }); }
ThreadPoolUtil 工具类是自定义的,其代码如下,由于是只需要进行大量的io操作,一次选择io密集型,将核心线程设置为空闲cpu的两倍 - 2,最大线程数设置为空闲cpu个数的2倍。在可控的情况下,即已知最大可能会出现多少个线程任务的情况下,可以将阻塞队列设置成有界的链表阻塞队列,其内部只有一个 reetrentLock 锁,其效率相对于其他阻塞队列效率更高
public class ThreadPoolUtil { public static ThreadPoolExecutor pool = null; public static synchronized ThreadPoolExecutor getThreadPool() { if (pool == null) { //获取当前机器的空闲cpu个数 int cpuNum = Runtime.getRuntime().availableProcessors(); //io密集型 int maximumPoolSize = cpuNum * 2 ; pool = new ThreadPoolExecutor( maximumPoolSize - 2, maximumPoolSize, 5L, //5s TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), //数组有界链表队列 Executors.defaultThreadFactory(), //默认的线程工厂 new ThreadPoolExecutor.AbortPolicy()); //直接抛异常,默认异常 } return pool; } }
最后就是这个线程任务类,参数通过构造方法注入,重写call方法。假设我本地6个cpu空闲,那么每次会有12个线程任务执行,每个线程插入50条数据,那么就是每次插入600条,比单线程插入快了12倍,比单线程每次插入一条数据快了600倍,所以选择异步多线程+批量插入数据效率是很高的
@Data public class MongoSaveUserDataTask implements Callable { private MongoTemplate mongoTemplate; private List<MongoUserData> mongoArchiveList; public MongoSaveUserDataTask(MongoTemplate mongoTemplate,List<MongoUserData> mongoDataList){ this.mongoTemplate = mongoTemplate; this.mongoArchiveList = mongoArchiveList; } @Override public Object call() throws Exception { //批量插入 mongoTemplate.insertAll(mongoDataList); //插入完毕睡眠20ms Thread.sleep(20); return null; } }
10万条数据同步其实也很快,分分钟就可以将数据同步完成。同步了一亿条数据也没花多少时间,而且在同步过程中,mongodb所在的服务器的cpu和内存基本在20%-30%之间
上面还有一个重要的一点,就是在用户数据拉取数据时,手动同步数据也在同时进行,就是也没有可能会出现同时同步两条数据到mongodb和标志位的情况,其实这点时不存在的,因为在mongodb中加了唯一索引,如果上报发现已经存在就直接报错,异步线程报错不影响主线程,在mysql的同步标志位表中也加了一个唯一索引,因此也不会出现这种重复插入两条数据的情况,其次这个唯一索引时得加不可的
总而言之就是同步数据只在用户读取数据和手动同步数据两个方法中会触发,并且每个用户只触发一次。后面单纯就是库里活了,重复执行手动同步,甚至可以调整每次查询用户的个数,如每次查询4000个,每50个用户查询一次游戏数据,然后再异步同步