小工具      在线工具  汉语词典  css  js  c++  java

Spark核心RDD详解(设计与运行原理,分区,创建,转换,行动与持久化)

大数据,spark,大数据,hadoop 额外说明

收录于:152天前

RDD设计背景与概念


在实际应用中,有很多迭代算法(如机器学习、图算法等)和交互式数据挖掘工具。这些应用场景的共同点是中间结果,即某一阶段的输出,在不同的计算阶段之间被复用。结果作为下一阶段的输入。然而目前的MapReduce框架将中间结果写入HDFS,这带来了大量的数据复制、磁盘IO和序列化开销。尽管Pregel等图计算框架也将结果存储在内存中,但这些框架只能支持一些特定的计算模式,并不能提供通用的数据抽象。 RDD的出现就是为了满足这种需求。它提供了一个抽象的数据架构。我们不必担心底层数据的分布式特性。我们只需要将具体的应用逻辑表达为一系列的转换过程,以及不同RDD之间的转换。操作形成依赖关系,可以流水线化,从而避免了中间结果的存储,大大减少了数据复制、磁盘IO和序列化开销。


RDD 是分布式对象的集合。它本质上是一个只读分区记录集合。每个RDD可以分为多个分区。每个分区都是一个数据集碎片,一个RDD的不同分区可以保存到集群中。可以在不同的节点上进行并行计算,从而可以在集群中的不同节点上进行并行计算。 RDD提供了高度受限的共享内存模型,即RDD是不能直接修改的只读记录分区的集合。 RDD只能基于稳定物理存储中的数据集创建,或者通过在其他RDD上执行来创建。新的 RDD 是通过某些转换操作(例如 map、join 和 groupBy)创建的。 RDD提供了丰富的操作集来支持常见的数据操作,分为“Action”和“Transformation”两种类型。前者用于执行计算并指定输出的形式,后者指定RDD。之间的相互依赖关系。两类操作的主要区别在于,转换操作(如map、filter、groupBy、join等)接受RDD并返回RDD,而操作操作(如count、collect等)接受RDD但返回RDD非RDD(即输出一个值或结果)。 RDD提供的转换接口非常简单。它们是map、filter、groupBy、join等粗粒度的数据转换操作,而不是对某个数据项的细粒度修改。因此,RDD更适合对数据集中的元素执行相同操作的批处理应用,但不适合需要异步且细粒度状态的应用,例如Web应用系统、增量网络爬虫等。其中,这种粗粒度的转换接口设计会让人直观地认为RDD的功能非常有限,不够强大。然而,RDD实际上已被实践证明可以很好地应用于许多并行计算应用中。它可以具备很多现有计算框架(如MapReduce、SQL、Pregel等)的表达能力,可以应用于这些框架无法处理的任务。交互式数据挖掘应用程序。

资料来源。1


RDD处理流程

Spark用Scala语言实现了RDD API,程序员可以通过调用API来实现对RDD的各种操作。 RDD的典型执行流程如下:

  1. RDD是通过读取外部数据源(或内存中的集合)创建的;
  2. RDD经过一系列“转换”操作,每次都会生成不同的RDD供下一次“转换”使用;
  3. 最后一个RDD经“行动”操作进行处理,并输出到外部数据源(或者变成Scala集合或标量)。
    需要说明的是,RDD采用了惰性调用,即在RDD的执行过程中(如图9-8所示),真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

如图1所示,创建了word.txt文件,内容如上;图2加载文件和视频统计逻辑;图3打印并生成RDD内容;图 5 显示每个生成的对象都是一个 RDD。

可以看出,Spark应用基本上就是一系列基于RDD的计算操作。 RDD按顺序使用并且相互依赖。最后一个操作是持久化 RDD,将其保存在内存或磁盘中。 (内存是从scala程序继承的)。

总结:

Spark的核心构建在统一的抽象RDD之上,使得Spark的各个组件可以无缝集成,在同一个应用程序中完成大数据计算任务。

计算一个RDD集合的执行流程如下:

  1. 创建这个Spark程序的执行上下文,即创建一个SparkContext对象; (用于操作 Spark 的 Scala 上下文连接)。
  2. 通过SparkContext从外部数据源读取数据创建RDD对象;
  3. RDD的转换(构建RDD之间的依赖关系,形成DAG图。此时没有发生真正的计算,只记录转换的轨迹;)
  4. 动作类型的操作会触发实际计算并将结果保留在内存中。

