type
status
date
slug
summary
tags
category
icon
password
RDD创建
在Spark中创建RDD的创建方式可以分为四种
1. 从集合(内存)中创建RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
2. 从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等。
3. 从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。
4. 直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
RDD并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里 的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
- 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下:
- 读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数 据读取的规则有些差异,具体 Spark 核心源码如下
RDD转换算子/RDD行动算子
转换算子:
名称 | 类型 | 函数签名 | 函数说明 | 示例 |
map | Value类型 | def map[U: ClassTag](f: T => U): RDD[U] | 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。 | val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
) |
mapPartitions | Value类型 | def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。 | val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_==2)
}
) |
mapPartitionsWithIndex | Value类型 | def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。 | val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
) |
flatMap | Value类型 | def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射 | val dataRDD = sparkContext.makeRDD(List(
List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
list => list
) |
glom | Value类型 | def glom(): RDD[Array[T]] | 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 | val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom() |
groupBy | Value类型 | def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] | 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中一个组的数据在一个分区中,但是并不是说一个分区中只有一个组 | val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
_%2
) |
filter | Value类型 | def filter(f: T => Boolean): RDD[T] | 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃.
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。 | val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0) |
sample | Value类型 | def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] | 根据指定的规则从数据集中抽取数据 | val dataRDD = sparkContext.makeRDD(List( 1,2,3,4 ),1)// 抽取数据不放回(伯努利算法)// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不 要// 第一个参数:抽取的数据是否放回,false:不放回// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;// 第三个参数:随机数种子 val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取数据放回(泊松算法)// 第一个参数:抽取的数据是否放回,true:放回;false:不放回// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数// 第三个参数:随机数种子val dataRDD2 = dataRDD.sample(true, 2) |
distinct | Value类型 | def distinct()(implicit ord: Ordering[T] = null): RDD[T]def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] | 将数据集中重复的数据去重 | val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2) |
coalesce | Value类型 | def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] | 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本 | val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),2)val dataRDD1 = dataRDD.repartition(4) |
sortBy | Value类型 | def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] | 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程 | val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),2)val dataRDD1 = dataRDD.sortBy(num=>num, false, 4) |
intersection | 双Value类型 | def intersection(other: RDD[T]): RDD[T] | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD | val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2) |
union | 双Value类型 | def union(other: RDD[T]): RDD[T] | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD | val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))val dataRDD = dataRDD1.union(dataRDD2) |
subtract | 双Value类型 | def subtract(other: RDD[T]): RDD[T] | 以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集 | val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))val dataRDD = dataRDD1.subtract(dataRDD2) |
zip | 双Value类型 | def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] | 将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。 | val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))val dataRDD = dataRDD1.zip(dataRDD2) |
partitionBy | Key-Value类型 | def partitionBy(partitioner: Partitioner): RDD[(K, V)] | 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner | val rdd: RDD[(Int, String)] =sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)import org.apache.spark.HashPartitionerval rdd2: RDD[(Int, String)] =rdd.partitionBy(new HashPartitioner(2)) |
reduceByKey | Key-Value类型 | def reduceByKey(func: (V, V) => V): RDD[(K, V)]def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] | 可以将数据按照相同的 Key 对 Value 进行聚合 | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val dataRDD2 = dataRDD1.reduceByKey(_+_)val dataRDD3 = dataRDD1.reduceByKey(_+_, 2) |
groupByKey | Key-Value类型 | def groupByKey(): RDD[(K, Iterable[V])]def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | 将数据源的数据根据 key 对 value 进行分组 | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val dataRDD2 = dataRDD1.groupByKey()val dataRDD3 = dataRDD1.groupByKey(2)val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2)) |
aggregateByKey | Key-Value类型 | def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] | 将数据根据不同的规则进行分区内计算和分区间计算 | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_) |
foldByKey | Key-Value类型 | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] | 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val dataRDD2 = dataRDD1.foldByKey(0)(_+_) |
combineByKey | Key-Value类型 | def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] | 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。 | val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))val input: RDD[(String, Int)] = sc.makeRDD(list, 2)val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey( (_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) |
sortByKey | Key-Value类型 | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false) |
join | Key-Value类型 | def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD | val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))rdd.join(rdd1).collect().foreach(println) |
leftOuterJoin | Key-Value类型 | def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] | 类似于 SQL 语句的左外连接 | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2) |
cogroup | Key-Value类型 | def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable | val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =dataRDD1.cogroup(dataRDD2) |
行动算子:
名称 | 函数签名 | 函数说明 | 示例 |
reduce | def reduce(f: (T, T) => T): T | 聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 聚合数据val reduceResult: Int = rdd.reduce(_+_) |
collect | def collect(): Array[T] | 在驱动程序中,以数组 Array 的形式返回数据集的所有元素 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 收集数据到 Driverrdd.collect().foreach(println) |
count | def count(): Long | 返回 RDD 中元素的个数 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 返回 RDD 中元素的个数val countResult: Long = rdd.count() |
first | def first(): T | 返回 RDD 中的第一个元素 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 返回 RDD 中元素的个数val firstResult: Int = rdd.first()println(firstResult) |
take | def take(num: Int): Array[T] | 返回一个由 RDD 的前 n 个元素组成的数组 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 返回 RDD 中元素的个数val takeResult: Array[Int] = rdd.take(2)println(takeResult.mkString(",")) |
takeOrdered | def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] | 返回该 RDD 排序后的前 n 个元素组成的数组 | val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))// 返回 RDD 中元素的个数val result: Array[Int] = rdd.takeOrdered(2) |
aggregate | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 | val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)// 将该 RDD 所有元素相加得到结果//val result: Int = rdd.aggregate(0)(_ + _, _ + _)val result: Int = rdd.aggregate(10)(_ + _, _ + _) |
fold | def fold(zeroValue: T)(op: (T, T) => T): T | 折叠操作,aggregate 的简化版操作 | val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))val foldResult: Int = rdd.fold(0)(_+_) |
countByKey | def countByKey(): Map[K, Long] | 统计每种 key 的个数 | val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,"b"), (3, "c"), (3, "c")))// 统计每种 key 的个数val result: collection.Map[Int, Long] = rdd.countByKey() |
save 相关算子 | def saveAsTextFile(path: String): Unitdef saveAsObjectFile(path: String): Unitdef saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit | 将数据保存到不同格式的文件中 | // 保存成 Text 文件rdd.saveAsTextFile("output")// 序列化成对象保存到文件rdd.saveAsObjectFile("output1")// 保存成 Sequencefile 文件rdd.map((_,1)).saveAsSequenceFile("output2") |
foreach | def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))} | 分布式遍历 RDD 中的每一个元素,调用指定函数 | val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 收集后打印rdd.map(num=>num).collect().foreach(println)println("****************")// 分布式打印rdd.foreach(println) |
- 作者:DewarTsang
- 链接:https://funtalk.top/article/b49c1017-36ad-429d-83bf-7429cac8fb31
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。