更新時間:2021-04-28 來源:黑馬程序員 瀏覽量:
行動算子主要是將在數據集上運行計算后的數值返回到驅動程序,從而觸發(fā)真正的計算。下面,列舉一些常用的行動算子API,如表1所示。
表1 常用的行動算子API
行動算子 | 相關說明 |
---|---|
count() | 返回數據集中的元素個數 |
first() | 返回數組的第一個元素 |
take(n) | 以數組的形式返回數組集中的前n個元素 |
reduce(func) | 通過函數func(輸入兩個參數并返回一個值)聚合數據集中的元素 |
collect() | 以數組的形式返回數據集中的所有元素 |
foreach(func) | 將數據集中的每個元素傳遞到函數func中運行 |
下面,結合具體的示例對這些行動算子API進行詳細講解。
count()主要用于返回數據集中的元素個數。假設,現有一個arrRdd,如果要統計arrRdd元素的個數,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.count() res0: Long = 5
上述代碼中,第1行代碼創(chuàng)建了一個RDD對象,當arrRdd調用count()操作后,返回的結果是5,說明成功獲取到了RDD數據集的元素。值得一提的是,可以將第一行代碼分解成下面兩行代碼,具體如下:
val arr = Array(1,2,3,4,5) val arrRdd = sc.parallelize(arr)
上述代碼中,第1行代碼創(chuàng)建了一個RDD對象,當arrRdd調用count()操作后,返回的結果是5,說明成功獲取到了RDD數據集的元素。值得一提的是,可以將第一行代碼分解成下面兩行代碼,具體如下:
val arr = Array(1,2,3,4,5) val arrRdd = sc.parallelize(arr)
first()
first()主要用于返回數組的第一個元素?,F有一個arrRdd,如果要獲取arrRdd中第一個元素,示例代碼如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.first() res1: Int = 1
從上述結果可以看出,當執(zhí)行arrRdd.first()操作后返回的結果是1,說明成功獲取到了第1個元素。
take(n)
take()主要用于以數組的形式返回數組集中的前n個元素?,F有一個arrRdd,如果要獲取arrRdd中的前三個元素,示例代碼如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.take(3) res2: Array[Int]=Array(1,2,3)
從上述代碼可以看出,執(zhí)行arrRdd.take(3)操作后返回的結果是Array(1,2,3),說明成功獲取到了RDD數據集的前3個元素。
reduce(func)
reduce()主要用于通過函數func(輸入兩個參數并返回一個值)聚合數據集中的元素。現有一個arrRdd,如果要對arrRdd中的元素進行聚合,示例代碼如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.reduce((a,b)=>a+b) res3: Int = 15
在上述代碼中,執(zhí)行arrRdd.reduce((a,b)=>a+b)操作后返回的結果是15,說明成功的將RDD數據集中的所有元素進行求和,結果為15。
collect()
collect()主要用于以數組的形式返回數據集中的所有元素。現有一個rdd,如果希望rdd中的元素以數組的形式輸出,示例代碼如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.collect() res4: Array[Int] = Array(1,2,3,4,5)
在上述代碼中,執(zhí)行arrRdd.collect()操作后返回的結果是Array(1,2,3,4,5),說明成功的將RDD數據集中的元素以數組的形式輸出。
foreach(func)
foreach()主要用于將數據集中的每個元素傳遞到函數func中運行?,F有一個arrRdd,如果希望遍歷輸出arrRdd中的元素,示例代碼如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelcollectionRDD[0] at parallelize at <console>:24 scala> arrRdd.foreach(x => println(x)) 1 2 3 4 5
在上述代碼中,foreach(x => println(x))的含義是依次遍歷arrRdd中的每一個元素,把當前遍歷的元素賦值給變量x,并且通過println(x)打印出x的值。執(zhí)行arrRdd.foreach()操作后,arrRdd中的元素被依次輸出了(即RDD數據集中所有的元素被遍歷輸出)。這里的arrRdd.foreach(x => println(x))可以簡寫為arrRdd.foreach(println)。
猜你喜歡: