黑马头条日记 | 分布式任务调度平台XXL-JOB —— XXL之力一举完成热点文章定时计算

张开发
2026/4/4 23:06:08 15 分钟阅读
黑马头条日记 | 分布式任务调度平台XXL-JOB —— XXL之力一举完成热点文章定时计算
一、引文APP端用户在不同频道来回切换时该如何向用户推送内容呢直觉来说就是从数据库按照文章发布时间倒序查询但这样操作会有两个明显的问题第一个是用户频繁切换频道对于数据库查询压力过大一旦用户访问量上去数据库可能就扛不住如此大量的请求。第二个是查询返回的结果是按照文章发布时间倒序排序只能说明新的文章会显示在前面它的质量是不确定的我们想要看到的是把一些热点文章放在前面这也是用户乐意看到的。综合上述考虑最终拟定的解决方案是把热点数据点赞、评论、阅读、收藏四要素决定存入Redis进行展示。本章我们主要讨论定时任务计算热点文章。二、SpringTask提到定时任务相信大家都会第一时间想到SpringTask框架曾几何时我们在苍穹外卖中就曾用到该技术来处理一直派送中的异常订单状态以及用户十五分钟内未支付修改状态已取消。但是它只适用于内网小服务开发对于黑马头条这种大体量的微服务架构就难免有些捉襟见肘了。集群环境任务的重复执行问题、cron表达式定义在代码之中修改不方便、定时任务失败了无法重试也没有统计、如果任务量过大不能有效的分片执行。以下是SpringTask与XXL-JOB的详细对比对比项Xxl-JobSpring Task (Scheduled)任务管理界面有Web UI可手动触发/暂停/停止无需写代码或调用接口任务分片支持分片广播一键横向扩展不支持失败重试可配置重试次数和间隔不支持需自行实现任务依赖不支持DAG依赖需自研或升级到 v2.3不支持HA/多实例注册中心统一调度抢锁执行多实例会重复执行需SchedulerLock等执行日志平台统一查看各实例日志分散告警通知邮件/钉钉等插件需自行实现配置变更运行时改Cron无需重启需改代码或刷新上下文你的使用场景ComputeHotArticleJob— 适合因为需管理、可重跑ScheduleApplication— 通用延迟任务三、XXL-JOB介绍XXL-JOB之前我们需要了解何为分布式任务调度。首先关于分布式架构简单来说就是把我们的单体应用拆分为若干服务服务之间通过网络交互完成业务处理也就是我们说的微服务架构。在这种分布式架构环境下一个服务往往会部署多个实例来执行相关业务那么此时我们执行任务调度就是所谓的分布式任务调度。这么做是有一定好处的将任务调度程序分布式构建这样就可以具有分布式系统的特点并且提高任务的调度处理能力1、并行任务调度并行任务调度实现靠多线程如果有大量任务需要调度此时光靠多线程就会有瓶颈了因为一台计算机CPU的处理能力是有限的。但是如果将任务调度程序分布式部署每个结点还可以部署为集群这样就可以让多台计算机共同去完成任务调度我们可以将任务分割为若干个分片由不同的实例并行执行来提高任务调度的处理效率。2、高可用若某一个实例宕机不影响其他实例来执行任务。3、弹性扩容当集群中增加实例就可以提高并执行任务的处理效率。4、任务管理与监测对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况从而做出快速的应急处理响应。但该方案实施中存在一个典型的并发问题如果定时任务重复执行了咋办当任务调度以集群方式部署同一个任务调度可能会执行多次例如电商系统定期发放优惠券就可能重复发放优惠券对公司造成损失信用卡还款提醒就会重复执行多次给用户造成烦恼所以我们需要控制相同的任务在多个运行实例上只执行一次。常见解决方案1.分布式锁多个实例在任务执行前首先需要获取锁如果获取失败那么就证明有其他服务已经在运行如果获取成功那么证明没有服务在运行定时任务那么就可以执行。2.ZooKeeper选举利用ZooKeeper对Leader实例执行定时任务执行定时任务的时候判断自己是否是Leader如果不是则不执行如果是则执行业务逻辑这样也能达到目的。1.简介针对分布式任务调度的需求市场上出现了很多的产品TBSchedule淘宝推出的一款非常优秀的高性能分布式调度框架目前被应用于阿里、京东、支付宝、国美等很多互联网企业的流程调度系统中。但是已经多年未更新文档缺失严重缺少维护。XXL-Job大众点评的分布式任务调度平台是一个轻量级分布式任务调度平台, 其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线开箱即用。Elastic-job当当网借鉴TBSchedule并基于quartz 二次开发的弹性分布式任务调度系统功能丰富强大采用zookeeper实现分布式协调具有任务高可用以及分片功能。Saturn 唯品会开源的一个分布式任务调度平台基于Elastic-job可以全域统一配置统一监控具有任务高可用以及分片功能。2.执行器触发任务调度时将会自动发现注册成功的执行器通常每个任务都会绑定一个执行器因此可以根据执行器对任务进行分组。属性介绍如下AppName: 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;注册方式调度中心获取执行器地址的方式自动注册执行器自动进行执行器注册调度中心通过底层注册表可以动态发现执行器机器地址手动录入人工手动录入执行器的地址信息多地址逗号分隔供调度中心使用机器地址注册方式为手动录入时有效支持人工维护执行器的地址信息注册地址格式可参考“http://127.0.0.1:9999/”为执行器内嵌服务地址3.调度配置无该类型不会主动触发调度CRON该类型将会通过CRON触发任务调度固定速度该类型将会以固定速度触发任务调度按照固定的间隔时间周期性触发4.任务配置运行模式BEAN模式任务以JobHandler方式维护在执行器端需要结合 JobHandler 属性匹配执行器中任务JobHandler运行模式为 BEAN模式 时生效对应执行器中新开发的JobHandler类XxlJob(demoJobHandler)注解自定义的value值执行参数任务执行所需的参数5.阻塞策略阻塞处理策略调度过于密集执行器来不及处理时的处理策略单机串行默认调度请求进入单机执行器后调度请求进入FIFO(First Input First Output)队列并以串行方式运行丢弃后续调度调度请求进入单机执行器后发现执行器存在运行的调度任务本次请求将会被丢弃并标记为失败覆盖之前调度调度请求进入单机执行器后发现执行器存在运行的调度任务将会终止运行中的调度任务并清空队列然后运行本地调度任务6.路由策略当执行器集群部署时提供丰富的路由策略包括FIRST第一个固定选择第一个机器LAST最后一个固定选择最后一个机器ROUND轮询RANDOM随机随机选择在线的机器CONSISTENT_HASH一致性HASH每个任务按照Hash算法固定选择某一台机器且所有任务均匀散列在不同机器上。LEAST_FREQUENTLY_USED最不经常使用使用频率最低的机器优先被选举LEAST_RECENTLY_USED最近最久未使用最久未使用的机器优先被选举FAILOVER故障转移按照顺序依次进行心跳检测第一个心跳检测成功的机器选定为目标执行器并发起调度BUSYOVER忙碌转移按照顺序依次进行空闲检测第一个空闲检测成功的机器选定为目标执行器并发起调度SHARDING_BROADCAST(分片广播)广播触发对应集群中所有机器执行一次任务同时系统自动传递分片参数可根据分片参数开发分片任务**四、热点文章计算1.前置XXL-JOB配置Nacos配置如下xxl: job: admin: addresses: http://192.168.200.130:8888/xxl-job-admin // 控制台地址 executor: appname: leadnews-hot-article-executor // 执行器唯一标识 port: 9999XXL-JOB配置类主要配置任务执行器Bean交给容器管理。Configuration public class XxlJobConfig { private Logger logger LoggerFactory.getLogger(XxlJobConfig.class); Value(${xxl.job.admin.addresses}) private String adminAddresses; Value(${xxl.job.executor.appname}) private String appname; Value(${xxl.job.executor.port}) private int port; Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info( xxl-job config init.); XxlJobSpringExecutor xxlJobSpringExecutor new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setPort(port); return xxlJobSpringExecutor; } }任务类关键是XxlJob注解value值就是控制台配置的JobHandler定时执行hotArticleService里面的业务逻辑即可。Component Slf4j public class ComputeHotArticleJob { Autowired private HotArticleService hotArticleService; XxlJob(computeHotArticleJob) public void handle(){ log.info(热文章分值计算调度任务开始执行...); hotArticleService.computeHotArticle(); log.info(热文章分值计算调度任务结束...); } }在xxl-job-admin中新建执行器和任务2.业务逻辑XxlJob 管理界面/CRON表达式│▼ComputeHotArticleJob.handle()XxlJob(computeHotArticleJob)│▼HotArticleServiceImpl.computeHotArticle()├─ ① apArticleMapper.findArticleListByLast5days(5天前的日期)│ SQL: 查询 createdTime (now - 5天) 的文章│├─ ② computeHotArticle(ListApArticle)│ ├─ 遍历每篇文章│ └─ computeScore(apArticle) ← 核心评分公式│ score views (权重1)│ likes * HOT_ARTICLE_LIKE_WEIGHT (权重3)│ comment * HOT_ARTICLE_COMMENT_WEIGHT (权重5)│ collection* HOT_ARTICLE_COLLECTION_WEIGHT(权重8)│└─ ③ cacheTagToRedis(ListHotArticleVo)├─ wemediaClient.getChannels() 获取所有频道├─ 按频道过滤 → sortAndCache(channelId, top30)└─ sortAndCache(DEFAULT_TAG, top30) 全局推荐└─ cacheService.set(key, json) 写入RedisService Slf4j Transactional public class HotArticleServiceImpl implements HotArticleService { Autowired private ApArticleMapper apArticleMapper; /** * 计算热点文章 */ Override public void computeHotArticle() { //1.查询前5天的文章数据 Date dateParam DateTime.now().minusDays(5).toDate(); ListApArticle apArticleList apArticleMapper.findArticleListByLast5days(dateParam); //2.计算文章的分值 ListHotArticleVo hotArticleVoList computeHotArticle(apArticleList); //3.为每个频道缓存30条分值较高的文章 cacheTagToRedis(hotArticleVoList); } Autowired private IWemediaClient wemediaClient; Autowired private CacheService cacheService; /** * 为每个频道缓存30条分值较高的文章 * param hotArticleVoList */ private void cacheTagToRedis(ListHotArticleVo hotArticleVoList) { //每个频道缓存30条分值较高的文章 ResponseResult responseResult wemediaClient.getChannels(); if(responseResult.getCode().equals(200)){ String channelJson JSON.toJSONString(responseResult.getData()); ListWmChannel wmChannels JSON.parseArray(channelJson, WmChannel.class); //检索出每个频道的文章 if(wmChannels ! null wmChannels.size() 0){ for (WmChannel wmChannel : wmChannels) { ListHotArticleVo hotArticleVos hotArticleVoList.stream().filter(x - x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList()); //给文章进行排序取30条分值较高的文章存入redis key频道id value30条分值较高的文章 sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE wmChannel.getId()); } } } //设置推荐数据 //给文章进行排序取30条分值较高的文章存入redis key频道id value30条分值较高的文章 sortAndCache(hotArticleVoList, ArticleConstants.HOT_ARTICLE_FIRST_PAGEArticleConstants.DEFAULT_TAG); } /** * 排序并且缓存数据 * param hotArticleVos * param key */ private void sortAndCache(ListHotArticleVo hotArticleVos, String key) { hotArticleVos hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); if (hotArticleVos.size() 30) { hotArticleVos hotArticleVos.subList(0, 30); } cacheService.set(key, JSON.toJSONString(hotArticleVos)); } /** * 计算文章分值 * param apArticleList * return */ private ListHotArticleVo computeHotArticle(ListApArticle apArticleList) { ListHotArticleVo hotArticleVoList new ArrayList(); if(apArticleList ! null apArticleList.size() 0){ for (ApArticle apArticle : apArticleList) { HotArticleVo hot new HotArticleVo(); BeanUtils.copyProperties(apArticle,hot); Integer score computeScore(apArticle); hot.setScore(score); hotArticleVoList.add(hot); } } return hotArticleVoList; } /** * 计算文章的具体分值 * param apArticle * return */ private Integer computeScore(ApArticle apArticle) { Integer scere 0; if(apArticle.getLikes() ! null){ scere apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; } if(apArticle.getViews() ! null){ scere apArticle.getViews(); } if(apArticle.getComment() ! null){ scere apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; } if(apArticle.getCollection() ! null){ scere apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; } return scere; } }

更多文章