大数据计算模式机器代表作品
分布式文件系统物理结构
分布式文件系统在物理结构上是由计算机集群中的多个节点构成的,如图3-2所示。这些节点分为两类:
主节点(名称节点):文件目录的创建、删除、重命名,管理数据节点和文件块的映射关系。客户端只有访问名称节点才能找到请求的文件块所在的位置,进而到相应位置读取所需文件块。
从节点(数据节点):负责数据的存储和读取。在存储时,由名称节点分配存储位置,然后由客户端把数据直接写入相应数据节点;在读取时,客户端从名称节点获得数据节点和文件块的映射关系,到相应位置访问文件块。数据节点也要根据名称节点的命令创建、删除和复制数据块。
HDFS 副本存放策略
(1)
- 在集群内发起写操作请求,则把第1个副本放置在发起写操作请求的数据节点上,实现就近写入数据。
- 在集群外发起写操作请求,则从集群内部挑选一台磁盘空间较为充足、CPU不太忙的数据节点,作为第1个副本的存放地。
(2)第2个副本会被放置在与第1个副本不同的机架的数据节点上。
(3)第3个副本会被放置在与第1个副本相同的机架的其他节点上。
(4)如果还有更多的副本,则继续从集群中随机选择数据节点进行存放。
Shuffle过程
(1)在Map端的Shuffle过程
Map任务的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来“领取”属于自己处理的数据。
(2)在Reduce端的Shuffle过程
Reduce任务从Map端的不同Map机器“领取”属于自己处理的那部分数据,然后对数据进行归并后交给Reduce处理。
WordConunt设计思路
- 分片(Splitting)阶段:一个大文件被切分成多个分片,分配到多个不同的机器上。每个机器上的Map任务负责处理分配给它的那一部分数据。
- Map阶段:在Map任务中,输入被默认处理为键值对形式,其中键是文件的行号,值是对应的行内容。Map任务解析每行的内容,识别出单词,并输出键值对,格式为<单词,1>,表示这个单词出现一次。
- Shuffle阶段:Map阶段的输出经过排序和分区处理,转换成<key, value-list>的形式,例如<hadoop, [1, 1, 1, 1, 1]>。这表明单词"hadoop"出现了五次。
- Reduce阶段:Reduce任务接收来自Shuffle阶段的中间结果,开始进行汇总计算。每个Reduce任务负责一部分数据的汇总,计算每个单词的最终频数,并将这些结果输出到分布式文件系统中。
Spark主要具有如下优点。
(l)Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活。
(2)Spak提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。
(3)Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。
Spark各种概念之间的相互关系
在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。
流计算与Hadoop
Hadoop作为大数据技术的事实标准,主要依靠MapReduce和HDFS来支撑大规模的分布式存储和处理。然而,Hadoop的MapReduce模型主要适用于批量数据处理,不适合流计算的实时需求。尽管可以通过小批量处理来减少延迟,这种方法会增加处理开销和复杂度,降低系统的可维护性和用户程序的可伸缩性。因此,业界已经开发了许多专门的实时流数据处理系统来满足这些需求。
1、编程实现文件合并和去重操作
对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件B的样例如下:
20150101 y
20150102 y
20150103 x
20150104 Z
20150105 y
根据输入文件A和B合并得到的输出文件C的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
参考代码
public class MyTest {
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text text = new Text();
public void map(Object key, Text value, Context context) {
text = value;
context.write(text, new Text(""));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) {
context.write(key, new Text(""));
}
}
}
2、编写MapReduce程序实现对输入文件的排序
现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。
输入文件 1 的样例如下:
33
37
12
40
输入文件 2 的样例如下:
4
16
39
5
输入文件 3 的样例如下:
1
45
25
输出结果如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
public class MyTest {
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) {
String text = value.toString();
data.set(Integer.parseInt(text));
context.write(data, new IntWritable(1));
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
for (IntWritable val : values) {
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
}
}
}
}
3、编写 MapReduce 程序实现求平均值
每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 MapReduce 应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
输入文件(AlgorithmScore)的样例如下:
XiaoMing 92
XiaoHong 87
XiaoXin 82
XiaoLi 90
输入文件(DataBaseScore)的样例如下:
XiaoMing 95
XiaoHong 81
XiaoXin 89
XiaoLi 85
输入文件(PythonScore)的样例如下:
XiaoMing 82
XiaoHong 83
XiaoXin 94
XiaoLi 91
输出结果如下:
XiaoHong 83.67
XiaoLi 88.67
XiaoMing 89.67
XiaoXin 88.33
public class MyTest {
public static class MyMapper extends Mapper<Object, Text, Text, FloatWritable> {
private Text stuName = new Text();
private FloatWritable stuScore = new FloatWritable();
public void map(Object key, Text value, Context context) {
String[] line = value.toString().split(" ");
this.stuName.set(line[0]);
this.stuScore.set(Float.parseFloat(line[1]));
context.write(this.stuName, this.stuScore);
}
}
public static class MyReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
private FloatWritable avgScore = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values, Context context) {
int num = 0;
float sum = 0;
float avg = 0;
for (FloatWritable val : values) {
sum += val.get();
num++;
}
avg = sum / num;
BigDecimal b = new BigDecimal(avg);
avgScore.set(b.setScale(2, BigDecimal.ROUND_HALF_UP).floatValue());
context.write(key, avgScore);
}
}
}
1、编写 Spark 独立应用程序实现数据去重
对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。
输入文件A的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件B的样例如下:
20150101 y
20150102 y
20150103 x
20150104 Z
20150105 y
根据输入文件A和B合并得到的输出文件C的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
object MyTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyTest").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("file:///home/hadoop/myfile")
val rdd1 = data.map(line => (line, ""))
val rdd2 = rdd1.groupByKey()
val rdd3 = rdd2.sortByKey()
val rdd4 = rdd3.keys
rdd4.foreach(println)
}
}
2、编写 Spark 独立应用程序实现整合排序
对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。
输入文件 1 的样例如下:
33
37
12
40
输入文件 2 的样例如下:
4
16
39
5
输入文件 3 的样例如下:
1
45
25
输出结果如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
object MyTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyTest").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("file:///home/hadoop/myfile", 1)
val rdd1 = data.map(line => (line.toInt, ""))
val rdd2 = rdd1.partitionBy(new HashPartitioner(1))
val rdd3 = rdd2.sortByKey()
val rdd4 = rdd3.keys
var index = 0
val rdd5 = rdd4.map(t => {
index = index + 1
(index, t)
})
rdd5.foreach(t => println(t._1 + " " + t._2))
}
}
3、编写 Spark 独立应用程序实现求平均值
每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。
输入文件(AlgorithmScore)的样例如下:
XiaoMing 92
XiaoHong 87
XiaoXin 82
XiaoLi 90
输入文件(DataBaseScore)的样例如下:
XiaoMing 95
XiaoHong 81
XiaoXin 89
XiaoLi 85
输入文件(PythonScore)的样例如下:
XiaoMing 82
XiaoHong 83
XiaoXin 94
XiaoLi 91
输出结果如下:
XiaoHong 83.67
XiaoLi 88.67
XiaoMing 89.67
XiaoXin 88.33
object MyTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyTest").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("file:///home/hadoop/myfile")
val rdd1 = data.map(line => line.split(" "))
val rdd2 = rdd1.map(t => (t(0), t(1).toInt))
val rdd3 = rdd2.mapValues(x => (x, 1))
val rdd4 = rdd3.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
val rdd5 = rdd4.mapValues(x => x._1.toDouble / x._2)
rdd5.foreach(t => {
val x = t._2
println(t._1 + " " + f"$x%1.2f")
})
}
}