RDD merupakan struktur data fundamental di Apache Spark. RDD cenderung lebih sesuai jika ingin menggunakan akses aras rendah terhadap data. Akses lebih mudah bisa dilakukan dengan menggunakan DataFrame. Meskipun demikian, seringkali kita tetap memerlukan RDD sehingga harus memahami berbagai proses transformasi serta action yang bisa dilakukan. Manual dokumentasi Apache Spark menyediakan penjelasan tetapi seringkali dirasakan masih kurang, salah satu di antara kekurangjelasan tersebut adalah zero value yang ada pada action aggregate. Perhatikan source code berikut ini:
from pyspark import SparkContext
sc = SparkContext()
listRDD = sc.parallelize([1,2,3,4,5,3,2])
print(listRDD)
for element in listRDD.collect():
print(element)
print("Jumlah partisi: ", listRDD.getNumPartitions())
# apa yang dilakukan thdp urutan2 data
seqOp = (lambda x, y: x + y)
# apa yang dilakukan setelah kombinasi
combOp = (lambda x, y: x + y)
# try this: change zero value to 1 or 2, what's the conclusion
# if we use masterr[4]?
agg0=listRDD.aggregate(0, seqOp, combOp)
agg1=listRDD.aggregate(1, seqOp, combOp)
agg2=listRDD.aggregate(2, seqOp, combOp)
print(agg0) # output 20
print(agg1) # output 25
print(agg2) # output 30
Manual untuk aggregate adalah sebagai berikut:
RDD.aggregate(zeroValue: U, seqOp: Callable[[U, T], U], combOp: Callable[[U, U], U]) → U
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”
The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U
Pertanyaannya adalah: mengapa hasil source code di atas adalah seperti berikut ini? darimana asal 20,25,30? apa keterkaitan dengan zeroValue?
$ spark-submit --master local[4] 03_actions.py
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:289
1
2
3
4
5
3
2
Jumlah partisi: 4
20
25
30
$
Jawabannya sederhana saja sebetulnya, mari kita lihat jika menggunakan angka 1 sebagai zeroValue:
- Bagi menjadi beberapa partisi, tempatkan data ke beberapa partisi. Partisi sesuai dengan parameter
--master local[4]
yang dituliskan di shell. - Untuk setiap partisi, tambahkan 1 (zeroValue). Ini merupakan hasil definisi parameter
seqOp
padaagg0=listRDD.aggregate(0, seqOp, combOp)
. Jadi, proses ini kurang lebih sebagai berikut:
partisi1
1
2
+1
partisi2
3
4
+1
partisi3
5
3
+1
partisi4
2
0 (karena tidak ada)
+1
Hasil total = 24
Setelah selesai dengan seqOp
, data dikombinasikan kemudian ditambah dengan apa yang didefinisikan di combOp
:
Hasil total = 24
+1
Hasil akhir = 25
Sila dicoba untuk zeroValue 2.