Loading... ## 大数据计算模式机器代表作品  ## 分布式文件系统物理结构 分布式文件系统在物理结构上是由计算机集群中的多个节点构成的,如图3-2所示。这些节点分为两类: 主节点(名称节点):文件目录的创建、删除、重命名,管理数据节点和文件块的映射关系。客户端只有访问名称节点才能找到请求的文件块所在的位置,进而到相应位置读取所需文件块。 从节点(数据节点):负责数据的存储和读取。在存储时,由名称节点分配存储位置,然后由客户端把数据直接写入相应数据节点;在读取时,客户端从名称节点获得数据节点和文件块的映射关系,到相应位置访问文件块。数据节点也要根据名称节点的命令创建、删除和复制数据块。  ## HDFS 副本存放策略 (1) - 在集群内发起写操作请求,则把第1个副本放置在发起写操作请求的数据节点上,实现就近写入数据。 - 在集群外发起写操作请求,则从集群内部挑选一台磁盘空间较为充足、CPU不太忙的数据节点,作为第1个副本的存放地。 (2)第2个副本会被放置在与第1个副本不同的机架的数据节点上。 (3)第3个副本会被放置在与第1个副本相同的机架的其他节点上。 (4)如果还有更多的副本,则继续从集群中随机选择数据节点进行存放。 ## Shuffle过程 (1)在Map端的Shuffle过程 Map任务的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来“领取”属于自己处理的数据。 (2)在Reduce端的Shuffle过程 Reduce任务从Map端的不同Map机器“领取”属于自己处理的那部分数据,然后对数据进行归并后交给Reduce处理。 ## WordConunt设计思路 1. **分片(Splitting)阶段**:一个大文件被切分成多个分片,分配到多个不同的机器上。每个机器上的Map任务负责处理分配给它的那一部分数据。 2. **Map阶段**:在Map任务中,输入被默认处理为键值对形式,其中键是文件的行号,值是对应的行内容。Map任务解析每行的内容,识别出单词,并输出键值对,格式为<单词,1>,表示这个单词出现一次。 3. **Shuffle阶段**:Map阶段的输出经过排序和分区处理,转换成<key, value-list>的形式,例如<hadoop, [1, 1, 1, 1, 1]>。这表明单词"hadoop"出现了五次。 4. **Reduce阶段**:Reduce任务接收来自Shuffle阶段的中间结果,开始进行汇总计算。每个Reduce任务负责一部分数据的汇总,计算每个单词的最终频数,并将这些结果输出到分布式文件系统中。  ## Spark主要具有如下优点。 (l)Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活。 (2)Spak提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。 (3)Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。 ## Spark各种概念之间的相互关系 在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。  ## 流计算与Hadoop Hadoop作为大数据技术的事实标准,主要依靠MapReduce和HDFS来支撑大规模的分布式存储和处理。然而,Hadoop的MapReduce模型主要适用于批量数据处理,不适合流计算的实时需求。尽管可以通过小批量处理来减少延迟,这种方法会增加处理开销和复杂度,降低系统的可维护性和用户程序的可伸缩性。因此,业界已经开发了许多专门的实时流数据处理系统来满足这些需求。 ## 1、编程实现文件合并和去重操作 对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例,供参考。 输入文件A的样例如下: ``` 20150101 x 20150102 y 20150103 x 20150104 y 20150105 z 20150106 x ``` 输入文件B的样例如下: ``` 20150101 y 20150102 y 20150103 x 20150104 Z 20150105 y ``` 根据输入文件A和B合并得到的输出文件C的样例如下: ``` 20150101 x 20150101 y 20150102 y 20150103 x 20150104 y 20150104 z 20150105 y 20150105 z 20150106 x ``` 参考代码 ~~~java public class MyTest { public static class Map extends Mapper<Object, Text, Text, Text> { private static Text text = new Text(); public void map(Object key, Text value, Context context) { text = value; context.write(text, new Text("")); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) { context.write(key, new Text("")); } } } ~~~ ## 2、编写MapReduce程序实现对输入文件的排序 现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。 输入文件 1 的样例如下: ``` 33 37 12 40 ``` 输入文件 2 的样例如下: ``` 4 16 39 5 ``` 输入文件 3 的样例如下: ``` 1 45 25 ``` 输出结果如下: ``` 1 1 2 4 3 5 4 12 5 16 6 25 7 33 8 37 9 39 10 40 11 45 ``` ~~~java public class MyTest { public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> { private static IntWritable data = new IntWritable(); public void map(Object key, Text value, Context context) { String text = value.toString(); data.set(Integer.parseInt(text)); context.write(data, new IntWritable(1)); } } public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private static IntWritable line_num = new IntWritable(1); public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) { for (IntWritable val : values) { context.write(line_num, key); line_num = new IntWritable(line_num.get() + 1); } } } } ~~~ ## 3、编写 MapReduce 程序实现求平均值 每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 MapReduce 应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。 输入文件(AlgorithmScore)的样例如下: ``` XiaoMing 92 XiaoHong 87 XiaoXin 82 XiaoLi 90 ``` 输入文件(DataBaseScore)的样例如下: ``` XiaoMing 95 XiaoHong 81 XiaoXin 89 XiaoLi 85 ``` 输入文件(PythonScore)的样例如下: ``` XiaoMing 82 XiaoHong 83 XiaoXin 94 XiaoLi 91 ``` 输出结果如下: ``` XiaoHong 83.67 XiaoLi 88.67 XiaoMing 89.67 XiaoXin 88.33 ``` ~~~java public class MyTest { public static class MyMapper extends Mapper<Object, Text, Text, FloatWritable> { private Text stuName = new Text(); private FloatWritable stuScore = new FloatWritable(); public void map(Object key, Text value, Context context) { String[] line = value.toString().split(" "); this.stuName.set(line[0]); this.stuScore.set(Float.parseFloat(line[1])); context.write(this.stuName, this.stuScore); } } public static class MyReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> { private FloatWritable avgScore = new FloatWritable(); public void reduce(Text key, Iterable<FloatWritable> values, Context context) { int num = 0; float sum = 0; float avg = 0; for (FloatWritable val : values) { sum += val.get(); num++; } avg = sum / num; BigDecimal b = new BigDecimal(avg); avgScore.set(b.setScale(2, BigDecimal.ROUND_HALF_UP).floatValue()); context.write(key, avgScore); } } } ~~~ ## 1、编写 Spark 独立应用程序实现数据去重 对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。 输入文件A的样例如下: ``` 20150101 x 20150102 y 20150103 x 20150104 y 20150105 z 20150106 x ``` 输入文件B的样例如下: ``` 20150101 y 20150102 y 20150103 x 20150104 Z 20150105 y ``` 根据输入文件A和B合并得到的输出文件C的样例如下: ``` 20150101 x 20150101 y 20150102 y 20150103 x 20150104 y 20150104 z 20150105 y 20150105 z 20150106 x ``` ~~~scala object MyTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("MyTest").setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("file:///home/hadoop/myfile") val rdd1 = data.map(line => (line, "")) val rdd2 = rdd1.groupByKey() val rdd3 = rdd2.sortByKey() val rdd4 = rdd3.keys rdd4.foreach(println) } } ~~~ ## 2、编写 Spark 独立应用程序实现整合排序 对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。 输入文件 1 的样例如下: ``` 33 37 12 40 ``` 输入文件 2 的样例如下: ``` 4 16 39 5 ``` 输入文件 3 的样例如下: ``` 1 45 25 ``` 输出结果如下: ``` 1 1 2 4 3 5 4 12 5 16 6 25 7 33 8 37 9 39 10 40 11 45 ``` ~~~scala object MyTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("MyTest").setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("file:///home/hadoop/myfile", 1) val rdd1 = data.map(line => (line.toInt, "")) val rdd2 = rdd1.partitionBy(new HashPartitioner(1)) val rdd3 = rdd2.sortByKey() val rdd4 = rdd3.keys var index = 0 val rdd5 = rdd4.map(t => { index = index + 1 (index, t) }) rdd5.foreach(t => println(t._1 + " " + t._2)) } } ~~~ ## 3、编写 Spark 独立应用程序实现求平均值 每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。 输入文件(AlgorithmScore)的样例如下: ``` XiaoMing 92 XiaoHong 87 XiaoXin 82 XiaoLi 90 ``` 输入文件(DataBaseScore)的样例如下: ``` XiaoMing 95 XiaoHong 81 XiaoXin 89 XiaoLi 85 ``` 输入文件(PythonScore)的样例如下: ``` XiaoMing 82 XiaoHong 83 XiaoXin 94 XiaoLi 91 ``` 输出结果如下: ``` XiaoHong 83.67 XiaoLi 88.67 XiaoMing 89.67 XiaoXin 88.33 ``` ~~~scala object MyTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("MyTest").setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("file:///home/hadoop/myfile") val rdd1 = data.map(line => line.split(" ")) val rdd2 = rdd1.map(t => (t(0), t(1).toInt)) val rdd3 = rdd2.mapValues(x => (x, 1)) val rdd4 = rdd3.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) val rdd5 = rdd4.mapValues(x => x._1.toDouble / x._2) rdd5.foreach(t => { val x = t._2 println(t._1 + " " + f"$x%1.2f") }) } } ~~~ 最后修改:2024 年 06 月 14 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 3 如果觉得我的文章对你有用,请随意赞赏