Spark is the name of the engine to realize cluster computing while PySpark is the Python's library to use Spark. PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for a data platform. If you’re already familiar with Python and libraries such as Pandas, then PySpark is a great language to learn in order to create more scalable analyses and pipelines. The goal of this post is to show how to build a machine learning model using PySpark. How To Install PySpark How To Install PySpark PySpark installing process is very easy as like others python's packages.(eg.Pandas,Numpy,scikit-learn). One important thing is, firstly ensure java has installed in your machine. then you can run PySpark on your jupyter notebook. Exploring The Data We will use the same data set when we built machine learning models in Python, and it is related to diabetes diseases of a National Institute of Diabetes and Digestive and Kidney Diseases. The classification goal is to predict whether the patient has diabetes (Yes/No). The dataset can be downloaded from Kaggle. built machine learning models in Python Kaggle from pyspark.sql import SparkSession spark = SparkSession.builder.appName('ml-diabetes').getOrCreate() df = spark.read.csv('diabetes.csv', header = True, inferSchema = True) df.printSchema() from pyspark.sql import SparkSession spark = SparkSession.builder.appName('ml-diabetes').getOrCreate() df = spark.read.csv('diabetes.csv', header = True, inferSchema = True) df.printSchema() from pyspark.sql import SparkSession spark = SparkSession.builder.appName('ml-diabetes').getOrCreate() df = spark.read.csv('diabetes.csv', header = True, inferSchema = True) df.printSchema() from pyspark.sql import SparkSession spark = SparkSession.builder.appName( 'ml-diabetes' ).getOrCreate() df = spark.read.csv( 'diabetes.csv' , header = True , inferSchema = True ) The datasets consists of several medical predictor variables and one target variable, Outcome. Predictor variables includes the number of pregnancies the patient has had, their BMI, insulin level, age, and so on. Input variables: Glucose,BloodPressure,BMI,Age,Pregnancies,Insulin,SkinThikness,DiabetesPedigreeFunction. Output variables: Outcome. Have a peek of the first five observations. Pandas data frame is prettier than Spark DataFrame.show(). import pandas as pd pd.DataFrame(df.take(5), columns=df.columns).transpose() import pandas as pd pd.DataFrame(df.take(5), columns=df.columns).transpose() import pandas as pd pd.DataFrame(df.take(5), columns=df.columns).transpose() import pandas as pd pd.DataFrame(df.take( 5 ), columns=df.columns).transpose() In PySpark you can show the data with Pandas' DataFrame using toPandas() toPandas() toPandas() df.toPandas() df.toPandas() df.toPandas() df.toPandas() Checking the classes are perfectly balanced!! df.groupby('Outcome').count().toPandas() df.groupby('Outcome').count().toPandas() df.groupby('Outcome').count().toPandas() df.groupby( 'Outcome' ).count().toPandas() Statistics Summary numeric_features = [t[0] for t in df.dtypes if t[1] == 'int'] df.select(numeric_features).describe().toPandas().transpose() numeric_features = [t[0] for t in df.dtypes if t[1] == 'int'] df.select(numeric_features).describe().toPandas().transpose() numeric_features = [t[0] for t in df.dtypes if t[1] == 'int'] df.select(numeric_features).describe().toPandas().transpose() numeric_features = [t[ 0 ] for t in df.dtypes if t[ 1 ] == 'int' ] Correlations between independent variables Correlations between independent variables from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)); # Rotate axis labels and remove axis ticks n = len(numeric_data.columns) for i in range(n): v = axs[i, 0] v.yaxis.label.set_rotation(0) v.yaxis.label.set_ha('right') v.set_yticks(()) h = axs[n-1, i] h.xaxis.label.set_rotation(90) h.set_xticks(()) from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)); # Rotate axis labels and remove axis ticks n = len(numeric_data.columns) for i in range(n): v = axs[i, 0] v.yaxis.label.set_rotation(0) v.yaxis.label.set_ha('right') v.set_yticks(()) h = axs[n-1, i] h.xaxis.label.set_rotation(90) h.set_xticks(()) from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)); # Rotate axis labels and remove axis ticks n = len(numeric_data.columns) for i in range(n): v = axs[i, 0] v.yaxis.label.set_rotation(0) v.yaxis.label.set_ha('right') v.set_yticks(()) h = axs[n-1, i] h.xaxis.label.set_rotation(90) h.set_xticks(()) from pandas.plotting import scatter_matrix axs = scatter_matrix(numeric_data, figsize=( 8 , 8 )); # Rotate axis labels and remove axis ticks n = len (numeric_data.columns) for i in range (n): v = axs[i, 0 ] v.yaxis.label.set_rotation( 0 ) v.yaxis.label.set_ha( 'right' ) h = axs[n- 1 , i] h.xaxis.label.set_rotation( 90 ) Data preparation and feature engineering Data preparation and feature engineering In this part, we will remove unnecessary columns and fill the missing values. Finally, selecting features for machine learning models. These features will be divided into two parts train and test. Lets starting mission 👨🚀 Missing Data Handling: Missing Data Handling: from pyspark.sql.functions import isnull, when, count, col df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show() from pyspark.sql.functions import isnull, when, count, col df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show() from pyspark.sql.functions import isnull, when, count, col df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show() from pyspark.sql.functions import isnull, when, count, col df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show() Wow!! 👌 That's great in this datasets haven't any missing values.😀 Unnecessary columns dropping Unnecessary columns dropping dataset = dataset.drop('SkinThickness') dataset = dataset.drop('Insulin') dataset = dataset.drop('DiabetesPedigreeFunction') dataset = dataset.drop('Pregnancies') dataset.show() dataset = dataset.drop('SkinThickness') dataset = dataset.drop('Insulin') dataset = dataset.drop('DiabetesPedigreeFunction') dataset = dataset.drop('Pregnancies') dataset.show() dataset = dataset.drop('SkinThickness') dataset = dataset.drop('Insulin') dataset = dataset.drop('DiabetesPedigreeFunction') dataset = dataset.drop('Pregnancies') dataset.show() dataset = dataset.drop( 'SkinThickness' ) dataset = dataset.drop( 'Insulin' ) dataset = dataset.drop( 'DiabetesPedigreeFunction' ) dataset = dataset.drop( 'Pregnancies' ) Features Convert into Vector Features Convert into Vector VectorAssembler — a feature transformer that merges multiple columns into a vector column. # Assemble all the features with VectorAssembler required_features = ['Glucose', 'BloodPressure', 'BMI', 'Age' ] from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=required_features, outputCol='features') transformed_data = assembler.transform(dataset) transformed_data.show() # Assemble all the features with VectorAssembler required_features = ['Glucose', 'BloodPressure', 'BMI', 'Age' ] from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=required_features, outputCol='features') transformed_data = assembler.transform(dataset) transformed_data.show() # Assemble all the features with VectorAssembler required_features = ['Glucose', 'BloodPressure', 'BMI', 'Age' ] from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=required_features, outputCol='features') transformed_data = assembler.transform(dataset) transformed_data.show() # Assemble all the features with VectorAssembler required_features = [ 'Glucose' , 'BloodPressure' , 'BMI' , 'Age' from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=required_features, outputCol= 'features' ) Done!!✌️ Now features converted into a vector. 🧮 Train and Test Split Train and Test Split Randomly split data into train and test sets, and set seed for reproducibility. # Split the data (training_data, test_data) = transformed_data.randomSplit([0.8,0.2], seed =2020) print("Training Dataset Count: " + str(training_data.count())) print("Test Dataset Count: " + str(test_data.count())) # Split the data (training_data, test_data) = transformed_data.randomSplit([0.8,0.2], seed =2020) print("Training Dataset Count: " + str(training_data.count())) print("Test Dataset Count: " + str(test_data.count())) # Split the data (training_data, test_data) = transformed_data.randomSplit([0.8,0.2], seed =2020) print("Training Dataset Count: " + str(training_data.count())) print("Test Dataset Count: " + str(test_data.count())) # Split the data (training_data, test_data) = transformed_data.randomSplit([ 0.8 , 0.2 ], seed = 2020 ) print ( "Training Dataset Count: " + str (training_data.count())) print ( "Test Dataset Count: " + str (test_data.count())) Training Dataset Count: 620 Training Dataset Count: 620 Training Dataset Count: 620 Test Dataset Count: 148 Test Dataset Count: 148 Test Dataset Count: 148 Machine learning Model Building Machine learning Model Building Random Forest Classifier Random Forest Classifier Random forest is a supervised learning algorithm which is used for both classification as well as regression. But however, it is mainly used for classification problems. As we know that a forest is made up of trees and more trees means more robust forest. Similarly, random forest algorithm creates decision trees on data samples and then gets the prediction from each of them and finally selects the best solution by means of voting. It is an ensemble method which is better than a single decision tree because it reduces the over-fitting by averaging the result. from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(labelCol='Outcome', featuresCol='features', maxDepth=5) model = rf.fit(training_data) rf_predictions = model.transform(test_data) from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(labelCol='Outcome', featuresCol='features', maxDepth=5) model = rf.fit(training_data) rf_predictions = model.transform(test_data) from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(labelCol='Outcome', featuresCol='features', maxDepth=5) model = rf.fit(training_data) rf_predictions = model.transform(test_data) from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(labelCol= 'Outcome' , featuresCol= 'features' , maxDepth= 5 ) Evaluate our Random Forest Classifier model. Evaluate our Random Forest Classifier model. from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome' , metricName = 'accuracy' ) print ( 'Random Forest classifier Accuracy:' , multi_evaluator.evaluate(rf_predictions)) Random Forest classifier Accuracy: 0.7945205479452054 (79.5%) Random Forest classifier Accuracy: 0.7945205479452054 (79.5%) Random Forest classifier Accuracy: 0.7945205479452054 (79.5%) Decision Tree Classifier Decision Tree Classifier Decision trees are widely used since they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Outcome', maxDepth = 3) dtModel = dt.fit(training_data) dt_predictions = dtModel.transform(test_data) dt_predictions.select('Glucose', 'BloodPressure', 'BMI', 'Age', 'Outcome').show(10) from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Outcome', maxDepth = 3) dtModel = dt.fit(training_data) dt_predictions = dtModel.transform(test_data) dt_predictions.select('Glucose', 'BloodPressure', 'BMI', 'Age', 'Outcome').show(10) from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Outcome', maxDepth = 3) dtModel = dt.fit(training_data) dt_predictions = dtModel.transform(test_data) dt_predictions.select('Glucose', 'BloodPressure', 'BMI', 'Age', 'Outcome').show(10) from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(featuresCol = 'features' , labelCol = 'Outcome' , maxDepth = 3 ) dt_predictions.select( 'Glucose' , 'BloodPressure' , 'BMI' , 'Age' , 'Outcome' ).show( 10 ) Evaluate our Decision Tree model. Evaluate our Decision Tree model. from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Decision Tree Accuracy:', multi_evaluator.evaluate(dt_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Decision Tree Accuracy:', multi_evaluator.evaluate(dt_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Decision Tree Accuracy:', multi_evaluator.evaluate(dt_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome' , metricName = 'accuracy' ) print ( 'Decision Tree Accuracy:' , multi_evaluator.evaluate(dt_predictions)) Decision Tree Accuracy: 0.7876712328767124 (79.7%) Decision Tree Accuracy: 0.7876712328767124 (79.7%) Decision Tree Accuracy: 0.7876712328767124 (79.7%) Logistic Regression Model Logistic Regression Model Logistic regression is the appropriate regression analysis to conduct when the dependent variable is dichotomous (binary). Like all regression analyses, the logistic regression is a predictive analysis. Logistic regression is used to describe data and to explain the relationship between one dependent binary variable and one or more nominal, ordinal, interval or ratio-level independent variables. Logistic Regression is used when the dependent variable(target) is categorical. from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10) lrModel = lr.fit(training_data) lr_predictions = lrModel.transform(test_data) from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10) lrModel = lr.fit(training_data) lr_predictions = lrModel.transform(test_data) from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10) lrModel = lr.fit(training_data) lr_predictions = lrModel.transform(test_data) from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features' , labelCol = 'Outcome' , maxIter= 10 ) Evaluate our Logistic Regression model. Evaluate our Logistic Regression model. from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome' , metricName = 'accuracy' ) print ( 'Logistic Regression Accuracy:' , multi_evaluator.evaluate(lr_predictions)) Logistic Regression Accuracy: 0.7876712328767124 (79.7%) Logistic Regression Accuracy: 0.7876712328767124 (79.7%) Logistic Regression Accuracy: 0.7876712328767124 (79.7%) Gradient-boosted Tree classifier Model Gradient boosting is a machine learning technique for regression and classification problems, which produces a prediction model in the form of an ensemble of weak prediction models, typically decision trees. from pyspark.ml.classification import GBTClassifier gb = GBTClassifier(labelCol = 'Outcome', featuresCol = 'features') gbModel = gb.fit(training_data) gb_predictions = gbModel.transform(test_data) from pyspark.ml.classification import GBTClassifier gb = GBTClassifier(labelCol = 'Outcome', featuresCol = 'features') gbModel = gb.fit(training_data) gb_predictions = gbModel.transform(test_data) from pyspark.ml.classification import GBTClassifier gb = GBTClassifier(labelCol = 'Outcome', featuresCol = 'features') gbModel = gb.fit(training_data) gb_predictions = gbModel.transform(test_data) from pyspark.ml.classification import GBTClassifier gb = GBTClassifier(labelCol = 'Outcome' , featuresCol = 'features' ) Evaluate our Gradient-Boosted Tree Classifier. Evaluate our Gradient-Boosted Tree Classifier. from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy') print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions)) from pyspark.ml.evaluation import MulticlassClassificationEvaluator multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome' , metricName = 'accuracy' ) print ( 'Gradient-boosted Trees Accuracy:' , multi_evaluator.evaluate(gb_predictions)) Gradient-boosted Trees Accuracy: 0.8013698630136986(80.13%) Gradient-boosted Trees Accuracy: 0.8013698630136986(80.13%) Gradient-boosted Trees Accuracy: 0.8013698630136986(80.13%) Conclusion PySpark is a great language for data scientists to learn because it enables scalable analysis and ML pipelines. If you’re already familiar with Python and Pandas, then much of your knowledge can be applied to Spark. To sum it up, we have learned how to build a machine learning application using PySpark. We tried three algorithms and gradient boosting performed best on our data set. I got inspiration from @Favio André Vázquez's Github repository 'first_spark_model'. Favio André Vázquez first_spark_model Source code can be found on Github. I look forward to hearing feedback or questions. Github Machine learning models sparking when PySpark gave the accelerator gear like the need for speed gaming cars. References: 1. PySpark Tutorial for Beginners: Machine Learning Example PySpark Tutorial for Beginners: Machine Learning Example 2. Apache Spark 2.1.0 Apache Spark 2.1.0 3. Machine Learning with PySpark and MLlib — Solving a Binary Classification Problem Machine Learning with PySpark and MLlib — Solving a Binary Classification Problem