RDD分区

分区的概念
分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区,分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个任务中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定。

分区优势
RDD 是一种分布式的数据集,数据源多种多样,而且数据量也很大,在存储这些海量数据时,也是按照块来存的,当RDD读取这些数据进行操作时,实际上是对每个分区中的数据进行操作,每一个分区的数据可以并行操作,分区可以提高并行度。

RDD分区原则

【spark】RDD的分区

RDD特性

  1. 数据集的基本单位是一组分区或分区列表。每个分片将由一个计算任务处理,分片的数量决定了并行度。用户在创建RDD时可以指定RDD分片的数量。如果未指定,将使用默认值(默认值 2)。

  2. 一个函数将应用于每个分区。 Spark中RDD的计算是基于分区的,函数会应用到每个分区。

  3. 一个 RDD 依赖于多个其他 RDD。 RDD的每次转换都会生成一个新的RDD,因此RDD之间会形成管道式的依赖关系。当部分分区数据丢失时,Spark可以通过这种依赖关系重新计算丢失的分区数据,而不是重新计算RDD的所有分区。 (Spark的容错机制)

  4. K-V 类型的 RDD 将具有 Partitioner 功能。非键值RDD的Parititioner的值为None。 Partitioner函数决定了RDD本身的分区数量,也决定了父RDD Shuffle输出时的分区数量。

  5. 每个RDD维护一个列表,每个Partition的位置(首选位置)都存储在列表中。

RDD转换(transformation)和行动(action)

RDD操作分为两类:transformation和action

转换:通过操作将一个RDD转换为另一个RDD

操作:评估或输出 RDD

所有这些操作主要针对两种类型的 RDD:

1)数值RDD(value型)

2)键值对RDD   (K-V型)

RDD 上的所有转换操作都是延迟执行的,Spark 仅在发生操作操作时才会执行它们。

在这里插入图片描述

转换运算符是一个对 RDD 进行操作的接口函数。其作用是将一个或多个RDD转化为新的RDD。使用Spark进行数据计算。使用创建算子生成RDD后,数据处理算法设计和程序编写中最关键的部分就是使用变换算子对原始数据生成的RDD进行一步一步的变换,最终得到想要的计算结果。结果。

动作是触发调度的操作符,它返回结果或将数据写入外部存储器。

RDD API

创建RDD
  1. 从外部存储系统中的数据集创建
//本地加载数据创建RDD

val baseRdd = sc.textFile("file:///wordcount/input/words.txt")

//加载hdfs文件
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")

  1. 通过使用运算符转换现有 RDD 来生成新的 RDD

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  1. 创建现有的 Scala 集合
  • 方法1 : sc.parallelize(Array(1,2,3,4,5,6,7,8))

  • 方法2 : sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD方法底层调用了parallelize方法,调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数,而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区

在这里插入图片描述
在这里插入图片描述

  1. 通过消息队列(如kafka、rabbitMQ)创建RDD

主要用于流处理应用(我还没学过)。

RDD转换

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
图片来源2

RDD行动

在这里插入图片描述

其他参考资料:
Spark的RDD运算符-转换运算符

Spark和RDD的知识整理和总结

RDD操作详解(三)Action操作

Spark的RDD动作算子(Action)全集

RDD持久化

RDD数据持久化的作用是什么?

1、将多次使用的RDD缓存起来,缓存在内存中。当频繁使用时,会直接从内存中读取缓存的数据,无需重新计算。

2、将RDD结果写入硬盘(容错机制)。当RDD丢失数据,或者从属RDD丢失数据时,可以使用持久化到硬盘的数据来恢复。

除了将 RDD 的最终目的地作为集合标量返回之外,RDD 还可以存储在外部文件系统或数据库中。 Spark与Hadoop完全兼容,因此Spark也支持MapReduce支持的文件或数据库类型的读写。

写入HDFS

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

写缓存

RDD通过cache()或者persist()将前面的计算结果缓存,默认情况下会把数据缓存在JVM的堆内存中。cache() 不需要传参,persist()需要设置持久化级别。

持久性级别为(首先在此列出,稍后将详细讨论):

  • 仅内存
  • 内存和磁盘
  • MEMORY_ONLY_SER
  • 内存_和_磁盘_SER;
  • 仅磁盘
  • MEMORY_ONLY_2
  • MEMORY_AND_DISK_2

