From 18c2b1cb826e8bcba95299172521589b90a346ee Mon Sep 17 00:00:00 2001 From: Irisah Date: Fri, 8 Jul 2022 09:14:05 +0800 Subject: [PATCH] update --- .../app/dws/DwsTradeSourceProvinceOrder.java | 6 --- ...dow.java => DwsTrafficSourceUVWindow.java} | 43 +++++++++---------- .../bean/TradeSourceProvinceOrder.java | 6 ++- ...niqueVisitor.java => TrafficSourceUV.java} | 23 +++++++--- .../java/com/atguigu/common/Constant.java | 2 +- 5 files changed, 44 insertions(+), 36 deletions(-) rename edu/src/main/java/com/atguigu/app/dws/{DwsTrafficUniqueVisitorWindow.java => DwsTrafficSourceUVWindow.java} (62%) rename edu/src/main/java/com/atguigu/bean/{TrafficUniqueVisitor.java => TrafficSourceUV.java} (72%) diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java b/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java index f6c1bc4..d895812 100644 --- a/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java +++ b/edu/src/main/java/com/atguigu/app/dws/DwsTradeSourceProvinceOrder.java @@ -4,22 +4,16 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.app.BaseAppV3; import com.atguigu.bean.TradeSourceProvinceOrder; -import com.atguigu.bean.TrafficUniqueVisitor; import com.atguigu.common.Constant; import com.atguigu.util.DateFormatUtil; import com.atguigu.util.FlinkSinkUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficSourceUVWindow.java similarity index 62% rename from edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java rename to edu/src/main/java/com/atguigu/app/dws/DwsTrafficSourceUVWindow.java index 8043e42..7d08b81 100644 --- a/edu/src/main/java/com/atguigu/app/dws/DwsTrafficUniqueVisitorWindow.java +++ b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficSourceUVWindow.java @@ -3,8 +3,7 @@ package com.atguigu.app.dws; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.app.BaseAppV3; -import com.atguigu.bean.TradeSourceProvinceOrder; -import com.atguigu.bean.TrafficUniqueVisitor; +import com.atguigu.bean.TrafficSourceUV; import com.atguigu.common.Constant; import com.atguigu.util.DateFormatUtil; import com.atguigu.util.FlinkSinkUtil; @@ -12,9 +11,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -32,13 +29,13 @@ import java.util.Map; * @Description: todo * @Create_time: 2022/7/6 20:41 */ -public class DwsTrafficUniqueVisitorWindow extends BaseAppV3 { - private static final String APPNAME = "DwsTrafficUniqueVisitorWindow"; +public class DwsTrafficSourceUVWindow extends BaseAppV3 { + private static final String APPNAME = "DwsTrafficSourceUVWindow"; public static void main(String[] args) { Map map = new HashMap<>(); - map.put(Constant.TOPIC_DWD_TRAFFIC_UV, new Long[]{}); - new DwsTrafficUniqueVisitorWindow().init( + map.put(Constant.TOPIC_DWD_TRAFFIC_UV, new Long[]{0L, 0L}); + new DwsTrafficSourceUVWindow().init( 12001, 2, APPNAME, @@ -50,43 +47,45 @@ public class DwsTrafficUniqueVisitorWindow extends BaseAppV3 { public void handle(StreamExecutionEnvironment env, Map> streams) { streams .get(Constant.TOPIC_DWD_TRAFFIC_UV) - .map(new MapFunction() { + .map(new MapFunction() { @Override - public TrafficUniqueVisitor map(String json) throws Exception { + public TrafficSourceUV map(String json) throws Exception { JSONObject jsonObject = JSON.parseObject(json); String userId = jsonObject.getJSONObject("common").getString("uid"); + String source = jsonObject.getJSONObject("common").getString("sc"); Long ts = jsonObject.getLong("ts"); - return new TrafficUniqueVisitor("", "", 0L, ts, new HashSet<>(Collections.singleton(userId))); + return new TrafficSourceUV("", "", source, 0L, ts, new HashSet<>(Collections.singleton(userId))); } }) .assignTimestampsAndWatermarks( WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((bean, ts) -> bean.getTs()) ) - .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) + .keyBy(TrafficSourceUV::getSource) + .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce( - new ReduceFunction() { + new ReduceFunction() { @Override - public TrafficUniqueVisitor reduce(TrafficUniqueVisitor v1, TrafficUniqueVisitor v2) throws Exception { + public TrafficSourceUV reduce(TrafficSourceUV v1, TrafficSourceUV v2) throws Exception { v1.getUserIdSet().addAll(v2.getUserIdSet()); - return v1; + return null; } }, - new AllWindowFunction() { + new ProcessWindowFunction() { @Override - public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { - TrafficUniqueVisitor value = values.iterator().next(); + public void process(String s, Context context, Iterable elements, Collector out) throws Exception { + TrafficSourceUV value = elements.iterator().next(); value.setCount((long) value.getUserIdSet().size()); - value.setStt(DateFormatUtil.toYmdHms(window.getStart())); - value.setEdt(DateFormatUtil.toYmdHms(window.getEnd())); + value.setStt(DateFormatUtil.toYmdHms(context.window().getStart())); + value.setEdt(DateFormatUtil.toYmdHms(context.window().getEnd())); value.setTs(System.currentTimeMillis()); out.collect(value); } } ) .addSink( - FlinkSinkUtil.getClickHoseSink(Constant.DWS_TRAFFIC_UV_WINDOW, TrafficUniqueVisitor.class) + FlinkSinkUtil.getClickHoseSink(Constant.DWS_TRAFFIC_SOURCE_UV_WINDOW, TrafficSourceUV.class) ); } } \ No newline at end of file diff --git a/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java b/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java index a88f034..6204438 100644 --- a/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java +++ b/edu/src/main/java/com/atguigu/bean/TradeSourceProvinceOrder.java @@ -1,5 +1,7 @@ package com.atguigu.bean; +import com.atguigu.annotation.NotSink; + import java.util.Set; /** @@ -24,9 +26,9 @@ public class TradeSourceProvinceOrder { private Long ts; - @NoSink + @NotSink private Set orderIdSet; - @NoSink + @NotSink private Set userIdSet; public TradeSourceProvinceOrder() { diff --git a/edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java b/edu/src/main/java/com/atguigu/bean/TrafficSourceUV.java similarity index 72% rename from edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java rename to edu/src/main/java/com/atguigu/bean/TrafficSourceUV.java index 53ef07d..874b967 100644 --- a/edu/src/main/java/com/atguigu/bean/TrafficUniqueVisitor.java +++ b/edu/src/main/java/com/atguigu/bean/TrafficSourceUV.java @@ -1,5 +1,7 @@ package com.atguigu.bean; +import com.atguigu.annotation.NotSink; + import java.util.Set; /** @@ -7,22 +9,24 @@ import java.util.Set; * @Description: todo * @Create_time: 2022/7/6 18:27 */ -public class TrafficUniqueVisitor { +public class TrafficSourceUV { private String stt; private String edt; + private String source; private Long count; private Long ts; - @NoSink + @NotSink private Set userIdSet; - public TrafficUniqueVisitor() { + public TrafficSourceUV() { } - public TrafficUniqueVisitor(String stt, String edt, Long count, Long ts, Set userIdSet) { + public TrafficSourceUV(String stt, String edt, String source, Long count, Long ts, Set userIdSet) { this.stt = stt; this.edt = edt; + this.source = source; this.count = count; this.ts = ts; this.userIdSet = userIdSet; @@ -44,6 +48,14 @@ public class TrafficUniqueVisitor { this.edt = edt; } + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + public Long getCount() { return count; } @@ -70,9 +82,10 @@ public class TrafficUniqueVisitor { @Override public String toString() { - return "TrafficUniqueVisitor{" + + return "TrafficSourceUV{" + "stt='" + stt + '\'' + ", edt='" + edt + '\'' + + ", source='" + source + '\'' + ", count=" + count + ", ts=" + ts + ", userIdSet=" + userIdSet + diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 980c847..3928f5a 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -25,6 +25,6 @@ public class Constant { public static final String TOPIC_DWD_ORDER_DETAIL = "dwd_trade_order_detail"; public static final String TOPIC_DWD_INTERACTION_COMMENT = "dwd_interaction_comment"; public static final String TOPIC_DWD_INTERACTION_REVIEW = "dwd_interaction_review"; - public static final String DWS_TRAFFIC_UV_WINDOW = "dws_traffic_uv_window"; + public static final String DWS_TRAFFIC_SOURCE_UV_WINDOW = "dws_traffic_source_uv_window"; public static final String DWS_TRADE_SOURCE_PROVINCE_ORDER_WINDOW = "dws_trade_source_province_order_window"; } -- Gitee