sparkRDD编程 最后更新时间:2022年04月18日 ## spark RDD编程 ![image-20220417094827722.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/1766673455.png) spark-shell2.x版本中,默认将SparkContext类实例化为了sc,将SparkSession实例化为了spark。使用时直接调用sc和spark即可。 ### 创建RDD #### 1. 从内存中创建 在spark中,可以使用parallelize方法和makeRDD两种方法从内存中创建RDD。 * parallelize方法是将seq(即序列)转化为RDD * makeRDD方法是从已有的RDD创建为新的RDD ##### (1) parallelize parallelize(seq, num) 该方法可以将seq集合转化为RDD。 参数解释: - seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数) - num表示分区数。不设分区,则默认为该application分配到的资源的CPU数。 格式: ```scala val 常量名 = sc.parallelize(seq, num); ``` 示例: ```scala var datas = Array(1,2,3,4,5); val lx = sc.parallelize(datas); println(lx.partitions.size) // --> 8 var datas = Array(1,2,3,4,5); val lx = sc.parallelize(datas,3); println(lx.partitions.size) // --> 3 ``` 提示:使用```rdd.partitions.size```方法可以输出rdd所用分区数。 注意:这里第一次输出8,是因为分配了cpu的8个核心资源。 ##### (2)makeRDD makeRDD(seq) 该方法是从已有的RDD创建为新的RDD。 该方法创建的RDD的分区数取决于集合元素。 参数解释: - seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数) 格式: ```scala val 常量名 = sc.makeRDD(seq) ``` 示例: ```scala var datas = Array(1,2,3,4,5); val lx = sc.makeRDD(datas); ``` #### 从外存中创建 在spark中,使用```textFile```方法可以从外存中读取数据集。该方法支持多种类型的数据集,如目录、文本文件、压缩文件、和通配符匹配的文件等,并且允许设定分区个数。 textFile(path, num) 参数解释: - path就是外存文件的获取路径。默认使用hdfs协议,访问本地时需要使用file协议。 - num即分区数。 ##### (1)从HDFS文件创建RDD 这种方法最常用。textFile默认读取HDFS中的文件。 格式: ```scala val 常量名 = sc.textFile("文件路径") ``` ```scala val 常量名 = sc.textFile("hdfs://文件路径") ``` 示例: ```scala val tests = sc.textFile("/lxyd/test.txt") ``` ```scala val tests = sc.textFile("hdfs://主节点:端口号/lxyd/test.txt") ``` ![image-20220417113955153.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/1791267655.png) ![image-20220417113035551.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/1232459857.png) ##### (2)从本地创建RDD 同样通过textFile方法读取文件。 格式: ```scala val 常量名 = sc.textFile("file://文件路径") ``` 示例: ```scala val tests = sc.textFile("file:///lxyd/test.txt") ``` ![image-20220417114033536.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/546574770.png) ### 转换 spark提供了大量的转换RDD用的方法。 #### map map(fun) 该方法通过参数中的函数将RDD中的各个元素处理后,返回一个新的RDD。 参数解释: - fun是一个函数表达式。map会将调用map方法的RDD中的元素一一作为参数传递给fun,fun返回的值又会被map组合为一个新的RDD返回。 格式: ```scala val 常量名 = rdd.map(fun) ``` 示例: ```scala val ageArr = Array(12,13,13,15) val age = sc.parallelize(ageArr) val newAge = age.map(x=>x+1) newAge.collect // --> (12,14,14,16) ``` ![image-20220417155749820.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/2450734715.png) #### flatMap flatMap(fun) 该方法与map方法类似。将元素处理,然后再转换为同级别,最后组成一个新的RDD返回。 参数解释: - fun是一个函数表达式。flatMap会将调用采用类似map的方法将RDD中的元素一一作为参数传递给fun,fun返回的值又会被flatMap转换为同级,组合为一个新的RDD返回。 格式: ```scala val 常量名 = rdd.flatMap(fun) ``` 示例:(采用map于其做比较) ```scala var wordsArr = Array("holle world", "I is test", "我只是 一个 测试") val words = sc.parallelize(wordsArr) val newWords = words.map(x=>x.split(" ")) newWords.collect // -->Array(Array(holle, world), Array(I, is, test), Array(我只是, 一个, 测试)) ``` ```scala var wordsArr = Array("holle world", "I is test", "我只是 一个 测试") val words = sc.parallelize(wordsArr) val newWords = words.flatMap(x=>x.split(" ")) newWords.collect // -->Array(holle, world, I, is, test, 我只是, 一个, 测试) ``` ![image-20220417161258775.png](https://www.yidaimingjvn.xyz/usr/uploads/2022/04/1734862339.png) #### filter filter(fun) 该方法用于过滤RDD中不需要的元素。 参数解释: - fun是一个函数表达式。filter会将调用该方法的RDD的元素一一作为参数传入fun,符合fun函数条件的元素会被处理后返回给filter,filter会将最后留下的元素组合为一个新的RDD并返回。 格式: ```scala val 常量名 = rdd.filter(fun) ``` 示例: ```scala var ageArr = Array(12,13,14,14,51,15) val age = sc.parallelize(ageArr) val newAge = age.filter(x=>x<20) newAge.collect // -->Array(12, 13, 14, 14, 15) ``` #### distinct distinct() 该方法可以去除RDD中的重复值,并返回一个新的RDD 格式: ```scala val 常量名 = rdd.distinct() ``` 示例: ```scala var actorArr = Array("kirito", "kirito", "asina", "asina") val actor = sc.parallelize(actorArr) val newActor = actor.distinct() newActor.collect // -->Array(kirito, asina) ``` #### union union(RDD) 该方法用于合并多个RDD,并返回一个新的RDD。 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 格式: ```scala val 常量名 = rdd.union(rdd) ``` 示例: ```scala var ageoArr = Array(12,13,14) var agetArr = Array(15,16,17) val ageo = sc.parallelize(ageoArr) val aget = sc.parallelize(agetArr) val newAge = ageo.union(aget) newAge.collect // --> Array(12, 13, 14, 15, 16, 17) ``` #### intersection intersection(RDD) 该方法用于获取两个RDD之间相同的元素,并返回一个新的RDD 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 格式: ```scala val 常量名 = rdd.intersection(rdd) ``` 示例: ```scala var ageoArr = Array(12,14,15,13,12,12) var agetArr = Array(12,12,15) val ageo = sc.parallelize(ageoArr) val aget = sc.parallelize(agetArr) val newAge = ageo.intersection(aget) newAge.collect // --> Array(12,15) ``` #### subtract subtract(RDD) 该方法的调用,调用该方法的RDD会被删去已在参数中RDD中出现过的元素,并将剩下的元素组成新的RDD返回 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 格式: ```scala val 常量名 = rdd.subtract(rdd) ``` 示例: ```scala var ageoArr = Array(12,13,14) var agetArr = Array(12,15,16) val ageo = sc.parallelize(ageoArr) val aget = sc.parallelize(agetArr) val newAgeo = ageo.subtract(aget) newAgeo.collect // --> Array(13,14) val newAget = aget.subtract(ageo) newAget.collect // --> Array(15,16) ``` #### cartesian cartesian(RDD) 该方法会将两个RDD中元素组合为笛卡尔积,并组成新的RDD返回。 假设RDDA有4个元素,RDDB有4个元素,RDDA中的每一个元素都会与RDDB中的每个元素组成一组,最后会返回16个元素。 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 格式: ```scala val 常量名 = rdd.cartesian(rdd) ``` 示例: ```scala var ageoArr = Array(1,2,3,2) var agetArr = Array(1,5,7,9) val ageo = sc.parallelize(ageoArr) val aget = sc.parallelize(agetArr) val newAge = ageo.cartesian(aget) newAge.collect // -->Array((1,1), (1,5), (1,7), (1,9), (2,1), (2,5), (2,7), (2,9), (3,1), (3,5), (3,7), (3,9), (2,1), (2,5), (2,7), (2,9)) ``` reduce reduce(fun) 该方法会将RDD中的元素一一取出,进行处理,并直接返回结果。 参数解释: - fun表示函数表达式。默认有两个参数,用于处理并返回。 格式: ```scala val 常量名 = rdd.reduce(fun) ``` 示例: ```scala var ageArr = Array(1,2,3,4,5,6,7) val age = sc.parallelize(ageArr) val newAge = age.reduce((a,b)=>a*b) println(newAge) // --> 5040 ``` #### reduceByKey reduceByKey(fun) 当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值传入fun函数中,fun处理完毕后,将新的元素组合为RDD并返回。 参数解释: - fun表示函数表达式。默认有两个参数,用于处理并返回。 格式: ```scala val 常量名 = rdd.reduceByKey(fun) ``` 示例: ```scala var actorArr = Array(("kirito",11),("kirito",11),("asina",12),("asina",14)) val actor = sc.parallelize(actorArr) val newActor = actor.reduceByKey((a,b)=>a*b) newActor.collect // --> Array((kirito,121), (asina,168)) ``` #### groupByKey groupByKey() 当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值划分为一组(以Iterable的方式存放),并组合为新的RDD返回。 格式: ```scala val 常量名 = rdd.groupByKey() ``` 示例: ```scala var actorArr = Array(("kirito",1),("kirito",12),("asina",43),("asina",32)) val actor = sc.parallelize(actorArr) val newActor = actor.groupByKey() newActor.collect // --> Array((kirito,CompactBuffer(1, 12)), (asina,CompactBuffer(43, 32))) ``` #### join join(RDD) 当转化为RDD的数据集为键值对形式的时候,该方法会提取两个RDD键相同的值组合在一起,组合成一个新的RDD并返回。 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 格式: ```scala val 常量名 = rdd.join(rdd) ``` 示例: ```scala var lxoArr = Array(("a",1),("b",2),("c",3)) var lxtArr = Array(("a",2),("d",2),("e",3)) val lxo = sc.parallelize(lxoArr) val lxt = sc.parallelize(lxtArr) val newLx= lxo.join(lxt) newLx.collect // --> Array(("a",(1,2))) ``` #### leftOuterJoin RDDA leftOuterJoin RDDB 对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD1的其他键值对一起组合成一个新的RDD并返回。 格式: ```scala val 常量名 = rdd1 leftOuterJoin rdd2 ``` 示例: ```scala var lxoArr = Array(("a",1),("b",2),("c",3)) var lxtArr = Array(("a",2),("d",2),("e",3)) val lxo = sc.parallelize(lxoArr) val lxt = sc.parallelize(lxtArr) val newLxo = lxo leftOuterJoin lxt newLxo.collect // --> Array((a,(1,Some(2))), (b,(2,None)), (c,(3,None))) val newLxt = lxt leftOuterJoin lxo newLxt.collect // --> Array((a,(2,Some(1))), (d,(2,None)), (e,(3,None))) ``` #### rightOuterJoin RDDA rightOuterJoin RDDB 对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD2的其他键值对一起组合成一个新的RDD并返回。 格式: ```scala val 常量名 = rdd rightOuterJoin rdd ``` 示例: ```scala var lxoArr = Array(("a",1),("b",2),("c",3)) var lxtArr = Array(("a",2),("d",2),("e",3)) val lxo = sc.parallelize(lxoArr) val lxt = sc.parallelize(lxtArr) val newLxo = lxo rightOuterJoin lxt newLxo.collect // -->Array((a,(Some(1),2)), (d,(None,2)), (e,(None,3))) val newLxt = lxt rightOuterJoin lxo newLxt.collect // -->Array((a,(Some(2),1)), (b,(None,2)), (c,(None,3))) ``` #### zip zip(RDD) 该方法可以将两个长度一致的RDD压缩到一起,以键值对的形式组合为一个新的RDD并返回。 参数解释: - RDD即直接填入一个RDD即可。(一次只能填入一个) 注意: - 该方法的两个rdd长度必须一致 格式: ```scala val 常量名 = rdd.zip(rdd) ``` 示例: ```scala val nameArr = Array("a","b","c","d","e") val ageArr = Array(1,2,3,4,5) val name = sc.parallelize(nameArr) val age = sc.parallelize(ageArr) val actor = name.zip(age) actor.collect // --> Array((a,1), (b,2), (c,3), (d,4), (e,5)) ``` #### combineByKey combineByKey(createCombiner,mergeValue,mergeCombiners) 该方法用于将相同键的数据聚合,并允许返回类型与输入数据不同的返回值。 参数解释: - createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作) - mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)(如果分区太多,每个分区只有一个参数就不会执行该操作) - mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行) 注意: 这玩意儿一定要注意分区,如果分区数高于元素数量,则会直接执行mergeCombiners。 格式: ```scala val 常量名 = actor.combineByKey( V=>C, (获取C的变量名:Int,获取V的变量名:Int)=>C, (获取新C值的变量名:Int,获取C值的变量名:Int)=>C ) ``` 示例: ```scala val nameArr = Array("a","a","a","a","c","c") val ageArr = Array(1,2,3,4,5,6) val name = sc.parallelize(nameArr) val age = sc.parallelize(ageArr) val actor = name.zip(age) val newActor = actor.combineByKey( x=>{println(s"x是$x,x+1=${x+1}");x+1}, (x:Int,acc:Int)=>{println(s"$acc + $x");acc+x}, (x:Int,acc:Int)=>{println(s"$acc - $x");acc-x} ) newActor.collect // --> x是1,x+1=2 // --> x是3,x+1=4 // --> x是6,x+1=7 // --> x是2,x+1=3 // --> x是4,x+1=5 // --> x是5,x+1=6 // --> 3 - 2 // --> 7 - 6 // --> 4 - 1 // --> 5 - 3 // --> Array[(String, Int)] = Array((a,2), (c,1)) ``` ```scala val nameArr = Array("a","a","a","a","c","c") val ageArr = Array(1,2,3,4,5,6) val name = sc.parallelize(nameArr,3) val age = sc.parallelize(ageArr,3) val actor = name.zip(age) val newActor = actor.combineByKey( x=>{println(s"x是$x,x+1=${x+1}");x+1}, (x:Int,acc:Int)=>{println(s"$acc + $x");acc+x}, (x:Int,acc:Int)=>{println(s"$acc - $x");acc-x} ) newActor.collect // --> x是5,x+1=6 // --> x是3,x+1=4 // --> x是1,x+1=2 // --> 6 + 6 // --> 4 + 4 // --> 2 + 2 // --> 8 - 4 // --> Array[(String, Int)] = Array((c,12), (a,4)) ``` 代码解释: 这里被分为三个分区。第一个分区是(a,1)(a,2)第二个分区是(a,3),(a,4)。第一个分区执行的时候先传入(a,1),然后执行```x=>{println(s"x是$x,x+1=${x+1}");x+1}``` 这个加1的操作,变成了2,这里只会执行一次。然后会执行第二条 ```(x:Int,acc:Int)=>{println(s"$acc + $x");acc+x}```,这里会把上一条的2和(a,2)里的2一起作为参数传进来,变成4。然后这个分区没有别的元素了,所以就直接返回出来。然后是第二个分区也执行一遍相同的操作。所以一号分区的值是4,二号分区的值是8。接着就会执行第三条也就是分区之间的操作 ```(x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}```,8-4,得到最终结果4。 ### 查询 #### collect collect() 该方法是一个动作操作,会把RDD所有元素转换为数组并返回到Driver端,适用于小数据处理。 格式: ```scala var 常量名 = rdd.collect ``` 示例: ```scala var ageArr = Array(1,2,3,4,5) val age = sc.parallelize(ageArr) var a = age.collect println(a(2)) // --> 3 ``` #### take take(num) 该方法可以获取RDD的前num个元素,并返回数组 参数解释: - num表示需要返回的元素个数。(超出范围只按最大范围算) 格式: ```scala var 常量名 = rdd.take(num) ``` 示例: ```scala var ageArr = Array(1,2,3,4,5) val age = sc.parallelize(ageArr) val a = age.take(3) // --> Array(1,2,3) for(b:Int<-a){ println(b) } // --> 1 2 3 ``` ```scala var ageArr = Array(1,2,3,4,5) val age = sc.parallelize(ageArr) val a = age.take(6) // --> Array(1,2,3) for(b:Int<-a){ println(b) } // --> 1 2 3 4 5 ``` #### lookup lookup(args) 该方法用于获取键值对类型RDD中键匹配的所有值。 参数解释: - args表示键值对中的键。 格式: ```scala var 变量名 = rdd.lookup(args) ``` 示例: ```scala var actorArr = Array(("a",1),("b",2),("a",3),("c",4)) val actor = sc.parallelize(actorArr) val a = actor.lookup("a") // -->WrappedArray(1, 3) println(a) for(b:Int<-a){ println(b) } // --> 1 3 ``` ### 排序 #### sortBy sortBy(f(T)=>K,ascending,numPartitions) 该方法可以对标准RDD进行排序。 参数解释: - f(T)=>k,f(T)表示要被排序对象的每一个元素,右边则是返回元素中要进行排序的值。 - ascending,表示排序的顺序,参数为true或是flase,默认是true即升序。 - numPartitions,表示排序后的RDD的分区个数,默认排序分区后的分区个数和排序之前的个数相同,即为this.partitions.size 格式: ```scala val 常量名 = rdd.sortBy(f(T)=>k,ascending,numPartitions) ``` 示例: ```scala var ageArr = Array(3,45,13,88,13) val age = sc.parallelize(ageArr) val newAge = age.sortBy(x=>x,true,3) newAge.collect // --> Array(3, 13, 13, 45, 88) newAge.partitions.size // --> 3 ``` ```scala var ageArr = Array((3,1),(43,4),(3,4),(23,5),(23,9)) val age = sc.parallelize(ageArr,3) val newAge = age.sortBy(x=>x._1,true) newAge.collect // --> Array((3,1), (3,4), (23,5), (23,9), (43,4)) newAge.partitions.size // --> 3 ``` ### 写入磁盘 #### json
Comments | NOTHING