1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import org.apache.spark.sql.SparkSession
import scala.util.Try
object Application {
def main(args : Array[String]) {
val spark = SparkSession.builder().appName("SparkApp").master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val scorebcp = "/mnt/c/Users/mxtao/Desktop/score.nb"
val genderbcp = "/mnt/c/Users/mxtao/Desktop/gender.nb"
// 读取文件,过滤无效数据
val score = sc.textFile(scorebcp)
.map(_.split("\t"))
.filter(_.length == 5) // 过滤掉字段有缺失的数据
.map(a => (a(0), a(1), a(2), a(3), a(4))) // 把数组转换成元组
.filter(t => t._1.nonEmpty && t._2.nonEmpty && t._3.nonEmpty && t._4.nonEmpty && t._5.nonEmpty) // 保证必要字段不为空
.filter(t => Try(t._3.toFloat).isSuccess && Try(t._4.toFloat).isSuccess && Try(t._5.toFloat).isSuccess) // 保证成绩字段是合法的
.map(t => (t._1, t._2, t._3.toFloat, t._4.toFloat, t._5.toFloat)) // 转换成 (学号, 姓名, 语文, 数学, 英语) 形式
val gender = sc.textFile(genderbcp)
.map(_.split("\t"))
.filter(_.length == 2)
.map(a => (a(0), a(1)))
.filter(t => t._1.nonEmpty && t._2.nonEmpty)
// 按总成绩排名从高到底输出学号和姓名
score.map(t => (t._1, t._2, t._3 + t._4 + t._5)) // 转换成 (学号, 姓名, 总成绩) 形式
.sortBy(_._3, ascending = false) // 以总成绩为key,按从低到高排序
.map(t => s"${t._1}\t${t._2}") // 格式化数据,为输出做准备
.collect() // 将分布在各个节点的数据拿到本地
.foreach(println) // 打印到控制台
// 找出有不及格科目的同学
score.filter(t => t._3 < 60.0 || t._4 < 60.0 || t._5 < 60.0)
.collect()
.foreach(println)
// 分别找出各个科目的第一名
score.sortBy(_._3, ascending = false)
.take(1)
.foreach(println)
score.sortBy(_._4, ascending = false)
.take(1)
.foreach(println)
score.sortBy(_._5, ascending = false)
.take(1)
.foreach(println)
// 分别找出男生和女生的第一名
val score2 = score.map(t => (t._1, (t._1, t._2, t._3 + t._4 + t._5))) // 转换成二元组,为join做准备,此处转换成了 (学号, (学号, 姓名, 总成绩)) 形式
.join(gender) // 与gender数据集(结构为:(学号,性别))关联碰撞,策略为丢弃所有未关联数据,结果结构为: (学号, ((学号, 姓名, 总成绩), 性别))
.map(_._2) // 丢掉不再使用的key,然后结构变为:((学号, 姓名, 总成绩), 性别)
score2.filter(_._2 == "男")
.map(_._1)
.sortBy(_._3, ascending = false)
.take(1)
.foreach(println)
score2.filter(_._2 == "女")
.map(_._1)
.sortBy(_._3, ascending = false)
.take(1)
.foreach(println)
spark.close()
}
}
|