diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsTrafficPageViewWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficPageViewWindow.java new file mode 100644 index 0000000000000000000000000000000000000000..6540994d132c9305adbecb9d7ff55fab9af895b6 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/DwsTrafficPageViewWindow.java @@ -0,0 +1,140 @@ +package com.atguigu.app.dws; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.app.BaseAppV1; +import com.atguigu.bean.TrafficHomeDetailPageViewBean; +import com.atguigu.common.Constant; +import com.atguigu.util.DateFormatUtil; +import com.atguigu.util.FlinkSinkUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.time.Duration; + +/** + * @ClassName DwsTrafficPageViewWindow + * @Author Chris + * @Description TODO Dws层页面浏览汇总表 + * @Date 2022/7/6 10:32 + **/ + +public class DwsTrafficPageViewWindow extends BaseAppV1 { + public static void main(String[] args) { + new DwsTrafficPageViewWindow().init( + 2005, + 2, + "DwsTrafficPageViewWindow", + Constant.TOPIC_DWD_TRAFFIC_PAGE + ); + } + + @Override + public void handle(StreamExecutionEnvironment env, DataStreamSource stream) { + //找到page数据中用户的一条home访问记录和一条course_detail访问记录(如果有的话) + SingleOutputStreamOperator beanStream = findUv(stream); + //开窗聚合,在上一步已经去过重了,所以不用keyBy,windowAll聚合 + SingleOutputStreamOperator resultStream = windowAndReduce(beanStream); + + //写出到clickHouse + writeToClickHouse(resultStream); + } + + private void writeToClickHouse(SingleOutputStreamOperator resultStream) { + resultStream.addSink(FlinkSinkUtil.getClickHoseSink("dws_traffic_page_view_window", TrafficHomeDetailPageViewBean.class)); + } + + private SingleOutputStreamOperator windowAndReduce(SingleOutputStreamOperator beanStream) { + return beanStream + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce( + new ReduceFunction() { + @Override + public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean bean1, + TrafficHomeDetailPageViewBean bean2) throws Exception { + bean1.setHomeUvCt(bean1.getHomeUvCt() + bean2.getHomeUvCt()); + bean1.setCourseDetailUvCt(bean1.getCourseDetailUvCt() + bean2.getCourseDetailUvCt()); + return bean1; + } + }, + new AllWindowFunction() { + @Override + public void apply(TimeWindow window, + Iterable values, + Collector out) throws Exception { + TrafficHomeDetailPageViewBean bean = values.iterator().next(); + bean.setStt(DateFormatUtil.toYmdHms(window.getStart())); + bean.setEdt(DateFormatUtil.toYmdHms(window.getEnd())); + bean.setTs(System.currentTimeMillis()); + System.out.println(bean); + out.collect(bean); + } + } + ); + } + + private SingleOutputStreamOperator findUv(DataStreamSource stream) { + + return stream + .map(JSON::parseObject) + .filter(obj -> { + String pageId = obj.getJSONObject("page").getString("page_id"); + return "home".equals(pageId) || "course_detail".equals(pageId); + }) + .assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner((obj, ts) -> obj.getLong("ts"))) + .keyBy(obj -> obj.getJSONObject("common").getString("mid")) + .process(new KeyedProcessFunction() { + + private ValueState homeState; + private ValueState courseDetailState; + + @Override + public void open(Configuration parameters) throws Exception { + homeState = getRuntimeContext().getState(new ValueStateDescriptor("homeState", String.class)); + courseDetailState = getRuntimeContext().getState(new ValueStateDescriptor("courseDetailState", String.class)); + } + + @Override + public void processElement(JSONObject obj, + Context ctx, + Collector out) throws Exception { + + String pageId = obj.getJSONObject("page").getString("page_id"); + Long ts = obj.getLong("ts"); + String today = DateFormatUtil.toDate(ts); + + long homeUvCt = 0L; + long courseDetailUvCt = 0L; + + if ("home".equals(pageId) && !today.equals(homeState.value())) { + homeUvCt = 1L; + homeState.update(today); + } else if ("course_detail".equals(pageId) && !today.equals(courseDetailState.value())) { + courseDetailUvCt = 1L; + courseDetailState.update(today); + } + + if (homeUvCt + courseDetailUvCt == 1) { + out.collect(new TrafficHomeDetailPageViewBean("", "", homeUvCt, courseDetailUvCt, ts)); + } + + } + }); + + + } +} diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java index 1c7f42685a3d30207bdb9ace79241ef7e0dc168f..73d1349a9ae6b4ef725925582566bd53f85d294c 100644 --- a/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java +++ b/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java @@ -5,6 +5,7 @@ import com.atguigu.app.BaseAppV1; import com.atguigu.bean.UserRegisterBean; import com.atguigu.common.Constant; import com.atguigu.util.DateFormatUtil; +import com.atguigu.util.FlinkSinkUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -72,7 +73,7 @@ public class DwsUserRegisterWindow extends BaseAppV1 { } } ) - ;// TODO: 写入clickHouse + .addSink(FlinkSinkUtil.getClickHoseSink("dws_user_register_window",UserRegisterBean.class)); } } diff --git a/edu/src/main/java/com/atguigu/bean/TrafficHomeDetailPageViewBean.java b/edu/src/main/java/com/atguigu/bean/TrafficHomeDetailPageViewBean.java new file mode 100644 index 0000000000000000000000000000000000000000..9929cf26d39f4f071f368d2279f0809d3564ff6f --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/TrafficHomeDetailPageViewBean.java @@ -0,0 +1,23 @@ +package com.atguigu.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class TrafficHomeDetailPageViewBean { + // 窗口起始时间 + String stt; + + // 窗口结束时间 + String edt; + + // 首页独立访客数 + Long homeUvCt; + + // 商品详情页独立访客数 + Long courseDetailUvCt; + + // 时间戳 + Long ts; +} \ No newline at end of file