From 257eab64b8e06113da7176492fd8199825d994ea Mon Sep 17 00:00:00 2001 From: lz <905769775@qq.com> Date: Tue, 5 Jul 2022 17:59:38 +0800 Subject: [PATCH 1/4] dwdcomment --- .../atguigu/app/PhoenixSink/PhoenixSink.java | 98 ------------------- .../java/com/atguigu/common/Constant.java | 1 + 2 files changed, 1 insertion(+), 98 deletions(-) delete mode 100644 edu/src/main/java/com/atguigu/app/PhoenixSink/PhoenixSink.java diff --git a/edu/src/main/java/com/atguigu/app/PhoenixSink/PhoenixSink.java b/edu/src/main/java/com/atguigu/app/PhoenixSink/PhoenixSink.java deleted file mode 100644 index a79b7b6..0000000 --- a/edu/src/main/java/com/atguigu/app/PhoenixSink/PhoenixSink.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.atguigu.app.PhoenixSink; - -import com.alibaba.druid.pool.DruidDataSource; -import com.alibaba.druid.pool.DruidPooledConnection; -import com.alibaba.fastjson.JSONObject; -import com.atguigu.util.DruidDSUtil; -import com.atguigu.bean.TableProcess; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import redis.clients.jedis.Jedis; - -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class PhoenixSink extends RichSinkFunction> { - - private DruidPooledConnection conn; - private Jedis redisClient; - - @Override - public void open(Configuration parameters) throws Exception { - // 用连接池 - DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource(); - conn = druidDataSource.getConnection(); - - //redisClient = RedisUtil.getRedisClient(); - - } - - @Override - public void close() throws Exception { - if (conn != null) { - conn.close(); // 如果是从连接池获取的连接, close是归还 - } - } - - @Override - public void invoke(Tuple2 value, - Context context) throws Exception { - // 1. 写数据到phoenix中 - writeToPhoenix(value); - // 2. 更新缓存或删除缓存 - delCache(value); - - } - - private void delCache(Tuple2 value) { - - JSONObject data = value.f0; - TableProcess tp = value.f1; - - - // key: 表名:id -// "update".equals(tp.get) - if ("update".equals(tp.getOperate_type())) { - System.out.println("开始删除...."); - String key = tp.getSinkTable() + ":" + data.getString("id"); // - redisClient.del(key); - // 删除的时候, key不存在怎么办? - } - - } - - private void writeToPhoenix(Tuple2 value) throws SQLException { - JSONObject data = value.f0; - TableProcess tp = value.f1; - - // upsert into t(a,b,c)values(?,?,?) - // 1. 拼接sql TODO - StringBuilder sql = new StringBuilder("upsert into "); - sql - .append(tp.getSinkTable()) - .append("(") - .append(tp.getSinkColumns()) - .append(")values(") - .append(tp.getSinkColumns().replaceAll("[^,]+", "?")) - .append(")"); - System.out.println("插入语句: " + sql.toString()); - // 2. 根据sql得到 预处理语句 - PreparedStatement ps = conn.prepareStatement(sql.toString()); - // 3. 给占位符赋值 TODO - String[] columnNames = tp.getSinkColumns().split(","); - for (int i = 0; i < columnNames.length; i++) { - String columnName = columnNames[i]; - Object v = data.get(columnName); - - ps.setString(i + 1,v == null ? null : v.toString()); // v == null null + "" = "null" "" null - } - - // 4. 执行 - ps.execute(); - // 5. 提交 - conn.commit(); - // 6. 关闭ps - ps.close(); - } -} diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 69a7be8..58424de 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -23,4 +23,5 @@ public class Constant { public static final String TOPIC_DWD_TRAFFIC_UV = "dwd_traffic_uv"; public static final String TOPIC_DWD_USER_REGISTER = "dwd_user_register"; public static final String TOPIC_DWD_ORDER_DETAIL = "dwd_trade_order_detail"; + public static final String TOPIC_DWD_INTERACTION_COMMENT = "dwd_interaction_comment"; } -- Gitee From 376ae0e2bff7a31b98ff05a5dea90c7cdcf8f318 Mon Sep 17 00:00:00 2001 From: lz <905769775@qq.com> Date: Tue, 5 Jul 2022 18:19:31 +0800 Subject: [PATCH 2/4] dwdcomment --- .../app/dwd/db/DwdInteractionComment.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java diff --git a/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java new file mode 100644 index 0000000..03510d8 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java @@ -0,0 +1,57 @@ +package com.atguigu.app.dwd.db; + +import com.atguigu.app.BaseSQLApp; +import com.atguigu.common.Constant; +import com.atguigu.util.SQLUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + + +public class DwdInteractionComment extends BaseSQLApp { + public static void main(String[] args) { + new DwdInteractionComment().init( + 2015, + 2, + "DwdInteractionComment", + 10 + ); + } + + @Override + protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { + //读取ods_db数据 + readOdsDb(tEnv, "DwdInteractionComment"); + + //过滤出评价表 + Table commentInfo = tEnv.sqlQuery("select " + + "data['id'] id, " + + "data['user_id'] user_id, " + + "data['chapter_id'] chapter_id, " + + "data['course_id'] course_id, " + + "data['comment_txt'] comment_txt, " + + "date_format(data['create_time'],'yyyy-MM-dd') date_id, " + + "data['create_time'] create_time, " + + "cast(ts as string) ts " + + "from ods_db " + + "where `database`='gmall' " + + "and `table`='comment_info' " + + "and `type`='insert' " + + "and data['deleted']='0' " ); + + // 结果写入到Kafka中 + tEnv.executeSql("create table dwd_interaction_comment( " + + "id string, " + + "user_id string, " + + "chapter_id string, " + + "course_id string, " + + "comment_txt string, " + + "data_id string, " + + "create_time string, " + + "ts string " + + ")" + SQLUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_INTERACTION_COMMENT)); + + commentInfo.executeInsert("dwd_interaction_comment"); + + } +} -- Gitee From be1fec1c08da676f13d598ea9d950fb103c133f3 Mon Sep 17 00:00:00 2001 From: lz <905769775@qq.com> Date: Tue, 5 Jul 2022 18:31:33 +0800 Subject: [PATCH 3/4] dwdcomment --- .../com/atguigu/app/dwd/db/DwdInteractionComment.java | 8 ++++---- edu/src/main/java/com/atguigu/common/Constant.java | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java index 03510d8..70e73db 100644 --- a/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java +++ b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionComment.java @@ -13,7 +13,7 @@ public class DwdInteractionComment extends BaseSQLApp { new DwdInteractionComment().init( 2015, 2, - "DwdInteractionComment", + "DwdInteractionReview", 10 ); } @@ -21,10 +21,10 @@ public class DwdInteractionComment extends BaseSQLApp { @Override protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { //读取ods_db数据 - readOdsDb(tEnv, "DwdInteractionComment"); + readOdsDb(tEnv, "DwdInteractionReview"); //过滤出评价表 - Table commentInfo = tEnv.sqlQuery("select " + + Table reviewInfo = tEnv.sqlQuery("select " + "data['id'] id, " + "data['user_id'] user_id, " + "data['chapter_id'] chapter_id, " + @@ -51,7 +51,7 @@ public class DwdInteractionComment extends BaseSQLApp { "ts string " + ")" + SQLUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_INTERACTION_COMMENT)); - commentInfo.executeInsert("dwd_interaction_comment"); + reviewInfo.executeInsert("dwd_interaction_review"); } } diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 58424de..6b96518 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -24,4 +24,5 @@ public class Constant { public static final String TOPIC_DWD_USER_REGISTER = "dwd_user_register"; public static final String TOPIC_DWD_ORDER_DETAIL = "dwd_trade_order_detail"; public static final String TOPIC_DWD_INTERACTION_COMMENT = "dwd_interaction_comment"; + public static final String TOPIC_DWD_INTERACTION_REVIEW = "dwd_interaction_review"; } -- Gitee From e3ce6a31a79112d7a36a05ef22a26f95b94e062a Mon Sep 17 00:00:00 2001 From: lz <905769775@qq.com> Date: Tue, 5 Jul 2022 18:37:54 +0800 Subject: [PATCH 4/4] dwdreview --- .../app/dwd/db/DwdInteractionReview.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionReview.java diff --git a/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionReview.java b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionReview.java new file mode 100644 index 0000000..5794015 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dwd/db/DwdInteractionReview.java @@ -0,0 +1,57 @@ +package com.atguigu.app.dwd.db; + +import com.atguigu.app.BaseSQLApp; +import com.atguigu.common.Constant; +import com.atguigu.util.SQLUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + + +public class DwdInteractionReview extends BaseSQLApp { + public static void main(String[] args) { + new DwdInteractionReview().init( + 2015, + 2, + "DwdInteractionComment", + 10 + ); + } + + @Override + protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { + //读取ods_db数据 + readOdsDb(tEnv, "DwdInteractionComment"); + + //过滤出课程评价表 + Table commentInfo = tEnv.sqlQuery("select " + + "data['id'] id, " + + "data['user_id'] user_id, " + + "data['chapter_id'] chapter_id, " + + "data['course_id'] course_id, " + + "data['comment_txt'] comment_txt, " + + "date_format(data['create_time'],'yyyy-MM-dd') date_id, " + + "data['create_time'] create_time, " + + "cast(ts as string) ts " + + "from ods_db " + + "where `database`='gmall' " + + "and `table`='comment_info' " + + "and `type`='insert' " + + "and data['deleted']='0' " ); + + // 结果写入到Kafka中 + tEnv.executeSql("create table dwd_interaction_comment( " + + "id string, " + + "user_id string, " + + "chapter_id string, " + + "course_id string, " + + "comment_txt string, " + + "data_id string, " + + "create_time string, " + + "ts string " + + ")" + SQLUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_INTERACTION_REVIEW)); + + commentInfo.executeInsert("dwd_interaction_comment"); + + } +} -- Gitee