TLDR — Use pipelines to save TF-IDF model generated from the training set, and SVM model for prediction. So essentially save two models, one for feature extraction and transformation of input, the other for prediction.
One of the big challenges when you develop a text classification model, the trained model which you get is not enough for prediction if your plan was to train offline and deploy only the model for prediction in some cases. Especially in the case where we are extracting features from the training set using `Hashing Trick` and to normalise the importance of a feature/term to the document using `Inverse Document Frequency`, the most frequent terms in documents actually have lesser importance to the whole corpus. This is all commonly labelled according to spark website as `Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus.`
If we are using TF-IDF for feature vectorization using spark, we typically implement it like this
import org.apache.spark.mllib.feature.{HashingTF, IDF}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.rdd.RDD
// Load documents (one per line).val documents**:** RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt").map(_.split(" ").toSeq)
val hashingTF = new HashingTF()val tf**:** RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:// First to compute the IDF vector and second to scale the term frequencies by IDF.tf.cache()val idf = new IDF().fit(tf)val tfidf**:** RDD[Vector] = idf.transform(tf)
So as you can see, just the trained model won’t enough for a standalone prediction as we have to extract the features from our input document and normalise their term frequencies, which is all dependent on the training set, which we don’t want to include in real-time prediction(time consuming as well as increase in memory consumption of the app).
So from spark 1.3 onwards, pipelines were introduced, where we can automate our workflow of extraction, transformation and prediction using pipelines. And from spark 1.6, we had the ability to save the pipeline models which included all the workflows. So if we wanted to use pipelines to train models offline and predict somewhere they are the goto solution. So if we wanted to use logistic regression to train and predict, this is how we can do(picked from http://spark.apache.org/docs/latest/ml-pipeline.html).
import org.apache.spark.ml.{Pipeline, PipelineModel}import org.apache.spark.ml.classification.LogisticRegressionimport org.apache.spark.ml.feature.{HashingTF, Tokenizer}import org.apache.spark.ml.linalg.Vectorimport org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.val training = spark.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to diskmodel.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to diskpipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during productionval sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.val test = spark.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "mapreduce spark"),(7L, "apache hadoop"))).toDF("id", "text")
// Make predictions on test documents.model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id**:** Long, text**:** String, prob**:** Vector, prediction**:** Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}
This was the perfect solution, but not all logistic regression algorithms were supported, only logistic regression and naive bayes were supported.
So if we wanted to use SVM or LogisticRegressionwithBFGS, we are out of luck. For these algorithm libraries to support pipelines, they have to implement a method called `fit`. To put it more precisely, pipelines work on the concept of transformers and estimators, whatever we put inside the pipeline workflow has to be one of those. Our algorithm models are estimators since they train or fit data. The fit() method accepts a dataframe and returns a `pipelineModel`. SVM doesn’t support this method.
So I tried to make the existing SVM an estimator without much success, as there seems to be a complete lack of documentation on how to create our own estimators and transformers. I would love to hear from anyone who was able to accomplish this.
So I was searching for alternate ways to make this work, a thought struck after a week of cogitating — use pipelines. Sounds confusing — I know. Let me elaborate it.
What if we set pipelines staging upto IDF Model generation only?. This outputs a pipeline model, which can be saved along with the trained SVM model.
So instead of saving only one SVM model for prediction, I used pipelines to generate a extract and transform model including stages of ‘tokenizing, extracting and transform’ to generate a pipelineModel and save it as well.
Here’s my final code snippets which I used to save both the SVM and pipeline Models.
import org.apache.spark.ml.{Pipeline, PipelineModel}import org.apache.spark.ml.feature.{HashingTF, Tokenizer}val tokenizer = new Tokenizer().setInputCol(“text”).setOutputCol(“words”)val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol(“rawFeatures”)val idf = new IDF().setInputCol(“rawFeatures”).setOutputCol(“features”)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))val pipelineModel = pipeline.fit(training_df)pipelineModel.save(“somewhere”) # Saving the pipeline modelval t = pipelineModel.transform(training_df).select(“features”, “label”).map( row => LabeledPoint(row.getAs[Double](“label”),row.getAs[org.apache.spark.mllib.linalg.Vector](“features”)))
val svm_model = new SVMWithSGD().run(t)svm_model.clearThreshold()svm_model.save(sc,”somewhere”) // Saving SVM model for prediction
For standalone prediction without trained data, load the two models and transform your input text to a Dataframe of extracted features, and pass your transformed input DF to the SVM model.
Note:- First two code snippets are referenced from http://spark.apache.org/docs/latest/ml-pipeline.html
PS:- I would love to hear alternate solutions and corrections if any. Thanks
Hacker Noon is how hackers start their afternoons. We’re a part of the @AMIfamily. We are now accepting submissions and happy to discuss advertising &sponsorship opportunities.
To learn more, read our about page, like/message us on Facebook, or simply, tweet/DM @HackerNoon.
If you enjoyed this story, we recommend reading our latest tech stories and trending tech stories. Until next time, don’t take the realities of the world for granted!