大数据计算模式机器代表作品

image-20240614080939571

分布式文件系统物理结构

分布式文件系统在物理结构上是由计算机集群中的多个节点构成的,如图3-2所示。这些节点分为两类:

主节点(名称节点):文件目录的创建、删除、重命名,管理数据节点和文件块的映射关系。客户端只有访问名称节点才能找到请求的文件块所在的位置,进而到相应位置读取所需文件块。

从节点(数据节点):负责数据的存储和读取。在存储时,由名称节点分配存储位置,然后由客户端把数据直接写入相应数据节点;在读取时,客户端从名称节点获得数据节点和文件块的映射关系,到相应位置访问文件块。数据节点也要根据名称节点的命令创建、删除和复制数据块。

image-20240614081938181

HDFS 副本存放策略

(1)

  • 在集群内发起写操作请求,则把第1个副本放置在发起写操作请求的数据节点上,实现就近写入数据。
  • 在集群外发起写操作请求,则从集群内部挑选一台磁盘空间较为充足、CPU不太忙的数据节点,作为第1个副本的存放地。

(2)第2个副本会被放置在与第1个副本不同的机架的数据节点上。
(3)第3个副本会被放置在与第1个副本相同的机架的其他节点上。
(4)如果还有更多的副本,则继续从集群中随机选择数据节点进行存放。

Shuffle过程

(1)在Map端的Shuffle过程
Map任务的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来“领取”属于自己处理的数据。

(2)在Reduce端的Shuffle过程

Reduce任务从Map端的不同Map机器“领取”属于自己处理的那部分数据,然后对数据进行归并后交给Reduce处理。

WordConunt设计思路

  1. 分片(Splitting)阶段:一个大文件被切分成多个分片,分配到多个不同的机器上。每个机器上的Map任务负责处理分配给它的那一部分数据。
  2. Map阶段:在Map任务中,输入被默认处理为键值对形式,其中键是文件的行号,值是对应的行内容。Map任务解析每行的内容,识别出单词,并输出键值对,格式为<单词,1>,表示这个单词出现一次。
  3. Shuffle阶段:Map阶段的输出经过排序和分区处理,转换成<key, value-list>的形式,例如<hadoop, [1, 1, 1, 1, 1]>。这表明单词"hadoop"出现了五次。
  4. Reduce阶段:Reduce任务接收来自Shuffle阶段的中间结果,开始进行汇总计算。每个Reduce任务负责一部分数据的汇总,计算每个单词的最终频数,并将这些结果输出到分布式文件系统中。

image-20240614081406011

Spark主要具有如下优点。

(l)Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活。
(2)Spak提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。
(3)Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。

Spark各种概念之间的相互关系

在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。

image-20240614081554772

流计算与Hadoop

Hadoop作为大数据技术的事实标准,主要依靠MapReduce和HDFS来支撑大规模的分布式存储和处理。然而,Hadoop的MapReduce模型主要适用于批量数据处理,不适合流计算的实时需求。尽管可以通过小批量处理来减少延迟,这种方法会增加处理开销和复杂度,降低系统的可维护性和用户程序的可伸缩性。因此,业界已经开发了许多专门的实时流数据处理系统来满足这些需求。

1、编程实现文件合并和去重操作

对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

输入文件B的样例如下:

20150101 y
20150102 y
20150103 x
20150104 Z
20150105 y

根据输入文件A和B合并得到的输出文件C的样例如下:

20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

参考代码

public class MyTest {

    public static class Map extends Mapper<Object, Text, Text, Text> {
        private static Text text = new Text();

        public void map(Object key, Text value, Context context) {
            text = value;
            context.write(text, new Text(""));
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) {
            context.write(key, new Text(""));
        }
    }

}

2、编写MapReduce程序实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

输入文件 1 的样例如下:

33
37
12
40

输入文件 2 的样例如下:

4
16
39
5

输入文件 3 的样例如下:

1
45
25

输出结果如下:

1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
public class MyTest {
    public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
        private static IntWritable data = new IntWritable();

        public void map(Object key, Text value, Context context) {
            String text = value.toString();
            data.set(Integer.parseInt(text));
            context.write(data, new IntWritable(1));
        }
    }

    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private static IntWritable line_num = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
            for (IntWritable val : values) {
                context.write(line_num, key);
                line_num = new IntWritable(line_num.get() + 1);
            }
        }
    }
}

3、编写 MapReduce 程序实现求平均值

每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 MapReduce 应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

