Apache Spark: Repartition vs Coalesce
Repartition can be used for increasing or decreasing the number of partitions. Whereas Coalesce can only be used for decreasing the number of partitions. Coalesce is a less expensive operation than Repartition as Coalesce reduces data movement between the nodes while Repartition shuffles all data over the network.
Table of Contents
Partitions
What are partitions?
The dataset in Spark DataFrames and RDDs are segregated into smaller datasets called partitions. By default, Spark uses cluster’s configuration (no. of nodes, memory etc) to decide the number of partitions. The partitions are distributed amongst the nodes of the cluster with redundancy. Partitions help with distributed computing and parallel processing.
Check the partitions
Spark RDD has built-in methods to obtain the number of partitions and the contents of each partition.
Open a spark-shell.
1 |
$ spark-shell --master yarn |
Create an RDD with 20 integers.
1 |
scala> val rdd = sc.parallelize(1 to 20) |
scala> val rdd = sc.parallelize(1 to 20)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at :24
Use RDD’s getNumPartitions method to get the number of partitions.
1 |
scala> rdd.getNumPartitions |
scala> rdd.getNumPartitions
res18: Int = 4
In this example, the RDD has 4 partitions. You may get different number of partitions based on your default configuration for the spark-shell.
RDD’s glom method can be used with collect to obtain an Array with each item consisting the contents of a partition.
1 |
scala> rdd.glom.collect |
scala> rdd.glom.collect
res19: Array[Array[Int]] = Array(
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
)
In the above output, we can see there are 4 arrays each corresponding to a partition. Each partition has 10 items.
Manually provide the number of partitions
In the previous example, the number of partitions were set automatically by spark. Parallelize method takes in additional argument numSlices to set the number of partitions when the RDD is getting created.
1 |
parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) |
Let’s create another RDD, call it rdd_4p, using parallelize method and passing 4 for numSlices argument.
1 2 3 |
scala> val rdd_4p = sc.parallelize(1 to 20, 4) scala> rdd_4p.getNumPartitions scala> rdd_4p.glom.collect |
scala> val rdd_4p = sc.parallelize(1 to 100, 4)
rdd_4p: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at :24scala> rdd_4p.getNumPartitions
res12: Int = 4scala> rdd_4p.glom.collect
res15: Array[Array[Int]] = Array(
Array(1, 2, 3, 4, 5),
Array(6, 7, 8, 9, 10),
Array(11, 12, 13, 14, 15),
Array(16, 17, 18, 19, 20))
Since we passed 4 as the second argument to parallelize method, the RDD was created with 4 partitions and the dataset was distributed into 4 partitions.
We will use rdd_4p, the RDD that we created with 4 partitions, in the following illustrations to explain Repartition and Coalesce methods.
Repartition
What is repartition?
Repartition changes the number of partitions and balances the data across them. Repartition can be used to increase or decrease the number of partition. Repartition always shuffles all the data over the network.
Repartition by the number of partitions
RDD’s repartition method takes an integer value – numPartitions as argument. Repartition method creates numPartitions number of partitions and balances the data across them.
repartition(numPartitions: Int)
Repartition to create fewer partitions
We will now see how repartition can create fewer partitions.
Use repartition method on rdd_4p by passing value of 2 for numPartitions parameter. Then look at the number of partitions and the contents of each partition for the new RDD (let’s call it rdd_2p_repartitioned).
1 2 3 |
scala> val rdd_2p_repartitioned = rdd_4p.repartition(2) scala> rdd_2p_repartitioned.getNumPartitions scala> rdd_2p_repartitioned.glom.collect |
scala> val rdd_2p_repartitioned = rdd_4p.repartition(2)
rdd_2p_repartitioned: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at repartition at :25scala> rdd_2p_repartitioned.getNumPartitions
res19: Int = 2scala> rdd_2p_repartitioned.glom.collect
res20: Array[Array[Int]] = Array(
Array(1, 3, 5, 16, 18, 20, 7, 9, 11, 13, 15),
Array(6, 8, 10, 12, 14, 2, 4, 17, 19)
)
As we can see rdd_2p_repartitioned was created with 2 partitions and the data was redistributed amongst the 2 partitions.
The below table compares the contents of each partition of the original RDD (rdd_4p) that has 4 partitions and the newly created RDD after repartitioning with 2 partition. You can observe that some of the data that were in partition 1 in the original data were moved to partition 2 after repartitioning. Repartitioning resulted in shuffle of all the data.
Partition | RDD with 4 Partitions | RDD Repartitioned with 2 Partitions |
Partition 1 | Array(1, 2, 3, 4, 5) | Array(1, 3, 5, 16, 18, 20, 7, 9, 11, 13, 15) |
Partition 2 | Array(6, 7, 8, 9, 10) | Array(6, 8, 10, 12, 14, 2, 4, 17, 19) |
Partition 3 | Array(11, 12, 13, 14, 15) | |
Partition 4 | Array(16, 17, 18, 19, 20) |
Repartition to create more partitions
Let’s now use repartition to create fewer partitions.
Use repartition method on rdd_4p by passing value of 8 for numPartitions parameter. Then look at the number of partitions and the contents of each partition for the new RDD (let’s call it rdd_8p_repartitioned).
1 2 3 |
scala> val rdd_8_partitions = rdd_4_partitions.repartition(8) scala> rdd_8_partitions.getNumPartitions scala> rdd_8_partitions.glom.collect |
scala> val rdd_8p_repartitioned = rdd_4p.repartition(8)
rdd_8p_repartitioned: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[34] at repartition at :25scala> rdd_8p_repartitioned.getNumPartitions
res21: Int = 8scala> rdd_8p_repartitioned.glom.collect
res22: Array[Array[Int]] = Array(
Array(17, 3, 12),
Array(4, 13, 18),
Array(6, 19, 5, 14),
Array(15, 7, 20),
Array(8),
Array(9),
Array(10, 1),
Array(2, 11, 16)
)
As we can see rdd_8p_repartitioned was created with 8 partitions and the data was redistributed amongst the 8 partitions.
The below table compares the contents of each partition of the original RDD (rdd_4p) that has 4 partitions and the newly created RDD after repartitioning with 8 partition. You can observe that the contents of the partitions before and after repartition are vastly different. Repartitioning shuffled all the data to balance the data between numPartitions number of partitions.
Partition | RDD with 4 Partitions | RDD Repartitioned with 8 Partitions |
Partition 1 | Array(1, 2, 3, 4, 5) | Array(17, 3, 12) |
Partition 2 | Array(6, 7, 8, 9, 10) | Array(4, 13, 18) |
Partition 3 | Array(11, 12, 13, 14, 15) | Array(6, 19, 5, 14) |
Partition 4 | Array(16, 17, 18, 19, 20) | Array(15, 7, 20) |
Partition 5 | Array(8) | |
Partition 6 | Array(9) | |
Partition 7 | Array(10, 1) | |
Partition 8 | Array(2, 11, 16)) |
Repartition by columns
Spark Dataset has an overloaded Repartition method to partition by partitioning expression. This is not in the scope of our discussion. Refer to the official Spark Dataset docs for more information.
Coalesce
What is coalesce?
Coalesce method takes in an integer value – numPartitions and returns a new RDD with numPartitions number of partitions. Coalesce can only create an RDD with fewer number of partitions. Coalesce minimizes the amount of data being shuffled. Coalesce doesn’t do anything when the value of numPartitions is larger than the number of partitions.
Coalesce to create fewer partitions
On rdd_4p, the RDD with 4 partitions, use coalesce method by passing value of 2 for numPartitions parameter. Then look at the contents of each partition for the new RDD.
1 2 3 |
scala> val rdd_2p_coalesced = rdd_4p.coalesce(2) scala> rdd_2p_coalesced.getNumPartitions scala> rdd_2p_coalesced.glom.collect |
scala> val rdd_2p_coalesced = rdd_4p.coalesce(2)
rdd_2p_coalesced: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[36] at coalesce at :25scala> rdd_2p_coalesced.getNumPartitions
res23: Int = 2scala> rdd_2p_coalesced.glom.collect
res24: Array[Array[Int]] = Array(
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
)
rdd_2p_coalesced has 2 partitions and the data is distributed amongst the two partitions.
The below table compares the contents of each partition of the original RDD (rdd_4p) that has 4 partitions and the newly created RDD after coalescing with 2 partitions. You can observe that the original contents of the active partitions (partitions 1, 2) stay intact while new contents from the retired partitions (partitions 3,4) got added. The shuffling of data was minimized by moving only the data from the retired partitions.
Partition | RDD with 4 Partitions | RDD Coalesced with 2 Partitions |
Partition 1 | Array(1, 2, 3, 4, 5) | Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) |
Partition 2 | Array(6, 7, 8, 9, 10) | Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) |
Partition 3 | Array(11, 12, 13, 14, 15) | |
Partition 4 | Array(16, 17, 18, 19, 20) |
Coalesce cannot create more partitions
Coalesce cannot create more partitions. We can confirm that with the following test.
On rdd_4p, the RDD with 4 partitions, use coalesce method by passing value of 8 for numPartitions parameter. We use 8 in this example but any number greater than 4 can be used.
1 2 3 |
scala> val rdd_8p_coalesced = rdd_4p.coalesce(8) scala> rdd_8p_coalesced.getNumPartitions scala> rdd_8p_coalesced.glom.collect |
scala> val rdd_8p_coalesced = rdd_4p.coalesce(8)
rdd_8p_coalesced: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[38] at coalesce at :25scala> rdd_8p_coalesced.getNumPartitions
res25: Int = 4scala> rdd_8p_coalesced.glom.collect
res26: Array[Array[Int]] = Array(
Array(1, 2, 3, 4, 5),
Array(6, 7, 8, 9, 10),
Array(11, 12, 13, 14, 15),
Array(16, 17, 18, 19, 20)
)
As we can see, the number of partitions and the contents of the partitions of rdd_8p_coalesced are identical to rdd_4p.
This below table shows that Coalesce does not modify the partitions when we pass in a value of numPartitions larger than then number of partitions.
Partition | RDD with 4 Partitions | RDD Coalesced with 8 Partitions |
Partition 1 | Array(1, 2, 3, 4, 5) | Array(1, 2, 3, 4, 5) |
Partition 2 | Array(6, 7, 8, 9, 10) | Array(6, 7, 8, 9, 10) |
Partition 3 | Array(11, 12, 13, 14, 15) | Array(11, 12, 13, 14, 15) |
Partition 4 | Array(16, 17, 18, 19, 20) | Array(16, 17, 18, 19, 20) |
Verdict
When to use Repartition over Coalesce?
Use Repartition method over Coalesce when you need to
- Increase the number of partitions
- Distribute the data equally amongst the partitions
- Shuffle all the data across the partitions
- Partition DataFrame by one or more columns
When to use Coalesce over Repartition?
Use Coalesce method over Repartition when you need to
- Decrease the number of partitions with minimal shuffling of data
Conclusion
Repartition offers more capabilities compared to Coaslece. When it comes to reducing the partitions, Coalesce is more efficient than Repartition. Coalesce reduces the partitions with minimal shuffle while Repartition shuffles all the data. Below, you can observe that the when we reduced the partitions, Repartition shuffled all the data between the 2 partitions (partitions 1,2). Coalesce kept the data in the 2 active partitions (partitions 1,2) intact and moved only the data from the retired partitions (partitions 3,4) to the active partitions.
Partition | RDD with 4 Partitions | RDD Repartitioned with 2 Partitions | RDD Coalesced with 2 Partitions |
Partition 1 | Array(1, 2, 3, 4, 5) | Array(1, 3, 5, 16, 18, 20, 7, 9, 11, 13, 15) | Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) |
Partition 2 | Array(6, 7, 8, 9, 10) | Array(6, 8, 10, 12, 14, 2, 4, 17, 19) | Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) |
Partition 3 | Array(11, 12, 13, 14, 15) | ||
Partition 4 | Array(16, 17, 18, 19, 20) |
4 Comments
nice explanation
Thank you, Bushan!
Great explanation dude 🙂 Thank you
Thank you. I hope you found it useful.