Spark Parallelize

To parallelize Collections in Driver program, Spark provides SparkContext.parallelize() method. When spark parallelize method is applied on a Collection (with elements), a new distributed data set is created with specified number of partitions and the elements of the collection are copied to the distributed dataset (RDD).

Following is the syntax of SparkContext’s parallelize() method.

public <T> RDD<T> parallelize(scala.collection.Seq<T> seq, int numSlices, scala.reflect.ClassTag<T> evidence$1)
ParameterDescription
seqMandatory. Collection of items to parallelize.
numSlicesOptional. An integer value. The number of partitions the data would be parallelized to.
evidence$1Optional.

Spark parallelize() method creates N number of partitions if N is specified, else Spark would set N based on the Spark Cluster the driver program is running on.

parallelize() method returns an RDD.

Note: It is important to note that parallelize() method acts lazy. Meaning parallelize() method is not actually acted upon until there is an action on the RDD. If there is any modification done to the collection(which we are parallelizing) before the action on the RDD, then when the RDD is acted upon, the modified Collection would be parallelized to RDD, not the Collection with the state you would expect at the program line SparkContext.parallelize(Collection).

Use parallelize() method only when the index of elements does not matter, because once parallelized to partitions, any transformation are done parallelly on partitions.

Examples – Spark Parallelize

In the following examples we shall parallelize a Collection of elements to RDD with specified number of partitions.

Following is a Java Application to demonstrate SparkContext.parallelize()

SparkParallelizeExample.java

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class SparkParallelizeExample {

	public static void main(String[] args) {
		// configure spark
		SparkConf sparkConf = new SparkConf().setAppName("Print Elements of RDD")
		                                .setMaster("local[2]").set("spark.executor.memory","2g");
		// start a spark context
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		// sample collection
		List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
		
		// parallelize the collection to two partitions
		JavaRDD<Integer> rdd = sc.parallelize(collection, 2);
		
		System.out.println("Number of partitions : "+rdd.getNumPartitions());
		
		rdd.foreach(new VoidFunction<Integer>(){ 
	          public void call(Integer number) {
	              System.out.println(number); 
	    }});
	}
}

Output

Number of partitions : 2


6
1
7
2
8
3
9
4
10
5

Please observe in the output that, when printing elements of RDD with two partitions, the partitions are acted upon parallelly.

In the following example, we will not specify the number of partitions to parallelize() method.

ParallelizeExample.java

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class ParallelizeExample {

	public static void main(String[] args) {
		// configure spark
		SparkConf sparkConf = new SparkConf().setAppName("Print Elements of RDD")
		                                .setMaster("local[4]").set("spark.executor.memory","2g");
		// start a spark context
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		// sample collection
		List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
		
		// parallelize the collection to two partitions
		JavaRDD<Integer> rdd = sc.parallelize(collection);
		
		System.out.println("Number of partitions : "+rdd.getNumPartitions());
		
		rdd.foreach(new VoidFunction<Integer>(){ 
	          public void call(Integer number) {
	              System.out.println(number); 
	    }});
	}
}

When Number of Partitions is not specified, it takes into account, the number of threads you mentioned while configuring your Spark Master. (setMaster(local[4]) where master can use 4 threads) .

ADVERTISEMENT

Conclusion

In this Spark TutorialSpark Parallelize, we have learnt how to parallelize a collection to distributed dataset (RDD) in driver program.