输入文件(AlgorithmScore)的样例如下:

XiaoMing 92
XiaoHong 87
XiaoXin 82
XiaoLi 90

输入文件(DataBaseScore)的样例如下:

XiaoMing 95
XiaoHong 81
XiaoXin 89
XiaoLi 85

输入文件(PythonScore)的样例如下:

XiaoMing 82
XiaoHong 83
XiaoXin 94
XiaoLi 91

输出结果如下:

XiaoHong 83.67
XiaoLi 88.67
XiaoMing 89.67
XiaoXin 88.33
public class MyTest {

    public static class MyMapper extends Mapper<Object, Text, Text, FloatWritable> {
        private Text stuName = new Text();
        private FloatWritable stuScore = new FloatWritable();

        public void map(Object key, Text value, Context context) {
            String[] line = value.toString().split(" ");
            this.stuName.set(line[0]);
            this.stuScore.set(Float.parseFloat(line[1]));
            context.write(this.stuName, this.stuScore);
        }
    }

    public static class MyReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
        private FloatWritable avgScore = new FloatWritable();

        public void reduce(Text key, Iterable<FloatWritable> values, Context context) {
            int num = 0;
            float sum = 0;
            float avg = 0;
            for (FloatWritable val : values) {
                sum += val.get();
                num++;
            }
            avg = sum / num;
            BigDecimal b = new BigDecimal(avg);
            avgScore.set(b.setScale(2, BigDecimal.ROUND_HALF_UP).floatValue());
            context.write(key, avgScore);
        }
    }
}

1、编写 Spark 独立应用程序实现数据去重

对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。
输入文件A的样例如下:

20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

输入文件B的样例如下:

20150101 y
20150102 y
20150103 x
20150104 Z
20150105 y

根据输入文件A和B合并得到的输出文件C的样例如下:

20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
object MyTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyTest").setMaster("local")
    val sc = new SparkContext(conf)
    val data = sc.textFile("file:///home/hadoop/myfile")
      
    val rdd1 = data.map(line => (line, ""))
    val rdd2 = rdd1.groupByKey()
    val rdd3 = rdd2.sortByKey()
    val rdd4 = rdd3.keys

    rdd4.foreach(println)
  }
}

2、编写 Spark 独立应用程序实现整合排序

对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并、剔除其中重复的内容并排序,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。

输入文件 1 的样例如下:

33
37
12
40

输入文件 2 的样例如下:

4
16
39
5

输入文件 3 的样例如下:

1
45
25

输出结果如下:

1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
object MyTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyTest").setMaster("local")
    val sc = new SparkContext(conf)
    val data = sc.textFile("file:///home/hadoop/myfile", 1)
      
    val rdd1 = data.map(line => (line.toInt, ""))
    val rdd2 = rdd1.partitionBy(new HashPartitioner(1))
    val rdd3 = rdd2.sortByKey()
    val rdd4 = rdd3.keys

    var index = 0
    val rdd5 = rdd4.map(t => {
      index = index + 1
      (index, t)
    })

    rdd5.foreach(t => println(t._1 + " " + t._2))
  }
}

3、编写 Spark 独立应用程序实现求平均值

每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到终端中。下面是输入文件和输出结果的一个样例,供参考。
输入文件(AlgorithmScore)的样例如下:

XiaoMing 92
XiaoHong 87
XiaoXin 82
XiaoLi 90

输入文件(DataBaseScore)的样例如下:

XiaoMing 95
XiaoHong 81
XiaoXin 89
XiaoLi 85

输入文件(PythonScore)的样例如下:

XiaoMing 82
XiaoHong 83
XiaoXin 94
XiaoLi 91

输出结果如下:

XiaoHong 83.67
XiaoLi 88.67
XiaoMing 89.67
XiaoXin 88.33
object MyTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyTest").setMaster("local")
    val sc = new SparkContext(conf)
    val data = sc.textFile("file:///home/hadoop/myfile")
      
    val rdd1 = data.map(line => line.split(" "))
    val rdd2 = rdd1.map(t => (t(0), t(1).toInt))
    val rdd3 = rdd2.mapValues(x => (x, 1))
    val rdd4 = rdd3.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    val rdd5 = rdd4.mapValues(x => x._1.toDouble / x._2)

    rdd5.foreach(t => {
      val x = t._2
      println(t._1 + " " + f"$x%1.2f")
    })
  }
}
最后修改:2024 年 06 月 14 日
如果觉得我的文章对你有用,请随意赞赏