Partitioning
Partitioning is the way to split data into different machines.
Properties of partitions:
-
Partitions never span multiple machines.
For example, tuples in the same partition must be in the same machine.
-
Each machine contains one or more partitions.
-
The number of partitions is configurable.
There are two partitioning in Spark:
- Hash partitioning
- Range partitioning
Hash partitioning
Hash partitioning tries to spread data evenly across the partitions based on key.
For the tuple (k, v)
, the partition will be decided by:
val p = k.hashCode() % numPartitions
Range partitioning
Using a range partitioner, keys are partitioned according to:
- an ordering for keys
- a set of sorted ranges of keys
For example, we have the keys, List(1, 201, 420, 460, 600)
. We can spread them by a set of ranges: Range(1, 200), Range(200, 400), Range(400, 800)
Apply custom partitioning in Spark
val pairs = purchaseRdd.map(p => (p.id, p.price))
partitionBy
val rangePartitioner = new RangePartitioner(8, pairs) // Spark will figure out the best partitions for us.
val partitioned = pairs.partitionBy(rangePartitioner).persist()
persist()
here is important. We don't want to partition the data again and again. It's better to keep it in the memory.
Transformation
-
Some transformation will partition the data automatically.
For example,
sortByKey
used range partitioning by default, andgroupbyKey
use hash partitioning.Other transformations are
join
(and 2 other outer join),reduceByKey
,groupWith
,foldByKey
,combineByKey
, etc. -
Some transformations will partition the data *only if it's applied on a partitioned RDD.
For example,
mapValues
,flatMapValues
,filter
. -
Some transformations won't parition.
e.g.
map
,flatMap
. These transformation may change the keys. That's why they won't partition.Attention: If we apply these transformations on partitioned RDD, the partitions will lose!
Advantage of partitioning
Partitioning will boost up the performance.
Previously, we compared that reduceByKey
improves the performance by reducing on the same node before shuffling.
Partitioning shares the same idea. It reduces shuffling, too.
Therefore, the best practice is applying reduceByKey
on partitioned RDD.
val cost = purchases
.map(p => (p.id, (1, p.price)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.count()
// Command took 4.65s
val rangePartitioner = new RangePartitioner(8, purchases)
val partitionedPurchases = purchases.partitionBy(rangePartitioner).persist()
val cost = partitionedPurchases
.map(p => (p.id, (1, p.price)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.count()
// Command took 1.79s
Partitioning is almost 4 times faster.