In this tutorial, you will learn how to use KMeans classification in Spark MLlib in Java language.
Example Program of KMeans Classification using spark MLlib in Java
KMeans Classification using spark MLlib in Java – KMeans algorithm is used for classification. Basically, it classifies each given observation/experiment/vector into one of the cluster. That cluster is chosen, whose mean vector is less distant to the observation/experiment/vector.
Clustering
Training data is a text file with each row containing space seperated values of features or dimensional values. Example training data is given below:
2.9 1.4 6.8
9.0 0.0 9.0
Each row in the above sample training data is:
- an observation/experiment/vector which is three dimensional (or)
- an observation has three features(whose values are continuous), (or)
- the experiment that has three state variables.
The number of dimensions/features/state-variables could be any number that is real.
The content of the training data file is shown below:
0.0 0.6 0.0
0.1 0.1 0.1
0.5 0.2 0.2
2.5 1.2 6.3
2.1 1.8 6.2
2.9 1.4 6.8
9.0 0.0 9.0
9.1 0.1 9.1
9.2 0.2 9.2
Java KMeans Program
The java program to demonstrate KMeans classification machine learning algorithm using spark mllib is given below.
JavaKMeansExample.java
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
/**
* KMeans Classification using spark MLlib in Java
* @author arjun
*/
public class JavaKMeansExample {
public static void main(String[] args) {
System.out.println("KMeans Classification using spark MLlib in Java . . .");
// hadoop home dir [path to bin folder containing winutils.exe]
System.setProperty("hadoop.home.dir", "D:\\hadoop\\");
SparkConf conf = new SparkConf().setAppName("JavaKMeansExample")
.setMaster("local[2]")
.set("spark.executor.memory","3g")
.set("spark.driver.memory", "3g");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Load and parse data
String path = "data/kMeansTrainingData.txt";
JavaRDD data = jsc.textFile(path);
JavaRDD parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into three classes using KMeans
int numClusters = 3;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("\n*****Training*****");
int clusterNumber = 0;
for (Vector center: clusters.clusterCenters()) {
System.out.println("Cluster center for Clsuter "+ (clusterNumber++) + " : " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("\nCost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
try {
FileUtils.forceDelete(new File("KMeansModel"));
System.out.println("\nDeleting old model completed.");
} catch (FileNotFoundException e1) {
} catch (IOException e) {
}
// Save and load model
clusters.save(jsc.sc(), "KMeansModel");
System.out.println("\rModel saved to KMeansModel/");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"KMeansModel");
// prediction for test vectors
System.out.println("\n*****Prediction*****");
System.out.println("[9.0, 0.6, 9.0] belongs to cluster "+sameModel.predict(Vectors.dense(9.0, 0.6, 9.0)));
System.out.println("[0.2, 0.5, 0.4] belongs to cluster "+sameModel.predict(Vectors.dense(0.2, 0.5, 0.4)));
System.out.println("[2.8, 1.6, 6.0] belongs to cluster "+sameModel.predict(Vectors.dense(2.8, 1.6, 6.0)));
jsc.stop();
}
}
Output
KMeans Classification using spark MLlib in Java . . .
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
*****Training*****
Cluster center for Clsuter 0 : [2.5,1.4666666666666668,6.433333333333334]
Cluster center for Clsuter 1 : [0.19999999999999998,0.29999999999999993,0.1]
Cluster center for Clsuter 2 : [9.1,0.1,9.1]
Cost: 1.0733333333333688
Within Set Sum of Squared Errors = 1.0733333333333688
Deleting old model completed.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Model saved to KMeansModel/
*****Prediction*****
[9.0, 0.6, 9.0] belongs to cluster 2
[0.2, 0.5, 0.4] belongs to cluster 1
[2.8, 1.6, 6.0] belongs to cluster 0
Let see what has been generated during training in detail :
The cluster generation part
*****Training*****
Cluster center for Clsuter 0 : [2.5,1.4666666666666668,6.433333333333334]
Cluster center for Clsuter 1 : [0.19999999999999998,0.29999999999999993,0.1]
Cluster center for Clsuter 2 : [9.1,0.1,9.1]
Based on the training data and the hyper parameter, number of clusters = 3, the algorithm has found three clusters Cluster 0, Cluster 1 and Cluster 2. The centers for these clusters have been calculated and are as shown in the above block.
The cost
In this algorithm, cost is a metric that shows the price to be paid for choosing a center for cluster. The cost is the sum of squared distances from center of the cluster to each member of the cluster. The cost has to be kept low for better prediction accuracy.
Cost: 1.0733333333333688
Within Set Sum of Squared Errors = 1.0733333333333688
Cost : 1.0733333.. =
[squared distances from center of Cluster 0 to the members of the Cluster 0] +
[squared distances from center of Cluster 0 to the members of the Cluster 0]+
[squared distances from center of Cluster 0 to the members of the Cluster 0]
Save KMeans model to local
The KMeans classification model generated during training could be saved to local, and be used for prediction.
Prediction
For the specified input rows to be predicted, the generated KMeans model during training, has predicted the cluster they belong to as shown below:
*****Prediction*****
[9.0, 0.6, 9.0] belongs to cluster 2
[0.2, 0.5, 0.4] belongs to cluster 1
[2.8, 1.6, 6.0] belongs to cluster 0
Conclusion
In this Apache Spark Tutorial, we have seen how to train a classification model using KMean Algorithm, save the model as a local file, and use the model for prediction.