diff --git a/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java b/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java index 0057c499b754afe4a1aa0e1d2afc41322ac7969f..af91d191814c6636187432f729cd6b8d0e817b96 100644 --- a/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java +++ b/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java @@ -22,6 +22,7 @@ import dev.jianmu.task.aggregate.TaskInstance; import dev.jianmu.task.event.TaskInstanceCreatedEvent; import dev.jianmu.task.repository.InstanceParameterRepository; import dev.jianmu.task.repository.TaskInstanceRepository; +import dev.jianmu.trigger.event.TriggerEvent; import dev.jianmu.trigger.repository.TriggerEventRepository; import dev.jianmu.worker.aggregate.Worker; import dev.jianmu.worker.repository.WorkerRepository; @@ -161,6 +162,8 @@ public class WorkerInternalApplication { .ifPresent(workflowInstance -> { // 分发worker List workerTags = this.getWorkerTag(workflowInstance); + logger.info("triggerId:{} instanceId:{} tags: {}", workflowInstance.getTriggerId(), + workflowInstance.getId(), workerTags); var workers = workerTags.isEmpty() ? this.workerRepository.findByTypeInAndCreatedTimeLessThan( List.of(Worker.Type.DOCKER, Worker.Type.KUBERNETES), @@ -185,27 +188,32 @@ public class WorkerInternalApplication { } private List getWorkerTag(WorkflowInstance workflowInstance) { - // 因为trigger参数与workflow的全局参数并不一致,该方法无法在workflow实体中进行处理 - var context = new ElContext(); var workflow = this.workflowRepository.findByRefAndVersion(workflowInstance.getWorkflowRef(), workflowInstance.getWorkflowVersion()) .orElseThrow(() -> new RuntimeException(String.format("无法找到对应的流程定义: %s, %s", workflowInstance.getWorkflowRef(), workflowInstance.getWorkflowVersion()))); - List tags = workflow.getTags(); + + + // 查询参数源 + var eventParameters = this.triggerEventRepository.findById(workflowInstance.getTriggerId()) + .map(TriggerEvent::getParameters) + .orElseGet(List::of); + // 创建表达式上下文 + var context = new ElContext(); + // 全局参数加入上下文 workflow.getGlobalParameters() .forEach(globalParameter -> context.add( "global", globalParameter.getName(), - Parameter - .Type.getTypeByName(globalParameter.getType()) - .newParameter(globalParameter.getValue()))); - - this.triggerEventRepository.findById(workflowInstance.getTriggerId()) - .ifPresent(triggerEvent -> triggerEvent.getParameters() - .forEach(triggerEventParameter -> context.add( - "trigger", - triggerEventParameter.getName(), - Parameter - .Type.getTypeByName(triggerEventParameter.getType()) - .newParameter(triggerEventParameter.getValue())))); + Parameter.Type.getTypeByName(globalParameter.getType()).newParameter(globalParameter.getValue())) + ); + // 事件参数加入上下文 + var eventParams = eventParameters.stream() + .map(eventParameter -> Map.entry(eventParameter.getName(), eventParameter.getParameterId())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var eventParamValues = this.parameterRepository.findByIds(new HashSet<>(eventParams.values())); + var eventMap = this.parameterDomainService.matchParameters(eventParams, eventParamValues); + // 事件参数scope为event + eventMap.forEach((key, val) -> context.add("trigger", key, val)); + return workflow.getTags().stream().filter(StringUtils::hasText).map(tag -> { Expression el = this.expressionLanguage.parseExpression("`" + tag + "`"); EvaluationResult result = this.expressionLanguage.evaluateExpression(el, context);