The Right Way to Do NLP by Apache Spark
Have you ever wondered if you could say what’s in the book without actually reading it? What if you could draw a map of all the important characters, places, events, and the relations among them? This is where Natural Language Processing (NLP) and Text mining techniques can help us to understand the natural language data in a new and different way.
OK! Maybe “reading a book” is not a good example as everyone should read at least two to four books per month!
This is a series of articles for exploring “Mueller Report” by using Spark NLP library built on top of Apache Spark and pre-trained models powered by TensorFlow and BERT.
These articles are purely educational for those interested in learning how to do NLP by using Apache Spark.
First part: Perform NLP tasks and annotating the “Mueller Report” by using pre-trained pipelines and models provided by Spark NLP.
Second part: Use models trained by BERT, training a POS tagger model in Spark NLP, data cleaning, and extracting keywords/phrases by POS and NER chunking.
Third part: Graph algorithms by GraphFrames, clusterings and topic modelings by Spark ML, and network visualization by Gephi.
(Extracting keywords from the Mueller Report by using Spark NLP)
Report on the Investigation into Russian Interference in the 2016 Presidential Election
Commonly known as “Muller report”
(Robert Mueller, 2012)
The original report was released by the US. Department of Justice (the original file) and for those of us who are not familiar with his investigation:
After years of investigating, the Department of Justice released a redacted copy of special counsel Robert Mueller’s report Thursday. The report is nearly 400 pages and covers subjects ranging from questions about Russian interference in the 2016 US presidential election to whether President Donald Trump obstructed justice. CNN, taken from this article.
For more information about Mueller report, you can either do your own research or check this Wikipedia page.
The issue with the original PDF file is that it’s not really a PDF! It’s a scanned file!
Section 508 requires your PDF to be accessible to users of assistive technology — like screen readers or Braille displays.
Not sure whether it’s a federal crime to release a PDF like that or what you can make of this message by Dept. of Justice (maybe if you email them and ask they actually send you the real version):
The Department recognizes that these documents may not yet be in an accessible format.
If you have a disability and the format of any material on the site interferes with your ability to access some information, please email the Department of Justice webmaster. To enable us to respond in a manner that will be of most help to you, please indicate the nature of the accessibility problem, your preferred format (electronic format (ASCII, etc.), standard print, large print, etc.), the web address of the requested material, and your full contact information, so we can reach you if questions arise while fulfilling your request.
Regardless, the first challenge everyone has faced and been working on was to make this PDF file searchable/selectable by going through a series of OCR tools and techniques. Though, depending on the quality, size, and resolution this may not be as accurate as one can hope. In addition, the redacted parts can contribute to some meaningless texts.
NOTE: We’ll get back to this issue in a minute. First, let’s have a look at our chosen open-source NLP library to process the Mueller Report file.
Spark NLP by John Snow Labs
What is Spark NLP?
Spark NLP is a text processing library built on top of Apache Spark and its Spark ML library. It provides simple, performant and accurate NLP annotations for machine learning pipelines, that scale easily in a distributed environment.
There are some eye-catching phrases that got my attention the first time I read an article on Databricks introducing Spark NLP about a year ago. I love Apache Spark and I learned Scala (and still learning) just for that purpose. Back then I wrote my own Stanford CoreNLP wrapper for Apache Spark. I wanted to stay in the Scala ecosystem so I avoided Python libraries such as spaCy, NLTK, etc.
However, I faced many issues since I was dealing with large-scale datasets. Also, I couldn’t seamlessly integrate my NLP codes into Spark ML pipelines. I can sum up my problems by quoting some parts from the same blog post:
Any integration between the two frameworks (Spark and another library) means that every object has to be serialized, go through inter-process communication in both ways, and copied at least twice in memory.
We see the same issue when using spaCy with Spark: Spark is highly optimized for loading & transforming data, but running an NLP pipeline requires copying all the data outside the Tungsten optimized format, serializing it, pushing it to a Python process, running the NLP pipeline (this bit is lightning fast), and then re-serializing the results back to the JVM process.
This naturally kills any performance benefits you would get from Spark’s caching or execution planner, requires at least twice the memory, and doesn’t improve with scaling. Using CoreNLP eliminates the copying to another process, but still requires copying all text from the data frames and copying the results back in.
So I was really excited when I saw there was an NLP library built on top of Apache Spark and it natively extends the Spark ML Pipeline. I could finally build NLP pipelines in Apache Spark!
Spark NLP is open source and has been released under the Apache 2.0 license. It is written in Scala but it supports Java and Python as well. It has no dependencies on other NLP or ML libraries. Spark NLP’s annotators provide rule-based algorithms, machine learning, and deep learning by using TensorFlow. For a more detailed comparison between Spark NLP and other open source NLP libraries, you can read this blog post.
As a native extension of the Spark ML API, the library offers the capability to train, customize and save models so they can run on a cluster, other machines or saved for later. It is also easy to extend and customize models and pipelines, as we’ll do here.
The library covers many NLP tasks, such as:
For the full list of annotators, models, and pipelines you can read their online documentation.
Full disclosure: I am one of the contributors!
Installing Spark NLP
- Spark NLP 2.0.3 release
- Apache Spark 2.4.1
- Apache Zeppelin release 0.8.2
- Local setup with MacBook Pro/macOS
- Cluster setup by Cloudera/CDH 6.2 with 40 servers
- Programming language: Scala (but no worries, the Python APIs in Spark and Spark NLP are very similar to the Scala language)
I will explain how to set up Spark NLP for my environment. Nevertheless, if you wish to try something different you can always find out more about how to use Spark NLP either by visiting the main public repository or have look at their showcase repository with lots of examples:
Main public repository:
Let’s get started! To use Spark NLP in Apache Zeppelin you have two options. Either use Spark Packages or you can build a Fat JAR yourself and just load it as an external JAR inside Spark session. Why don’t I show you both?
First, with Spark Package:
Either add this to your conf/zeppelin-env.sh
# set options to pass spark-submit command export SPARK_SUBMIT_OPTIONS="--packages com.johnsnowlabs.nlp:spark-nlp_2.11:2.0.3"
2. Or, add it to Generic Inline ConfInterpreter (at the beginning of your notebook before starting your Spark Session):
%spark.conf # spark.jars.packages can be used for adding packages into spark interpreter spark.jars.packages com.johnsnowlabs.nlp:spark-nlp_2.11:2.0.3
Second, loading an external JAR:
To build a Fat JAR all you need to do is:
$ git clone https://github.com/JohnSnowLabs/spark-nlp $ cd spark-nlp $ sbt assembly
Then you can follow one of the two ways I mentioned to add this external JAR. You just need to change “ — packages” to “ — jars” in the first option. Or for the second solution, just have “spark.jars”.
Start Spark with Spark NLP
Now we can start using Spark NLP 2.0.3 with Zeppelin 0.8.2 and Spark 2.4.1 by importing Spark NLP annotators:
import com.johnsnowlabs.nlp.base._ import com.johnsnowlabs.nlp.annotator._ import org.apache.spark.ml.Pipeline
Apache Zeppelin is going to start a new Spark session that comes with Spark NLP regardless of whether you used Spark Package or an external JAR.
Read the Mueller Report PDF file
Remember the issue about the PDF file not being a real PDF? Well, we have 3 options here:
- You can either use any OCR tools/libraries you prefer to generate a PDF or a Text file.
- Or you can use already searchable and selectable PDF files created by the community.
- Or you can just use Spark NLP!
Spark NLP comes with an OCR package that can read both PDF files and scanned images. However, I mixed option 2 with option 3. (I needed to install Tesseract 4.x+ for image-based OCR on my entire cluster so I got a bit lazy)
You can download these two PDF files from Scribd:
Of course, you can just download the Text version and read it by Spark. However, I would like to show you how to use the OCR that comes with Spark NLP.
Spark NLP OCR:
Let’s create a helper function for everything related to OCR:
import com.johnsnowlabs.nlp.util.io.OcrHelper val ocrHelper = new OcrHelper()
Now we need to read the PDF and create a Dataset from its content. The OCR in Spark NLP creates one row per page:
(DataFrame created by reading the PDF file)
As you can see I’m loading the “Volume I” of this report in the format of PDF into a Dataset. I do this locally just to show you don’t always need a cluster to use Apache Spark and Spark NLP!
TIP 1: If the PDF was actually a scanned image, we could have used these settings (but not in our use case, we found a selectable PDF):
ocrHelper.setPreferredMethod("image") ocrHelper.setFallbackMethod(false) ocrHelper.setMinSizeBeforeFallback(0)
TIP 2: You can simply convert Spark Dataset into DataFrame if needed by:
Spark NLP Pipelines and Models
NLP by Machine Learning and Deep Learning
Now it’s time to do some NLP tasks. As I mentioned at the beginning, we would like to use already pre-trained pipelines and models provided by Spark NLP in Part I. These are some of the pipelines and models that are available:
(Spark NLP pre-trained Pipelines and Models (full list))
However, I would like to use a pipeline called “explain_document_dl” first. Let’s see how we can download this pipeline, use it to annotate some inputs, and what exactly does it offer:
import com.johnsnowlabs.nlp.pretrained.PretrainedPipeline val pipeline = PretrainedPipeline("explain_document_dl", "en") // This DataFrame has one sentence for testing val testData = Seq( "Donald Trump is the 45th President of the United States" ).toDS.toDF("text") // Let's use our pre-trained pipeline to predict the test dataset pipeline.transform(testData).show
Here is the result of .show():
(Spark NLP pre-trained “explain_document_dl” pipeline)
I know! It’s a lot going on in this pipeline. Let’s start with NLP annotators we have in “explain_document_dl” pipeline:
- WordEmbeddings (GloVe 6B 100)
- NerConverter (chunking)
To my knowledge, there are some annotators inside this pipeline which are using Deep Learning powered by TensorFlow for their supervised learning. For instance, you will notice these lines when you are loading this pipeline:
pipeline: com.johnsnowlabs.nlp.pretrained.PretrainedPipeline = PretrainedPipeline(explain_document_dl,en,public/models) adding (ner-dl/mac/_sparse_feature_cross_op.so,ner-dl/mac/_lstm_ops.so) loading to tensorflow
For simplicity, I’ll select a bunch of columns separately so we can actually see some results:
(Spark NLP pre-trained “explain_document_dl” pipeline)
So this is a very complete NLP pipeline. It has lots of NLP tasks like other NLP libraries and even more like Spell checking. But, this might be a bit heavy if you are just looking for one or two NLP tasks such as POS or NER.
Let’s try another pre-trained pipeline called “entity_recognizer_dl”:
import com.johnsnowlabs.nlp.pretrained.PretrainedPipeline val pipeline = PretrainedPipeline("entity_recognizer_dl", "en") val testData = Seq( "Donald Trump is the 45th President of the United States" ).toDS.toDF("text") // Let's use our pre-trained pipeline to predict the test dataset pipeline.transform(testData).show
As you can see, using pre-trained pipeline is very easy. You just need to change its name and it will download and cache it locally. What is inside this pipeline?
- NER chunk
Let’s walk through what is happening with the NER model in both of these pipelines. The Named Entity Recognition (NER) uses Word Embeddings (GloVe or BERT) for training. I can quote one of the main maintainers of the project about what it is:
NerDLModel is the result of a training process, originated by NerDLApproach SparkML estimator. This estimator is a TensorFlow DLmodel. It follows a Bi-LSTM with Convolutional Neural Networks scheme, utilizing word embeddings for token and sub-token analysis.
You can read this full article about the use of TensorFlow graphs and how Spark NLP uses it to train its NER models:
Back to our pipeline, NER chunk will extract chunks of Named Entities. For instance, if you have Donald -> I-PER and Trump -> I-PER, it will result in Donal Trump. Take a look at this example:
(Spark NLP pre-trained “entity_recognizer_dl” pipelineCustom Pipelines)
Personally, I would prefer to build my own NLP pipelines when I am dealing with pre-trained models. This way, I have full control over what types of annotators I want to use, whether I want ML or DL models, use my own trained models in the mix, customize the inputs/outputs of each annotator, integrate Spark ML functions, and so much more!
Is it possible to create your own NLP pipeline but still take advantage of pre-trained models?
The answer is yes! Let’s look at one example:
val document = new DocumentAssembler() .setInputCol("text") .setOutputCol("document") val sentence = new SentenceDetector() .setInputCols(Array("document")) .setOutputCol("sentence") .setExplodeSentences(true) val token = new Tokenizer() .setInputCols(Array("document")) .setOutputCol("token") val normalized = new Normalizer() .setInputCols(Array("token")) .setOutputCol("normalized") val pos = PerceptronModel.pretrained() .setInputCols("sentence", "normalized") .setOutputCol("pos") val chunker = new Chunker() .setInputCols(Array("document", "pos")) .setOutputCol("pos_chunked") .setRegexParsers(Array( "<DT>?<JJ>*<NN>" )) val embeddings = WordEmbeddingsModel.pretrained() .setOutputCol("embeddings") val ner = NerDLModel.pretrained() .setInputCols("document", "normalized", "embeddings") .setOutputCol("ner") val nerConverter = new NerConverter() .setInputCols("document", "token", "ner") .setOutputCol("ner_chunked") val pipeline = new Pipeline().setStages(Array( document, sentence, token, normalized, pos, chunker, embeddings, ner, nerConverter ))
That’s it! Pretty easy and Sparky. The important part is that you can set which inputs you want for each annotator. For instance, for POS tagging, I can either use tokens, stemmed tokens, lemmatized tokens, or normalized tokens. This can change the results of annotators. Same for NerDLModel. I chose normalized tokens for both POS and Ner models since I am guessing my dataset is a bit messy and requires some cleaning.
Let’s use our customized pipeline. If you know anything about Spark ML pipeline, it has two stages. One is fitting which is where you train the models inside your pipeline. The second is predicting your new data by transforming it into a new DataFrame.
val nlpPipelineModel = pipeline.fit(muellerFirstVol) val nlpPipelinePrediction = nlpPipelineModel.transform(muellerFirstVol)
The .fit() is for decoration here as everything already comes pre-trained. We don’t have to train anything so the .transform() is where we use the models inside our pipeline to create a new DataFrame with all the predictions. But if we did have our own models or Spark ML functions which required training then the .fit() would take some time to train the models.
On a local machine, this took about 3 seconds to run. My laptop has a Core i9, 32G Memory, and Vega 20 (if this matters at all) so it is a pretty good machine.
(Apache Spark on Local machine)
This example is nowhere near a Big Data scenario where you are dealing with millions of records, sentences, or words. In fact, it’s not even small data. However, we are using Apache Spark for a reason! Let’s run this in a cluster where we can distribute our tasks.
For instance, a while back I had a bigger and much more complicated Spark NLP pipeline to process the entire French Great Debates which is called “Le Grand Débat Nationale”.
(Politoscope project: https://politoscope.org/2019/03/gdn-preliminaires/)
In the end, I was able to have my Spark NLP pipeline in a cluster with over millions of sentences generated by over 250 thousand users. These types of NLP projects are very hard to almost impossible when you are stuck in one machine.
Back to our own demo! All we need to do in the cluster is to repartition the DataFrame from 1 partition (since it’s 1 file) to something like 60 (depending on how many executors, cores per executor, etc.). This way Spark can distribute the tasks to more executors and run them in parallel:
muellerFirstVol.rdd.getNumPartitions val newMuellerFirstVolDF = muellerFirstVol.repartition(60) //Now this runs in parallel val nlpPipelineModel = pipeline.fit(newMuellerFirstVolDF) val nlpPipelinePrediction = nlpPipelineModel.transform(newMuellerFirstVolDF)
NOTE: The reason I created a new DataFrame is, the RDDs are immutable by nature. So you can’t just change their number of partitions. However, you can create a new RDDs (DataFrame) with a different number of partitions.
This time it took 0.4 seconds instead of 3 seconds to run the pipeline on a cluster. Maybe a few seconds faster in one job is not something to even notice, but you can apply this to tens of thousands of PDFs or millions of records where we can take advantage of Apache Spark distributed engine.
(Apache Spark on a cluster)
Now let’s have a look at the results of our customized pipelines. What I would like to do, is a simple grouping on chunks from NER model:
(Spark NLP: NER chunking on Mueller ReportI)
As you can see it needs some data cleaning to exclude mistaken entities such as “P.” which we will do in the second part. If we create a co-occurrence matrix of these named entities in the first Volume of Mueller Report we can visualize them in Gephi as I will explain how in the 3rd part:
(Spark NLP: Mueller Report Named Entities co-occurrence graph)
What comes next:
Congratulations! Now you know how to use Spark NLP pre-trained pipelines and models to perform NLP tasks in Apache Spark. This gives you the advantage of a distributed engine in Apache Spark to run large-scale NLP jobs over thousands of cores of CPU.
Keep in mind this was a very fast and easy way to start with Spark NLP. In the next part, I would like to experiment with a NER model that is trained by BERT word embeddings instead of GloVe, training my own POS tagger model in Spark NLP from Universal Dependency, run some data cleanings, and finally extract some keywords/phrases by POS and NER chunking.
- Spark NLP on GitHub
- Spark NLP Website
- Spark NLP examples
- Spark NLP Slack
- Spark NLP vs. other NLP libraries
- Spark NLP vs spaCy
- Spark NLP powered by TensorFlow
Please leave a comment here or tweet at me on Twitter if you have any question.
Found this article interesting? Please follow me (Maziyar Panahi) on Medium for future articles and please share! 👏