代码拉取完成,页面将自动刷新
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class TicketsByC {
public static void main(String[] args) throws Exception {
// spark://master:7077
// yarn-client
// 设置Spark应用程序的配置,这里指定应用名称为"TicketsCount",并在本地模式下运行,使用3个线程
SparkConf conf = new SparkConf().setAppName("TicketsCount").setMaster("local[3]");
// 初始化JavaSparkContext对象,它是Java Spark程序的主要入口点
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 数据初步清理,主要是过滤出各个机场个总票数
*/
// 读取HDFS上的CSV文件
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/user/data/region.csv", 1);
// 过滤出(第二列或第三列)以'C'开头的行
lines = lines.filter(line ->
line.split(",")[1].substring(0, 1).equals("C")||
line.split(",")[2].substring(0,1).equals("C")
);
// 将过滤后的行转换为键值对RDD,键为机场代码(第二列),值为票数(第四列)
JavaPairRDD<String, Integer> pair = lines.mapToPair(line ->
new Tuple2<>(line.split(",")[1], Integer.parseInt(line.split(",")[3]))
);
// 按键(机场代码)进行归约操作,统计每个机场的总票数
JavaPairRDD<String, Integer> counts = pair.reduceByKey((v1, v2) -> (v1 + v2));
/**
* 数据二次清理,进行排序
*/
// 数据二次处理,先将键值对中的键和值互换,以便按票数排序
JavaPairRDD<Integer, String> counts_r = counts.mapToPair(t -> new Tuple2<>(t._2, t._1));
// 按票数(现在作为键)进行降序排序
JavaPairRDD<Integer, String> sorted_counts_r = counts_r.sortByKey(false);
// 再次将键值对中的键和值互换回来,以机场代码作为键
JavaPairRDD<String, Integer> sortedcounts = sorted_counts_r.mapToPair(t -> new Tuple2<>(t._2, t._1));
// 打印排序后的结果
sortedcounts.foreach(t -> {
System.out.println(t._1 + "appears " + t._2 + " times.");
});
// 将排序后的结果保存到HDFS上
sortedcounts.saveAsTextFile("hdfs://master:9000/out/view1");
sc.close();
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。