|开源微服务编排框架:Netflix Conductor( 二 )
SCHEDULED:待调度 , task放到队列中还没有被poll出来执行时的状态 IN_PROGRESS:执行中 , 被poll出来执行但还没有完成时的状态 COMPLETED:执行完成 FAILED:执行失败 CANCELLED:被中止时为此状态 , 一般出现在两种情况:手动中止流程时 , 正在运行中的task会被置为此状态;多个fork分支 , 当某个分支的task失败时 , 其它分支中正在运行的task会被置为此状态;
2 任务队列
任务的执行(同步的系统任务除外)都会先添加到任务队列中 , 是典型的生产者消费者模式 。
任务队列 , 是一个带有延迟、优先级功能的队列; 每种类型的Task是一个单独的队列 , 此外 , 如果配置了domain、isolationGroup , 还会拆分成多个队列实现执行隔离; decider service是生产者 , 其根据流程配置与当前执行情况 , 解析出可执行的task后 , 添加到队列; 任务执行器(SystemTaskWorker、Worker)是消费者 , 其长轮询对应的队列 , 从队列中获取任务执行; 队列接口可插拔 , conductor提供了Dynomite 、MySQL、PostgreSQL的实现 。
3 核心功能实现机制
conductor调度的核心是decider service , 其根据当前流程运行的状态 , 解析出将要执行的任务列表 , 将任务入队交给worker执行 。
decide主要流程简化如下 , 详细代码见WorkflowExecutor.java的decide方法:
其中 , 调度任务处理流程简化如下 , 详细代码见WorkflowExecutor.java的scheduleTask方法:
decide的触发时机
最主要的触发时机:
【|开源微服务编排框架:Netflix Conductor】
新启动执行时 , 会触发decide操作 系统任务执行完成时 , 会触发decide操作 Workder任务通过ExecutionService更新任务状态时 , 会触发decide操作 流程控制节点的实现机制 1)TaskTaskMapper 对于每一个Task来说 , 都有Task和TaskMapper两部分: Task:任务的执行逻辑代码 , 它的作用是Task的执行 TaskMapper:任务的映射逻辑代码 , 它通过Task的定义配置、当前实例的执行状态等信息 , 返回实际需要执行的Task列表 对于一般的任务来说 , TaskMapper返回的是就是Task本身 , 补充一些执行实例的状态信息 。 但是对于控制节点来说 , 会有不同的逻辑 。2)条件分支(SWITCH)的实现机制 SWITCH用于根据条件判断 , 执行不同的分支 。实际上 , 该节点的Task不做任何操作 , TaskMapper根据分支条件 , 判断出要走的分之后 , 返回对应分支的第一个Task 。SwitchTaskMapper.java getMappedTasks方法关键代码: // 待调度的Task list , 最终返回结果ListTasktasksToBeScheduled = new LinkedList();// evalResult是分支条件变量的值(case)// decisionCases是一个Map结构 , key为分支的case值 , value为对应分支的任务定义list(分支内的任务定义会有多个)// 根据分支变量的实际值 , 获取对应分支的任务定义listListWorkflowTaskselectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的逻辑:如果获取不到对应的分支或者分支为空 , 则用默认的分支if (selectedTasks == null || selectedTasks.isEmpty()) { selectedTasks = taskToSchedule.getDefaultCase();if (selectedTasks != null!selectedTasks.isEmpty()) { // 获取分支的第一个(下标0)task , 返回给decider service去做调度(decider会把任务添加到队列里 , 交给worker去执行) WorkflowTask selectedTask = selectedTasks.get(0); // 调用了deciderService的getTasksToBeScheduled方法 , 此方法里又获取到TaskMapper调用了getMappedTasks 。 这里采用了递归调用的方式 , 解析嵌套的Task ListTaskcaseTasks = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance selectedTask retryCount taskMapperContext.getRetryTaskId()); tasksToBeScheduled.addAll(caseTasks); switchTask.getInputData().put(\"hasChildren\" \"true\");return tasksToBeScheduled; 3)并行(FORK)的实现机制 FORK用于开启多个并行分支 。实际上 , 该节点的Task不做任何操作 , TaskMapper返回所有并行分支的第一个Task 。 ForkJoinTaskMapper.java getMappedTasks关键代码: // 待调度的Task list , 最终返回结果ListTasktasksToBeScheduled = new LinkedList();// 配置中的所有fork分支ListListWorkflowTaskforkTasks = taskToSchedule.getForkTasks();for (ListWorkflowTaskwfts : forkTasks) { // 每个分支取第一个Task WorkflowTask wft = wfts.get(0); // 调用了deciderService的getTasksToBeScheduled方法 , 此方法里又获取到TaskMapper调用了getMappedTasks 。 这里采用了递归调用的方式 , 解析嵌套的Task ListTasktasks2 = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance wft retryCount); tasksToBeScheduled.addAll(tasks2);return tasksToBeScheduled; 总的来说 , 分支(SWITCH)、并行(FORK)节点本身没有执行逻辑 , 其通过TaskMapper返回到实际要执行的Task , 然后交给Decider Service处理 。重试的实现机制 重试和其延迟时间设置 , 都是借助任务队列的功能实现的 。重试:将任务重新添加到任务队列 重试的延迟时间:添加到任务队列时设置延迟时间 , 延迟时间过后 , 任务才能在队列中被poll出来执行 五 完整性保障机制 由于调度过程中可能会出现因机器重启、网络异常、JVM崩溃等偶发情况 , 这些会导致的decide过程意外终止 , 流程执行不完整 , 展现出如流程一直运行中(实际已经没有在调度) , 或者其它状态错误等异常现象 。1 WorkflowReconciler 针对这种情况 , conductor有一个WorkflowReconciler , 会定期尝试decide所有正在运行中的流程 , 修复流程执行的一致性 。 此外 , 它还有一个作用是校验流程超时时间 。2 decideQueue 那么WorkflowReconciler是如何获取到当前运行中的流程呢 , 答案是decideQueue 。 decideQueue和任务队列相同 , 也是一个具有延迟功能的队列 , 其存放的是正在执行中的流程的实例id 。 在任务开始执行时(包括新启动执行、重试执行、恢复执行、重跑执行等) , 会将实例id push到decideQueue中;在执行结束(成功、失败)时 , 会从decideQueue中删除实例id 。3 ExecutionLockService WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、维护流程一致性 。 但是流程本身正常执行也会触发decide , 如果同一个执行同时触发两个decide , 可能会导致状态混乱 , 执行卡住等问题 。conductor采用了锁来解决这个问题 , 其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现 。decide方法中最开始会尝试获取锁 , 如果获取失败则直接返回 。 通过锁来保障不会对同一个流程实例并发执行decide 。if (!executionLockService.acquireLock(workflowId)) { return false; 由于锁是可配置的 , 可能会导致一个误区:单台机器的话不用配置锁 。 其实单机也是需要配置锁的 , 因为WorkflowReconciler和流程正常执行会产生冲突 , 可能会导致偶发的流程状态混乱问题 。参考:Github: https://github.com/Netflix/conductor官方文档:https://netflix.github.io/conductor/WorkflowReconciler:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.javaWorkflowSystemTask:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfTfile=WorkflowSystemTask.java 作者 | 夜阳 原文链接:https://developer.aliyun.com/article/818136?utm_content=g_1000311143 本文为阿里云原创内容 , 未经允许不得转载 。
- 微信|个人收款码与商业收款码有什么不一样
- m都是大片!微软 Skype 支持将必应 Bing 图片设为通话虚拟背景
- 芯片|上市仅4个月,跌价1000元,微云台主摄+6nm芯片+4400mAh
- 京东正式上线“年礼无忧”服务
- 显示器|微信新功能开始!长语音可以暂停
- 短信|关于5G消息,中国移动取得新进展,微信该做准备了
- 打脸!华为在美国,用专利把英特尔、苹果、微软、高通打败了
- 自驾游|儿子将母亲忘在服务区 开出40公里仍不知 网友:心大
- 微信上线“语音暂停”功能
- 微信聊天最令人头疼的场景是什么?一定有人会说是对方发来一连串语音还都是超过30秒的长消息...|终于!微信上线万众期待的新功能!网友:总算等到了