From a6df787813cb7305af75a4f95d0c0c4b4fc350a2 Mon Sep 17 00:00:00 2001 From: lz <905769775@qq.com> Date: Thu, 7 Jul 2022 15:30:02 +0800 Subject: [PATCH] review --- .../main/java/com/atguigu/app/BaseAppV1.java | 2 +- .../app/dws/DwsInteractionReviewWindow.java | 128 ++++++++++++++++++ .../atguigu/bean/InteractionReviewBean.java | 31 +++++ 3 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 edu/src/main/java/com/atguigu/app/dws/DwsInteractionReviewWindow.java create mode 100644 edu/src/main/java/com/atguigu/bean/InteractionReviewBean.java diff --git a/edu/src/main/java/com/atguigu/app/BaseAppV1.java b/edu/src/main/java/com/atguigu/app/BaseAppV1.java index 819f935..f032868 100644 --- a/edu/src/main/java/com/atguigu/app/BaseAppV1.java +++ b/edu/src/main/java/com/atguigu/app/BaseAppV1.java @@ -40,7 +40,7 @@ public abstract class BaseAppV1 { DataStreamSource stream = env.addSource(FlinkSourceUtil.getKafkaSource( ckGroupIdJobName, topic, - new Long[]{2000L, 2000L})); //可在此处指定每个分区的偏移量起始位置,从0分区开始指定 + new Long[]{1000L, 1000L})); //可在此处指定每个分区的偏移量起始位置,从0分区开始指定 handle(env, stream); diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsInteractionReviewWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsInteractionReviewWindow.java new file mode 100644 index 0000000..1d1ae73 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/DwsInteractionReviewWindow.java @@ -0,0 +1,128 @@ +package com.atguigu.app.dws; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.app.BaseAppV1; +import com.atguigu.bean.InteractionReviewBean; +import com.atguigu.common.Constant; +import com.atguigu.util.*; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.sql.Connection; +import java.time.Duration; + +public class DwsInteractionReviewWindow extends BaseAppV1 { + public static void main(String[] args) { + new DwsInteractionReviewWindow().init( + 3009, + 2, + "DwsInteractionReviewWindow", + Constant.TOPIC_DWD_INTERACTION_REVIEW + ); + } + + @Override + public void handle(StreamExecutionEnvironment env, DataStreamSource stream) { + SingleOutputStreamOperator beanStream = parsetToPojo(stream.map(JSON::parseObject)); + SingleOutputStreamOperator streamWithoutDim = windowAndAggregate(beanStream); + SingleOutputStreamOperator streamWithDim = joinDim(streamWithoutDim); + writeToClickHouse(streamWithDim); + } + + private void writeToClickHouse(SingleOutputStreamOperator stream) { + stream.addSink(FlinkSinkUtil.getClickHoseSink("dws_interaction_review_window", InteractionReviewBean.class)); + } + + private SingleOutputStreamOperator joinDim(SingleOutputStreamOperator stream) { + return stream + .map(new RichMapFunction() { + private Connection phoenixConn; + @Override + public void open(Configuration parameters) throws Exception { + phoenixConn = JdbcUtil.getPhoenixConnection(); + } + + @Override + public void close() throws Exception { + if (phoenixConn != null) { + phoenixConn.close(); + } + } + + @Override + public InteractionReviewBean map(InteractionReviewBean bean) throws Exception { + JSONObject courseInfo = DimUtil2.readDimFromPhoenix(phoenixConn, "dim_course_info", bean.getCourseId()); + bean.setCourseName(courseInfo.getString("COURSE_NAME")); + return bean; + } + }); + + } + + private SingleOutputStreamOperator windowAndAggregate( + SingleOutputStreamOperator stream) { + return stream + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner((bean, ts) -> bean.getTs()) + ) + .keyBy(InteractionReviewBean::getCourseId) + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce( + new ReduceFunction() { + @Override + public InteractionReviewBean reduce(InteractionReviewBean value1, + InteractionReviewBean value2) throws Exception { + return value1; + } + }, + new ProcessWindowFunction() { + @Override + public void process(String key, + Context ctx, + Iterable elements, + Collector out) throws Exception { + InteractionReviewBean bean = elements.iterator().next(); + bean.setStt(DateFormatUtil.toYmdHms(ctx.window().getStart())); + bean.setEdt(DateFormatUtil.toYmdHms(ctx.window().getEnd())); + bean.setTs(System.currentTimeMillis()); + + out.collect(bean); + } + } + ); + } + + private SingleOutputStreamOperator parsetToPojo(SingleOutputStreamOperator stream) { + return stream.map(new MapFunction() { + @Override + public InteractionReviewBean map(JSONObject value) throws Exception { + InteractionReviewBean bean = InteractionReviewBean.builder() + .userId(value.getString("user_id")) + .courseId(value.getString("course_id")) + .reviewStars(value.getString("review_stars")) + .ts(value.getLong("ts") * 1000) + .build(); + + return bean; + } + }); + } +} + diff --git a/edu/src/main/java/com/atguigu/bean/InteractionReviewBean.java b/edu/src/main/java/com/atguigu/bean/InteractionReviewBean.java new file mode 100644 index 0000000..d86c466 --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/InteractionReviewBean.java @@ -0,0 +1,31 @@ +package com.atguigu.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashSet; +import java.util.Set; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class InteractionReviewBean { + // 窗口起始时间 + String stt; + // 窗口结束时间 + String edt; + // course_id + String courseId; + // course名称 + String courseName; + //用户ID + String userId; + // 评分 + String reviewStars; + // 时间戳 + Long ts; + +} -- Gitee