diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 6b539d01b1628db882ed979d4cb67cec2cba05d4..5ba687d74bceb93de580f992cee20fa20e3cb28f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -84,7 +84,7 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "processDefinitionJson", required = true) String json, @RequestParam(value = "locations", required = true) String locations, @RequestParam(value = "connects", required = true) String connects, - @RequestParam(value = "description", required = false) String description) throws JsonProcessingException { + @RequestParam(value = "description", required = false) String description) throws Exception { logger.info("login user {}, create process definition, project name: {}, process definition name: {}, " + "process_definition_json: {}, desc: {} locations:{}, connects:{}", @@ -111,7 +111,7 @@ public class ProcessDefinitionController extends BaseController { @ApiException(COPY_PROCESS_DEFINITION_ERROR) public Result copyProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, - @RequestParam(value = "processId", required = true) int processId) throws JsonProcessingException { + @RequestParam(value = "processId", required = true) int processId) throws Exception { logger.info("copy process definition, login user:{}, project name:{}, process definition id:{}", loginUser.getUserName(), projectName, processId); Map result = processDefinitionService.copyProcessDefinition(loginUser, projectName, processId); @@ -175,7 +175,7 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "processDefinitionJson", required = true) String processDefinitionJson, @RequestParam(value = "locations", required = false) String locations, @RequestParam(value = "connects", required = false) String connects, - @RequestParam(value = "description", required = false) String description) { + @RequestParam(value = "description", required = false) String description) throws Exception { logger.info("login user {}, update process define, project name: {}, process define name: {}, " + "process_definition_json: {}, desc: {}, locations:{}, connects:{}", @@ -322,6 +322,23 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } + /** + * encapsulation treeview structure (dependence) + * + * @param id process definition id + * @return tree view json data + */ + @ApiOperation(value = "viewTreeByDepend", notes = "VIEW_TREE_DEPEND_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/view-tree-depend") + @ResponseStatus(HttpStatus.OK) + public Result viewTreeByDepend(@RequestParam("processId") Integer id) { + Map result = processDefinitionService.viewTreeByDepend(id); + return returnDataList(result); + } + /** * get tasks list by process definition id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index 10dc8e4ce5d4a9f84aa606031671a8383428a99b..717398b3bfa702e86d659066a1b70f338880531d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -169,7 +169,7 @@ public class ProcessInstanceController extends BaseController { @RequestParam(value = "locations", required = false) String locations, @RequestParam(value = "connects", required = false) String connects, @RequestParam(value = "flag", required = false) Flag flag - ) throws ParseException { + ) throws Exception { logger.info("updateProcessInstance process instance, login user:{}, project name:{}, process instance json:{}," + "process instance id:{}, schedule time:{}, sync define:{}, flag:{}, locations:{}, connects:{}", loginUser.getUserName(), projectName, processInstanceJson, processInstanceId, scheduleTime, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java index cc9e0f657f804c5d57988466cbbf72c53cfadfba..5609eb06bf5a38c9cc524a7b9ec0b0149c64bd9e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java @@ -243,7 +243,7 @@ public class ProjectController extends BaseController { @ApiException(IMPORT_PROCESS_DEFINE_ERROR) public Result importProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("file") MultipartFile file, - @RequestParam("projectName") String projectName) { + @RequestParam("projectName") String projectName) throws Exception { logger.info("import process definition by id, login user:{}, project: {}", loginUser.getUserName(), projectName); Map result = processDefinitionService.importProcessDefinition(loginUser, file, projectName); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index e23ed0002a4dc770714d6d341918576fb35a9765..310ef3ad391ed55f515ffe20109dd14868da213e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -39,8 +39,12 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; +import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.utils.DagHelper; @@ -59,11 +63,13 @@ import javax.servlet.http.HttpServletResponse; import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; +import static org.apache.dolphinscheduler.common.Constants.COMMA; /** * process definition service @@ -101,6 +107,9 @@ public class ProcessDefinitionService extends BaseDAGService { @Autowired private ProcessService processService; + @Autowired + private DataSourceMapper dataSourceMapper; + /** * create process definition * @@ -112,7 +121,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param locations locations for nodes * @param connects connects for nodes * @return create result code - * @throws JsonProcessingException JsonProcessingException + * @throws SQLException SQLException */ public Map createProcessDefinition(User loginUser, String projectName, @@ -120,7 +129,7 @@ public class ProcessDefinitionService extends BaseDAGService { String processDefinitionJson, String desc, String locations, - String connects) throws JsonProcessingException { + String connects) throws SQLException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -144,7 +153,7 @@ public class ProcessDefinitionService extends BaseDAGService { processDefine.setReleaseState(ReleaseState.OFFLINE); processDefine.setProjectId(project.getId()); processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); + processDefine.setProcessDefinitionJson(refreshTaskNodeDependParams(processDefinitionJson)); processDefine.setDescription(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); @@ -172,6 +181,84 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } + public String refreshTaskNodeDependParams(String processDefinitionJson) throws SQLException { + if (StringUtils.isEmpty(processDefinitionJson)) { + return processDefinitionJson; + } + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + if (CollectionUtils.isEmpty(processData.getTasks())) { + return processDefinitionJson; + } + + String taskNodeName = ""; + try { + for (TaskNode taskNode : processData.getTasks()) { + taskNodeName = taskNode.getName(); + if (taskNode.getType().equals(TaskType.SQL.toString())) { + logger.info("task node [{}] set depend params", taskNode.getName()); + SqlParameters sqlParameters = (SqlParameters) TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + initSqlNodeDependParams(sqlParameters); + taskNode.setParams(JSONUtils.toJsonString(sqlParameters)); + } else if (taskNode.getType().equals(TaskType.DATAX.toString())) { + logger.info("task node [{}] set depend params", taskNode.getName()); + DataxParameters etlParameters = (DataxParameters) TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + initEtlNodeDependParams(etlParameters); + taskNode.setParams(JSONUtils.toJsonString(etlParameters)); + } + } + } catch (SQLException e) { + throw new SQLException(String.format("node [%s] refresh depend parameter fail : %s", taskNodeName, e.getMessage())); + } + + return JSONUtils.toJsonString(processData); + } + + private void initSqlNodeDependParams(SqlParameters parameters) throws SQLException { + DataSource dataSource = dataSourceMapper.selectById(parameters.getDatasource()); + if (dataSource == null) { + return; + } + + BaseDataSource dataSourceForm = DataSourceFactory.getDatasource(dataSource.getType(), dataSource.getConnectionParams()); + String[] hostsPorts = JdbcUtils.getHostsAndPort(dataSourceForm.getAddress()); + + List selectTableList = SqlUtils.resolveSqlSelectTables(DbType.valueOf(parameters.getType()), parameters.getSql()); + List insertTableList = SqlUtils.resolveSqlInsertTables(DbType.valueOf(parameters.getType()), parameters.getSql()); + + selectTableList.removeAll(insertTableList); + + parameters.setDependNodeKeys(DependUnionKeyUtils.buildDependTableUnionKey(hostsPorts[0], dataSourceForm.getDatabase(), selectTableList)); + + if (CollectionUtils.isNotEmpty(insertTableList)) { + parameters.setTargetNodeKeys(DependUnionKeyUtils.buildTargetTableUnionKey(hostsPorts[0], dataSourceForm.getDatabase(), insertTableList)); + } + } + + private void initEtlNodeDependParams(DataxParameters parameters) throws SQLException { + List tableList = SqlUtils.resolveSqlSelectTables(DbType.valueOf(parameters.getDsType()), parameters.getSql()); + if (CollectionUtils.isEmpty(tableList)) { + return; + } + + DataSource dataSource = dataSourceMapper.selectById(parameters.getDataSource()); + DataSource dataTarget = dataSourceMapper.selectById(parameters.getDataTarget()); + if (dataSource == null || dataTarget == null) { + return; + } + + BaseDataSource dataSourceForm = DataSourceFactory.getDatasource(dataSource.getType(), dataSource.getConnectionParams()); + String[] hostsPorts = JdbcUtils.getHostsAndPort(dataSourceForm.getAddress()); + + + BaseDataSource dataTargetForm = DataSourceFactory.getDatasource(dataTarget.getType(), dataTarget.getConnectionParams()); + String[] targetHostsPorts = JdbcUtils.getHostsAndPort(dataTargetForm.getAddress()); + + parameters.setDependNodeKeys(DependUnionKeyUtils.buildDependTableUnionKey(hostsPorts[0], dataSourceForm.getDatabase(), tableList)); + parameters.setTargetNodeKeys(DependUnionKeyUtils.buildTargetTableUnionKey(targetHostsPorts[0], dataTargetForm.getDatabase(), parameters.getTargetTable())); + } + /** * get resource ids * @@ -300,7 +387,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processId process definition id * @return copy result code */ - public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException { + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws SQLException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -342,7 +429,7 @@ public class ProcessDefinitionService extends BaseDAGService { */ public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, String processDefinitionJson, String desc, - String locations, String connects) { + String locations, String connects) throws SQLException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -376,7 +463,7 @@ public class ProcessDefinitionService extends BaseDAGService { processDefine.setName(name); processDefine.setReleaseState(ReleaseState.OFFLINE); processDefine.setProjectId(project.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); + processDefine.setProcessDefinitionJson(refreshTaskNodeDependParams(processDefinitionJson)); processDefine.setDescription(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); @@ -752,7 +839,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @return import process */ @Transactional(rollbackFor = Exception.class) - public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { + public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) throws Exception { Map result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); List processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); @@ -1265,7 +1352,17 @@ public class ProcessDefinitionService extends BaseDAGService { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); return result; } - DAG dag = genDagGraph(processDefinition); + DAG dag = genDagGraph(processDefinition, false); + + TreeViewDto parentTreeViewDto = buildTreeViewDto(processId, limit, dag); + + result.put(Constants.DATA_LIST, parentTreeViewDto); + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + return result; + } + + private TreeViewDto buildTreeViewDto(Integer processId, Integer limit, DAG dag) { /** * nodes that is running */ @@ -1276,30 +1373,31 @@ public class ProcessDefinitionService extends BaseDAGService { */ Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); - /** - * List of process instances - */ - List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); - - for (ProcessInstance processInstance : processInstanceList) { - processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); - } - - if (limit > processInstanceList.size()) { - limit = processInstanceList.size(); - } - TreeViewDto parentTreeViewDto = new TreeViewDto(); parentTreeViewDto.setName("DAG"); parentTreeViewDto.setType(""); - // Specify the process definition, because it is a TreeView for a process definition - for (int i = limit - 1; i >= 0; i--) { - ProcessInstance processInstance = processInstanceList.get(i); + // List of process instances + List processInstanceList = null; + if (limit > 0) { + processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); + + for (ProcessInstance processInstance : processInstanceList) { + processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); + } + + if (limit > processInstanceList.size()) { + limit = processInstanceList.size(); + } + + // Specify the process definition, because it is a TreeView for a process definition + for (int i = limit - 1; i >= 0; i--) { + ProcessInstance processInstance = processInstanceList.get(i); - Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); - parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString() - , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); + parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString() + , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + } } List parentTreeViewDtoList = new ArrayList<>(); @@ -1323,30 +1421,33 @@ public class ProcessDefinitionService extends BaseDAGService { treeViewDto.setType(taskNode.getType()); - //set treeViewDto instances - for (int i = limit - 1; i >= 0; i--) { - ProcessInstance processInstance = processInstanceList.get(i); - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); - if (taskInstance == null) { - treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); - } else { - Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); - Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); - - int subProcessId = 0; - /** - * if process is sub process, the return sub id, or sub id=0 - */ - if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { - String taskJson = taskInstance.getTaskJson(); - taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskNode.getParams()).path(CMDPARAM_SUB_PROCESS_DEFINE_ID).asText()); + if (CollectionUtils.isNotEmpty(processInstanceList)) { + //set treeViewDto instances + for (int i = limit - 1; i >= 0; i--) { + ProcessInstance processInstance = processInstanceList.get(i); + TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); + if (taskInstance == null) { + treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); + } else { + Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); + Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); + + int subProcessId = 0; + /** + * if process is sub process, the return sub id, or sub id=0 + */ + if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { + String taskJson = taskInstance.getTaskJson(); + taskNode = JSON.parseObject(taskJson, TaskNode.class); + subProcessId = Integer.parseInt(JSON.parseObject( + taskNode.getParams()).getString(CMDPARAM_SUB_PROCESS_DEFINE_ID)); + } + treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() + , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); } - treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() - , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); } } + for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { pTreeViewDto.getChildren().add(treeViewDto); } @@ -1373,12 +1474,9 @@ public class ProcessDefinitionService extends BaseDAGService { waitingRunningNodeMap.clear(); } } - result.put(Constants.DATA_LIST, parentTreeViewDto); - result.put(Constants.STATUS, Status.SUCCESS); - result.put(Constants.MSG, Status.SUCCESS.getMsg()); - return result; - } + return parentTreeViewDto; + } /** * Generate the DAG Graph based on the process definition id @@ -1386,25 +1484,146 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processDefinition process definition * @return dag graph */ - private DAG genDagGraph(ProcessDefinition processDefinition) { + private DAG genDagGraph(ProcessDefinition processDefinition, boolean isResetDepend) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - //check process data - if (null != processData) { - List taskNodeList = processData.getTasks(); - processDefinition.setGlobalParamList(processData.getGlobalParams()); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); + ProcessDag processDag = DagHelper.getProcessDag(processData.getTasks()); - // Generate concrete Dag to be executed - return DagHelper.buildDagGraph(processDag); + if (isResetDepend) { + // analyse dependence + resetDagTaskNodesByDataLineage(processDag, processDefinition.getId()); } - return new DAG<>(); + DAG dag = new DAG<>(); + + /** + * Add the ndoes + */ + if (CollectionUtils.isNotEmpty(processDag.getNodes())) { + for (TaskNode node : processDag.getNodes()) { + dag.addNode(node.getName(), node); + } + } + + /** + * Add the edges + */ + if (CollectionUtils.isNotEmpty(processDag.getEdges())) { + for (TaskNodeRelation edge : processDag.getEdges()) { + dag.addEdge(edge.getStartNode(), edge.getEndNode()); + } + } + + return dag; } + /** + * Encapsulates the TreeView structure (dependence) + * + * @param processId process definition id + * @return tree view json data + * @throws Exception exception + */ + public Map viewTreeByDepend(Integer processId) { + Map result = new HashMap<>(); + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + logger.info("process define not exists"); + throw new RuntimeException("process define not exists"); + } + DAG dag = genDagGraph(processDefinition, true); + + TreeViewDto parentTreeViewDto = buildTreeViewDto(processId, -1, dag); + + result.put(Constants.DATA_LIST, parentTreeViewDto); + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + return result; + } + + /** + * reset dag task nodes by data lineage + * @param processDag + * @param processDefinitionId + */ + private void resetDagTaskNodesByDataLineage(ProcessDag processDag, int processDefinitionId) { + Map existedTaskNodeMap = new HashMap<>(); + + for(TaskNode taskNode : processDag.getNodes()) { + if (taskNode.isForbidden()) { + continue; + } + + existedTaskNodeMap.put(processDefinitionId + taskNode.getName(), taskNode); + } + + int nodeSize = processDag.getNodes().size(); + for(int i = 0; i < nodeSize; i++) { + if (processDag.getNodes().get(i).isForbidden()) { + continue; + } + + analyseNodeDependByTableLineage(processDag, processDag.getNodes().get(i), processDag.getNodes().get(i), existedTaskNodeMap); + } + } + + /** + * analyse node depend by table lineage + * @param processDag + * @param analyseNode + * @param postNode + * @param existedTaskNodeMap + */ + private void analyseNodeDependByTableLineage(ProcessDag processDag, TaskNode analyseNode, TaskNode postNode, Map existedTaskNodeMap) { + // exist depend tag + AbstractParameters parameters = TaskParametersUtils.getParameters(analyseNode.getType(), analyseNode.getParams()); + if (!parameters.isCheckDepend()) { + return; + } + + // query all dependent processes + String[] dependNodeKeys = parameters.getDependNodeKeys().split(COMMA); + List processDefinitionList = processService.queryDependDefinitionList(dependNodeKeys); + if (CollectionUtils.isEmpty(processDefinitionList)) { + return; + } + + for (ProcessDefinition processDefinition : processDefinitionList) { + ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class); + List dependTaskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); + + for (TaskNode realNode : dependTaskNodeList) { + if (!realNode.isForbidden() + && !analyseNode.getName().equals(realNode.getName()) + && DependUnionKeyUtils.existDependRelation(realNode, dependNodeKeys)) { + + // check if the depend node exists + if (existedTaskNodeMap.containsKey(processDefinition.getId() + realNode.getName())) { + TaskNode dependNode = existedTaskNodeMap.get(processDefinition.getId() + realNode.getName()); + processDag.getEdges().add(new TaskNodeRelation(dependNode.getName(), postNode.getName())); + postNode.setPreTasks(JSONUtils.toJsonString(new String[]{dependNode.getName()})); + continue; + } + + // new depend node + TaskNode dependNode = TaskNodeUtils.buildDependTaskNode(processDefinition.getName(), realNode.getName(), 0, 0); + + // add node relation + processDag.getNodes().add(dependNode); + processDag.getEdges().add(new TaskNodeRelation(dependNode.getName(), postNode.getName())); + postNode.setPreTasks(JSONUtils.toJsonString(new String[]{dependNode.getName()})); + + existedTaskNodeMap.put(processDefinition.getId() + realNode.getName(), dependNode); + + analyseNodeDependByTableLineage(processDag, realNode, dependNode, existedTaskNodeMap); + } + } + } + } /** * whether the graph has a ring diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 8b393a7fe6b354cd39cb670fe0770e87377a4b14..3984c618603b81a38c8e4605fc61220c1e1e6800 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -63,9 +63,6 @@ import static org.apache.dolphinscheduler.common.Constants.*; @Service public class ProcessInstanceService extends BaseDAGService { - - private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); - @Autowired ProjectMapper projectMapper; @@ -93,8 +90,6 @@ public class ProcessInstanceService extends BaseDAGService { @Autowired LoggerService loggerService; - - @Autowired UsersService usersService; @@ -143,7 +138,7 @@ public class ProcessInstanceService extends BaseDAGService { */ public Map queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId, String startDate, String endDate, - String searchVal, String executorName,ExecutionStatus stateType, String host, + String searchVal, String executorName, ExecutionStatus stateType, String host, Integer pageNo, Integer pageSize) { Map result = new HashMap<>(5); @@ -333,11 +328,11 @@ public class ProcessInstanceService extends BaseDAGService { * @param locations locations * @param connects connects * @return update result code - * @throws ParseException parse exception for json parse + * @throws Exception exception */ public Map updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, String processInstanceJson, String scheduleTime, Boolean syncDefine, - Flag flag, String locations, String connects) throws ParseException { + Flag flag, String locations, String connects) throws Exception { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -394,14 +389,14 @@ public class ProcessInstanceService extends BaseDAGService { if(tenant != null){ processInstance.setTenantCode(tenant.getTenantCode()); } - processInstance.setProcessInstanceJson(processInstanceJson); + processInstance.setProcessInstanceJson(processDefinitionService.refreshTaskNodeDependParams(processInstanceJson)); processInstance.setGlobalParams(globalParams); } int update = processService.updateProcessInstance(processInstance); int updateDefine = 1; if (Boolean.TRUE.equals(syncDefine) && StringUtils.isNotEmpty(processInstanceJson)) { - processDefinition.setProcessDefinitionJson(processInstanceJson); + processDefinition.setProcessDefinitionJson(processInstance.getProcessInstanceJson()); processDefinition.setGlobalParams(originDefParams); processDefinition.setLocations(locations); processDefinition.setConnects(connects); @@ -478,15 +473,12 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); if (null == processInstance) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } - - // delete database cascade int delete = processService.deleteWorkProcessInstanceById(processInstanceId); processService.deleteAllSubWorkProcessByParentId(processInstanceId); @@ -600,11 +592,10 @@ public class ProcessInstanceService extends BaseDAGService { ganttDto.setTaskNames(nodeList); List taskList = new ArrayList<>(); - for (String node : nodeList) { - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node); - if (taskInstance == null) { - continue; - } + + // query node task and virtual task + List taskInstanceList = taskInstanceMapper.findALlTaskListByProcessInstanceId(processInstanceId); + for (TaskInstance taskInstance : taskInstanceList) { Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); Task task = new Task(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index d4fa2307eca4d9c1e85e88de58fc73ee01127de7..4d615b6899de8e87e0fe124229531a7dc999c091 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -23,11 +23,13 @@ import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.http.entity.ContentType; import org.json.JSONException; @@ -46,6 +48,7 @@ import org.springframework.web.multipart.MultipartFile; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.*; @@ -737,9 +740,9 @@ public class ProcessDefinitionServiceTest { * @param loginUser login user * @param currentProjectName current project name * @param processMetaJson process meta json - * @throws IOException IO exception + * @throws Exception exception */ - private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { + private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws Exception { //check null FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); @@ -760,7 +763,7 @@ public class ProcessDefinitionServiceTest { } @Test - public void testUpdateProcessDefinition () { + public void testUpdateProcessDefinition () throws Exception { User loginUser = new User(); loginUser.setId(1); loginUser.setUserType(UserType.ADMIN_USER); @@ -876,4 +879,54 @@ public class ProcessDefinitionServiceTest { result.put(Constants.MSG, status.getMsg()); } } + + /** + * Method: resetDagTaskNodesByDataLineage(List taskNodeList, + * List startNodeNameList) + */ + @Test + public void testResetDagTaskNodesByDataLineage() + throws Exception { + try { + Method method = ProcessDefinitionService.class.getDeclaredMethod("resetDagTaskNodesByDataLineage", ProcessDag.class, int.class); + method.setAccessible(true); + + String processInstanceJson = "{\"tasks\":[{\"id\":\"tasks-32619\",\"name\":\"t_ds_version_copy1\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"INSERT INTO t_ds_version_copy1\\nselect * from t_ds_version_copy2\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy1\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-12393\",\"name\":\"t_ds_version_copy\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_depend,127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"SELECT * FROM t_ds_version_copy1\\nUNION\\nselect * FROM t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy\",\"preStatements\":[\"truncate table t_ds_version_copy\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[\"t_ds_version_copy1\"],\"extras\":null,\"depList\":[\"t_ds_version_copy1\"],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"; + ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); + ProcessDag processDag = DagHelper.getProcessDag(processData.getTasks()); + + ProcessDefinition processDefinition_32619 = new ProcessDefinition(); + processDefinition_32619.setId(2); + processDefinition_32619.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-3877\",\"name\":\"4 and 3 -> 2\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_depend,127.0.0.1:escheduler:t_ds_version_copy3#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"insert into t_ds_version_copy2\\nselect * from t_ds_version_copy4\\nUNION\\nselect * from t_ds_version_copy3\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy2\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy2#table_depend"})).thenReturn(Arrays.asList(processDefinition_32619)); + + ProcessDefinition processDefinition_12393_1 = new ProcessDefinition(); + processDefinition_12393_1.setId(3); + processDefinition_12393_1.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + + ProcessDefinition processDefinition_12393_2 = new ProcessDefinition(); + processDefinition_12393_2.setId(1); + processDefinition_12393_2.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-32619\",\"name\":\"t_ds_version_copy1\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"INSERT INTO t_ds_version_copy1\\nselect * from t_ds_version_copy2\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy1\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-12393\",\"name\":\"t_ds_version_copy\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_depend,127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"SELECT * FROM t_ds_version_copy1\\nUNION\\nselect * FROM t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy\",\"preStatements\":[\"truncate table t_ds_version_copy\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[\"t_ds_version_copy1\"],\"extras\":null,\"depList\":[\"t_ds_version_copy1\"],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy1#table_depend", "127.0.0.1:escheduler:t_ds_version_copy5#table_depend"})).thenReturn(Arrays.asList(processDefinition_12393_1, processDefinition_12393_2)); + + ProcessDefinition processDefinition_3877 = new ProcessDefinition(); + processDefinition_3877.setId(3); + processDefinition_3877.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy4#table_depend", "127.0.0.1:escheduler:t_ds_version_copy3#table_depend"})).thenReturn(Arrays.asList(processDefinition_3877)); + + ProcessDefinition processDefinition_43031 = new ProcessDefinition(); + processDefinition_43031.setId(3); + processDefinition_43031.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy5#table_depend"})).thenReturn(Arrays.asList(processDefinition_43031)); + + // task 34351 + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy6#table_depend"})).thenReturn(new ArrayList<>()); + + method.invoke(processDefinitionService, processDag, 1); + + Assert.assertEquals(5, processDag.getNodes().size()); + } catch(Exception e) { + Assert.fail(e.getMessage()); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index b35614335c89f0df52de5d7c06c758a8e50f5df3..d700e007d4651e3fce5e53cad123df482ff8f65f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -281,7 +281,7 @@ public class ProcessInstanceServiceTest { } @Test - public void testUpdateProcessInstance() throws ParseException { + public void testUpdateProcessInstance() throws Exception { String projectName = "project_test1"; User loginUser = getAdminUser(); Map result = new HashMap<>(5); diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 28acb21fd5ca06ff616ffba100a77bffbc7dc703..3cda8841a305d6078fd76a36331b9fee7487e3ab 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -590,6 +590,12 @@ janino ${codehaus.janino.version} + + + com.alibaba + druid + + com.github.rholder guava-retrying diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index fc099600261def25657cf76234fb9c7c9c2dfcf2..e8700efda7440ca6f4a7edddb83828a66139537c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -240,6 +240,15 @@ public final class Constants { */ public static final String EQUAL_SIGN = "="; + /** + * dot . + */ + public static final String DOT = "."; + + /** + * SHARP + */ + public static final String SHARP = "#"; public static final String WORKER_MAX_CPULOAD_AVG = "worker.max.cpuload.avg"; @@ -386,6 +395,11 @@ public final class Constants { */ public static final int RPC_PORT = 50051; + /** + * normal running task + */ + public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; + /** * forbid running task */ @@ -966,4 +980,9 @@ public final class Constants { */ public static final String PLUGIN_JAR_SUFFIX = ".jar"; + /** + * default worker group id + */ + public static final int DEFAULT_WORKER_GROUP_ID = -1; + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java index 1d28a759c06084d5019031601551fa44353f449e..306ba01eafe72227c68a504dfca4dae2c3763f5a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java @@ -75,4 +75,8 @@ public enum DbType { } throw new IllegalArgumentException("invalid type : " + type); } + + public String getDesc() { + return this.descp; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index 5ab970e69b1c908eecd9441a3e2e614559533f0b..9b6431eb25897c48583ea1e197f0879ac917fe51 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -16,328 +16,329 @@ */ package org.apache.dolphinscheduler.common.model; +import com.fasterxml.jackson.annotation.JsonRawValue; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.*; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import java.io.IOException; import java.util.List; import java.util.Objects; public class TaskNode { - /** - * task node id - */ - private String id; - - /** - * task node name - */ - private String name; - - /** - * task node description - */ - private String desc; - - /** - * task node type - */ - private String type; - - /** - * the run flag has two states, NORMAL or FORBIDDEN - */ - private String runFlag; - - /** - * the front field - */ - private String loc; - - /** - * maximum number of retries - */ - private int maxRetryTimes; - - /** - * Unit of retry interval: points - */ - private int retryInterval; - - /** - * params information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String params; - - /** - * inner dependency information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String preTasks; - - /** - * users store additional information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String extras; - - /** - * node dependency list - */ - private List depList; - - /** - * outer dependency information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String dependence; - - - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String conditionResult; - - /** - * task instance priority - */ - private Priority taskInstancePriority; - - /** - * worker group - */ - private String workerGroup; - - - /** - * task time out - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String timeout; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getDesc() { - return desc; - } - - public void setDesc(String desc) { - this.desc = desc; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getParams() { - return params; - } - - public void setParams(String params) { - this.params = params; - } - - public String getPreTasks() { - return preTasks; - } - - public void setPreTasks(String preTasks) throws IOException { - this.preTasks = preTasks; - this.depList = JSONUtils.toList(preTasks, String.class); - } - - public String getExtras() { - return extras; - } - - public void setExtras(String extras) { - this.extras = extras; - } - - public List getDepList() { - return depList; - } - - public void setDepList(List depList) throws JsonProcessingException { - this.depList = depList; - this.preTasks = JSONUtils.toJsonString(depList); - } - - public String getLoc() { - return loc; - } - - public void setLoc(String loc) { - this.loc = loc; - } - - public String getRunFlag(){ - return runFlag; - } - - public void setRunFlag(String runFlag) { - this.runFlag = runFlag; - } - - public Boolean isForbidden(){ - return (StringUtils.isNotEmpty(this.runFlag) && - this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN)); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskNode taskNode = (TaskNode) o; - return Objects.equals(name, taskNode.name) && - Objects.equals(desc, taskNode.desc) && - Objects.equals(type, taskNode.type) && - Objects.equals(params, taskNode.params) && - Objects.equals(preTasks, taskNode.preTasks) && - Objects.equals(extras, taskNode.extras) && - Objects.equals(runFlag, taskNode.runFlag) && - Objects.equals(dependence, taskNode.dependence) && - Objects.equals(workerGroup, taskNode.workerGroup) && - Objects.equals(conditionResult, taskNode.conditionResult) && - - CollectionUtils.equalLists(depList, taskNode.depList); - } - - @Override - public int hashCode() { - return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag); - } - - public String getDependence() { - return dependence; - } - - public void setDependence(String dependence) { - this.dependence = dependence; - } - - public int getMaxRetryTimes() { - return maxRetryTimes; - } - - public void setMaxRetryTimes(int maxRetryTimes) { - this.maxRetryTimes = maxRetryTimes; - } - - public int getRetryInterval() { - return retryInterval; - } - - public void setRetryInterval(int retryInterval) { - this.retryInterval = retryInterval; - } - - public Priority getTaskInstancePriority() { - return taskInstancePriority; - } - - public void setTaskInstancePriority(Priority taskInstancePriority) { - this.taskInstancePriority = taskInstancePriority; - } - - public String getTimeout() { - return timeout; - } - - public void setTimeout(String timeout) { - this.timeout = timeout; - } - - /** - * get task time out parameter - * @return task time out parameter - */ - public TaskTimeoutParameter getTaskTimeoutParameter() { - if(StringUtils.isNotEmpty(this.getTimeout())){ - String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name()); - String taskTimeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name()); - return JSONUtils.parseObject(taskTimeout,TaskTimeoutParameter.class); - } - return new TaskTimeoutParameter(false); - } - - public boolean isConditionsTask(){ - return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType()); - } - - @Override - public String toString() { - return "TaskNode{" + - "id='" + id + '\'' + - ", name='" + name + '\'' + - ", desc='" + desc + '\'' + - ", type='" + type + '\'' + - ", runFlag='" + runFlag + '\'' + - ", loc='" + loc + '\'' + - ", maxRetryTimes=" + maxRetryTimes + - ", retryInterval=" + retryInterval + - ", params='" + params + '\'' + - ", preTasks='" + preTasks + '\'' + - ", extras='" + extras + '\'' + - ", depList=" + depList + - ", dependence='" + dependence + '\'' + - ", taskInstancePriority=" + taskInstancePriority + - ", timeout='" + timeout + '\'' + - ", workerGroup='" + workerGroup + '\'' + - '}'; - } - - public String getWorkerGroup() { - return workerGroup; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - - public String getConditionResult() { - return conditionResult; - } - - public void setConditionResult(String conditionResult) { - this.conditionResult = conditionResult; - } + /** + * task node id + */ + private String id; + + /** + * task node name + */ + private String name; + + /** + * task node description + */ + private String desc; + + /** + * task node type + */ + private String type; + + /** + * the run flag has two states, NORMAL or FORBIDDEN + */ + private String runFlag; + + /** + * the front field + */ + private String loc; + + /** + * maximum number of retries + */ + private int maxRetryTimes; + + /** + * Unit of retry interval: points + */ + private int retryInterval; + + /** + * params information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String params; + + /** + * inner dependency information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String preTasks; + + /** + * users store additional information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String extras; + + /** + * node dependency list + */ + private List depList; + + /** + * outer dependency information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String dependence; + + + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String conditionResult; + + /** + * task instance priority + */ + private Priority taskInstancePriority; + + /** + * worker group + */ + private String workerGroup; + + + /** + * task time out + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String timeout; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @JsonRawValue + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + + public String getPreTasks() { + return preTasks; + } + + public void setPreTasks(String preTasks) { + this.preTasks = preTasks; + this.depList = JSONUtils.toList(preTasks, String.class); + } + + public String getExtras() { + return extras; + } + + public void setExtras(String extras) { + this.extras = extras; + } + + public List getDepList() { + return depList; + } + + public void setDepList(List depList) { + this.depList = depList; + this.preTasks = JSONUtils.toJsonString(depList); + } + + public String getLoc() { + return loc; + } + + public void setLoc(String loc) { + this.loc = loc; + } + + public String getRunFlag() { + return runFlag; + } + + public void setRunFlag(String runFlag) { + this.runFlag = runFlag; + } + + public Boolean isForbidden() { + return (StringUtils.isNotEmpty(this.runFlag) && + this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskNode taskNode = (TaskNode) o; + return Objects.equals(name, taskNode.name) && + Objects.equals(desc, taskNode.desc) && + Objects.equals(type, taskNode.type) && + Objects.equals(params, taskNode.params) && + Objects.equals(preTasks, taskNode.preTasks) && + Objects.equals(extras, taskNode.extras) && + Objects.equals(runFlag, taskNode.runFlag) && + Objects.equals(dependence, taskNode.dependence) && + Objects.equals(workerGroup, taskNode.workerGroup) && + Objects.equals(conditionResult, taskNode.conditionResult) && + + CollectionUtils.equalLists(depList, taskNode.depList); + } + + @Override + public int hashCode() { + return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag); + } + + public String getDependence() { + return dependence; + } + + public void setDependence(String dependence) { + this.dependence = dependence; + } + + public int getMaxRetryTimes() { + return maxRetryTimes; + } + + public void setMaxRetryTimes(int maxRetryTimes) { + this.maxRetryTimes = maxRetryTimes; + } + + public int getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(int retryInterval) { + this.retryInterval = retryInterval; + } + + public Priority getTaskInstancePriority() { + return taskInstancePriority; + } + + public void setTaskInstancePriority(Priority taskInstancePriority) { + this.taskInstancePriority = taskInstancePriority; + } + + public String getTimeout() { + return timeout; + } + + public void setTimeout(String timeout) { + this.timeout = timeout; + } + + /** + * get task time out parameter + * + * @return task time out parameter + */ + public TaskTimeoutParameter getTaskTimeoutParameter() { + if (StringUtils.isNotEmpty(this.getTimeout())) { + String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name()); + String taskTimeout = this.getTimeout().replace(formatStr, TaskTimeoutStrategy.WARNFAILED.name()); + return JSONUtils.parseObject(taskTimeout, TaskTimeoutParameter.class); + } + return new TaskTimeoutParameter(false); + } + + public boolean isConditionsTask() { + return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType()); + } + + @Override + public String toString() { + return "TaskNode{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", desc='" + desc + '\'' + + ", type='" + type + '\'' + + ", runFlag='" + runFlag + '\'' + + ", loc='" + loc + '\'' + + ", maxRetryTimes=" + maxRetryTimes + + ", retryInterval=" + retryInterval + + ", params='" + params + '\'' + + ", preTasks='" + preTasks + '\'' + + ", extras='" + extras + '\'' + + ", depList=" + depList + + ", dependence='" + dependence + '\'' + + ", taskInstancePriority=" + taskInstancePriority + + ", timeout='" + timeout + '\'' + + ", workerGroup='" + workerGroup + '\'' + + '}'; + } + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public String getConditionResult() { + return conditionResult; + } + + public void setConditionResult(String conditionResult) { + this.conditionResult = conditionResult; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java index 929516c86bcf788649ca19b3af1a68824bb28db5..9b837e44c6115b6a972275b5670fa082971f4501 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java @@ -16,8 +16,11 @@ */ package org.apache.dolphinscheduler.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.utils.StringUtils; import java.util.LinkedHashMap; import java.util.List; @@ -28,43 +31,94 @@ import java.util.Map; */ public abstract class AbstractParameters implements IParameters { - @Override - public abstract boolean checkParameters(); - - @Override - public abstract List getResourceFilesList(); - - /** - * local parameters - */ - public List localParams; - - /** - * get local parameters list - * @return Property list - */ - public List getLocalParams() { - return localParams; - } - - public void setLocalParams(List localParams) { - this.localParams = localParams; - } - - /** - * get local parameters map - * @return parameters map - */ - public Map getLocalParametersMap() { - if (localParams != null) { - Map localParametersMaps = new LinkedHashMap<>(); - - for (Property property : localParams) { - localParametersMaps.put(property.getProp(),property); + @Override + public abstract boolean checkParameters(); + + @Override + public abstract List getResourceFilesList(); + + /** + * local parameters + */ + public List localParams; + + /** + * check depend flag + */ + private int checkDependFlag; + + /** + * target node keys + */ + private String targetNodeKeys; + + /** + * depend node keys + */ + private String dependNodeKeys; + + /** + * get local parameters list + * + * @return Property list + */ + public List getLocalParams() { + return localParams; + } + + public void setLocalParams(List localParams) { + this.localParams = localParams; + } + + public int getCheckDependFlag() { + return checkDependFlag; + } + + public void setCheckDependFlag(int checkDependFlag) { + this.checkDependFlag = checkDependFlag; + } + + public String getTargetNodeKeys() { + return targetNodeKeys; + } + + public void setTargetNodeKeys(String targetNodeKeys) { + this.targetNodeKeys = targetNodeKeys; + } + + public String getDependNodeKeys() { + return dependNodeKeys; + } + + public void setDependNodeKeys(String dependNodeKeys) { + this.dependNodeKeys = dependNodeKeys; + } + + /** + * get local parameters map + * + * @return parameters map + */ + @JsonIgnore + public Map getLocalParametersMap() { + if (localParams != null) { + Map localParametersMaps = new LinkedHashMap<>(); + + for (Property property : localParams) { + localParametersMaps.put(property.getProp(), property); + } + return localParametersMaps; + } + return null; + } + + public boolean isCheckDepend() { + if (this.getCheckDependFlag() == Flag.NO.ordinal() + || StringUtils.isEmpty(this.getDependNodeKeys())) { + return false; } - return localParametersMaps; - } - return null; - } + + return true; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..98e3631bc411bb25cd4101f6fb270d8a2a0a6292 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtils.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +import java.util.Arrays; +import java.util.List; + +public class DependUnionKeyUtils { + + private static final String MARK_WORD_TABLE_DEPEND = Constants.SHARP + "table_depend"; + + private static final String MARK_WORD_TABLE_TARGET = Constants.SHARP + "table_target"; + + public static String clearIllegalChars(String str) { + if (StringUtils.isEmpty(str)) { + return str; + } + + str = str.replace(" ", ""); + str = str.replace("\t", ""); + str = str.replace("\n", ""); + str = str.replace(",", ","); + return str; + } + + public static String removeMarkWord(String str) { + if (StringUtils.isEmpty(str)) { + return str; + } + + str = str.replace(MARK_WORD_TABLE_DEPEND, ""); + str = str.replace(MARK_WORD_TABLE_TARGET, ""); + return str; + } + + public static String buildDependTableUnionKey(String host, String database, String tableName) { + return buildUnionKey(host, database, tableName, MARK_WORD_TABLE_DEPEND); + } + + public static String buildDependTableUnionKey(String host, String database, List tableList) { + return buildUnionKey(host, database, tableList, MARK_WORD_TABLE_DEPEND); + } + + public static String buildTargetTableUnionKey(String host, String database, String tableName) { + return buildUnionKey(host, database, tableName, MARK_WORD_TABLE_TARGET); + } + + public static String buildTargetTableUnionKey(String host, String database, List tableList) { + return buildUnionKey(host, database, tableList, MARK_WORD_TABLE_TARGET); + } + + public static String buildTargetTableUnionKey(String host, String database, String[] tableArray) { + return buildUnionKey(host, database, Arrays.asList(tableArray), MARK_WORD_TABLE_TARGET); + } + + public static String buildUnionKey(String host, String database, List tableList, String markWord) { + if (CollectionUtils.isEmpty(tableList)) { + return null; + } + + String[] unionKeys = new String[tableList.size()]; + for (int i = 0; i < tableList.size(); i++) { + unionKeys[i] = buildUnionKey(host, database, tableList.get(i), markWord); + } + + return StringUtils.join(unionKeys, Constants.COMMA); + } + + public static String buildUnionKey(String host, String database, String tableName, String markWord) { + if (StringUtils.isEmpty(tableName)) { + return null; + } + + if (tableName.indexOf(Constants.DOT) > 0) { + database = tableName.split("\\" + Constants.DOT)[0]; + tableName = tableName.split("\\" + Constants.DOT)[1]; + } + + return String.format("%s:%s:%s%s", host, database, tableName, markWord); + } + + public static boolean existDependRelation(TaskNode taskNode, String[] dependNodeKeys) { + AbstractParameters parameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + return existDependRelation(parameters.getTargetNodeKeys(), dependNodeKeys); + } + + public static boolean existDependRelation(String targetNodeKey, String[] dependNodeKeys) { + if (StringUtils.isEmpty(targetNodeKey) || dependNodeKeys == null) { + return false; + } + + for (String dependKey : dependNodeKeys) { + if (targetNodeKey.indexOf(dependKey.replace(MARK_WORD_TABLE_DEPEND, MARK_WORD_TABLE_TARGET)) > -1) { + return true; + } + } + + return false; + } + + public static String[] replaceMarkWordToTarget(String[] dependNodeKeys) { + if (dependNodeKeys == null) { + return null; + } + + String[] targetNodeKeys = new String[dependNodeKeys.length]; + for (int i = 0; i < dependNodeKeys.length; i++) { + targetNodeKeys[i] = replaceMarkWordToTarget(dependNodeKeys[i]); + } + return targetNodeKeys; + } + + public static String replaceMarkWordToTarget(String dependNodeKey) { + return dependNodeKey.replace(MARK_WORD_TABLE_DEPEND, MARK_WORD_TABLE_TARGET); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JdbcUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JdbcUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..46afb3a45c95d37a8d28090ea3da66756e993092 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JdbcUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.Constants; + +/** + * jdbc utils + */ +public class JdbcUtils { + + public static String[] getHostsAndPort(String address) { + String[] result = new String[2]; + String[] tmpArray = address.split(Constants.DOUBLE_SLASH); + String hostsAndPorts = tmpArray[tmpArray.length - 1]; + StringBuilder hosts = new StringBuilder(); + String[] hostPortArray = hostsAndPorts.split(Constants.COMMA); + String port = hostPortArray[0].split(Constants.COLON)[1]; + for (String hostPort : hostPortArray) { + hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA); + } + hosts.deleteCharAt(hosts.length() - 1); + result[0] = hosts.toString(); + result[1] = port; + return result; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SqlUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SqlUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..73c8559389ab7e3da8e3c2429324c4806d50c4f0 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SqlUtils.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.parser.Token; +import com.alibaba.druid.sql.visitor.SchemaStatVisitor; +import com.alibaba.druid.stat.TableStat; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.datanucleus.store.rdbms.exceptions.UnsupportedDataTypeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class SqlUtils { + + public static final Logger logger = LoggerFactory.getLogger(SqlUtils.class); + + /** + * resolve sql statement select table names + * + * @param dbType database type + * @param sql select sql + * @return table List + * @throws UnsupportedDataTypeException + */ + public static List resolveSqlSelectTables(DbType dbType, String sql) throws SQLException { + return resolveSqlTables(dbType, sql).get(Token.SELECT); + } + + /** + * resolve sql statement insert table names + * + * @param dbType database type + * @param sql select sql + * @return table List + * @throws UnsupportedDataTypeException + */ + public static List resolveSqlInsertTables(DbType dbType, String sql) throws SQLException { + return resolveSqlTables(dbType, sql).get(Token.INSERT); + } + + /** + * resolve sql statement update table names + * + * @param dbType database type + * @param sql select sql + * @return table List + * @throws UnsupportedDataTypeException + */ + public static List resolveSqlUpdateTables(DbType dbType, String sql) throws SQLException { + return resolveSqlTables(dbType, sql).get(Token.UPDATE); + } + + /** + * resolve sql statement delete table names + * + * @param dbType database type + * @param sql select sql + * @return table List + * @throws UnsupportedDataTypeException + */ + public static List resolveSqlDeleteTables(DbType dbType, String sql) throws SQLException { + return resolveSqlTables(dbType, sql).get(Token.DELETE); + } + + /** + * resolve sql statement table names + * + * @param dbType database type + * @param sql sql + * @return table map Token:List + * @throws UnsupportedDataTypeException + */ + public static Map> resolveSqlTables(DbType dbType, String sql) throws SQLException { + Map> tableMap = new HashMap<>(); + tableMap.put(Token.SELECT, new ArrayList<>()); + tableMap.put(Token.INSERT, new ArrayList<>()); + tableMap.put(Token.UPDATE, new ArrayList<>()); + tableMap.put(Token.DELETE, new ArrayList<>()); + + try { + List stmtList = SQLUtils.parseStatements(sql, dbType.getDesc()); + + for (SQLStatement stmt : stmtList) { + SchemaStatVisitor visitor = SQLUtils.createSchemaStatVisitor(dbType.getDesc()); + stmt.accept(visitor); + + if (visitor.getTables() != null) { + for (TableStat.Name name : visitor.getTables().keySet()) { + TableStat tableStat = visitor.getTables().get(name); + if (tableStat.getSelectCount() > 0) { + tableMap.get(Token.SELECT).add(name.getName()); + } + + if (tableStat.getInsertCount() > 0) { + tableMap.get(Token.INSERT).add(name.getName()); + } + + if (tableStat.getUpdateCount() > 0) { + tableMap.get(Token.UPDATE).add(name.getName()); + } + + if (tableStat.getDeleteCount() > 0) { + tableMap.get(Token.DELETE).add(name.getName()); + } + } + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + throw new SQLException(e); + } + + return tableMap; + } + + public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { + if (columns == null) { + return null; + } + + String[] toColumns = new String[columns.length]; + for (int i = 0; i < columns.length; i++) { + toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); + } + + return toColumns; + } + + public static String doConvertKeywordsColumn(DbType dbType, String column) { + if (column == null) { + return column; + } + + column = column.trim(); + column = column.replace("`", ""); + column = column.replace("\"", ""); + column = column.replace("'", ""); + + switch (dbType) { + case MYSQL: + return String.format("`%s`", column); + case POSTGRESQL: + return String.format("\"%s\"", column); + case ORACLE: + return String.format("\"%s\"", column); + case SQLSERVER: + return String.format("`%s`", column); + default: + return column; + } + } + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskNodeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskNodeUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..bcfb05477241848c188ddd338bfb88bdaca157d5 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskNodeUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TaskNodeUtils { + + public static String buildTaskId() { + return String.format("%s-%d", "tasks", 10000 + RandomUtils.nextInt(9999)); + } + + public static TaskNode buildDependTaskNode(String dependProcessName, String dependNodeName, int retryTimes, int retryInterval) { + TaskNode node = new TaskNode(); + node.setId(buildTaskId()); + node.setName(String.format("%s | %s", dependProcessName, dependNodeName)); + node.setType(TaskType.DEPENDENT.toString()); + node.setRunFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL); + node.setMaxRetryTimes(retryTimes); + node.setRetryInterval(retryInterval); + node.setTimeout("{\"strategy\": \"\", \"interval\": null, \"enable\": false}"); + node.setTaskInstancePriority(Priority.HIGH); + return node; + } + + public static DependentItem buildDependentItem(int processDefinitionId, String nodeName) { + DependentItem dependentItem = new DependentItem(); + dependentItem.setDefinitionId(processDefinitionId); + dependentItem.setDepTasks(nodeName); + dependentItem.setCycle(CycleEnum.DAY.toString().toLowerCase()); + dependentItem.setDateValue("today"); + return dependentItem; + } + + public static void addNodeDependentItem(TaskNode taskNode, int dependProcessId, String dependNodeName) { + addNodeDependentItem(taskNode, buildDependentItem(dependProcessId, dependNodeName)); + } + + public static void addNodeDependentItem(TaskNode taskNode, DependentItem dependentItem) { + DependentParameters dependentParameters; + if (StringUtils.isEmpty(taskNode.getDependence())) { + dependentParameters = new DependentParameters(); + } else { + dependentParameters = JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class); + } + + List dependTaskList; + if (CollectionUtils.isEmpty(dependentParameters.getDependTaskList())) { + dependTaskList = new ArrayList<>(); + } else { + dependTaskList = dependentParameters.getDependTaskList(); + } + + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + List dependItemList = new ArrayList<>(); + dependItemList.add(dependentItem); + dependentTaskModel.setDependItemList(dependItemList); + dependentTaskModel.setRelation(DependentRelation.AND); + dependTaskList.add(dependentTaskModel); + dependentParameters.setDependTaskList(dependTaskList); + dependentParameters.setRelation(DependentRelation.AND); + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + } + + public static void addNodeDepList(TaskNode taskNode, TaskNode dependNode) { + if (CollectionUtils.isEmpty(taskNode.getDepList())) { + taskNode.setDepList(Arrays.asList(dependNode.getName())); + } else { + if (!taskNode.getDepList().contains(dependNode.getName())) { + taskNode.getDepList().add(dependNode.getName()); + } + } + } + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ValidUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ValidUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..7ad23fc806a5a168eb624561adc87cc77afb0d99 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ValidUtils.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +public class ValidUtils { + + public static void notNull(Object obj, String message) { + if (obj == null) { + throw new RuntimeException(message); + } + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..55c3e09b6cba6e3dd1d1f7803d818fd8a11cead6 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependUnionKeyUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class DependUnionKeyUtilsTest { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testClearIllegalChars() { + Assert.assertEquals("abc,", DependUnionKeyUtils.clearIllegalChars("a b \t c \n,")); + Assert.assertEquals(null, DependUnionKeyUtils.clearIllegalChars(null)); + } + + @Test + public void testRemoveMarkWord() { + Assert.assertEquals("127.0.0.1:db1:table1", DependUnionKeyUtils.removeMarkWord("127.0.0.1:db1:table1#table_depend")); + Assert.assertEquals("127.0.0.1:db1:table2", DependUnionKeyUtils.removeMarkWord("127.0.0.1:db1:table2#table_target")); + Assert.assertEquals(null, DependUnionKeyUtils.removeMarkWord(null)); + } + + @Test + public void testBuildDependTableUnionKey1() { + Assert.assertEquals("127.0.0.1:db1:table1#table_depend", DependUnionKeyUtils.buildDependTableUnionKey("127.0.0.1", "db1", "table1")); + } + + @Test + public void testBuildDependTableUnionKey2() { + Assert.assertEquals("127.0.0.1:db1:table1#table_depend", DependUnionKeyUtils.buildDependTableUnionKey("127.0.0.1", null, "db1.table1")); + } + + @Test + public void testBuildDependTableUnionKey3() { + Assert.assertEquals("127.0.0.1:db1:table1#table_depend,127.0.0.1:db1:table2#table_depend", DependUnionKeyUtils.buildDependTableUnionKey("127.0.0.1", "db1", Arrays.asList("table1", "table2"))); + + List list = null; + Assert.assertEquals(null, DependUnionKeyUtils.buildDependTableUnionKey("127.0.0.1", "db1", list)); + Assert.assertEquals("127.0.0.1:db1:table1#table_depend,", DependUnionKeyUtils.buildDependTableUnionKey("127.0.0.1", "db1", Arrays.asList("table1", null))); + } + + @Test + public void testBuildTargetTableUnionKey1() { + Assert.assertEquals("127.0.0.1:db1:table1#table_target", DependUnionKeyUtils.buildTargetTableUnionKey("127.0.0.1", "db1", "table1")); + } + + @Test + public void testBuildTargetTableUnionKey2() { + Assert.assertEquals("127.0.0.1:db1:table1#table_target", DependUnionKeyUtils.buildTargetTableUnionKey("127.0.0.1", null, "db1.table1")); + } + + @Test + public void testBuildTargetTableUnionKey3() { + Assert.assertEquals("127.0.0.1:db1:table1#table_target,127.0.0.1:db1:table2#table_target", DependUnionKeyUtils.buildTargetTableUnionKey("127.0.0.1", "db1", Arrays.asList("table1", "table2"))); + } + + @Test + public void testBuildTargetTableUnionKey4() { + Assert.assertEquals("127.0.0.1:db1:table1#table_target,127.0.0.1:db1:table2#table_target", DependUnionKeyUtils.buildTargetTableUnionKey("127.0.0.1", "db1", new String[]{"table1", "table2"})); + } + + @Test + public void testExistDependRelation1() { + TaskNode taskNode = new TaskNode(); + taskNode.setType(TaskType.SQL.toString()); + SqlParameters sqlParameters = new SqlParameters(); + sqlParameters.setTargetNodeKeys("127.0.0.1:db1:table1#table_target"); + taskNode.setParams(JSONUtils.toJsonString(sqlParameters)); + Assert.assertTrue(DependUnionKeyUtils.existDependRelation(taskNode, new String[]{"127.0.0.1:db1:table1#table_depend"})); + + Assert.assertTrue(!DependUnionKeyUtils.existDependRelation(taskNode, null)); + } + + @Test + public void testExistDependRelation2() { + Assert.assertTrue(DependUnionKeyUtils.existDependRelation("127.0.0.1:db1:table1#table_target", new String[]{"127.0.0.1:db1:table1#table_depend"})); + Assert.assertTrue(!DependUnionKeyUtils.existDependRelation("127.0.0.1:db1:table2#table_target", new String[]{"127.0.0.1:db1:table1#table_depend"})); + Assert.assertTrue(!DependUnionKeyUtils.existDependRelation("127.0.0.1:db1:table2#table_target", null)); + } + + @Test + public void testReplaceMarkWordToTarget1() { + Assert.assertArrayEquals(new String[]{"127.0.0.1:db1:table1#table_target"}, DependUnionKeyUtils.replaceMarkWordToTarget(new String[]{"127.0.0.1:db1:table1#table_depend"})); + + String[] array = null; + Assert.assertArrayEquals(null, DependUnionKeyUtils.replaceMarkWordToTarget(array)); + } + + @Test + public void testReplaceMarkWordToTarget2() { + Assert.assertEquals("127.0.0.1:db1:table1#table_target", DependUnionKeyUtils.replaceMarkWordToTarget("127.0.0.1:db1:table1#table_depend")); + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SqlUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SqlUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b5d1912dfd14a584fdca5877e7bd6e262a8fde28 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SqlUtilsTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + + +import org.apache.dolphinscheduler.common.enums.DbType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * SqlUtils Tester. + * + * @since + * + *
+ * 五月 29, 2020
+ *        
+ * + * @version 1.0 + */ +public class SqlUtilsTest { + + @Before + public void before() + throws Exception {} + + @After + public void after() + throws Exception {} + + /** + * Method: resolveSqlSelectTables(DbType dbType, String sql) + */ + @Test + public void testResolveSqlSelectTables() + throws Exception { + List tableList = SqlUtils.resolveSqlSelectTables(DbType.MYSQL, "select a from t1"); + Assert.assertNotNull(tableList); + Assert.assertTrue(tableList.size() == 1); + Assert.assertEquals("t1", tableList.get(0)); + } + + /** + * Method: resolveSqlInsertTables(DbType dbType, String sql) + */ + @Test + public void testResolveSqlInsertTables() + throws Exception { + List tableList = SqlUtils.resolveSqlInsertTables(DbType.MYSQL, "insert into t1(a) values (1)"); + Assert.assertNotNull(tableList); + Assert.assertTrue(tableList.size() == 1); + Assert.assertEquals("t1", tableList.get(0)); + } + + /** + * Method: resolveSqlUpdateTables(DbType dbType, String sql) + */ + @Test + public void testResolveSqlUpdateTables() + throws Exception { + List tableList = SqlUtils.resolveSqlUpdateTables(DbType.MYSQL, "update t1 set a = 1"); + Assert.assertNotNull(tableList); + Assert.assertTrue(tableList.size() == 1); + Assert.assertEquals("t1", tableList.get(0)); + } + + /** + * Method: resolveSqlDeleteTables(DbType dbType, String sql) + */ + @Test + public void testResolveSqlDeleteTables() + throws Exception { + List tableList = SqlUtils.resolveSqlDeleteTables(DbType.MYSQL, "delete from t1 where a = 1"); + Assert.assertNotNull(tableList); + Assert.assertTrue(tableList.size() == 1); + Assert.assertEquals("t1", tableList.get(0)); + } + + /** + * Method: convertKeywordsColumns(DbType dbType, String[] columns) + */ + @Test + public void testConvertKeywordsColumns() + throws Exception { + String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; + String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; + + String[] toColumns = SqlUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); + + assertTrue(fromColumns.length == toColumns.length); + + for (int i = 0; i < toColumns.length; i++) { + assertEquals(targetColumns[i], toColumns[i]); + } + } + + /** + * Method: doConvertKeywordsColumn(DbType dbType, String column) + */ + @Test + public void testDoConvertKeywordsColumn() + throws Exception { + assertEquals("`select`", SqlUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); + assertEquals("\"select\"", SqlUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); + assertEquals("`select`", SqlUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); + assertEquals("\"select\"", SqlUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); + assertEquals("select", SqlUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ValidUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ValidUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5123528e1d5eabb5825cdf83e742d1e83f87eb77 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ValidUtilsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +/** + * ValidUtils Tester. + * + * @since + * + *
+ * 五月 29, 2020
+ *        
+ * + * @version 1.0 + */ +public class ValidUtilsTest { + + @Before + public void before() + throws Exception {} + + @After + public void after() + throws Exception {} + + /** + * Method: notNull(Object obj, String message) + */ + @Test + public void testNotNull() + throws Exception { + try { + ValidUtils.notNull(null, "test null"); + } catch (RuntimeException exception) {} + + try { + ValidUtils.notNull(new Object(), "test not null"); + } catch (RuntimeException exception) { + Assert.fail(exception.getMessage()); + } catch (Exception exception) { + Assert.fail(exception.getMessage()); + } + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index b75bb58b7d4f3c577bf4ff18e2b5520632f30a86..409d79b948d0a1598118c5d3580fb097059a3c57 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -102,4 +102,11 @@ public interface ProcessDefinitionMapper extends BaseMapper { */ @MapKey("id") List> listResources(); + + /** + * query definition by target node keys + * @param targetNodeKeys target node keys + * @return process definition list + */ + List queryDefinitionByTargetNodeKeys(@Param("targetNodeKeys") String[] targetNodeKeys); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index ac23b25c9c9142655de71aa281ab41928deca8d5..6a114d6944e88501810bcd6d5d5d08cb20aee160 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -69,4 +69,6 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("startTime") Date startTime, @Param("endTime") Date endTime ); + + List findALlTaskListByProcessInstanceId(@Param("processInstanceId") Integer processInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 4418cce7d8484139e80fb3044f114bb44764ae16..106292be55b7b6d7c7a99eaca514dca7e8c72cbe 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -186,10 +186,27 @@ public class DagHelper { TaskDependType depNodeType) throws Exception { ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = new ArrayList<>(); - if (null != processData) { - taskNodeList = processData.getTasks(); + if (processData == null) { + return null; } + + List taskNodeList = processData.getTasks(); + return generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); + } + + /** + * generate dag by start nodes and recovery nodes + * @param taskNodeList taskNodeList + * @param startNodeNameList startNodeNameList + * @param recoveryNodeNameList recoveryNodeNameList + * @param depNodeType depNodeType + * @return process dag + * @throws Exception if error throws Exception + */ + public static ProcessDag generateFlowDag(List taskNodeList, + List startNodeNameList, + List recoveryNodeNameList, + TaskDependType depNodeType) throws Exception { List destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index 0cabf800cd8998ee8efdc01cfeadf0ab2329f98a..d352f87e5b76dae1cf466d5184f677076040a965 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -96,4 +96,13 @@ FROM t_ds_process_definition WHERE release_state = 1 and resource_ids is not null and resource_ids != '' + + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 143761bf8c93165a2ed7b7d23cb4827a289eb14f..db95115c38b603b32b538ea76d03b3ce194de45c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -129,4 +129,11 @@ order by instance.start_time desc + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java index ec995df51ce858e64801471c04a611fa00861942..6b3f294415fbf6c71f5e3b03f6d5c2975a150c18 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java @@ -230,4 +230,10 @@ public class ProcessDefinitionMapperTest { List> maps = processDefinitionMapper.listResources(); Assert.assertNotNull(maps); } + + @Test + public void testFindALlTaskListByProcessInstanceId(){ + List processDefinitionList = processDefinitionMapper.queryDefinitionByTargetNodeKeys(new String[]{"127.0.0.1:db1:t1"}); + Assert.assertEquals(0, processDefinitionList == null ? 0 : processDefinitionList.size()); + } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index b224067a296c46f907a4487704e120f50dadc9a9..0d9a845f12d3efd190b2f50524b0c5ce2ffeb027 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -297,4 +297,13 @@ public class TaskInstanceMapperTest { Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0); } + + /** + * test find aLl task list by process instanceId + */ + @Test + public void testFindALlTaskListByProcessInstanceId() { + List taskInstanceList = taskInstanceMapper.findALlTaskListByProcessInstanceId(0); + Assert.assertEquals(0, taskInstanceList == null ? 0 : taskInstanceList.size()); + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 5b4b5daef1a819ed0a49bd3bccf22de3f47e9d00..2478d2997527d47f435935e97904ff6a5aded3d7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -52,6 +52,12 @@ public class MasterConfig { @Value("${master.listen.port:5678}") private int listenPort; + @Value("${data.lineage.depend.retryTimes:240}") + private int dataLineageDependRetryTimes; + + @Value("${data.lineage.depend.retryInterval:2}") + private int dataLineageDependRetryInterval; + public int getListenPort() { return listenPort; } @@ -126,4 +132,20 @@ public class MasterConfig { public void setMasterReservedMemory(double masterReservedMemory) { this.masterReservedMemory = masterReservedMemory; } + + public int getDataLineageDependRetryTimes() { + return dataLineageDependRetryTimes; + } + + public void setDataLineageDependRetryTimes(int dataLineageDependRetryTimes) { + this.dataLineageDependRetryTimes = dataLineageDependRetryTimes; + } + + public int getDataLineageDependRetryInterval() { + return dataLineageDependRetryInterval; + } + + public void setDataLineageDependRetryInterval(int dataLineageDependRetryInterval) { + this.dataLineageDependRetryInterval = dataLineageDependRetryInterval; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 211da1a4336df1972364d46d6d250a38c72bb0fe..e6acb92bf51d65688a12d54b83437f610cc55dc8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -24,13 +24,12 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -356,9 +355,13 @@ public class MasterExecThread implements Runnable { // generate process to get DAG info List recoveryNameList = getRecoveryNodeNameList(); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); - ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(), + + ProcessData processData = JSONUtils.parseObject(processInstance.getProcessInstanceJson(), ProcessData.class); + resetDagTaskNodesByDataLineage(processData.getTasks(), startNodeNameList); + + ProcessDag processDag = generateFlowDag(processData.getTasks(), startNodeNameList, recoveryNameList, processInstance.getTaskDependType()); - if(processDag == null){ + if (processDag == null) { logger.error("processDag is null"); return; } @@ -366,6 +369,111 @@ public class MasterExecThread implements Runnable { dag = DagHelper.buildDagGraph(processDag); } + /** + * reset dag task nodes by data lineage + * @param taskNodeList + * @param startNodeNameList + */ + private void resetDagTaskNodesByDataLineage(List taskNodeList, List startNodeNameList) { + Map existedTaskNodeMap = new HashMap<>(); + + for (TaskNode taskNode : taskNodeList) { + if (taskNode.isForbidden()) { + continue; + } + + // clear pre task depend of start nodes + if (processInstance.getTaskDependType() != TaskDependType.TASK_PRE + && startNodeNameList.contains(taskNode.getName())) { + taskNode.setPreTasks(null); + } + + existedTaskNodeMap.put(processInstance.getProcessDefinitionId() + taskNode.getName(), taskNode); + } + + int nodeSize = taskNodeList.size(); + for (int i = 0; i < nodeSize; i++) { + if (taskNodeList.get(i).isForbidden()) { + continue; + } + + analyseNodeDependByTableLineage(taskNodeList, taskNodeList.get(i), taskNodeList.get(i), existedTaskNodeMap); + } + } + + /** + * analyse node depend by table lineage + * @param lineageNodeList + * @param analyseNode + * @param postNode + * @param existedTaskNodeMap + */ + private void analyseNodeDependByTableLineage(List lineageNodeList, TaskNode analyseNode, + TaskNode postNode, Map existedTaskNodeMap) { + // exist depend tag + AbstractParameters parameters = TaskParametersUtils.getParameters(analyseNode.getType(), + analyseNode.getParams()); + if (!parameters.isCheckDepend()) { + return; + } + + logger.info("task {} start analyse table depend : {}", analyseNode.getName(), parameters.getDependNodeKeys()); + + // query all dependent processes + String[] dependNodeKeys = parameters.getDependNodeKeys().split(COMMA); + List processDefinitionList = processService.queryDependDefinitionList(dependNodeKeys); + if (CollectionUtils.isEmpty(processDefinitionList)) { + return; + } + + for (ProcessDefinition processDefinition : processDefinitionList) { + ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), + ProcessData.class); + List dependTaskNodeList = (processData.getTasks() == null) ? + new ArrayList<>() : processData.getTasks(); + + for (TaskNode realNode : dependTaskNodeList) { + /** + * 1. non forbidden + * 2. non oneself node + * 3. exist depend relation + */ + if (!realNode.isForbidden() + && !analyseNode.getName().equals(realNode.getName()) + && DependUnionKeyUtils.existDependRelation(realNode, dependNodeKeys)) { + + // check if the depend node exists + if (existedTaskNodeMap.containsKey(processDefinition.getId() + realNode.getName())) { + TaskNode dependNode = existedTaskNodeMap.get(processDefinition.getId() + realNode.getName()); + TaskNodeUtils.addNodeDependentItem(dependNode, processDefinition.getId(), realNode.getName()); + TaskNodeUtils.addNodeDepList(postNode, dependNode); + logger.info("new depend relation : {} -> {}", dependNode.getName(), postNode.getName()); + continue; + } + + // new depend node + TaskNode dependNode = TaskNodeUtils.buildDependTaskNode(processDefinition.getName(), + realNode.getName(), masterConfig.getDataLineageDependRetryTimes(), + masterConfig.getDataLineageDependRetryInterval()); + + // cache node + existedTaskNodeMap.put(processDefinition.getId() + realNode.getName(), dependNode); + + // add node relation + TaskNodeUtils.addNodeDependentItem(dependNode, processDefinition.getId(), realNode.getName()); + TaskNodeUtils.addNodeDepList(postNode, dependNode); + + lineageNodeList.add(dependNode); + + logger.info("new depend relation : {} -> {}", dependNode.getName(), postNode.getName()); + + // analyse current node table lineage + analyseNodeDependByTableLineage(lineageNodeList, realNode, dependNode, existedTaskNodeMap); + } + } + } + } + /** * init task queue */ @@ -1218,17 +1326,17 @@ public class MasterExecThread implements Runnable { /** * generate flow dag - * @param processDefinitionJson process definition json + * @param taskNodeList task node list * @param startNodeNameList start node name list * @param recoveryNodeNameList recovery node name list * @param depNodeType depend node type * @return ProcessDag process dag * @throws Exception exception */ - public ProcessDag generateFlowDag(String processDefinitionJson, + public ProcessDag generateFlowDag(List taskNodeList, List startNodeNameList, List recoveryNodeNameList, - TaskDependType depNodeType)throws Exception{ - return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType); + TaskDependType depNodeType)throws Exception { + return DagHelper.generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java index 930098919bdff65b2783828cc3d1ec987f2790ce..8a716e91b44cbdfa91679b4d736b28cff773a735 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java @@ -89,41 +89,4 @@ public class DataxUtils { } } - public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { - if (columns == null) { - return null; - } - - String[] toColumns = new String[columns.length]; - for (int i = 0; i < columns.length; i++ ) { - toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); - } - - return toColumns; - } - - public static String doConvertKeywordsColumn(DbType dbType, String column) { - if (column == null) { - return column; - } - - column = column.trim(); - column = column.replace("`", ""); - column = column.replace("\"", ""); - column = column.replace("'", ""); - - switch (dbType) { - case MYSQL: - return String.format("`%s`", column); - case POSTGRESQL: - return String.format("\"%s\"", column); - case ORACLE: - return String.format("\"%s\"", column); - case SQLSERVER: - return String.format("`%s`", column); - default: - return column; - } - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index a14a5d64f591023ee43f7343f83ac723a92ba28f..0789c88fb43d51659de13ffc2deb91f074932eb6 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -443,9 +443,9 @@ public class DataxTask extends AbstractTask { columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); } - notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); + ValidUtils.notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); - return DataxUtils.convertKeywordsColumns(dtType, columnNames); + return SqlUtils.convertKeywordsColumns(dtType, columnNames); } /** @@ -463,7 +463,7 @@ public class DataxTask extends AbstractTask { try { SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); - notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); + ValidUtils.notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); SQLStatement sqlStatement = parser.parseStatement(); SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; @@ -479,7 +479,7 @@ public class DataxTask extends AbstractTask { selectItemList = block.getSelectList(); } - notNull(selectItemList, + ValidUtils.notNull(selectItemList, String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); columnNames = new String[selectItemList.size()]; @@ -559,10 +559,4 @@ public class DataxTask extends AbstractTask { return dataXParameters; } - private void notNull(Object obj, String message) { - if (obj == null) { - throw new RuntimeException(message); - } - } - } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties index 2f75aa50ad4a04df14e8a99f89143848e998b6c7..ce00eb66dca6b66251b11518988b096c06ad193c 100644 --- a/dolphinscheduler-server/src/main/resources/master.properties +++ b/dolphinscheduler-server/src/main/resources/master.properties @@ -38,4 +38,10 @@ #master.reserved.memory=0.3 # master listen port -#master.listen.port=5678 \ No newline at end of file +#master.listen.port=5678 + +# data lineage depend retryTimes +#data.lineage.depend.retryTimes=240 + +# data lineage depend retryInterval (minute) +data.lineage.depend.retryInterval=1 \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 5875d2175800f4f5ef90c16397d3b9aa8436a592..2bf945557e7e04ff63531ab3f0124442a54fa9b2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; @@ -33,6 +34,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.context.ApplicationContext; @@ -51,6 +53,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; */ @RunWith(PowerMockRunner.class) @PrepareForTest({MasterExecThread.class}) +@PowerMockIgnore({"javax.script.*", "javax.management.*"}) public class MasterExecThreadTest { private MasterExecThread masterExecThread; @@ -152,4 +155,53 @@ public class MasterExecThreadTest { return schedulerList; } + /** + * Method: resetDagTaskNodesByDataLineage(List taskNodeList, + * List startNodeNameList) + */ + @Test + public void testResetDagTaskNodesByDataLineage() + throws Exception { + try { + Method method = MasterExecThread.class.getDeclaredMethod("resetDagTaskNodesByDataLineage", List.class, List.class); + method.setAccessible(true); + + String processInstanceJson = "{\"tasks\":[{\"id\":\"tasks-32619\",\"name\":\"t_ds_version_copy1\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"INSERT INTO t_ds_version_copy1\\nselect * from t_ds_version_copy2\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy1\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-12393\",\"name\":\"t_ds_version_copy\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_depend,127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"SELECT * FROM t_ds_version_copy1\\nUNION\\nselect * FROM t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy\",\"preStatements\":[\"truncate table t_ds_version_copy\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[\"t_ds_version_copy1\"],\"extras\":null,\"depList\":[\"t_ds_version_copy1\"],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"; + ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); + + ProcessDefinition processDefinition_32619 = new ProcessDefinition(); + processDefinition_32619.setId(2); + processDefinition_32619.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-3877\",\"name\":\"4 and 3 -> 2\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_depend,127.0.0.1:escheduler:t_ds_version_copy3#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"insert into t_ds_version_copy2\\nselect * from t_ds_version_copy4\\nUNION\\nselect * from t_ds_version_copy3\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy2\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy2#table_depend"})).thenReturn(Arrays.asList(processDefinition_32619)); + + ProcessDefinition processDefinition_12393_1 = new ProcessDefinition(); + processDefinition_12393_1.setId(3); + processDefinition_12393_1.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + + ProcessDefinition processDefinition_12393_2 = new ProcessDefinition(); + processDefinition_12393_2.setId(1); + processDefinition_12393_2.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-32619\",\"name\":\"t_ds_version_copy1\",\"desc\":null,\"type\":\"SQL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":[],\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy2#table_depend\",\"type\":\"MYSQL\",\"datasource\":5,\"sql\":\"INSERT INTO t_ds_version_copy1\\nselect * from t_ds_version_copy2\",\"sqlType\":1,\"udfs\":\"\",\"showType\":\"TABLE\",\"connParams\":\"\",\"preStatements\":[\"truncate table t_ds_version_copy1\"],\"postStatements\":[],\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"targetTable\":null,\"dtType\":null,\"dataTarget\":0,\"resourceFilesList\":[],\"localParametersMap\":{}},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-12393\",\"name\":\"t_ds_version_copy\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy1#table_depend,127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"SELECT * FROM t_ds_version_copy1\\nUNION\\nselect * FROM t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy\",\"preStatements\":[\"truncate table t_ds_version_copy\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[\"t_ds_version_copy1\"],\"extras\":null,\"depList\":[\"t_ds_version_copy1\"],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy1#table_depend", "127.0.0.1:escheduler:t_ds_version_copy5#table_depend"})).thenReturn(Arrays.asList(processDefinition_12393_1, processDefinition_12393_2)); + + ProcessDefinition processDefinition_3877 = new ProcessDefinition(); + processDefinition_3877.setId(3); + processDefinition_3877.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy4#table_depend", "127.0.0.1:escheduler:t_ds_version_copy3#table_depend"})).thenReturn(Arrays.asList(processDefinition_3877)); + + ProcessDefinition processDefinition_43031 = new ProcessDefinition(); + processDefinition_43031.setId(3); + processDefinition_43031.setProcessDefinitionJson("{\"tasks\":[{\"id\":\"tasks-34351\",\"name\":\"t_ds_version_copy5\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy6#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy6\",\"targetTable\":\"t_ds_version_copy5\",\"preStatements\":[\"truncate table t_ds_version_copy6\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}},{\"id\":\"tasks-43031\",\"name\":\"t_ds_version_copy4\",\"desc\":null,\"type\":\"DATAX\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"localParams\":null,\"checkDependFlag\":1,\"targetNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy4#table_target\",\"dependNodeKeys\":\"127.0.0.1:escheduler:t_ds_version_copy5#table_depend\",\"customConfig\":0,\"json\":null,\"dsType\":\"MYSQL\",\"dataSource\":5,\"dtType\":\"MYSQL\",\"dataTarget\":5,\"sql\":\"select id, version from t_ds_version_copy5\",\"targetTable\":\"t_ds_version_copy4\",\"preStatements\":[\"truncate table t_ds_version_copy5\"],\"postStatements\":[],\"jobSpeedByte\":0,\"jobSpeedRecord\":1000,\"resourceFilesList\":[],\"localParametersMap\":null},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"timeout\":{\"enable\":false,\"strategy\":\"\"},\"forbidden\":false,\"conditionsTask\":false,\"taskTimeoutParameter\":{\"enable\":false,\"strategy\":null,\"interval\":0}}],\"globalParams\":[],\"timeout\":0,\"tenantId\":-1}"); + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy5#table_depend"})).thenReturn(Arrays.asList(processDefinition_43031)); + + // task 34351 + Mockito.when(processService.queryDependDefinitionList(new String[]{"127.0.0.1:escheduler:t_ds_version_copy6#table_depend"})).thenReturn(new ArrayList<>()); + + method.invoke(masterExecThread, processData.getTasks(), new ArrayList<>()); + + Assert.assertEquals(5, processData.getTasks().size()); + } catch(Exception e) { + Assert.fail(e.getMessage()); + } + } + } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java index 2720bb8a2806f7f28e630a53a52ff17d0a7b7424..f4cb33ec2059fb6ba421e5575c7b1d68a441d854 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java @@ -73,36 +73,4 @@ public class DataxUtilsTest { assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null); } - /** - * - * Method: convertKeywordsColumns(DbType dbType, String[] columns) - * - */ - @Test - public void testConvertKeywordsColumns() throws Exception { - String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; - String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; - - String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); - - assertTrue(fromColumns.length == toColumns.length); - - for (int i = 0; i < toColumns.length; i++) { - assertEquals(targetColumns[i], toColumns[i]); - } - } - - /** - * - * Method: doConvertKeywordsColumn(DbType dbType, String column) - * - */ - @Test - public void testDoConvertKeywordsColumn() throws Exception { - assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); - assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); - assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); - assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); - assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 60c17f7f18394b876b3bc3a4af14b97da1fe19f1..f0502ece95851198f101c13d17409b5b192bf5ee 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1812,4 +1812,18 @@ public class ProcessService { taskInstance.getId()); } + /** + * query depend process definition list + * @param dependNodeKeys depend node keys + * @return process definition list + */ + public List queryDependDefinitionList(String[] dependNodeKeys){ + if(dependNodeKeys == null || dependNodeKeys.length == 0) { + return null; + } + + String[] targetNodeKeys = DependUnionKeyUtils.replaceMarkWordToTarget(dependNodeKeys); + return processDefineMapper.queryDefinitionByTargetNodeKeys(targetNodeKeys); + } + } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue index f1c9b757bdac1f23df5cb387e90c315575ea3cb7..c146e2144e669a2f710afcade7ae09bd2235fd57 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -138,6 +138,18 @@ + +
+ {{$t('Check Dependency')}} +
+
+ +
+
+ + diff --git a/dolphinscheduler-ui/src/js/conf/home/router/index.js b/dolphinscheduler-ui/src/js/conf/home/router/index.js index b4236f3685117e68eaba6f33dce7cfe46e1889df..ce39987b275e6c308389c48f4e52e843d8d426b6 100644 --- a/dolphinscheduler-ui/src/js/conf/home/router/index.js +++ b/dolphinscheduler-ui/src/js/conf/home/router/index.js @@ -115,6 +115,14 @@ const router = new Router({ meta: { title: `${i18n.$t('Scheduled task list')}` } + }, + { + path: '/projects/definition/depend/tree/:id', + name: 'definition-tree-view-depend-index', + component: resolve => require(['../pages/projects/pages/definition/pages/tree/dependIndex'], resolve), + meta: { + title: `${i18n.$t('DependView')}` + } } ] }, diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index 09d51b91dd24ed570a44387ab47ee9933ba194b8..dfdb3a8657e20c37bb1e9b3d66fa72eeb83bc0ad 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -677,6 +677,18 @@ export default { }) }) }, + /** + * tree chart (depend) + */ + getViewTreeByDepend ({ state }, payload) { + return new Promise((resolve, reject) => { + io.get(`projects/${state.projectName}/process/view-tree-depend`, payload, res => { + resolve(res.data) + }).catch(e => { + reject(e) + }) + }) + }, /** * gantt chart */ diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index c8d82d42467f82f7918230a577bb8406470f152e..97b0dfe2386abd6bb0fb4cb4051577c23b87cf72 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -606,5 +606,7 @@ export default { 'Socket Timeout':'Socket Timeout', 'Connect timeout be a positive integer': 'Connect timeout be a positive integer', 'Socket Timeout be a positive integer': 'Socket Timeout be a positive integer', - 'ms':'ms' + 'ms':'ms', + 'DependView': 'DependView', + 'Check Dependency': 'Check Dependency' } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index e65449092e3904067f1e7187f762f65ba4e0ccde..bcabfd79c5082f8e5e02d022a0aa5f61f505b907 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -606,5 +606,7 @@ export default { 'Socket Timeout':'Socket超时', 'Connect timeout be a positive integer': '连接超时必须为数字', 'Socket Timeout be a positive integer': 'Socket超时必须为数字', - 'ms':'毫秒' + 'ms':'毫秒', + 'DependView': '依赖关系图', + 'Check Dependency': '检测依赖' } diff --git a/pom.xml b/pom.xml index 57269748cfcc2bcb285818354c74592afc4d6a5b..1f8910675bd4a1f8077fbf80ea685c5e77b2410f 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 3.2.0 2.0.1 5.0.5 - 1.1.14 + 1.1.21 1.4.200 1.6 1.1.1 @@ -849,6 +849,9 @@ **/plugin/model/AlertDataTest.java **/plugin/model/AlertInfoTest.java **/plugin/utils/PropertyUtilsTest.java + **/common/utils/DependUnionKeyUtilsTest.java + **/common/utils/SqlUtilsTest.java + **/common/utils/ValidUtilsTest.java diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index f045bb275cb04aba1491873cc4139443190f1b1d..a820db04ac085a595303cb5457028cb4a8b18428 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -42,7 +42,7 @@ datanucleus-api-jdo-4.2.1.jar datanucleus-core-4.1.6.jar datanucleus-rdbms-4.1.7.jar derby-10.14.2.0.jar -druid-1.1.14.jar +druid-1.1.21.jar gson-2.8.5.jar guava-20.0.jar guice-3.0.jar