cache()底层调用persist()并将持久化级别设置为MEMORY_ONLY,这意味着cache()和persist(StorageLevel.MEMORY_ONLY)是相同的。

可以使用 unpersist() 方法手动从缓存中删除持久 RDD。

写入HDFS(Spark自带的方法)

将数据写入HDFS并使用HDFS永久存储。

操作流程:

  • 设置持久存储路径

  • 调用checkpoint()进行数据的保存SparkContext.setCheckpointDir("HDFS的目录")

  • 调用持久化方法RDD.checkpoint()

写入本地目录

wordCount1.saveAsTextFile("file:///home/master/hadoop/files/...")

在这里插入图片描述

在这里插入图片描述

总结:

  1. Persist和Cache将数据保存在内存中

  2. Checkpoint将数据保存在HDFS中

  3. 程序结束或者手动调用unpersist方法后,Persist和Cache都会被清除。

  4. 检查点永久存储不会被删除。

相关参考:Spark和RDD的知识整理和总结


    厦门大学林子宇教授——Spark入门:RDD的设计与运行原理 Spark RDD 操作:转换和操作
. . .

相关推荐

额外说明

Matlab产生正弦mif文件

离散化采集点 离散化采样点,将连续的正弦波信号进行离散化;应用matlab软件进行实现; 将离散化后的正弦波一个周期存储到Ram中。 3.1 将离散后的数据进行定点化,Ram的规格是256x8,数据规格1bit符号和7比特小数位。 3.2 创建一个ram

额外说明

在简历上写上“精通”后,我被有工作经验的面试官噎住了。

前言 如果有真才实学,写个精通可以让面试官眼前一亮! 如果是瞎写?基本就要被狠狠地虐一把里! 最近在面试,我现在十分后悔在简历上写了“精通”二字… 先给大家看看我简历上的技能列表: 熟悉软件测试理论基础,熟悉软件测试的流程、方法,具备测试用例需求分析和设

额外说明

【Three.js入门】处理动画、尺寸自适应、双击进入/退出全屏(Clock跟踪时间,Gsap动画库,自适应画面,进入/退出全屏)

个人简介 -个人主页: 前端杂货铺 -‍♂️学习方向: 主攻前端方向,也会涉及到服务端 -个人状态: 在校大学生一枚,已拿多个前端 offer(秋招) -未来打算: 为中国的工业软件事业效力n年 -推荐学习:-前端面试宝典 -Vue2 -Vue3 -Vu

额外说明

Java实训项目3:GUI学生信息管理系统 - 涉及知识点

文章目录 五、涉及知识点 (一)Java基本语法 1、数据类型 2、变量与常量 3、运算符与表达式 (二)Java流程控制 1、选择结构 2、循环结构 (三)Java面向对象编程 1、封装 2、继承 3、多态 4、接口 5、内部类 6、异常处理 (四)J

额外说明

mysql的auto_incremnet操作 解决Invalid default value for ‘xxx‘的问题 查询表结构

目录 auto_increment的作用 auto_increment引发的删除问题 复现问题 查询表结构 查询未删前的数据 删除数据 批量插入数据 再次查询数据 解决问题 删除uid字段 再新建uid字段 查询新建后的数据 Invalid defaul

额外说明

大数据学习笔记21:MR案例——分区全排序

文章目录 一、提出任务 二、准备工作 1、启动hadoop服务 2、上传数据文件到HDFS 3、创建Maven项目PartitionSort 4、修改pom.xml文件,添加依赖 5、创建log4j.properties文件 三、完成任务 1、创建Sor

额外说明

MarkerManager and Sidebar

markermanager  的应用.  <!   <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"     "http://www.w3.org/TR/xhtml1/DTD/xhtml1-

额外说明

Java实现List集合去重的5种方式

通过HashSet去重(不保证顺序) public static List<String> getUserList() { List<String> userList = new ArrayList<>();

额外说明

Greenplum GPKafka【部署 01】使用GPKafka实现Kafka数据导入Greenplum数据库完整流程分享(扩展安装文件网盘分享)

分享资源地址及文件列表: 链接:https://pan.baidu.com/s/1XVTxKLkOYrL4pCZpFfs-Tg 提取码:sq90 包含文件: # 命令执行 gpkafka # 扩展安装 gpss.control gpss--1.0.s

ads via 小工具