From 9122493d2400c5506bb21d3f4fe5cb3d9b3ff86c Mon Sep 17 00:00:00 2001 From: daihw <928874202@qq.com> Date: Tue, 7 Jun 2022 10:04:22 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=9C=AA=E5=AE=8C=E6=88=90=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E6=97=A5=E5=BF=97=E8=AE=A2=E9=98=85=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TaskInstanceEventHandler.java | 9 ++- .../WorkflowInstanceEventHandler.java | 10 ++- .../storage/FileSystemStorageService.java | 62 ++++++++++++------- .../storage/MonitoringFileService.java | 9 ++- 4 files changed, 62 insertions(+), 28 deletions(-) diff --git a/api/src/main/java/dev/jianmu/api/eventhandler/TaskInstanceEventHandler.java b/api/src/main/java/dev/jianmu/api/eventhandler/TaskInstanceEventHandler.java index 9621c3831..86b604622 100644 --- a/api/src/main/java/dev/jianmu/api/eventhandler/TaskInstanceEventHandler.java +++ b/api/src/main/java/dev/jianmu/api/eventhandler/TaskInstanceEventHandler.java @@ -7,6 +7,7 @@ import dev.jianmu.application.service.internal.WorkerApplication; import dev.jianmu.infrastructure.docker.TaskFailedEvent; import dev.jianmu.infrastructure.docker.TaskFinishedEvent; import dev.jianmu.infrastructure.docker.TaskRunningEvent; +import dev.jianmu.infrastructure.storage.MonitoringFileService; import dev.jianmu.task.event.TaskInstanceCreatedEvent; import dev.jianmu.task.event.TaskInstanceFailedEvent; import dev.jianmu.task.event.TaskInstanceRunningEvent; @@ -31,15 +32,17 @@ public class TaskInstanceEventHandler { private final TaskInstanceInternalApplication taskInstanceInternalApplication; private final AsyncTaskInstanceInternalApplication asyncTaskInstanceInternalApplication; private final WorkerApplication workerApplication; + private final MonitoringFileService monitoringFileService; public TaskInstanceEventHandler( TaskInstanceInternalApplication taskInstanceInternalApplication, AsyncTaskInstanceInternalApplication asyncTaskInstanceInternalApplication, - WorkerApplication workerApplication - ) { + WorkerApplication workerApplication, + MonitoringFileService monitoringFileService) { this.taskInstanceInternalApplication = taskInstanceInternalApplication; this.asyncTaskInstanceInternalApplication = asyncTaskInstanceInternalApplication; this.workerApplication = workerApplication; + this.monitoringFileService = monitoringFileService; } @EventListener @@ -55,6 +58,7 @@ public class TaskInstanceEventHandler { } else { this.taskInstanceInternalApplication.executeFailed(taskResultDto.getTaskInstanceId()); } + this.monitoringFileService.clearCallbackByLogId(taskFinishedEvent.getTaskId()); } @EventListener @@ -71,6 +75,7 @@ public class TaskInstanceEventHandler { MDC.put("triggerId", taskFailedEvent.getTriggerId()); logger.info("task {} is failed, due to: {}", taskFailedEvent.getTaskId(), taskFailedEvent.getErrorMsg()); this.taskInstanceInternalApplication.executeFailed(taskFailedEvent.getTaskId()); + this.monitoringFileService.clearCallbackByLogId(taskFailedEvent.getTaskId()); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) diff --git a/api/src/main/java/dev/jianmu/api/eventhandler/WorkflowInstanceEventHandler.java b/api/src/main/java/dev/jianmu/api/eventhandler/WorkflowInstanceEventHandler.java index 29feaa0fb..5986cf0c8 100644 --- a/api/src/main/java/dev/jianmu/api/eventhandler/WorkflowInstanceEventHandler.java +++ b/api/src/main/java/dev/jianmu/api/eventhandler/WorkflowInstanceEventHandler.java @@ -4,6 +4,7 @@ import dev.jianmu.application.command.WorkflowStartCmd; import dev.jianmu.application.service.internal.AsyncTaskInstanceInternalApplication; import dev.jianmu.application.service.internal.WorkerApplication; import dev.jianmu.application.service.internal.WorkflowInternalApplication; +import dev.jianmu.infrastructure.storage.MonitoringFileService; import dev.jianmu.workflow.aggregate.process.WorkflowInstance; import dev.jianmu.workflow.event.process.ProcessEndedEvent; import dev.jianmu.workflow.event.process.ProcessNotRunningEvent; @@ -31,17 +32,19 @@ public class WorkflowInstanceEventHandler { private final AsyncTaskInstanceInternalApplication asyncTaskInstanceInternalApplication; private final WorkerApplication workerApplication; private final ApplicationEventPublisher publisher; + private final MonitoringFileService monitoringFileService; public WorkflowInstanceEventHandler( WorkflowInternalApplication workflowInternalApplication, AsyncTaskInstanceInternalApplication asyncTaskInstanceInternalApplication, WorkerApplication workerApplication, - ApplicationEventPublisher publisher - ) { + ApplicationEventPublisher publisher, + MonitoringFileService monitoringFileService) { this.workflowInternalApplication = workflowInternalApplication; this.asyncTaskInstanceInternalApplication = asyncTaskInstanceInternalApplication; this.workerApplication = workerApplication; this.publisher = publisher; + this.monitoringFileService = monitoringFileService; } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @@ -81,6 +84,7 @@ public class WorkflowInstanceEventHandler { log.info(event.toString()); this.asyncTaskInstanceInternalApplication.terminateByTriggerId(event.getTriggerId()); log.info("-----------------------------------------------------"); + this.monitoringFileService.clearCallbackByLogId(event.getTriggerId()); } @EventListener @@ -90,6 +94,7 @@ public class WorkflowInstanceEventHandler { log.info(event.toString()); this.workerApplication.cleanupWorkspace(event.getTriggerId()); log.info("-----------------------------------------------------"); + this.monitoringFileService.clearCallbackByLogId(event.getTriggerId()); } @EventListener @@ -99,5 +104,6 @@ public class WorkflowInstanceEventHandler { log.info(event.toString()); this.workerApplication.cleanupWorkspace(event.getTriggerId()); log.info("-----------------------------------------------------"); + this.monitoringFileService.clearCallbackByLogId(event.getTriggerId()); } } diff --git a/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/FileSystemStorageService.java b/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/FileSystemStorageService.java index ef1c17c5a..af7d8dd2a 100644 --- a/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/FileSystemStorageService.java +++ b/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/FileSystemStorageService.java @@ -1,8 +1,11 @@ package dev.jianmu.infrastructure.storage; import dev.jianmu.infrastructure.SseTemplate; -import dev.jianmu.infrastructure.storage.vo.ConsumerVo; import dev.jianmu.infrastructure.storage.vo.LogVo; +import dev.jianmu.task.aggregate.InstanceStatus; +import dev.jianmu.task.repository.TaskInstanceRepository; +import dev.jianmu.workflow.aggregate.process.ProcessStatus; +import dev.jianmu.workflow.repository.WorkflowInstanceRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; @@ -40,13 +43,22 @@ public class FileSystemStorageService implements StorageService, ApplicationRunn private final Path rootLocation; private final Path webhookRootLocation; private final Path workflowLocation; + private final WorkflowInstanceRepository workflowInstanceRepository; + private final TaskInstanceRepository taskInstanceRepository; - public FileSystemStorageService(SseTemplate template, MonitoringFileService monitoringFileService, StorageProperties properties) { + public FileSystemStorageService(SseTemplate template, + MonitoringFileService monitoringFileService, + StorageProperties properties, + WorkflowInstanceRepository workflowInstanceRepository, + TaskInstanceRepository taskInstanceRepository + ) { this.template = template; this.monitoringFileService = monitoringFileService; this.rootLocation = Paths.get(properties.getFilepath(), taskFilepath); this.webhookRootLocation = Paths.get(properties.getFilepath(), webhookFilepath); this.workflowLocation = Paths.get(properties.getFilepath(), workflowFilepath); + this.workflowInstanceRepository = workflowInstanceRepository; + this.taskInstanceRepository = taskInstanceRepository; } @Override @@ -81,11 +93,25 @@ public class FileSystemStorageService implements StorageService, ApplicationRunn @Override public SseEmitter readLog(String logFileName, int size, boolean isTask) { + boolean isComplete; + if (isTask) { + isComplete = this.taskInstanceRepository.findById(logFileName) + .stream().noneMatch(taskInstance -> taskInstance.getStatus() == InstanceStatus.WAITING || taskInstance.getStatus() == InstanceStatus.RUNNING); + } else { + isComplete = this.workflowInstanceRepository.findByTriggerId(logFileName) + .stream().noneMatch(workflowInstance -> workflowInstance.getStatus() == ProcessStatus.RUNNING || workflowInstance.getStatus() == ProcessStatus.SUSPENDED); + } var fullName = logFileName + LogfilePostfix; + String filePath = (isTask ? this.rootLocation : this.workflowLocation) + File.separator + fullName; var sseEmitter = this.template.newSseEmitter(); + if (isComplete) { + this.firstReadLog(Paths.get(filePath), new AtomicLong(1), sseEmitter, size); + return sseEmitter; + } + // 订阅未完成日志 var consumerVo = this.monitoringFileService.listen(fullName, ((file, counter) -> { try { - Files.lines(file).skip(counter.intValue() - 2) + Files.lines(file).skip(counter.intValue()) .forEach(line -> { this.template.sendMessage(SseEmitter.event() .id(String.valueOf(counter.get())) @@ -96,31 +122,21 @@ public class FileSystemStorageService implements StorageService, ApplicationRunn logger.trace("Could not read log file", e); } })); - String filePath = (isTask ? this.rootLocation : this.workflowLocation) + File.separator + fullName; - this.firstReadLog(Paths.get(filePath), consumerVo, sseEmitter, size); + this.firstReadLog(Paths.get(filePath), consumerVo.getCounter(), sseEmitter, size); return sseEmitter; } - private BufferedReader execCmd(String cmd) { - try { - var process = Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}); - return new BufferedReader(new InputStreamReader(new BufferedInputStream(process.getInputStream()))); - } catch (IOException e) { - throw new StorageException("Could not execution command", e); - } - } - - private void firstReadLog(Path path, ConsumerVo consumerVo, SseEmitter sseEmitter, int size) { + private void firstReadLog(Path path, AtomicLong counter, SseEmitter sseEmitter, int size) { try { var countLine = Files.lines(path).count(); - consumerVo.getCounter().set(countLine - size); + counter.set(countLine - size); Files.lines(path) - .skip(countLine > size ? countLine - size -1 : 0) + .skip(countLine > size ? countLine - size : 0) .forEach(line -> this.template.sendMessage(SseEmitter.event() - .id(String.valueOf(consumerVo.getCounter().incrementAndGet())) + .id(String.valueOf(counter.incrementAndGet())) .data(line), sseEmitter) ); - consumerVo.getCounter().set(countLine); + counter.set(countLine); } catch (IOException e) { logger.trace("Could not read log file", e); } @@ -133,12 +149,12 @@ public class FileSystemStorageService implements StorageService, ApplicationRunn var lineNum = new AtomicLong(line - 1); try { Files.lines(Paths.get(filePath)) - .skip(line == 1 ? 0 : line - 2) + .skip(line - 1) .limit(size) .forEach(str -> list.add(LogVo.builder() - .lastEventId(String.valueOf(lineNum.incrementAndGet())) - .data(str) - .build()) + .lastEventId(String.valueOf(lineNum.incrementAndGet())) + .data(str) + .build()) ); } catch (IOException e) { logger.trace("Could not read log file", e); diff --git a/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/MonitoringFileService.java b/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/MonitoringFileService.java index e99014543..d68702f83 100644 --- a/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/MonitoringFileService.java +++ b/infrastructure/src/main/java/dev/jianmu/infrastructure/storage/MonitoringFileService.java @@ -26,6 +26,7 @@ import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; @Service @Slf4j public class MonitoringFileService implements DisposableBean { + private static final Map keyPathMap = new ConcurrentHashMap<>(); private static final String LogfilePostfix = ".log"; private final Map> callbackMap = new ConcurrentHashMap<>(); @@ -37,6 +38,10 @@ public class MonitoringFileService implements DisposableBean { this.monitoringTaskDirectory = taskPath; this.monitoringWorkflowDirectory = workflowPath; this.watchService = FileSystems.getDefault().newWatchService(); + var taskKey = this.monitoringTaskDirectory.register(this.watchService, ENTRY_MODIFY); + var workflowKey = this.monitoringWorkflowDirectory.register(this.watchService, ENTRY_MODIFY); + keyPathMap.put(taskKey, this.monitoringTaskDirectory); + keyPathMap.put(workflowKey, this.monitoringWorkflowDirectory); this.monitoringWorkflowDirectory.register(this.watchService, ENTRY_MODIFY); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(this::monitor); @@ -54,7 +59,8 @@ public class MonitoringFileService implements DisposableBean { try { var key = this.watchService.take(); for (final WatchEvent event : key.pollEvents()) { - final Path changed = this.monitoringWorkflowDirectory.resolve((Path) event.context()); + var path = keyPathMap.get(key); + final Path changed = path.resolve((Path) event.context()); final String fileName = changed.getFileName().toString(); var set = this.callbackMap.get(fileName); if (event.kind() == ENTRY_MODIFY && set != null) { @@ -67,6 +73,7 @@ public class MonitoringFileService implements DisposableBean { boolean isKeyStillValid = key.reset(); if (!isKeyStillValid) { log.trace("monitor - key is no longer valid: " + key); + keyPathMap.remove(key); } } catch (ClosedWatchServiceException ex) { log.trace(""); -- Gitee