TLDR — Use pipelines to save TF-IDF model generated from the training set, and SVM model for prediction. So essentially save two models, one for feature extraction and transformation of input, the other for prediction. One of the big challenges when you develop a text classification model, the trained model which you get is not enough for prediction if your plan was to train offline and deploy only the model for prediction in some cases. Especially in the case where we are extracting features from the training set using and to normalise the importance of a feature/term to the document using , the most frequent terms in documents actually have lesser importance to the whole corpus. This is all commonly labelled according to spark website as ` is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus.` `Hashing Trick` `Inverse Document Frequency` Term frequency-inverse document frequency (TF-IDF) If we are using TF-IDF for feature vectorization using spark, we typically implement it like this { , } import org.apache.spark.mllib.feature. HashingTF IDF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD documents**:** RDD[Seq[String]] sc.textFile("data/mllib/kmeans_data.txt").map( .split(" ").toSeq) // Load documents (one per line). val = _ hashingTF () tf**:** RDD[Vector] hashingTF.transform(documents) val = new HashingTF val = tf.cache() idf ().fit(tf) tfidf**:** RDD[Vector] idf.transform(tf) // While applying HashingTF only needs a single pass to the data, applying IDF needs two passes: // First to compute the IDF vector and second to scale the term frequencies by IDF. val = new IDF val = So as you can see, just the trained model won’t enough for a standalone prediction as we have to extract the features from our input document and normalise their term frequencies, which is all dependent on the training set, which we don’t want to include in real-time prediction(time consuming as well as increase in memory consumption of the app). So from spark 1.3 onwards, pipelines were introduced, where we can automate our workflow of extraction, transformation and prediction using pipelines. And from spark 1.6, we had the ability to save the pipeline models which included all the workflows. So if we wanted to use pipelines to train models offline and predict somewhere they are the goto solution. So if we wanted to use logistic regression to train and predict, this is how we can do(picked from ). http://spark.apache.org/docs/latest/ml-pipeline.html { , } { , } import org.apache.spark.ml. Pipeline PipelineModel import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature. HashingTF Tokenizer import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row training spark.createDataFrame( ((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label") // Prepare training documents from a list of (id, text, label) tuples. val = Seq tokenizer ().setInputCol("text").setOutputCol("words") hashingTF ().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") lr ().setMaxIter(10).setRegParam(0.01) pipeline ().setStages( (tokenizer, hashingTF, lr)) // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val = new Tokenizer val = new HashingTF val = new LogisticRegression val = new Pipeline Array model pipeline.fit(training) // Fit the pipeline to training documents. val = model.write.overwrite().save("/tmp/spark-logistic-regression-model") // Now we can optionally save the fitted pipeline to disk pipeline.write.overwrite().save("/tmp/unfit-lr-model") // We can also save this unfit pipeline to disk sameModel .load("/tmp/spark-logistic-regression-model") // And load it back in during production val = PipelineModel test spark.createDataFrame( ((4L, "spark i j k"),(5L, "l m n"),(6L, "mapreduce spark"),(7L, "apache hadoop"))).toDF("id", "text") // Prepare test documents, which are unlabeled (id, text) tuples. val = Seq model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { (id**:** Long, text**:** String, prob**:** Vector, prediction**:** Double) println(s"($id, $text) --> prob=$prob, prediction=$prediction")} // Make predictions on test documents. case Row => This was the perfect solution, but not all logistic regression algorithms were supported, only and were supported. logistic regression naive bayes So if we wanted to use or , we are out of luck. For these algorithm libraries to support pipelines, they have to implement a method called `fit`. To put it more precisely, pipelines work on the concept of transformers and estimators, whatever we put inside the pipeline workflow has to be one of those. Our algorithm models are estimators since they train or fit data. The fit() method accepts a dataframe and returns a `pipelineModel`. SVM doesn’t support this method. SVM LogisticRegressionwithBFGS So I tried to make the existing SVM an estimator without much success, as there seems to be a complete lack of documentation on how to create our own estimators and transformers. I would love to hear from anyone who was able to accomplish this. So I was searching for alternate ways to make this work, a thought struck after a week of cogitating — use pipelines. Sounds confusing — I know. Let me elaborate it. What if we set pipelines staging upto IDF Model generation only?. This outputs a pipeline model, which can be saved along with the trained SVM model. So instead of saving only one SVM model for prediction, I used pipelines to generate a extract and transform model including stages of ‘tokenizing, extracting and transform’ to generate a and save it as well. pipelineModel Here’s my final code snippets which I used to save both the SVM and pipeline Models. import org.apache.spark.ml.{Pipeline, PipelineModel}import org.apache.spark.ml.feature.{HashingTF, Tokenizer}val tokenizer = new Tokenizer().setInputCol(“text”).setOutputCol(“words”)val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol(“rawFeatures”)val idf = new IDF().setInputCol(“rawFeatures”).setOutputCol(“features”) val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))val pipelineModel = pipeline.fit(training_df)pipelineModel.save(“somewhere”) # Saving the pipeline modelval t = pipelineModel.transform(training_df).select(“features”, “label”).map( row => LabeledPoint(row.getAs[Double](“label”),row.getAs[org.apache.spark.mllib.linalg.Vector](“features”))) val svm_model = new SVMWithSGD().run(t)svm_model.clearThreshold()svm_model.save(sc,”somewhere”) // Saving SVM model for prediction For standalone prediction without trained data, load the two models and transform your input text to a Dataframe of extracted features, and pass your transformed input DF to the SVM model. :- First two code snippets are referenced from Note http://spark.apache.org/docs/latest/ml-pipeline.html PS:- I would love to hear alternate solutions and corrections if any. Thanks is how hackers start their afternoons. We’re a part of the family. We are now and happy to opportunities. Hacker Noon @AMI accepting submissions discuss advertising &sponsorship To learn more, , , or simply, read our about page like/message us on Facebook tweet/DM @HackerNoon. If you enjoyed this story, we recommend reading our and . Until next time, don’t take the realities of the world for granted! latest tech stories trending tech stories