diff --git a/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java b/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java
index f69a0395785b72b6c6ac251e21f0b4efd9b3da6d..692d1de7efef5c71191088844b7376511146107e 100644
--- a/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java
+++ b/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java
@@ -2,6 +2,7 @@ package cn.chongho.inf.flink.job;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
@@ -31,6 +32,9 @@ public class FlinkCdcSqlTemplateJob {
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ env.setStateBackend(new HashMapStateBackend());
+ env.getCheckpointConfig().setCheckpointStorage(parameterTool.get("checkpoint.path", "file:///tmp/flink/checkpoints"));
+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
diff --git a/flink-web-console/flink-admin.sql b/flink-web-console/flink-admin.sql
index 14a42b65ced493b092531fd80821b17885b9d8dd..3e55be680f946a90bbb12c1105a2945c56713d66 100644
--- a/flink-web-console/flink-admin.sql
+++ b/flink-web-console/flink-admin.sql
@@ -166,6 +166,7 @@ DROP TABLE IF EXISTS `job`;
CREATE TABLE `job` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
+ `job_type` tinyint(4) NULL DEFAULT 1,
`jar_id` int(11) NULL DEFAULT NULL,
`entry_class` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`args` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
@@ -173,6 +174,7 @@ CREATE TABLE `job` (
`target_db_id` bigint(20) NULL DEFAULT NULL COMMENT '落库数据库',
`savepoint_path` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`job_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
+ `job_type` tinyint(4) NULL DEFAULT 1,
`status` int(11) NULL DEFAULT 0,
`enable_flag` tinyint(4) NULL DEFAULT 1,
`flink_colony_id` bigint(20) NULL DEFAULT NULL COMMENT '任务运行集群id',
diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java
index 055a4054c6443d6f43b80f8ff1e77db23f79ac38..5c2580365f308bc9b6e0205b8a9653ec0937c1ec 100644
--- a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java
+++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java
@@ -80,6 +80,7 @@ public class MybatisConfiguration implements TransactionManagementConfigurer {
}
@Bean
+ @Override
public PlatformTransactionManager annotationDrivenTransactionManager() {
logger.info("事物配置");
return new DataSourceTransactionManager(dataSource);
diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java
index 4a64c3326dab74257a03cb4027c359023cedd3e8..420ef3fdda8a38af3434aaad34b1c40f62976709 100644
--- a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java
+++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java
@@ -24,9 +24,6 @@ public class RestTemplateConfig {
restTemplate.setErrorHandler(new DefaultResponseErrorHandler(){
@Override
public void handleError(ClientHttpResponse response) throws IOException {
-// if(response.getStatusCode() != HttpStatus.INTERNAL_SERVER_ERROR) {
-// super.handleError(response);
-// }else{
StringBuilder sb = new StringBuilder();
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(response.getBody()));
@@ -34,7 +31,7 @@ public class RestTemplateConfig {
sb.append(line);
}
throw new RestClientException(sb.toString());
-// }
+
}
});
return restTemplate;
@@ -46,9 +43,7 @@ public class RestTemplateConfig {
restTemplate.setErrorHandler(new DefaultResponseErrorHandler(){
@Override
public void handleError(ClientHttpResponse response) throws IOException {
-// if(response.getStatusCode() != HttpStatus.INTERNAL_SERVER_ERROR) {
-// super.handleError(response);
-// }else{
+
StringBuilder sb = new StringBuilder();
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(response.getBody()));
@@ -56,7 +51,7 @@ public class RestTemplateConfig {
sb.append(line);
}
throw new RestClientException(sb.toString());
-// }
+
}
});
return restTemplate;
diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java
index 0bcec093c9d6101b8997a5968951771520a661a0..bade439c1e25af392b2169ab0233ecdbc853459d 100644
--- a/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java
+++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java
@@ -40,10 +40,14 @@ public class Constant {
ENABLE;
}
+ /**
+ * 数据库类型。
+ */
public enum DbType {
MYSQL,
ORACLE,
ELASTICSEARCH,
+ HOLO,
;
}
@@ -208,12 +212,15 @@ public class Constant {
}
/**
- * 数据权限,数据类型
+ * 数据权限,数据类型。
*/
public enum DataType {
JOB(1),
CDCJOB(2);
+ /**
+ * 任务类型.
+ */
private Integer value;
public Integer getValue() {
diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java
index 6859250d2faa38b871776fc66e8a81f2612e0d75..7b4411effcfb3f9d73b0233d3c7fb58e67372e60 100644
--- a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java
+++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java
@@ -12,6 +12,7 @@ package cn.chongho.inf.flink.controller;
import cn.chongho.inf.flink.model.*;
import cn.chongho.inf.flink.service.*;
+import cn.chongho.inf.flink.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
@@ -53,8 +54,7 @@ public class AdminController {
List