From d591b3bb804ed5ef45297b8e5bdd9134d687e5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cno-bug-is-great=E2=80=9D?= <“804257656@qq.com> Date: Wed, 6 Jul 2022 18:21:11 +0800 Subject: [PATCH] keyword --- .../java/com/atguigu/annotation/NotSink.java | 13 +++ .../main/java/com/atguigu/app/BaseAppV1.java | 2 +- ...DwsTrafficSourceKeywordPageViewWindow.java | 80 +++++++++++++ .../java/com/atguigu/bean/KeywordBean.java | 16 +++ .../java/com/atguigu/common/Constant.java | 2 +- .../java/com/atguigu/util/FlinkSinkUtil.java | 105 +++++++++++++++--- .../java/com/atguigu/util/IkAnalyzer.java | 21 ++++ .../main/java/com/atguigu/util/IkUtil.java | 48 ++++++++ 8 files changed, 272 insertions(+), 15 deletions(-) create mode 100644 edu/src/main/java/com/atguigu/annotation/NotSink.java create mode 100644 edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java create mode 100644 edu/src/main/java/com/atguigu/bean/KeywordBean.java create mode 100644 edu/src/main/java/com/atguigu/util/IkAnalyzer.java create mode 100644 edu/src/main/java/com/atguigu/util/IkUtil.java diff --git a/edu/src/main/java/com/atguigu/annotation/NotSink.java b/edu/src/main/java/com/atguigu/annotation/NotSink.java new file mode 100644 index 0000000..a108d75 --- /dev/null +++ b/edu/src/main/java/com/atguigu/annotation/NotSink.java @@ -0,0 +1,13 @@ +package com.atguigu.annotation; +// 告诉java注解用在哪 +// 注解的声明周期 + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface NotSink { +} diff --git a/edu/src/main/java/com/atguigu/app/BaseAppV1.java b/edu/src/main/java/com/atguigu/app/BaseAppV1.java index ae33f33..6e10b4d 100644 --- a/edu/src/main/java/com/atguigu/app/BaseAppV1.java +++ b/edu/src/main/java/com/atguigu/app/BaseAppV1.java @@ -21,7 +21,7 @@ public abstract class BaseAppV1 { System.setProperty("HADOOP_USER_NAME","atguigu"); Configuration conf = new Configuration(); conf.setInteger("rest.port",port); - conf.setString("flink.hadoop.dfs.client.use.datanode.hostname","true"); // + //conf.setString("flink.hadoop.dfs.client.use.datanode.hostname","true"); // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(p); diff --git a/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java b/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java new file mode 100644 index 0000000..8f71dca --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java @@ -0,0 +1,80 @@ +package com.atguigu.app.dws; + +import com.atguigu.app.BaseSQLApp; +import com.atguigu.bean.KeywordBean; +import com.atguigu.common.Constant; +import com.atguigu.util.FlinkSinkUtil; +import com.atguigu.util.IkAnalyzer; +import com.atguigu.util.SQLUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +public class Dws_DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp { + public static void main(String[] args) { + new Dws_DwsTrafficSourceKeywordPageViewWindow().init( + 3001, + 2, + "Dws_DwsTrafficSourceKeywordPageViewWindow", + 10 + ); + } + + @Override + protected void handle(StreamExecutionEnvironment env, + StreamTableEnvironment tEnv) { + // 1. 读取页面日志数据: ddl语句 + tEnv.executeSql("create table page_log(" + + "page map," + + "ts bigint," + + "et as to_timestamp_ltz(ts,3)," + + "watermark for et as et - interval '3' second " + + ")" + SQLUtil.getKafkaSourceDDL(Constant.TOPIC_DWD_TRAFFIC_PAGE, "Dws_DwsTrafficSourceKeywordPageViewWindow")); + + // 2. 过滤出搜索记录, 取出搜索关键词 + Table t1 = tEnv.sqlQuery("select " + + "page['item'] keyword," + + "et " + + "from page_log " + + "where page['page_id']='course_list' " + + "and page['item_type']='keyword' "); + tEnv.createTemporaryView("t1", t1); + + // 3. 对关键词进行分词 自定义函数 + tEnv.createTemporaryFunction("ik_analyzer", IkAnalyzer.class); + Table t2 = tEnv.sqlQuery("select " + + "kw," + + "et " + + "from t1 " + + "join lateral table(ik_analyzer(keyword))on true "); + tEnv.createTemporaryView("t2", t2); + // 4. 开窗聚合 + // 分组窗口 tvf over + // 分组窗口: 滚动 滚动 会话 + // tvf: 滚动 滑动 累计 + Table resultTable = tEnv.sqlQuery("select " + + "date_format(window_start,'yyyy-MM-dd HH:mm:ss') stt," + + "date_format(window_end,'yyyy-MM-dd HH:mm:ss') edt," + + "kw keyword," + + "count(*) keywordCount," + + "unix_timestamp() *1000 as ts " + + "from table(tumble(table t2,descriptor(et),interval '5' second ))" + + "group by kw,window_start,window_end"); + + // 5. 把结果写出到 ClickHouse 中 + // 自定义流到sink + // 支持jdbc连接 + // bean里面的字段名要和表中的字段名保持一致, 这样才能使用反射的反射方式写入 + tEnv + .toRetractStream(resultTable, KeywordBean.class) + .filter(t -> t.f0) + .map(t -> t.f1) + .addSink(FlinkSinkUtil.getClickHoseSink("dws_traffic_source_keyword_page_view_window", KeywordBean.class)); + + try { + env.execute(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/edu/src/main/java/com/atguigu/bean/KeywordBean.java b/edu/src/main/java/com/atguigu/bean/KeywordBean.java new file mode 100644 index 0000000..fc812a1 --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/KeywordBean.java @@ -0,0 +1,16 @@ +package com.atguigu.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class KeywordBean { + private String stt; + private String edt; + private String keyword; + private Long keywordCount; + private Long ts; +} diff --git a/edu/src/main/java/com/atguigu/common/Constant.java b/edu/src/main/java/com/atguigu/common/Constant.java index 6b96518..6d7a3bc 100644 --- a/edu/src/main/java/com/atguigu/common/Constant.java +++ b/edu/src/main/java/com/atguigu/common/Constant.java @@ -8,7 +8,7 @@ public class Constant { public static final String TOPIC_ODS_LOG = "ods_log"; public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; - public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop302:8123/gmall2022"; + public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop302:8123/gmall"; public static final int REDIS_DIM_TTL = 2 * 24 * 60 * 60; //维度ttl:2天 public static final String TOPIC_DWD_TRAFFIC_PAGE = "dwd_traffic_page"; diff --git a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java index 46b16a4..0c68224 100644 --- a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java +++ b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java @@ -1,15 +1,27 @@ package com.atguigu.util; +import com.atguigu.annotation.NotSink; +import com.atguigu.bean.KeywordBean; import com.atguigu.common.Constant; import com.atguigu.sink.PhoenixSink; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.shaded.guava18.com.google.common.base.CaseFormat; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @ClassName FlinkSinkUtil @@ -27,20 +39,87 @@ public class FlinkSinkUtil { return new FlinkKafkaProducer( - "default", - new KafkaSerializationSchema() { - @Override - public ProducerRecord serialize(String element, - @Nullable Long timestamp) { - return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8)); - } - }, - props, - FlinkKafkaProducer.Semantic.EXACTLY_ONCE + "default", + new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String element, + @Nullable Long timestamp) { + return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8)); + } + }, + props, + FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); } - public static PhoenixSink getPhoenixSink() { + public static PhoenixSink getPhoenixSink() { return new PhoenixSink(); - } -} + } + + public static SinkFunction getClickHoseSink(String table, Class tClass) { + //使用jdbcSink封装一个clickhouse sink + String driver = Constant.CLICKHOUSE_DRIVER; + String url = Constant.CLICKHOUSE_URL; + // insert into table(age, name, sex) values(?,?,?) + // 使用反射, 找到pojo中的属性名 + + Field[] fields = tClass.getDeclaredFields(); + + String names = Stream + .of(fields) + .filter(field -> { + NotSink notSink = field.getAnnotation(NotSink.class); + // 没有注解的时候, 属性保留下来 + return notSink == null; + }) + .map(field -> { + String name = field.getName(); + return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, name); + }) + .collect(Collectors.joining(",")); + + String sql = "insert into " + table + "(" + names + ")values(" + names.replaceAll("[^,]+", "?") + ")"; + System.out.println("clickhosue 插入语句:" + sql); + return getJdbcSink(driver, url, null, null, sql); + } + + private static SinkFunction getJdbcSink(String driver, String url, String user, String password, + String sql) { + return JdbcSink.sink( + sql, + new JdbcStatementBuilder() { + @Override + public void accept(PreparedStatement ps, + T t) throws SQLException { + //TODO 要根据sql语句 + // insert into a(stt,edt,keyword,keyword_count,ts)values(?,?,?,?,?) + Class tClass = t.getClass(); + Field[] fields = tClass.getDeclaredFields(); + try { + for (int i = 0, position = 1; i < fields.length; i++) { + Field field = fields[i]; + if (field.getAnnotation(NotSink.class) == null) { + field.setAccessible(true); + Object v = field.get(t); + ps.setObject(position++, v); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }, + new JdbcExecutionOptions.Builder() + .withBatchSize(1024) + .withBatchIntervalMs(2000) + .withMaxRetries(3) + .build(), + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withDriverName(driver) + .withUrl(url) + .withUsername(user) + .withPassword(password) + .build() + ); + } +} \ No newline at end of file diff --git a/edu/src/main/java/com/atguigu/util/IkAnalyzer.java b/edu/src/main/java/com/atguigu/util/IkAnalyzer.java new file mode 100644 index 0000000..31c8c15 --- /dev/null +++ b/edu/src/main/java/com/atguigu/util/IkAnalyzer.java @@ -0,0 +1,21 @@ +package com.atguigu.util; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.util.List; + +@FunctionHint(output = @DataTypeHint("row")) +public class IkAnalyzer extends TableFunction { + public void eval(String keyword) { + // 把keyword进行分词 + // 小米手机 + List kws = IkUtil.split(keyword); + // list有多少个字符串, 就输出多少行 + for (String kw : kws) { + collect(Row.of(kw)); + } + } +} diff --git a/edu/src/main/java/com/atguigu/util/IkUtil.java b/edu/src/main/java/com/atguigu/util/IkUtil.java new file mode 100644 index 0000000..7c73a0c --- /dev/null +++ b/edu/src/main/java/com/atguigu/util/IkUtil.java @@ -0,0 +1,48 @@ +package com.atguigu.util; + +import org.wltea.analyzer.core.IKSegmenter; +import org.wltea.analyzer.core.Lexeme; + +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +public class IkUtil { + // 使用ik分词器把传入的字符串进行分词 + //TODO + public static List split(String keyword) { + List result = new ArrayList<>(); + // string -> reader ? + + // 内存流 StringReader + StringReader reader = new StringReader(keyword); + IKSegmenter ikSegmenter = new IKSegmenter(reader, true); + + Lexeme next = null; + try { + next = ikSegmenter.next(); + while (next != null) { + String kw = next.getLexemeText(); + result.add(kw); + + next = ikSegmenter.next(); + } + } catch (IOException e) { + e.printStackTrace(); + } + + + // list集合中可能有重复元素,实现去重 + HashSet set = new HashSet<>(result); + result.clear(); + result.addAll(set); + + return result; + } + + public static void main(String[] args) { + System.out.println(split("java,python,多线程,前端,数据库,大数据,hadoop,flink")); + } +} -- Gitee