diff --git a/edu/src/main/java/com/atguigu/app/dwd/db/DwdUserRegister.java b/edu/src/main/java/com/atguigu/app/dwd/db/DwdUserRegister.java index 916f6aa0e480af3412008d26df730702a327a313..97245890998150525b0ceb0afacb4bdcd04b90b2 100644 --- a/edu/src/main/java/com/atguigu/app/dwd/db/DwdUserRegister.java +++ b/edu/src/main/java/com/atguigu/app/dwd/db/DwdUserRegister.java @@ -47,5 +47,6 @@ public class DwdUserRegister extends BaseSQLApp { userInfo.executeInsert("dwd_user_register"); + } } diff --git a/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java b/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java new file mode 100644 index 0000000000000000000000000000000000000000..1c7f42685a3d30207bdb9ace79241ef7e0dc168f --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/DwsUserRegisterWindow.java @@ -0,0 +1,78 @@ +package com.atguigu.app.dws; + +import com.alibaba.fastjson.JSON; +import com.atguigu.app.BaseAppV1; +import com.atguigu.bean.UserRegisterBean; +import com.atguigu.common.Constant; +import com.atguigu.util.DateFormatUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.time.Duration; + +/** + * @ClassName DwsUserRegisterWindow + * @Author Chris + * @Description Dws层用户注册汇总表 + * @Date 2022/7/6 8:35 + **/ + +public class DwsUserRegisterWindow extends BaseAppV1 { + public static void main(String[] args) { + new DwsUserRegisterWindow().init( + 2003, + 2, + "DwsUserRegisterWindow", + Constant.TOPIC_DWD_USER_REGISTER + ); + } + + @Override + public void handle(StreamExecutionEnvironment env, DataStreamSource stream) { + stream + .map(json -> new UserRegisterBean( + "", + "", + 1L, + JSON.parseObject(json).getLong("ts") * 1000 + )) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner((userBean, ts) -> userBean.getTs())) + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce( + new ReduceFunction() { + @Override + public UserRegisterBean reduce(UserRegisterBean value1, + UserRegisterBean value2) throws Exception { + value1.setRegisterCt(value1.getRegisterCt() + value2.getRegisterCt()); + return value1; + } + + }, + new AllWindowFunction() { + @Override + public void apply(TimeWindow window, + Iterable values, + Collector out) throws Exception { + UserRegisterBean bean = values.iterator().next(); + bean.setStt(DateFormatUtil.toYmdHms(window.getStart())); + bean.setEdt(DateFormatUtil.toYmdHms(window.getEnd())); + bean.setTs(System.currentTimeMillis()); + + out.collect(bean); + } + } + ) + ;// TODO: 写入clickHouse + + } +} diff --git a/edu/src/main/java/com/atguigu/bean/UserRegisterBean.java b/edu/src/main/java/com/atguigu/bean/UserRegisterBean.java new file mode 100644 index 0000000000000000000000000000000000000000..e5fa929a9f6b80354db2a2a98041f91db7972eb7 --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/UserRegisterBean.java @@ -0,0 +1,19 @@ +package com.atguigu.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class UserRegisterBean { + // 窗口起始时间 + String stt; + // 窗口终止时间 + String edt; + // 注册用户数 + Long registerCt; + // 时间戳 + Long ts; +} \ No newline at end of file diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 6b96518b0a7e59040b393c1275f0478869f8150e..6d7a3bc938d38165a86346c307d97604a659ba9b 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -8,7 +8,7 @@ public class Constant { public static final String TOPIC_ODS_LOG = "ods_log"; public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; - public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop302:8123/gmall2022"; + public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop302:8123/gmall"; public static final int REDIS_DIM_TTL = 2 * 24 * 60 * 60; //维度ttl:2天 public static final String TOPIC_DWD_TRAFFIC_PAGE = "dwd_traffic_page"; diff --git a/edu/src/main/java/com/atguigu/util/AtguiguUtil.java b/edu/src/main/java/com/atguigu/util/AtguiguUtil.java index 4b26e7e08c14072ef53387db7a4d4b1fdd4d87bc..31c74e1bff143b5f51a1d7fc1c49beb1669f1468 100644 --- a/edu/src/main/java/com/atguigu/util/AtguiguUtil.java +++ b/edu/src/main/java/com/atguigu/util/AtguiguUtil.java @@ -13,9 +13,6 @@ import java.util.List; **/ public class AtguiguUtil { - public static void main(String[] args) { - - } public static List toList(Iterable elements) { List list = new ArrayList<>(); diff --git a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java index 46b16a4f9fe25ef9e298ef257946a5081b3a5de6..c31e0ef0dfb2f787dcc6133ab804201b3ae71c71 100644 --- a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java +++ b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java @@ -2,14 +2,25 @@ package com.atguigu.util; import com.atguigu.common.Constant; import com.atguigu.sink.PhoenixSink; +import com.atguigu.bean.NoSink; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.shaded.guava18.com.google.common.base.CaseFormat; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @ClassName FlinkSinkUtil @@ -43,4 +54,80 @@ public class FlinkSinkUtil { public static PhoenixSink getPhoenixSink() { return new PhoenixSink(); } + + public static SinkFunction getClickHoseSink(String table, Class tClass) { + //使用jdbcSink封装一个clickhouse sink + String driver = Constant.CLICKHOUSE_DRIVER; + String url = Constant.CLICKHOUSE_URL; + // insert into table(age, name, sex) values(?,?,?) + // 使用反射, 找到pojo中的属性名 + + Field[] fields = tClass.getDeclaredFields(); + + /*String names = ""; + for (Field field : fields) { + String name = field.getName(); + names += name + ","; + } + names = names.substring(0, names.length() - 1);*/ + + String names = Stream + .of(fields) + .filter(field -> { + NoSink notSink = field.getAnnotation(NoSink.class); + // 没有注解的时候, 属性保留下来 + return notSink == null; + } ) // 过滤掉不需要的字段 + .map(field -> { + String name = field.getName(); + return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, name); // 驼峰转成下划线 + }) + .collect(Collectors.joining(",")); + + + String sql = "insert into " + table + "(" + names + ")values(" + names.replaceAll("[^,]+", "?") + ")"; + System.out.println("clickhosue 插入语句:" + sql); + return getJdbcSink(driver, url, null, null, sql); + } + + private static SinkFunction getJdbcSink(String driver, String url, String user, String password, + String sql) { + + return JdbcSink.sink( + sql, + new JdbcStatementBuilder() { + @Override + public void accept(PreparedStatement ps, + T t) throws SQLException { + //TODO 要根据sql语句 + // insert into a(stt,edt,source,keyword,keyword_count,ts)values(?,?,?,?,?,?) + Class tClass = t.getClass(); + Field[] fields = tClass.getDeclaredFields(); + try { + for (int i = 0, position = 1; i < fields.length; i++) { + Field field = fields[i]; + if (field.getAnnotation(NoSink.class) == null) { + field.setAccessible(true); + Object v = field.get(t); + ps.setObject(position++, v); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }, + new JdbcExecutionOptions.Builder() + .withBatchSize(1024) + .withBatchIntervalMs(2000) + .withMaxRetries(3) + .build(), + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withDriverName(driver) + .withUrl(url) + .withUsername(user) + .withPassword(password) + .build() + ); + } }