Transformations操作

 

map (f:T=>U)

 

将原来RDD中的每个数据项通过map算子中的用户自定义函数f映射, 转变为一个新的元素,然后返回一个新的RDD,这个返回的数据集是分布式的RDD数据集.

 

flatMap(f:T=>U)

 

将原来RDD中的每个数据项通过map算子中的用户自定义函数f映射, 转变为一个集合, 并将生成的每个集合合并为一个集合, 然后返回一个新的RDD, 这个返回的数据集是分布式的RDD数据集.

 

注意: 和map差不多,但是flatMap会将一个元素扁平化成一个数组, 生成的是多个结果.

 

filter(f:T=>Bool)

 

对调用filter算子的RDD数据集中的每个元素, 分别作用于此函数f映射,

如果此函数f的返回值为true,则保留此元素, 如果为false, 则过滤掉此元素.

最终返回一个新的RDD, 此RDD必然是原RDD的子集.

 

 

reduceByKey(f:(V,V)=>V, [numTasks])

 

就是将元素为KV对的RDD中相同Key的元素的Value进行函数f的映射操作,因此,相同Key的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

 

sortByKey([ascending],[numTasks])

按照key来进行排序, ascending是boolean类型, ascending为true是升序, ascending为false是降序

 

union(otherDataset):

union顾名思义就是联合的意思,它将2个RDD中的每个元素都联合起来,生成1个新的RDD。

使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重,可以使用distinct()。

++符号相当于uion函数操作。

 

 

distinct([numTasks]):

 

distinct将RDD中的元素进行去重操作。

distinct的操作其实是把原RDD进行map操作, 根据原来的KEY-VALUE生成为key,

value使用null来替换, 并对新生成的RDD执行reduceByKey操作, 这个reduceByKey的操作中, 传入的x, y都是null, 执行完reducebykey的操作后, 这个时候新的RDD就只包含一个结果值(其实就是一个null), 最后执行下map操作, 这个操作返回的是RDD的第一个值, 第一个值就是原始rdd的key-value.

执行reduceByKey操作默认的分区器是Hash分区器.

这个功能在执行时也需要做shuffle的操作.也就是说, distinct的操作是根据key与value一起计算不重复的结果.

只有两个记录中key与value都不重复才算是不重复的数据.

如果将数据封装成对象,这个时候就要使用reduceByKey()了,因为distinct只能过滤数字和字符串的值.

distinct的源码也是这样实现的。

 

 

 

groupBy

 

将元素通过函数f生成相应的Key,数据就转化为Key-Value格式,之后将相同Key的元素分为一组。

 

 

groupByKey(numTasks):

 

返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist.

和groupby类似,但是需要有key,也就是说输入需要是key-value形式

如果是seq类型的可以通过keyby方法给加key

 

 

join(otherDataset, [numTasks]):

当两个KV的dataset(K,V)和(K,W)进行关联操作时, 返回的是(K,(V,W))的dataset, numTasks为并发的任务数

 

 

cogroup

 

cartesian(otherDataset):笛卡尔积, 就是m*n

 

subtract

相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素,需要shuffle.

 

 

mapPartitions(func):

 

和map很像,但是map是每个element 元素作用于函数 f, 而mapPartitions是每个partition分区作用于函数 f .

函数f获取到每个分区的迭代器,通过迭代器对每个分区的元素进行操作。

 

 

 

缓存操作

 

 

cache

将RDD数据原样存入内存cache将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)函数的功能。

 

persist,

对RDD数据进行持久化操作

持久化RDD的同时切断Lineage ,修改了RDD的meta info中的storagelevel函数对RDD进行缓存操作。数据缓存在哪里由StorageLevel枚举类型确定。

有几种类型的组合

DISK代表磁盘

MEMORY代表内存

SER代表数据是否进行序列化存储

StorageLevel是枚举类型,代表存储模式

MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以序列化的方式存储。

 

 

Checkpoint:

持久化RDD的同时切断Lineage ,修改了RDD的meta info中的lineage,返回经过修改的RDD对象自身而非新的RDD对象,属lazy操作.

 

 

 

Actions操作

 

只有Actions算子才能触发 spark 任务的真正的执行.

 

 

foreach(func):

对RDD dataset中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

 

 

reduce(f:(X,Y)=>T)

 

 

将RDD中元素前两个传给输入函数f,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数f,直到最后只有一个值为止。

说白了就是聚集,但是传入的函数是两个参数输入返回一个值.

这个函数必须是满足交换律和结合律的.

reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。

 

 

 

 

collect():

将分布式的RDD返回为一个单机的scala Array数组, 一般在filter之后数据集足够小的时候使用。相当于toArray(),toArray()已经过时不推荐使用。

 

collectAsMap

对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。

 

count():

返回整个RDD dataset中element元素的个数

 

first():

返回RDD dataset中的第一个元素

first相当于top( 1)

 

take(n):

返回RDD dataset的前n个elements单机的scala Array数组

 

saveAsTextFile(path):

 

函数将dataset数据输出到本地文件系统或是存储到HDFS的指定目录中。

将RDD中的每个元素映射为(Null,x.toString),然后再将其写入HDFS。

 

 

saveAsSequenceFile(path):

只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统.

 

saveAsObjectFile

将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为

(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

 

 

countByKey():返回的是key对应个数的一个map,作用于一个RDD

 

 

top

可返回最大的k个元素。

 

 

takeOrdered返回最小的k个元素