diff --git a/api/src/main/java/dev/jianmu/api/controller/ViewController.java b/api/src/main/java/dev/jianmu/api/controller/ViewController.java index 0ee52f3e18eed98f8f625f8ca9fcbe72744d971d..977292ef44f1e72d3f8c5df75ecc52d4e609261a 100644 --- a/api/src/main/java/dev/jianmu/api/controller/ViewController.java +++ b/api/src/main/java/dev/jianmu/api/controller/ViewController.java @@ -7,6 +7,7 @@ import dev.jianmu.api.vo.*; import dev.jianmu.application.dsl.DslParser; import dev.jianmu.application.exception.DataNotFoundException; import dev.jianmu.application.service.*; +import dev.jianmu.application.service.internal.WorkerInternalApplication; import dev.jianmu.application.service.internal.WorkflowInternalApplication; import dev.jianmu.infrastructure.storage.StorageService; import dev.jianmu.infrastructure.storage.vo.LogVo; @@ -16,6 +17,7 @@ import dev.jianmu.secret.aggregate.Namespace; import dev.jianmu.task.aggregate.InstanceParameter; import dev.jianmu.task.aggregate.Volume; import dev.jianmu.trigger.event.TriggerEvent; +import dev.jianmu.worker.aggregate.Worker; import dev.jianmu.workflow.aggregate.definition.Workflow; import dev.jianmu.workflow.aggregate.parameter.Parameter; import dev.jianmu.workflow.aggregate.process.ProcessStatus; diff --git a/api/src/main/java/dev/jianmu/api/controller/WorkerApi.java b/api/src/main/java/dev/jianmu/api/controller/WorkerApi.java new file mode 100644 index 0000000000000000000000000000000000000000..0d1b22cb6d184baaa2beffff314c8e78890acb33 --- /dev/null +++ b/api/src/main/java/dev/jianmu/api/controller/WorkerApi.java @@ -0,0 +1,337 @@ +package dev.jianmu.api.controller; + +import dev.jianmu.api.dto.*; +import dev.jianmu.api.vo.Auth; +import dev.jianmu.api.vo.VolumeVo; +import dev.jianmu.api.vo.WorkerTaskVo; +import dev.jianmu.application.query.NodeDefApi; +import dev.jianmu.application.service.TaskInstanceApplication; +import dev.jianmu.application.service.internal.WorkerInternalApplication; +import dev.jianmu.infrastructure.GlobalProperties; +import dev.jianmu.infrastructure.storage.StorageService; +import dev.jianmu.infrastructure.worker.DeferredResultService; +import dev.jianmu.infrastructure.worker.unit.Unit; +import dev.jianmu.task.aggregate.InstanceStatus; +import dev.jianmu.worker.aggregate.Worker; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.Parameters; +import io.swagger.v3.oas.annotations.enums.ParameterIn; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import jakarta.validation.Valid; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.DeadlockLoserDataAccessException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +import java.io.IOException; + +/** + * @author Ethan Liu + * @class WorkerController + * @description Worker API + * @create 2021-04-21 14:40 + */ +@RestController +@RequestMapping("workers") +@Tag(name = "Worker API", description = "为Worker提供的API") +public class WorkerApi { + private final WorkerInternalApplication workerApplication; + private final DeferredResultService deferredResultService; + private final NodeDefApi nodeDefApi; + private final StorageService storageService; + private final TaskInstanceApplication taskInstanceApplication; + private final GlobalProperties globalProperties; + + public WorkerApi(WorkerInternalApplication workerApplication, + DeferredResultService deferredResultService, + NodeDefApi nodeDefApi, + StorageService storageService, + TaskInstanceApplication taskInstanceApplication, + GlobalProperties globalProperties + ) { + this.workerApplication = workerApplication; + this.deferredResultService = deferredResultService; + this.nodeDefApi = nodeDefApi; + this.storageService = storageService; + this.taskInstanceApplication = taskInstanceApplication; + this.globalProperties = globalProperties; + } + + @PutMapping("{workerId}/join") + @Operation(summary = "Worker注册接口", description = "Worker注册接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void join(@PathVariable("workerId") String workerId, @RequestBody @Valid WorkerJoiningDto dto) { + var worker = Worker.Builder.aWorker() + .id(workerId) + .name(dto.getName()) + .type(dto.getType()) + .os(dto.getOs()) + .arch(dto.getArch()) + .capacity(dto.getCapacity()) + .build(); + this.workerApplication.join(worker); + } + + @PutMapping("{workerId}/online") + @Operation(summary = "通知Worker已经Online", description = "通知Worker已经Online") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void online(@PathVariable("workerId") String workerId) { + this.workerApplication.online(workerId); + } + + @PutMapping("{workerId}/offline") + @Operation(summary = "通知Worker已经Offline", description = "通知Worker已经Offline") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void offline(@PathVariable("workerId") String workerId) { + this.workerApplication.offline(workerId); + } + + + @GetMapping("{workerId}/ping") + @Operation(summary = "ping Server接口", description = "ping Server接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void ping(@PathVariable("workerId") String workerId) { + } + + @GetMapping("kubernetes/{workerId}/tasks") + @Operation(summary = "拉取kube任务接口", description = "拉取kube任务接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public DeferredResult> pullKubeTasks(@PathVariable String workerId, TaskPullingDto taskPullingDto) { + var deferredResult = this.deferredResultService.newPullDeferredResult(workerId); + this.workerApplication.pullKubeTasks(workerId, taskPullingDto.getTriggerId()).ifPresent(taskInstance -> { + var unit = this.workerApplication.findUnit(taskInstance); + deferredResult.setResult(ResponseEntity.status(HttpStatus.OK).body(unit)); + }); + return deferredResult; + } + + @GetMapping("{workerId}/tasks") + @Operation(summary = "拉取任务接口", description = "拉取任务接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public DeferredResult> pullTasks(@PathVariable String workerId) { + var deferredResult = this.deferredResultService.newPullDeferredResult(workerId); + this.workerApplication.pullTasks(workerId).ifPresent(taskInstance -> { + if (taskInstance.isVolume()) { + deferredResult.setResult(ResponseEntity + .status(HttpStatus.OK) + .body(WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.VOLUME) + .taskInstanceId(taskInstance.getBusinessId()) + .volume(VolumeVo.builder() + .name(taskInstance.getTriggerId()) + .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) + .build()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion()) + .build() + )); + } else { + deferredResult.setResult(ResponseEntity + .status(HttpStatus.OK) + .body(WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.TASK) + .taskInstanceId(taskInstance.getBusinessId()) + .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) + .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) + .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion()) + .build() + )); + } + }); + return deferredResult; + } + + @GetMapping("{workerId}/tasks/{businessId}") + @Operation(summary = "获取任务详情接口", description = "获取任务详情接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public WorkerTaskVo findTaskById(@PathVariable String workerId, @PathVariable("businessId") String businessId) { + var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) + .orElseThrow(() -> new RuntimeException("未找到任务:" + businessId)); + if (taskInstance.isVolume()) { + return WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.VOLUME) + .taskInstanceId(taskInstance.getBusinessId()) + .volume(VolumeVo.builder() + .name(taskInstance.getTriggerId()) + .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) + .build()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion()) + .build(); + } else { + return WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.TASK) + .taskInstanceId(taskInstance.getBusinessId()) + .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) + .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) + .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion()) + .build(); + } + } + + private Auth getTaskAuth() { + var registry = globalProperties.getWorker().getRegistry(); + return Auth.builder() + .address(registry.getAddress()) + .username(registry.getUsername()) + .password(registry.getPassword()) + .build(); + } + + @PostMapping("{workerId}/tasks/{businessId}") + @Operation(summary = "获取终止任务接口", description = "获取终止任务接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public DeferredResult> watchTasks(@PathVariable String workerId, @PathVariable("businessId") String businessId) { + var exist = this.deferredResultService.existWatchDeferredResult(workerId, businessId); + var deferredResult = this.deferredResultService.newWatchDeferredResult(workerId, businessId); + if (!exist) { + var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) + .orElseThrow(() -> new RuntimeException("未找到任务实例,businessId: " + businessId)); + if (taskInstance.getStatus() == InstanceStatus.EXECUTION_FAILED) { + deferredResult.setResult(ResponseEntity.status(HttpStatus.OK).body(businessId)); + } + } + return deferredResult; + } + + @Retryable( + value = {DeadlockLoserDataAccessException.class, CannotAcquireLockException.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 1000L, multiplier = 2), + listeners = "retryListener" + ) + @PatchMapping("{workerId}/tasks/{businessId}/accept") + @Operation(summary = "确定任务接口", description = "确定任务接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public WorkerTaskVo acceptTask(HttpServletResponse response, @PathVariable("workerId") String workerId, + @PathVariable("businessId") String businessId, @Valid @RequestBody TaskInstanceAcceptingDto dto) { + var taskInstance = this.workerApplication.acceptTask(response, workerId, businessId, dto.getVersion()); + if (response.getStatus() != HttpStatus.OK.value()) { + return WorkerTaskVo.builder() + .taskInstanceId(businessId) + .build(); + } + if (taskInstance.isVolume()) { + return WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.VOLUME) + .taskInstanceId(taskInstance.getBusinessId()) + .volume(VolumeVo.builder() + .name(taskInstance.getTriggerId()) + .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) + .build()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion() + 1) + .build(); + } else { + return WorkerTaskVo.builder() + .type(WorkerTaskVo.Type.TASK) + .taskInstanceId(taskInstance.getBusinessId()) + .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) + .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) + .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) + .auth(this.getTaskAuth()) + .version(taskInstance.getVersion() + 1) + .build(); + } + } + + @Retryable( + value = {DeadlockLoserDataAccessException.class, CannotAcquireLockException.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 1000L, multiplier = 2), + listeners = "retryListener" + ) + @PatchMapping("{workerId}/tasks/{businessId}") + @Operation(summary = "更新任务接口", description = "更新任务接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void updateTaskInstance(@PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId, @Valid @RequestBody TaskInstanceUpdatingDto dto) { + this.workerApplication.updateTaskInstance(workerId, businessId, dto.getStatus().name(), dto.getResultFile(), dto.getErrorMsg(), dto.getExitCode()); + } + + @PostMapping("{workerId}/tasks/{businessId}/logs") + @Operation(summary = "写入任务日志接口", description = "写入任务日志接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void writeTaskLog(HttpServletRequest request, @PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId) { + var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) + .orElseThrow(() -> new RuntimeException("未找到任务实例, businessId:" + businessId)); + try (var writer = this.storageService.writeLog(taskInstance.getId(), false)) { + var reader = request.getReader(); + String line; + while ((line = reader.readLine()) != null) { + var list = TaskInstanceWritingLogDto.parseString(line); + list.stream() + .filter(dto -> dto.getContent() != null) + .forEach(dto -> this.workerApplication.writeTaskLog(writer, workerId, taskInstance.getId(), dto.getContent(), dto.getNumber(), dto.getTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("任务日志写入失败: " + e); + } + } + + @PostMapping("{workerId}/tasks/{businessId}/logs/batch") + @Operation(summary = "实时写入任务日志接口", description = "实时写入任务日志接口") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public void batchWriteTaskLog(HttpServletRequest request, @PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId) { + var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) + .orElseThrow(() -> new RuntimeException("未找到任务实例, businessId:" + businessId)); + try (var writer = this.storageService.writeLog(taskInstance.getId(), true)) { + var reader = request.getReader(); + String line; + while ((line = reader.readLine()) != null) { + if ("null".equals(line)) { + continue; + } + var list = TaskInstanceWritingLogDto.parseString(line); + list.stream() + .filter(dto -> dto.getContent() != null) + .forEach(dto -> this.workerApplication.writeTaskLog(writer, workerId, taskInstance.getId(), dto.getContent(), dto.getNumber(), dto.getTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("任务日志写入失败: " + e); + } + } + + @GetMapping("/kubernetes/{workerId}/tasks/{triggerId}") + @Operation(summary = "获取k8s运行中任务", description = "获取k8s运行中任务") + @Parameters({ + @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") + }) + public Unit findRunningTaskByTriggerId(@PathVariable("workerId") String workerId, @PathVariable("triggerId") String triggerId) { + return this.workerApplication.findRunningTaskByTriggerId(workerId, triggerId); + } +} diff --git a/api/src/main/java/dev/jianmu/api/controller/WorkerController.java b/api/src/main/java/dev/jianmu/api/controller/WorkerController.java index 15a09185c7ba3a49ab540fef30e79e74591a48bc..6828aeb268c2304507689dc36c1cb26b04d3d69b 100644 --- a/api/src/main/java/dev/jianmu/api/controller/WorkerController.java +++ b/api/src/main/java/dev/jianmu/api/controller/WorkerController.java @@ -1,67 +1,25 @@ package dev.jianmu.api.controller; -import dev.jianmu.api.dto.*; -import dev.jianmu.api.vo.Auth; -import dev.jianmu.api.vo.VolumeVo; -import dev.jianmu.api.vo.WorkerTaskVo; -import dev.jianmu.application.query.NodeDefApi; -import dev.jianmu.application.service.TaskInstanceApplication; +import dev.jianmu.api.vo.WorkerVo; +import dev.jianmu.application.service.WorkerApplication; import dev.jianmu.application.service.internal.WorkerInternalApplication; -import dev.jianmu.infrastructure.GlobalProperties; -import dev.jianmu.infrastructure.storage.StorageService; -import dev.jianmu.infrastructure.worker.DeferredResultService; -import dev.jianmu.infrastructure.worker.unit.Unit; -import dev.jianmu.task.aggregate.InstanceStatus; import dev.jianmu.worker.aggregate.Worker; import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.Parameters; -import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.tags.Tag; -import jakarta.servlet.http.HttpServletRequest; -import jakarta.servlet.http.HttpServletResponse; -import jakarta.validation.Valid; -import org.springframework.dao.CannotAcquireLockException; -import org.springframework.dao.DeadlockLoserDataAccessException; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; -import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; -/** - * @author Ethan Liu - * @class WorkerController - * @description Worker API - * @create 2021-04-21 14:40 - */ @RestController -@RequestMapping("workers") -@Tag(name = "Worker API", description = "Worker API") +@RequestMapping("frontend/workers") +@Tag(name = "Worker API", description = "为前端提供的API") public class WorkerController { - private final WorkerInternalApplication workerApplication; - private final DeferredResultService deferredResultService; - private final NodeDefApi nodeDefApi; - private final StorageService storageService; - private final TaskInstanceApplication taskInstanceApplication; - private final GlobalProperties globalProperties; - public WorkerController(WorkerInternalApplication workerApplication, - DeferredResultService deferredResultService, - NodeDefApi nodeDefApi, - StorageService storageService, - TaskInstanceApplication taskInstanceApplication, - GlobalProperties globalProperties - ) { + private final WorkerApplication workerApplication; + + public WorkerController(WorkerApplication workerApplication) { this.workerApplication = workerApplication; - this.deferredResultService = deferredResultService; - this.nodeDefApi = nodeDefApi; - this.storageService = storageService; - this.taskInstanceApplication = taskInstanceApplication; - this.globalProperties = globalProperties; } @GetMapping("/types") @@ -70,247 +28,26 @@ public class WorkerController { return Worker.Type.values(); } - @PutMapping("{workerId}/join") - @Operation(summary = "连接Server接口", description = "连接Server接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public void join(@PathVariable("workerId") String workerId, @RequestBody @Valid WorkerJoiningDto dto) { - this.workerApplication.join(workerId, dto.getType(), dto.getName(), dto.getTag()); - } - - @GetMapping("{workerId}/ping") - @Operation(summary = "ping Server接口", description = "ping Server接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public void ping(@PathVariable("workerId") String workerId) { - } - - @GetMapping("kubernetes/{workerId}/tasks") - @Operation(summary = "拉取kube任务接口", description = "拉取kube任务接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public DeferredResult> pullKubeTasks(@PathVariable String workerId, TaskPullingDto taskPullingDto) { - var deferredResult = this.deferredResultService.newPullDeferredResult(workerId); - this.workerApplication.pullKubeTasks(workerId, taskPullingDto.getTriggerId()).ifPresent(taskInstance -> { - var unit = this.workerApplication.findUnit(taskInstance); - deferredResult.setResult(ResponseEntity.status(HttpStatus.OK).body(unit)); - }); - return deferredResult; - } - - @GetMapping("{workerId}/tasks") - @Operation(summary = "拉取任务接口", description = "拉取任务接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public DeferredResult> pullTasks(@PathVariable String workerId) { - var deferredResult = this.deferredResultService.newPullDeferredResult(workerId); - this.workerApplication.pullTasks(workerId).ifPresent(taskInstance -> { - if (taskInstance.isVolume()) { - deferredResult.setResult(ResponseEntity - .status(HttpStatus.OK) - .body(WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.VOLUME) - .taskInstanceId(taskInstance.getBusinessId()) - .volume(VolumeVo.builder() - .name(taskInstance.getTriggerId()) - .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) - .build()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion()) - .build() - )); - } else { - deferredResult.setResult(ResponseEntity - .status(HttpStatus.OK) - .body(WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.TASK) - .taskInstanceId(taskInstance.getBusinessId()) - .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) - .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) - .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion()) - .build() - )); - } - }); - return deferredResult; - } - - @GetMapping("{workerId}/tasks/{businessId}") - @Operation(summary = "获取任务详情接口", description = "获取任务详情接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public WorkerTaskVo findTaskById(@PathVariable String workerId, @PathVariable("businessId") String businessId) { - var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) - .orElseThrow(() -> new RuntimeException("未找到任务:" + businessId)); - if (taskInstance.isVolume()) { - return WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.VOLUME) - .taskInstanceId(taskInstance.getBusinessId()) - .volume(VolumeVo.builder() - .name(taskInstance.getTriggerId()) - .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) - .build()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion()) - .build(); - } else { - return WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.TASK) - .taskInstanceId(taskInstance.getBusinessId()) - .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) - .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) - .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion()) - .build(); - } - } - - private Auth getTaskAuth() { - var registry = globalProperties.getWorker().getRegistry(); - return Auth.builder() - .address(registry.getAddress()) - .username(registry.getUsername()) - .password(registry.getPassword()) - .build(); - } - - @PostMapping("{workerId}/tasks/{businessId}") - @Operation(summary = "获取终止任务接口", description = "获取终止任务接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public DeferredResult> watchTasks(@PathVariable String workerId, @PathVariable("businessId") String businessId) { - var exist = this.deferredResultService.existWatchDeferredResult(workerId, businessId); - var deferredResult = this.deferredResultService.newWatchDeferredResult(workerId, businessId); - if (!exist) { - var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) - .orElseThrow(() -> new RuntimeException("未找到任务实例,businessId: " + businessId)); - if (taskInstance.getStatus() == InstanceStatus.EXECUTION_FAILED) { - deferredResult.setResult(ResponseEntity.status(HttpStatus.OK).body(businessId)); - } - } - return deferredResult; - } - - @Retryable( - value = {DeadlockLoserDataAccessException.class, CannotAcquireLockException.class}, - maxAttempts = 5, - backoff = @Backoff(delay = 1000L, multiplier = 2), - listeners = "retryListener" - ) - @PatchMapping("{workerId}/tasks/{businessId}/accept") - @Operation(summary = "确定任务接口", description = "确定任务接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public WorkerTaskVo acceptTask(HttpServletResponse response, @PathVariable("workerId") String workerId, - @PathVariable("businessId") String businessId, @Valid @RequestBody TaskInstanceAcceptingDto dto) { - var taskInstance = this.workerApplication.acceptTask(response, workerId, businessId, dto.getVersion()); - if (response.getStatus() != HttpStatus.OK.value()) { - return WorkerTaskVo.builder() - .taskInstanceId(businessId) - .build(); - } - if (taskInstance.isVolume()) { - return WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.VOLUME) - .taskInstanceId(taskInstance.getBusinessId()) - .volume(VolumeVo.builder() - .name(taskInstance.getTriggerId()) - .type(taskInstance.isCreationVolume() ? VolumeVo.Type.CREATION : VolumeVo.Type.DELETION) - .build()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion() + 1) - .build(); - } else { - return WorkerTaskVo.builder() - .type(WorkerTaskVo.Type.TASK) - .taskInstanceId(taskInstance.getBusinessId()) - .pullStrategy(this.globalProperties.getWorker().getImagePullPolicy()) - .containerSpec(this.workerApplication.getContainerSpec(taskInstance)) - .resultFile(this.nodeDefApi.findByType(taskInstance.getDefKey()).getResultFile()) - .auth(this.getTaskAuth()) - .version(taskInstance.getVersion() + 1) - .build(); - } - } - - @Retryable( - value = {DeadlockLoserDataAccessException.class, CannotAcquireLockException.class}, - maxAttempts = 5, - backoff = @Backoff(delay = 1000L, multiplier = 2), - listeners = "retryListener" - ) - @PatchMapping("{workerId}/tasks/{businessId}") - @Operation(summary = "更新任务接口", description = "更新任务接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public void updateTaskInstance(@PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId, @Valid @RequestBody TaskInstanceUpdatingDto dto) { - this.workerApplication.updateTaskInstance(workerId, businessId, dto.getStatus().name(), dto.getResultFile(), dto.getErrorMsg(), dto.getExitCode()); - } - - @PostMapping("{workerId}/tasks/{businessId}/logs") - @Operation(summary = "写入任务日志接口", description = "写入任务日志接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public void writeTaskLog(HttpServletRequest request, @PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId) { - var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) - .orElseThrow(() -> new RuntimeException("未找到任务实例, businessId:" + businessId)); - try (var writer = this.storageService.writeLog(taskInstance.getId(), false)) { - var reader = request.getReader(); - String line; - while ((line = reader.readLine()) != null) { - var list = TaskInstanceWritingLogDto.parseString(line); - list.stream() - .filter(dto -> dto.getContent() != null) - .forEach(dto -> this.workerApplication.writeTaskLog(writer, workerId, taskInstance.getId(), dto.getContent(), dto.getNumber(), dto.getTimestamp())); - } - } catch (IOException e) { - throw new RuntimeException("任务日志写入失败: " + e); - } - } - - @PostMapping("{workerId}/tasks/{businessId}/logs/batch") - @Operation(summary = "实时写入任务日志接口", description = "实时写入任务日志接口") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public void batchWriteTaskLog(HttpServletRequest request, @PathVariable("workerId") String workerId, @PathVariable("businessId") String businessId) { - var taskInstance = this.taskInstanceApplication.findByBusinessIdAndMaxSerialNo(businessId) - .orElseThrow(() -> new RuntimeException("未找到任务实例, businessId:" + businessId)); - try (var writer = this.storageService.writeLog(taskInstance.getId(), true)) { - var reader = request.getReader(); - String line; - while ((line = reader.readLine()) != null) { - if ("null".equals(line)) { - continue; - } - var list = TaskInstanceWritingLogDto.parseString(line); - list.stream() - .filter(dto -> dto.getContent() != null) - .forEach(dto -> this.workerApplication.writeTaskLog(writer, workerId, taskInstance.getId(), dto.getContent(), dto.getNumber(), dto.getTimestamp())); - } - } catch (IOException e) { - throw new RuntimeException("任务日志写入失败: " + e); - } - } - - @GetMapping("/kubernetes/{workerId}/tasks/{triggerId}") - @Operation(summary = "获取k8s运行中任务", description = "获取k8s运行中任务") - @Parameters({ - @Parameter(name = "X-Jianmu-Token", in = ParameterIn.HEADER, description = "认证token") - }) - public Unit findRunningTaskByTriggerId(@PathVariable("workerId") String workerId, @PathVariable("triggerId") String triggerId) { - return this.workerApplication.findRunningTaskByTriggerId(workerId, triggerId); + @DeleteMapping("{workerId}") + @Operation(summary = "Worker删除接口", description = "Worker删除接口") + public void delete(@PathVariable("workerId") String workerId) { + this.workerApplication.delete(workerId); + } + + @GetMapping("") + @Operation(summary = "查询Worker列表", description = "查询Worker列表") + public List findWorkerList() { + var workers = this.workerApplication.findAll(); + return workers.stream().map(worker -> WorkerVo.builder() + .id(worker.getId()) + .name(worker.getName()) + .type(worker.getType()) + .tags(worker.getTags()) + .os(worker.getOs()) + .arch(worker.getArch()) + .capacity(worker.getCapacity()) + .createdTime(worker.getCreatedTime()) + .build()) + .collect(Collectors.toList()); } } diff --git a/api/src/main/java/dev/jianmu/api/dto/WorkerJoiningDto.java b/api/src/main/java/dev/jianmu/api/dto/WorkerJoiningDto.java index 25c138e4cfd180aba60ade0d3a8776dde9bcdce2..35316fda20d92372a9b06262ae274ff815b166f4 100644 --- a/api/src/main/java/dev/jianmu/api/dto/WorkerJoiningDto.java +++ b/api/src/main/java/dev/jianmu/api/dto/WorkerJoiningDto.java @@ -25,4 +25,10 @@ public class WorkerJoiningDto { private String name; @Schema(required = true, description = "tag") private String tag; + @Schema(description = "capacity") + private Integer capacity; + @Schema(description = "os") + private String os; + @Schema(description = "arch") + private String arch; } diff --git a/api/src/main/java/dev/jianmu/api/vo/WorkerVo.java b/api/src/main/java/dev/jianmu/api/vo/WorkerVo.java new file mode 100644 index 0000000000000000000000000000000000000000..760849b0c70bce674241a595fc5803570df1f675 --- /dev/null +++ b/api/src/main/java/dev/jianmu/api/vo/WorkerVo.java @@ -0,0 +1,27 @@ +package dev.jianmu.api.vo; + +import dev.jianmu.worker.aggregate.Worker; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Builder +@Data +@AllArgsConstructor +@NoArgsConstructor +@Schema(description = "Worker定义VO") +public class WorkerVo { + private String id; + private String name; + private String tags; + private Integer capacity; + private String os; + private String arch; + private Worker.Type type; + private Worker.Status status; + private LocalDateTime createdTime; +} diff --git a/application/src/main/java/dev/jianmu/application/service/WorkerApplication.java b/application/src/main/java/dev/jianmu/application/service/WorkerApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..760ace93a34ae284d7a1de2f29e9d1a401668135 --- /dev/null +++ b/application/src/main/java/dev/jianmu/application/service/WorkerApplication.java @@ -0,0 +1,34 @@ +package dev.jianmu.application.service; + +import dev.jianmu.task.repository.TaskInstanceRepository; +import dev.jianmu.worker.aggregate.Worker; +import dev.jianmu.worker.repository.WorkerRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Service +public class WorkerApplication { + private final WorkerRepository workerRepository; + private final TaskInstanceRepository taskInstanceRepository; + + public WorkerApplication(WorkerRepository workerRepository, TaskInstanceRepository taskInstanceRepository) { + this.workerRepository = workerRepository; + this.taskInstanceRepository = taskInstanceRepository; + } + + public List findAll() { + return this.workerRepository.findAll(); + } + + @Transactional + public void delete(String workerId) { + this.workerRepository.findById(workerId).ifPresent(worker -> { + this.workerRepository.delete(worker); + this.taskInstanceRepository.deleteByWorkerId(worker.getId()); + }); + } +} diff --git a/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java b/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java index aa20f7cf6d2fa10d5a2e65c33f398132d37f72ae..e7faa0df7ce91deb4518b03b5280873c3c7a5758 100644 --- a/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java +++ b/application/src/main/java/dev/jianmu/application/service/internal/WorkerInternalApplication.java @@ -136,21 +136,30 @@ public class WorkerInternalApplication { } @Transactional - public void join(String workerId, Worker.Type type, String name, String tag) { - if (this.workerRepository.findById(workerId).isPresent()) { - this.workerRepository.updateTag(Worker.Builder.aWorker() - .id(workerId) - .tags(tag) - .build()); - return; - } - this.workerRepository.add(Worker.Builder.aWorker() - .id(workerId) - .name(name) - .type(type) - .tags(tag) - .status(Worker.Status.ONLINE) - .build()); + public void join(Worker worker) { + this.workerRepository.findById(worker.getId()).ifPresentOrElse( + w -> this.workerRepository.updateInfo(worker), + () -> { + worker.online(); + this.workerRepository.add(worker); + } + ); + } + + @Transactional + public void online(String workerId) { + var worker = this.workerRepository.findById(workerId) + .orElseThrow(() -> new RuntimeException("未找到Worker")); + worker.online(); + this.workerRepository.updateStatus(worker); + } + + @Transactional + public void offline(String workerId) { + var worker = this.workerRepository.findById(workerId) + .orElseThrow(() -> new RuntimeException("未找到Worker")); + worker.offline(); + this.workerRepository.updateStatus(worker); } @Transactional diff --git a/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/TaskInstanceMapper.java b/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/TaskInstanceMapper.java index a56f2d9054176283a341891cddb3951ee51280ca..8c432c8907c3456c28b094a8ba9388def1c75c18 100644 --- a/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/TaskInstanceMapper.java +++ b/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/TaskInstanceMapper.java @@ -37,6 +37,9 @@ public interface TaskInstanceMapper { @Delete("delete from task_instance where trigger_id = #{triggerId}") void deleteByTriggerId(String triggerId); + @Delete("delete from task_instance where worker_id = #{workerId}") + void deleteByWorkerId(String workerId); + @Select("select * from task_instance where id = #{instanceId}") @Result(column = "serial_no", property = "serialNo") @Result(column = "def_key", property = "defKey") @@ -97,6 +100,21 @@ public interface TaskInstanceMapper { @Result(column = "end_time", property = "endTime") List findByTriggerId(String triggerId); + @Select("select * from task_instance where worker_id = #{workerId}") + @Result(column = "serial_no", property = "serialNo") + @Result(column = "def_key", property = "defKey") + @Result(column = "node_info", property = "nodeInfo", typeHandler = NodeInfoTypeHandler.class) + @Result(column = "async_task_ref", property = "asyncTaskRef") + @Result(column = "workflow_ref", property = "workflowRef") + @Result(column = "workflow_version", property = "workflowVersion") + @Result(column = "business_id", property = "businessId") + @Result(column = "trigger_id", property = "triggerId") + @Result(column = "worker_id", property = "workerId") + @Result(column = "_version", property = "version") + @Result(column = "start_time", property = "startTime") + @Result(column = "end_time", property = "endTime") + List findByWorkerId(String workerId); + @Select("select * from task_instance where status = 'RUNNING'") @Result(column = "serial_no", property = "serialNo") @Result(column = "def_key", property = "defKey") diff --git a/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/WorkerMapper.java b/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/WorkerMapper.java index eca0c2817786f1c29c3322a8cd67b821825fad63..1decbacb9047e0941e493a768561b7b00547734e 100644 --- a/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/WorkerMapper.java +++ b/infrastructure/src/main/java/dev/jianmu/infrastructure/mapper/task/WorkerMapper.java @@ -24,8 +24,12 @@ public interface WorkerMapper { @Update("update worker set status = #{status} where id = #{id}") void updateStatus(Worker worker); - @Update("update worker set tags = #{tags} where id = #{id}") - void updateTag(Worker worker); + @Update("update worker set name = #{name}, tags = #{tags}, capacity = #{capacity}, os = #{os}, arch = #{arch} where id = #{id}") + void updateInfo(Worker worker); + + @Select("select * from worker") + @Result(column = "created_time", property = "createdTime") + List findAll(); @Select("select * from worker where id = #{workerId}") @Result(column = "created_time", property = "createdTime") @@ -41,6 +45,7 @@ public interface WorkerMapper { "") @Result(column = "created_time", property = "createdTime") List findByTypeInAndCreatedTimeLessThan(@Param("types") List types, @Param("createdTime") LocalDateTime createdTime); + @Select(" + + \ No newline at end of file diff --git a/ui/vite.config.ts b/ui/vite.config.ts index 5364e29ef234069d6240ee7a901557d92df3d599..a0b5c02b8ea3135ac3c99575bcc5f3d174f757a4 100644 --- a/ui/vite.config.ts +++ b/ui/vite.config.ts @@ -25,7 +25,7 @@ export default ({ command, mode }: ConfigEnv): UserConfigExport => { // 会话 '/auth': { target, changeOrigin }, // worker - '/workers': { target, changeOrigin }, + '/frontend/workers': { target, changeOrigin }, // 密钥管理 '/secrets': { target, changeOrigin }, // 流程定义 diff --git a/worker-core/src/main/java/dev/jianmu/worker/aggregate/Worker.java b/worker-core/src/main/java/dev/jianmu/worker/aggregate/Worker.java index ba05e9a7b97b6464a8d5e5298868f7f9a7350df2..ab2121ee110cfb9cd0afa3a589e620bb8228734d 100644 --- a/worker-core/src/main/java/dev/jianmu/worker/aggregate/Worker.java +++ b/worker-core/src/main/java/dev/jianmu/worker/aggregate/Worker.java @@ -31,6 +31,14 @@ public class Worker { private Status status; private final LocalDateTime createdTime = LocalDateTime.now(); + public void online() { + this.status = Status.ONLINE; + } + + public void offline() { + this.status = Status.OFFLINE; + } + public String getId() { return id; } diff --git a/worker-core/src/main/java/dev/jianmu/worker/repository/WorkerRepository.java b/worker-core/src/main/java/dev/jianmu/worker/repository/WorkerRepository.java index 2a84cf2e3fec2488a053a519a193d2eb65c1c07d..adc65ec9a9d4665d5b3b7271c8b41e64cee6a7d2 100644 --- a/worker-core/src/main/java/dev/jianmu/worker/repository/WorkerRepository.java +++ b/worker-core/src/main/java/dev/jianmu/worker/repository/WorkerRepository.java @@ -19,7 +19,9 @@ public interface WorkerRepository { void updateStatus(Worker worker); - void updateTag(Worker worker); + void updateInfo(Worker worker); + + List findAll(); Optional findById(String workerId);