In this article, we cover how to use pipeline patterns in python data engineering projects. Here are the steps:
Let's get into it!
The functional pipeline is a design pattern mostly used in the functional programming paradigm, where data flows through a sequence of stages and the output of the previous stage is the input of the next. Each step can be thought of as a filter operation that transforms the data in some way.
This pattern is most suitable for map, filter and reduces operations. It also provides a clean, readable and more sustainable code in data engineering projects.
For example, let's take an input text which has to go through a series of transformations,
These pipeline functions are simplified to demonstrate the use case, In a real-life scenario, it would be a lot more complex.
Let's create the simple transformation functions.
#pipeline_functions.py
import re
def remove_spaces(string):
output = string.replace(' ', '')
print(f"""{remove_spaces.__name__}() ==> {output}""")
return output
def remove_special_chars(string):
output = re.sub("[^A-Za-z0-9]", "", string)
print(f"""{remove_special_chars.__name__}() ==> {output}""")
return output
def lowercase(string):
output = string.lower()
print(f"""{lowercase.__name__}() ==> {output}""")
return output
fastcore is a utility that has a lot of python goodies to make coding faster, easier, and more maintainable. It borrows some ideas from other languages like Julia, Ruby, and Haskell. It also adds functional programming patterns, simplified parallel processing, and a lot more.
For our pipeline implementation, we will be using fastcore transform module.
Do checkout -> https://fastcore.fast.ai/
$ pip install fastcore
#main.py
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase
def main(input_string):
# Creates a pipeline with a list of functions
pipe = Pipeline([remove_spaces, remove_special_chars, lowercase])
# Invokes pipeline
output = pipe(input_string)
print(f"""output ==> {output}""")
if __name__ == '__main__':
text = input("Enter input string: ")
main(text)
$ python main.py
Enter input string: Hello World!
remove_spaces() ==> HelloWorld!
remove_special_chars() ==> HelloWorld
lowercase() ==> helloworld
output ==> helloworld
As you can see that entered input text gets passed through the pipeline from left to right order and manipulates input text in each step and returns the final output. We can even go one step further to have more dynamic pipeline functions by getting pipeline functions at runtime. So this list of functions can also be serialized and persisted for later use.
#main_dynamic.py
import sys
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase
def main(input_string, pipe_funcs):
# Creates a pipeline with a list of functions using using globals()
pipe = Pipeline([globals()[func] for func in pipe_funcs])
# Invokes pipeline
output = pipe(input_string)
print(f"""output ==> {output}""")
if __name__ == '__main__':
text = input("Enter input string: ")
funcs = sys.argv[1:]
main(text, funcs)
$ python main_dynamic.py remove_spaces lowercase
Enter input string: Hello World 123$
remove_spaces() ==> HelloWorld123$
lowercase() ==> helloworld123$
output ==> helloworld123$
So the pipeline pattern implementation in data engineering components makes it easier to write complex data processing operations. And fastcore utility makes it even better.
If you have found this tutorial helpful, or have any suggestions do let me know!
Previously published at https://sureshdsk.dev/pipeline-pattern-in-python-data-engineering