巧用 Redis,实现微博 Feed 流功能!( 二 )


  • 「发布Feed:」不用区分是否大V , 所有用户的流程都一样,都是三步 。
  • 「读取Feed流:」不需要第一步,也不需要第三步,只需要第二步即可 , 将之前的2 + N(N是关注的大V个数) 次网络开销减少为 1 次网络开销 。读取延时大幅降级 。
6.两种模式总结推拉结合存在一个弊端,就是刷新自己的Feed流时,大V的个人页Timeline 的读压力会很大 。
如何解决:
  • 不使用大V/普通用户的优化方式,使用对活跃粉丝采用推模式,非活跃粉丝采用拉模式 。
  • 完全使用推模式就可以彻底解决这个问题,但是会带来存储量增大,大V Feed 发送总时间增大,从发给第一个粉丝到发给最后一个粉丝可能要几分钟时间 。
四、实现笔主主要采用纯推模式实现了一个普通企业基本可用的 Feed 流系统,下面介绍一下具体的实现代码 , 主要包括3大个部分:
  • 初始化 Feed 流 。
  • 关注的用户发布/删除 feed,该用户的粉丝更新自己的Feed流 。
  • 用户新增/取消关注,更新自己的Feed流 。
1.初始化 Feed 流当用户第一进来刷新Feed 流,且 Feed 流还不存在时 , 我们需要进行初始化,初始化的具体代码如下:核心思想就是从数据库中load出 feed 信息,塞到 zSet 中 , 然后分页返回 。
/** * 获取关注的人的信息流 */public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {String focusFeedKey = "focusFeedKey" + userId;// 如果 zset 为空,先初始化if (!zSetRedisTemplate.exists(focusFeedKey)) {initFocusIdeaSet(userId);}// 如果 zset 存在,但是存在 0 值Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");if (zscore != null && zscore > 0) {return null;}//分页int offset = (page - 1) * size;long score = System.currentTimeMillis();// 按 score 值从大到小从 zSet 中取出 FeedId 集合List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);List<FeedDto> result = new ArrayList<>();if (QlchatUtil.isNotEmpty(list)) {for (String s : list) {// 根据 feedId 从缓存中 load 出 feedFeedDto feedDto = this.loadCache(Long.valueOf(s));if (feedDto != null) {result.add(feedDto);}}}return result;}/** * 初始化关注的人的信息流 zSet */private void initFocusFeedSet( Long userId) {String focusFeedKey = "focusFeedKey" + userId;zSetRedisTemplate.del(focusIdeaKey);// 从数据库中加载当前用户关注的人发布过的 FeedList<Feed> list = this.feedMApper.listFocusFeed(userId);if (QlchatUtil.isEmpty(list)) {//保存0,避免空数据频繁查库zSetRedisTemplate.zadd(focusFeedKey, 1, "0");zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);return;}// 遍历 FeedList,把 FeedId 存到 zSet 中for (Feed feed : list) {zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());}zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);}2.关注的用户发布/删除新的 feed每当用户发布/删除新的 feed,我们需要更新该用户所有的粉丝的 Feed流,该步骤一般比较耗时,所以建议异步处理,为了避免一次性load出太多的粉丝数据,这里采用循环分页查询 。为了避免粉丝的 Feed流过大,我们会限制 Feed 流的长度为1000 , 当Feed流长度超过1000时,会移除最旧的 Feed 。
/** * 新增/删除 feed时,处理粉丝 feed 流 * * @param userId 新增/删除 feed的用户id * @param feedId 新增/删除 的feedId * @param typefeed_add = 新增feed feed_sub = 删除feed */public void handleFeed(Long userId, Long feedId, String type) {Integer currentPage = 1;Integer size = 1000;List<FansDto> fansDtos;while (true) {Page page = new Page();page.setSize(size);page.setPage(currentPage);fansDtos = this.fansService.listFans(userId, page);for (FansDto fansDto : fansDtos) {String focusFeedKey = "focusFeedKey" + userId;// 如果粉丝 zSet 不存在 , 退出if (!this.zSetRedisTemplate.exists(focusFeedKey)) {continue;}// 新增Feedif ("feed_add".equals(type)) {this.removeOldestZset(focusFeedKey);zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);}// 删除Feedelse if ("feed_sub".equals(type)) {zSetRedisTemplate.zrem(focusFeedKey, feedId);}}if (fansDtos.size() < size) {break;}currentPage++;}}/** * 删除 zSet 中最旧的数据 */private void removeOldestZset(String focusFeedKey){// 如果当前 zSet 大于1000,删除最旧的数据if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {// 获取当前 zSet 中 score 值最小的List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);if (QlchatUtil.isNotEmpty(zrevrange)) {this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));}}}


推荐阅读