diff --git a/edu/src/main/java/com/atguigu/annotation/NotSink.java b/edu/src/main/java/com/atguigu/annotation/NotSink.java new file mode 100644 index 0000000000000000000000000000000000000000..a108d758dd31681ffb4be1bed5adae5b024a78ce --- /dev/null +++ b/edu/src/main/java/com/atguigu/annotation/NotSink.java @@ -0,0 +1,13 @@ +package com.atguigu.annotation; +// 告诉java注解用在哪 +// 注解的声明周期 + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface NotSink { +} diff --git a/edu/src/main/java/com/atguigu/app/BaseAppV1.java b/edu/src/main/java/com/atguigu/app/BaseAppV1.java index c4bc36d7114b0e7921d86bc22f81dfec963c5883..819f93504cfe874a461dc1b49403111de06cdc6b 100644 --- a/edu/src/main/java/com/atguigu/app/BaseAppV1.java +++ b/edu/src/main/java/com/atguigu/app/BaseAppV1.java @@ -21,7 +21,7 @@ public abstract class BaseAppV1 { System.setProperty("HADOOP_USER_NAME","atguigu"); Configuration conf = new Configuration(); conf.setInteger("rest.port",port); - conf.setString("flink.hadoop.dfs.client.use.datanode.hostname","true"); // + //conf.setString("flink.hadoop.dfs.client.use.datanode.hostname","true"); // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(p); diff --git a/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java b/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java new file mode 100644 index 0000000000000000000000000000000000000000..8f71dcae1bd2a785a0ef800bba31daeb35cc9857 --- /dev/null +++ b/edu/src/main/java/com/atguigu/app/dws/Dws_DwsTrafficSourceKeywordPageViewWindow.java @@ -0,0 +1,80 @@ +package com.atguigu.app.dws; + +import com.atguigu.app.BaseSQLApp; +import com.atguigu.bean.KeywordBean; +import com.atguigu.common.Constant; +import com.atguigu.util.FlinkSinkUtil; +import com.atguigu.util.IkAnalyzer; +import com.atguigu.util.SQLUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +public class Dws_DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp { + public static void main(String[] args) { + new Dws_DwsTrafficSourceKeywordPageViewWindow().init( + 3001, + 2, + "Dws_DwsTrafficSourceKeywordPageViewWindow", + 10 + ); + } + + @Override + protected void handle(StreamExecutionEnvironment env, + StreamTableEnvironment tEnv) { + // 1. 读取页面日志数据: ddl语句 + tEnv.executeSql("create table page_log(" + + "page map," + + "ts bigint," + + "et as to_timestamp_ltz(ts,3)," + + "watermark for et as et - interval '3' second " + + ")" + SQLUtil.getKafkaSourceDDL(Constant.TOPIC_DWD_TRAFFIC_PAGE, "Dws_DwsTrafficSourceKeywordPageViewWindow")); + + // 2. 过滤出搜索记录, 取出搜索关键词 + Table t1 = tEnv.sqlQuery("select " + + "page['item'] keyword," + + "et " + + "from page_log " + + "where page['page_id']='course_list' " + + "and page['item_type']='keyword' "); + tEnv.createTemporaryView("t1", t1); + + // 3. 对关键词进行分词 自定义函数 + tEnv.createTemporaryFunction("ik_analyzer", IkAnalyzer.class); + Table t2 = tEnv.sqlQuery("select " + + "kw," + + "et " + + "from t1 " + + "join lateral table(ik_analyzer(keyword))on true "); + tEnv.createTemporaryView("t2", t2); + // 4. 开窗聚合 + // 分组窗口 tvf over + // 分组窗口: 滚动 滚动 会话 + // tvf: 滚动 滑动 累计 + Table resultTable = tEnv.sqlQuery("select " + + "date_format(window_start,'yyyy-MM-dd HH:mm:ss') stt," + + "date_format(window_end,'yyyy-MM-dd HH:mm:ss') edt," + + "kw keyword," + + "count(*) keywordCount," + + "unix_timestamp() *1000 as ts " + + "from table(tumble(table t2,descriptor(et),interval '5' second ))" + + "group by kw,window_start,window_end"); + + // 5. 把结果写出到 ClickHouse 中 + // 自定义流到sink + // 支持jdbc连接 + // bean里面的字段名要和表中的字段名保持一致, 这样才能使用反射的反射方式写入 + tEnv + .toRetractStream(resultTable, KeywordBean.class) + .filter(t -> t.f0) + .map(t -> t.f1) + .addSink(FlinkSinkUtil.getClickHoseSink("dws_traffic_source_keyword_page_view_window", KeywordBean.class)); + + try { + env.execute(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/edu/src/main/java/com/atguigu/bean/KeywordBean.java b/edu/src/main/java/com/atguigu/bean/KeywordBean.java new file mode 100644 index 0000000000000000000000000000000000000000..fc812a1338880700589f5052a5bfa192f3758626 --- /dev/null +++ b/edu/src/main/java/com/atguigu/bean/KeywordBean.java @@ -0,0 +1,16 @@ +package com.atguigu.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class KeywordBean { + private String stt; + private String edt; + private String keyword; + private Long keywordCount; + private Long ts; +} diff --git a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java index c31e0ef0dfb2f787dcc6133ab804201b3ae71c71..173133d6bd7415bf464174ffe942d472f062748d 100644 --- a/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java +++ b/edu/src/main/java/com/atguigu/util/FlinkSinkUtil.java @@ -1,5 +1,7 @@ package com.atguigu.util; +import com.atguigu.annotation.NotSink; +import com.atguigu.bean.KeywordBean; import com.atguigu.common.Constant; import com.atguigu.sink.PhoenixSink; import com.atguigu.bean.NoSink; @@ -38,20 +40,20 @@ public class FlinkSinkUtil { return new FlinkKafkaProducer( - "default", - new KafkaSerializationSchema() { - @Override - public ProducerRecord serialize(String element, - @Nullable Long timestamp) { - return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8)); - } - }, - props, - FlinkKafkaProducer.Semantic.EXACTLY_ONCE + "default", + new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String element, + @Nullable Long timestamp) { + return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8)); + } + }, + props, + FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); } - public static PhoenixSink getPhoenixSink() { + public static PhoenixSink getPhoenixSink() { return new PhoenixSink(); } @@ -64,26 +66,18 @@ public class FlinkSinkUtil { Field[] fields = tClass.getDeclaredFields(); - /*String names = ""; - for (Field field : fields) { - String name = field.getName(); - names += name + ","; - } - names = names.substring(0, names.length() - 1);*/ - String names = Stream - .of(fields) - .filter(field -> { - NoSink notSink = field.getAnnotation(NoSink.class); - // 没有注解的时候, 属性保留下来 - return notSink == null; - } ) // 过滤掉不需要的字段 - .map(field -> { - String name = field.getName(); - return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, name); // 驼峰转成下划线 - }) - .collect(Collectors.joining(",")); - + .of(fields) + .filter(field -> { + NotSink notSink = field.getAnnotation(NotSink.class); + // 没有注解的时候, 属性保留下来 + return notSink == null; + }) + .map(field -> { + String name = field.getName(); + return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, name); + }) + .collect(Collectors.joining(",")); String sql = "insert into " + table + "(" + names + ")values(" + names.replaceAll("[^,]+", "?") + ")"; System.out.println("clickhosue 插入语句:" + sql); @@ -92,42 +86,41 @@ public class FlinkSinkUtil { private static SinkFunction getJdbcSink(String driver, String url, String user, String password, String sql) { - return JdbcSink.sink( - sql, - new JdbcStatementBuilder() { - @Override - public void accept(PreparedStatement ps, - T t) throws SQLException { - //TODO 要根据sql语句 - // insert into a(stt,edt,source,keyword,keyword_count,ts)values(?,?,?,?,?,?) - Class tClass = t.getClass(); - Field[] fields = tClass.getDeclaredFields(); - try { - for (int i = 0, position = 1; i < fields.length; i++) { - Field field = fields[i]; - if (field.getAnnotation(NoSink.class) == null) { - field.setAccessible(true); - Object v = field.get(t); - ps.setObject(position++, v); - } + sql, + new JdbcStatementBuilder() { + @Override + public void accept(PreparedStatement ps, + T t) throws SQLException { + //TODO 要根据sql语句 + // insert into a(stt,edt,keyword,keyword_count,ts)values(?,?,?,?,?) + Class tClass = t.getClass(); + Field[] fields = tClass.getDeclaredFields(); + try { + for (int i = 0, position = 1; i < fields.length; i++) { + Field field = fields[i]; + if (field.getAnnotation(NotSink.class) == null) { + field.setAccessible(true); + Object v = field.get(t); + ps.setObject(position++, v); } - } catch (Exception e) { - e.printStackTrace(); } + } catch (Exception e) { + e.printStackTrace(); } - }, - new JdbcExecutionOptions.Builder() - .withBatchSize(1024) - .withBatchIntervalMs(2000) - .withMaxRetries(3) - .build(), - new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withDriverName(driver) - .withUrl(url) - .withUsername(user) - .withPassword(password) - .build() + } + }, + new JdbcExecutionOptions.Builder() + .withBatchSize(1024) + .withBatchIntervalMs(2000) + .withMaxRetries(3) + .build(), + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withDriverName(driver) + .withUrl(url) + .withUsername(user) + .withPassword(password) + .build() ); } } diff --git a/edu/src/main/java/com/atguigu/util/IkAnalyzer.java b/edu/src/main/java/com/atguigu/util/IkAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..31c8c158916ad32e47e28c99ef0ce4b2a60d0b49 --- /dev/null +++ b/edu/src/main/java/com/atguigu/util/IkAnalyzer.java @@ -0,0 +1,21 @@ +package com.atguigu.util; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.util.List; + +@FunctionHint(output = @DataTypeHint("row")) +public class IkAnalyzer extends TableFunction { + public void eval(String keyword) { + // 把keyword进行分词 + // 小米手机 + List kws = IkUtil.split(keyword); + // list有多少个字符串, 就输出多少行 + for (String kw : kws) { + collect(Row.of(kw)); + } + } +} diff --git a/edu/src/main/java/com/atguigu/util/IkUtil.java b/edu/src/main/java/com/atguigu/util/IkUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..7c73a0cd645dc3de45da40fea23ac70dceeb8660 --- /dev/null +++ b/edu/src/main/java/com/atguigu/util/IkUtil.java @@ -0,0 +1,48 @@ +package com.atguigu.util; + +import org.wltea.analyzer.core.IKSegmenter; +import org.wltea.analyzer.core.Lexeme; + +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +public class IkUtil { + // 使用ik分词器把传入的字符串进行分词 + //TODO + public static List split(String keyword) { + List result = new ArrayList<>(); + // string -> reader ? + + // 内存流 StringReader + StringReader reader = new StringReader(keyword); + IKSegmenter ikSegmenter = new IKSegmenter(reader, true); + + Lexeme next = null; + try { + next = ikSegmenter.next(); + while (next != null) { + String kw = next.getLexemeText(); + result.add(kw); + + next = ikSegmenter.next(); + } + } catch (IOException e) { + e.printStackTrace(); + } + + + // list集合中可能有重复元素,实现去重 + HashSet set = new HashSet<>(result); + result.clear(); + result.addAll(set); + + return result; + } + + public static void main(String[] args) { + System.out.println(split("java,python,多线程,前端,数据库,大数据,hadoop,flink")); + } +}