From d47eddcf8ce86d0f231880dc22c07151569959b4 Mon Sep 17 00:00:00 2001 From: Irisah Date: Wed, 6 Jul 2022 20:59:34 +0800 Subject: [PATCH 1/2] add DwsTradeSourceProvinceOrder add DwsTrafficUniqueVisitorWindow add BaseAppV3 pending optimize --- .../main/java/com/atguigu/app/BaseAppV3.java | 62 ++++++++ .../com/atguigu/app/dwd/DwdTradeCartAdd.java | 2 +- .../atguigu/app/dwd/DwdTradeOrderDetail.java | 2 +- .../atguigu/app/dwd/DwdTradePaySucDetail.java | 2 +- .../app/dws/DwsTradeSourceProvinceOrder.java | 106 +++++++++++++ .../dws/DwsTrafficUniqueVisitorWindow.java | 92 +++++++++++ .../bean/TradeSourceProvinceOrder.java | 143 ++++++++++++++++++ .../atguigu/bean/TrafficUniqueVisitor.java | 81 ++++++++++ .../java/com/atguigu/common/Constant.java | 2 + .../com/atguigu/util/FlinkSourceUtil.java | 2 +- 10 files changed, 490 insertions(+), 4 deletions(-) create mode 100644 edu/src/main/java/com/atguigu/app/BaseAppV3.java create mode 100644 edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java create mode 100644 edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java create mode 100644 edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java create mode 100644 edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java diff --git a/edu/src/main/java/com/atguigu/app/BaseAppV3.java b/edu/src/main/java/com/atguigu/app/BaseAppV3.java new file mode 100644 index 0000000..aef6e8d --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/BaseAppV3.java @@ -0,0 +1,62 @@ +package com.atguigu.app; + + +import com.atguigu.util.FlinkSourceUtil; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.*; + +public abstract class BaseAppV3 { + /** + * 指定消费的topic与offsets + */ + public void init(int port, int p, String ckGroupIdJobName, Map topicsAndOffsets) { + System.setProperty("HADOOP_USER_NAME", "atguigu"); + Configuration conf = new Configuration(); + conf.setInteger("rest.port", port); + conf.setString("flink.hadoop.dfs.client.use.datanode.hostname","true"); // + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + env.setParallelism(p); + + env.enableCheckpointing(3000); + env.setStateBackend(new HashMapStateBackend()); + env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop302:8020/edu/" + ckGroupIdJobName); + + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + env.getCheckpointConfig().setCheckpointTimeout(60 * 1000); + env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + Map> streams = new HashMap<>(); + for (Map.Entry entry : topicsAndOffsets.entrySet()) { + String topic = entry.getKey(); + Long[] offsets = entry.getValue(); + DataStreamSource stream; + if (offsets.length == 0) { + stream = env.addSource(FlinkSourceUtil.getKafkaSource(ckGroupIdJobName, topic)); + } else { + stream = env.addSource(FlinkSourceUtil.getKafkaSource(ckGroupIdJobName, topic, offsets)); + } + streams.put(topic, stream); + } + + handle(env, streams); + + try { + env.execute(ckGroupIdJobName); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public abstract void handle(StreamExecutionEnvironment env, + Map> streams); + +} diff --git a/edu/src/main/java/com/atguigu/app/dwd/DwdTradeCartAdd.java b/edu/src/main/java/com/atguigu/app/dwd/DwdTradeCartAdd.java index 88c4b3c..15ae6a0 100644 --- a/edu/src/main/java/com/atguigu/app/dwd/DwdTradeCartAdd.java +++ b/edu/src/main/java/com/atguigu/app/dwd/DwdTradeCartAdd.java @@ -33,7 +33,7 @@ public class DwdTradeCartAdd extends BaseSQLApp { protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { readOdsDb(tEnv, APPNAME); - Long[] startOffsets = new Long[]{0L}; + Long[] startOffsets = new Long[]{0L,0L}; Table sessionIdAndSc = BaseSourceUtil.readOdsLog(env, tEnv, APPNAME, startOffsets); tEnv.createTemporaryView("session_sc", sessionIdAndSc); diff --git a/edu/src/main/java/com/atguigu/app/dwd/DwdTradeOrderDetail.java b/edu/src/main/java/com/atguigu/app/dwd/DwdTradeOrderDetail.java index 4a8ef55..3f9e5f9 100644 --- a/edu/src/main/java/com/atguigu/app/dwd/DwdTradeOrderDetail.java +++ b/edu/src/main/java/com/atguigu/app/dwd/DwdTradeOrderDetail.java @@ -32,7 +32,7 @@ public class DwdTradeOrderDetail extends BaseSQLApp { readOdsDb(tEnv, APPNAME); // 从ods_log获取session_id与sc_code - Long[] startOffsets = new Long[]{0L}; + Long[] startOffsets = new Long[]{0L,0L}; Table sessionIdAndSc = BaseSourceUtil.readOdsLog(env, tEnv, APPNAME, startOffsets); tEnv.createTemporaryView("session_sc", sessionIdAndSc); // tEnv.sqlQuery("select sessionId,sc from session_sc").execute().print(); diff --git a/edu/src/main/java/com/atguigu/app/dwd/DwdTradePaySucDetail.java b/edu/src/main/java/com/atguigu/app/dwd/DwdTradePaySucDetail.java index 7ba385d..797dd57 100644 --- a/edu/src/main/java/com/atguigu/app/dwd/DwdTradePaySucDetail.java +++ b/edu/src/main/java/com/atguigu/app/dwd/DwdTradePaySucDetail.java @@ -46,7 +46,7 @@ public class DwdTradePaySucDetail extends BaseSQLApp { // tEnv.sqlQuery("select session_id,id,pt from order_info").execute().print(); // 从ods_log获取session_id与sc_code - Long[] startOffsets = new Long[]{0L}; + Long[] startOffsets = new Long[]{0L,0L}; Table sessionIdAndSc = BaseSourceUtil.readOdsLog(env, tEnv, APPNAME, startOffsets); tEnv.createTemporaryView("session_sc", sessionIdAndSc); // tEnv.sqlQuery("select sessionId,sc from session_sc").execute().print(); diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java b/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java new file mode 100644 index 0000000..f6c1bc4 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java @@ -0,0 +1,106 @@ +package com.atguigu.app.dws; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.app.BaseAppV3; +import com.atguigu.bean.TradeSourceProvinceOrder; +import com.atguigu.bean.TrafficUniqueVisitor; +import com.atguigu.common.Constant; +import com.atguigu.util.DateFormatUtil; +import com.atguigu.util.FlinkSinkUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * @Author: Iris_Liu + * @Description: todo + * @Create_time: 2022/7/6 18:13 + */ +public class DwsTradeSourceProvinceOrder extends BaseAppV3 { + + private static final String APPNAME = "DwsTradeSourceProvinceOrder"; + + public static void main(String[] args) { + Map map = new HashMap<>(); + map.put(Constant.TOPIC_DWD_ORDER_DETAIL, new Long[]{}); + new DwsTradeSourceProvinceOrder().init( + 12000, + 2, + APPNAME, + map + ); + } + + @Override + public void handle(StreamExecutionEnvironment env, Map> streams) { + streams + .get(Constant.TOPIC_DWD_ORDER_DETAIL) + .map(new MapFunction() { + @Override + public TradeSourceProvinceOrder map(String json) throws Exception { + JSONObject jsonObject = JSON.parseObject(json); + return new TradeSourceProvinceOrder( + "", "", + jsonObject.getString("source_name"), + jsonObject.getString("province_name"), + 0L, 0L, + jsonObject.getDoubleValue("split_final_amount"), + jsonObject.getLong("ts"), + new HashSet<>(Collections.singleton(jsonObject.getString("order_id"))), + new HashSet<>(Collections.singleton(jsonObject.getString("user_id"))) + ); + } + }) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner((bean, ts) -> bean.getTs()) + ) + .keyBy(bean -> bean.getSource() + bean.getProvince()) + .window(TumblingEventTimeWindows.of(Time.seconds(10))) + .reduce( + new ReduceFunction() { + @Override + public TradeSourceProvinceOrder reduce(TradeSourceProvinceOrder v1, TradeSourceProvinceOrder v2) throws Exception { + v1.setAmount(v1.getAmount() + v2.getAmount()); + v1.getOrderIdSet().addAll(v2.getOrderIdSet()); + v1.getUserIdSet().addAll(v2.getUserIdSet()); + return v1; + } + }, new ProcessWindowFunction() { + @Override + public void process(String s, Context context, Iterable elements, Collector out) throws Exception { + TradeSourceProvinceOrder value = elements.iterator().next(); + value.setStt(DateFormatUtil.toYmdHms(context.window().getStart())); + value.setEdt(DateFormatUtil.toYmdHms(context.window().getEnd())); + value.setCount((long) value.getUserIdSet().size()); + value.setTimes((long) value.getOrderIdSet().size()); + value.setTs(context.currentProcessingTime()); + out.collect(value); + } + } + ) + .addSink( + FlinkSinkUtil.getClickHoseSink(Constant.DWS_TRADE_SOURCE_PROVINCE_ORDER_WINDOW, TradeSourceProvinceOrder.class) + ); + } +} \ No newline at end of file diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java new file mode 100644 index 0000000..8043e42 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java @@ -0,0 +1,92 @@ +package com.atguigu.app.dws; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.app.BaseAppV3; +import com.atguigu.bean.TradeSourceProvinceOrder; +import com.atguigu.bean.TrafficUniqueVisitor; +import com.atguigu.common.Constant; +import com.atguigu.util.DateFormatUtil; +import com.atguigu.util.FlinkSinkUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * @Author: Iris_Liu + * @Description: todo + * @Create_time: 2022/7/6 20:41 + */ +public class DwsTrafficUniqueVisitorWindow extends BaseAppV3 { + private static final String APPNAME = "DwsTrafficUniqueVisitorWindow"; + + public static void main(String[] args) { + Map map = new HashMap<>(); + map.put(Constant.TOPIC_DWD_TRAFFIC_UV, new Long[]{}); + new DwsTrafficUniqueVisitorWindow().init( + 12001, + 2, + APPNAME, + map + ); + } + + @Override + public void handle(StreamExecutionEnvironment env, Map> streams) { + streams + .get(Constant.TOPIC_DWD_TRAFFIC_UV) + .map(new MapFunction() { + @Override + public TrafficUniqueVisitor map(String json) throws Exception { + JSONObject jsonObject = JSON.parseObject(json); + String userId = jsonObject.getJSONObject("common").getString("uid"); + Long ts = jsonObject.getLong("ts"); + return new TrafficUniqueVisitor("", "", 0L, ts, new HashSet<>(Collections.singleton(userId))); + } + }) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner((bean, ts) -> bean.getTs()) + ) + .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) + .reduce( + new ReduceFunction() { + @Override + public TrafficUniqueVisitor reduce(TrafficUniqueVisitor v1, TrafficUniqueVisitor v2) throws Exception { + v1.getUserIdSet().addAll(v2.getUserIdSet()); + return v1; + } + }, + new AllWindowFunction() { + @Override + public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { + TrafficUniqueVisitor value = values.iterator().next(); + value.setCount((long) value.getUserIdSet().size()); + value.setStt(DateFormatUtil.toYmdHms(window.getStart())); + value.setEdt(DateFormatUtil.toYmdHms(window.getEnd())); + value.setTs(System.currentTimeMillis()); + out.collect(value); + } + } + ) + .addSink( + FlinkSinkUtil.getClickHoseSink(Constant.DWS_TRAFFIC_UV_WINDOW, TrafficUniqueVisitor.class) + ); + } +} \ No newline at end of file diff --git a/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java b/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java new file mode 100644 index 0000000..a88f034 --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java @@ -0,0 +1,143 @@ +package com.atguigu.bean; + +import java.util.Set; + +/** + * @Author: Iris_Liu + * @Description: todo + * @Create_time: 2022/7/6 18:16 + */ +public class TradeSourceProvinceOrder { + private String stt; + private String edt; + private String source; + private String province; + + // 下单人数 + private Long count; + + // 下单次数 + private Long times; + + // 下单金额 + private Double amount; + + private Long ts; + + @NoSink + private Set orderIdSet; + @NoSink + private Set userIdSet; + + public TradeSourceProvinceOrder() { + } + + public TradeSourceProvinceOrder(String stt, String edt, String source, String province, Long count, Long times, Double amount, Long ts, Set orderIdSet, Set userIdSet) { + this.stt = stt; + this.edt = edt; + this.source = source; + this.province = province; + this.count = count; + this.times = times; + this.amount = amount; + this.ts = ts; + this.orderIdSet = orderIdSet; + this.userIdSet = userIdSet; + } + + public String getStt() { + return stt; + } + + public void setStt(String stt) { + this.stt = stt; + } + + public String getEdt() { + return edt; + } + + public void setEdt(String edt) { + this.edt = edt; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getProvince() { + return province; + } + + public void setProvince(String province) { + this.province = province; + } + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public Long getTimes() { + return times; + } + + public void setTimes(Long times) { + this.times = times; + } + + public Double getAmount() { + return amount; + } + + public void setAmount(Double amount) { + this.amount = amount; + } + + public Long getTs() { + return ts; + } + + public void setTs(Long ts) { + this.ts = ts; + } + + public Set getOrderIdSet() { + return orderIdSet; + } + + public void setOrderIdSet(Set orderIdSet) { + this.orderIdSet = orderIdSet; + } + + public Set getUserIdSet() { + return userIdSet; + } + + public void setUserIdSet(Set userIdSet) { + this.userIdSet = userIdSet; + } + + @Override + public String toString() { + return "TradeSourceProvinceOrder{" + + "stt='" + stt + '\'' + + ", edt='" + edt + '\'' + + ", source='" + source + '\'' + + ", province='" + province + '\'' + + ", count=" + count + + ", times=" + times + + ", amount=" + amount + + ", ts=" + ts + + ", orderIdSet=" + orderIdSet + + ", userIdSet=" + userIdSet + + '}'; + } +} diff --git a/edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java b/edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java new file mode 100644 index 0000000..53ef07d --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java @@ -0,0 +1,81 @@ +package com.atguigu.bean; + +import java.util.Set; + +/** + * @Author: Iris_Liu + * @Description: todo + * @Create_time: 2022/7/6 18:27 + */ +public class TrafficUniqueVisitor { + private String stt; + private String edt; + private Long count; + + private Long ts; + + @NoSink + private Set userIdSet; + + public TrafficUniqueVisitor() { + } + + public TrafficUniqueVisitor(String stt, String edt, Long count, Long ts, Set userIdSet) { + this.stt = stt; + this.edt = edt; + this.count = count; + this.ts = ts; + this.userIdSet = userIdSet; + } + + public String getStt() { + return stt; + } + + public void setStt(String stt) { + this.stt = stt; + } + + public String getEdt() { + return edt; + } + + public void setEdt(String edt) { + this.edt = edt; + } + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public Long getTs() { + return ts; + } + + public void setTs(Long ts) { + this.ts = ts; + } + + public Set getUserIdSet() { + return userIdSet; + } + + public void setUserIdSet(Set userIdSet) { + this.userIdSet = userIdSet; + } + + @Override + public String toString() { + return "TrafficUniqueVisitor{" + + "stt='" + stt + '\'' + + ", edt='" + edt + '\'' + + ", count=" + count + + ", ts=" + ts + + ", userIdSet=" + userIdSet + + '}'; + } +} diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 6d7a3bc..980c847 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -25,4 +25,6 @@ public class Constant { public static final String TOPIC_DWD_ORDER_DETAIL = "dwd_trade_order_detail"; public static final String TOPIC_DWD_INTERACTION_COMMENT = "dwd_interaction_comment"; public static final String TOPIC_DWD_INTERACTION_REVIEW = "dwd_interaction_review"; + public static final String DWS_TRAFFIC_UV_WINDOW = "dws_traffic_uv_window"; + public static final String DWS_TRADE_SOURCE_PROVINCE_ORDER_WINDOW = "dws_trade_source_province_order_window"; } diff --git a/edu/src/main/java/com/atguigu/util/FlinkSourceUtil.java b/edu/src/main/java/com/atguigu/util/FlinkSourceUtil.java index 0e17f92..34b314e 100644 --- a/edu/src/main/java/com/atguigu/util/FlinkSourceUtil.java +++ b/edu/src/main/java/com/atguigu/util/FlinkSourceUtil.java @@ -27,7 +27,7 @@ public class FlinkSourceUtil { } private static FlinkKafkaConsumer getKafkaConsumer(String topic, Properties props) { - return new FlinkKafkaConsumer( + return new FlinkKafkaConsumer<>( topic, /*new SimpleStringSchema()*/ // 自定义反序列化器 -- Gitee From f3039ed468da1a86385d958db216ebda7c55e559 Mon Sep 17 00:00:00 2001 From: Irisah Date: Wed, 6 Jul 2022 21:00:49 +0800 Subject: [PATCH 2/2] update --- edu/src/main/java/com/atguigu/app/BaseAppV3.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edu/src/main/java/com/atguigu/app/BaseAppV3.java b/edu/src/main/java/com/atguigu/app/BaseAppV3.java index aef6e8d..6f3e733 100644 --- a/edu/src/main/java/com/atguigu/app/BaseAppV3.java +++ b/edu/src/main/java/com/atguigu/app/BaseAppV3.java @@ -13,7 +13,7 @@ import java.util.*; public abstract class BaseAppV3 { /** - * 指定消费的topic与offsets + * 指定消费的topic与offsets,如果不想指定offsets可直接传new Long[]{} */ public void init(int port, int p, String ckGroupIdJobName, Map topicsAndOffsets) { System.setProperty("HADOOP_USER_NAME", "atguigu"); -- Gitee