Sometimes we encounter problems processing big files using Node.js because the main thread (responsible for the event loop) can stop. For this reason, I decided to write about how to use streams in Node.js showing a funny example. Let's go!
Application: Node.js application that downloads a huge CSV file from Google Cloud Storage, processes the data, transforms it into a JSON file, extracts some data insights, and finally compresses the file and uploads it again to the Cloud Storage. (We are going to create a terminal progress bar to visualize the process progress)
Main principles:
Streams: Interface to work with chunks of data.
Readable Streams: Interface to consume data streams.
Writable Streams: Interface to provide data and source to the data streams.
Duplex Streams: Implement both interfaces, Readable and Writable.
Transform Streams: It's a Duplex Stream used to modify the data during the process.
PassThrough: It's a Transform Stream useful when we need to check or test something during the stream pipeline process.
Pipeline: It's useful to handle all the stream process.
Steps to reproduce:
Download file stream
Create a progress bar
Transform each CSV file line into javascript objects
Get information from the file
Convert the data into JSON format
Compress the data
Upload the new file converted
The CSV file you can find here
We are going to use Node 16.16
version
You need to have a Google account to access Google Cloud Services.
We need to install cloud storage library to deal with google cloud service.
npm install @google-cloud/storage
If you need help configuring your Cloud Storage service on your Google account, there are many tutorials that can help with that, it's not the purpose of this tutorial.
Let's create our first file cloudStorageFileService.js
in src
folder to work with our storage.
src/cloudStorageFileService.js
const { Storage } = require('@google-cloud/storage')
const path = require('path')
const serviceKey = path.join(__dirname, '../gkeys.json')
class CloudStorageFileService {
// (1)
constructor() {
this.storage = new Storage({
projectId: 'my-project-id',
keyFilename: serviceKey
})
}
// (2)
async downloadFile(bucketName, fileName) {
return await this.storage
.bucket(bucketName)
.file(fileName)
.createReadStream()
}
// (3)
async uploadFile(bucketName, destFileName) {
return await this.storage
.bucket(bucketName)
.file(destFileName)
.createWriteStream()
}
// (4)
async getFileSize(bucketName, fileName) {
const [metadata] = await this.storage
.bucket(bucketName)
.file(fileName)
.getMetadata();
return metadata.size
}
}
module.exports = CloudStorageFileService
From the code sections above:
Here we are extending PassThrough
because we don't need to apply any modification to the stream, only to get some data to create our progress bar.
It is important to know that Streams implement Event Emitter
interface, and we can work with emit
to emit events and on
to listen to them.
src/progressPass.js
const { PassThrough } = require('stream')
class ProgressPass extends PassThrough {
// (1)
constructor(fileSize, options = {}) {
super({ ...options })
this.on('data', this.processData)
this.on('progress', this.showResult)
this.on('close', this.finishProgress)
this.bytesRead = 0
this.progress = 0
this.fileSize = fileSize
this.createProgressBar()
}
// (2)
processData(data) {
this.bytesRead += data.length
this.progress = (this.bytesRead / this.fileSize) * 100
this.emit('progress', Math.floor(this.progress))
}
// (3)
createProgressBar() {
process.stdout.write("\x1B[?25l")
process.stdout.write('[')
for(let i = 1; i <= 101; i++) {
process.stdout.write('-')
}
process.stdout.write(']')
}
// (4)
showResult(progress) {
process.stdout.cursorTo(progress+1)
process.stdout.write('=')
process.stdout.cursorTo(105)
process.stdout.write(`${progress}%`)
}
finishProgress() {
process.stdout.write("\x1B[?25h")
process.stdout.write("\n")
}
}
module.exports = ProgressPass
From the code sections above:
Initializing our progress bar:
data
event is emitted when a new chunk comes to the current stream.progress
is our custom event that we emit to update the progress bar.close
is the event emitted when there is no more data to pass through the stream.Every time new data comes we get the chunk and add to the sum the chunk's length and emitting progress
event to update the progress bar.
Creating a unfilled progress bar like:
[------------------------------------------------]
4 When the progress
event happens we update the progress bar with the current percentage:
[===========-------------------------------------] 25%
Now it's time to get every line of our CSV file and convert it to a JS object. The intention of this is to manipulate the data easier and at the final step of the process, convert it to a JSON file.
The strategy is to convert the binary chunk into text, and to read every line of the it.
To get the values we need to split the lines using ,
.
When we are reading using streams, by default each chunk has 16kb (we can modify it). Moments of separation could occur similar to the examples below:
chunk 1:
value1, value2, val
chunk 2:
ue3, value4, value5
Keep in mind we need to treat this kind of thing and temporarily save the previous chunk before splitting the text.
To treat the transformation we are going to use Transform Stream. As I mentioned it's useful when we want to transform our data chunks.
src/objectTransform.js
const { Transform } = require('stream')
class ObjectTranform extends Transform {
// (1)
constructor(options = {}) {
super({ ...options })
this.headerLine = true
this.keys = []
this.tailChunk = ''
}
// (2)
_transform(chunk, encoding, callback) {
const stringChunks = chunk.toString("utf8")
const lines = stringChunks.split('\n')
for (const line of lines) {
const lineString = (this.tailChunk + line)
let values = lineString.split(',')
if (this.headerLine) {
this.keys = values
this.headerLine = false
continue
}
if (values.length !== this.keys.length || lineString[lineString.length - 1] === ',') {
this.tailChunk = line
} else {
const chunkObject = {}
this.keys.forEach((element, index) => {
chunkObject[element] = values[index]
})
this.tailChunk = ''
this.push(`${JSON.stringify(chunkObject)}`)
}
}
callback()
}
// (3)
_flush(callback) {
callback()
}
}
module.exports = ObjectTranform
From the code sections above:
tailChunk
is for saving the incomplete CSV lines.flush
is called when there is no more data to be processed.Now that we have JS objects we can extract some data from the chunks.
Here again, we are going to use PassThrough Stream because we only want to check some information.
src/monitorTransform.js
const { PassThrough } = require('stream')
class MonitorTransform extends PassThrough {
constructor(options = {}) {
super({ ...options })
this.on('data', this.processData)
this.on('close', this.showResult)
this.totalCrimes = 0
this.boroughTotal = new Map()
this.monthTotal = new Map()
this.yearTotal = new Map()
}
// (1)
processData(data) {
const row = JSON.parse(data.toString())
const rowCrimeQuantity = Number(row.value) || 0
const currentBoroughTotal = Number(this.boroughTotal.get(row.borough)) || 0
const currentMonthTotal = Number(this.monthTotal.get(row.month)) || 0
const currentYearTotal = Number(this.yearTotal.get(row.year)) || 0
this.totalCrimes += rowCrimeQuantity
this.boroughTotal.set(row.borough, currentBoroughTotal + rowCrimeQuantity)
this.monthTotal.set(row.month, currentMonthTotal + rowCrimeQuantity)
this.yearTotal.set(row.year, currentYearTotal + rowCrimeQuantity)
}
// (2)
showResult() {
console.log(this.totalCrimes)
console.log(this.boroughTotal)
console.log(this.monthTotal)
console.log(this.yearTotal)
}
}
module.exports = MonitorTransform
From the code sections above:
data
event happens we can process the chunk and get whatever we want from it.All the chunks are in a JS object format, but when we save the file, this won't work properly. We need to treat it by transforming it into an array of objects using Transform Stream.
src/jsonTransform.js
const { Transform } = require('stream')
class JsonTransform extends Transform {
constructor (options = {}) {
super({ ...options })
this.once('data', this.startJson)
this.firstLine = true
}
// (1)
startJson() {
this.push('[')
}
// (2)
_transform (chunk, encoding, callback) {
const row = JSON.parse(chunk.toString())
const newChunk = this.firstLine ? `${JSON.stringify(row)}` : `,${JSON.stringify(row)}`
this.push(newChunk)
if(this.firstLine) {
this.firstLine = false
}
callback()
}
// (3)
_flush(callback) {
this.push(']')
callback()
}
}
module.exports = JsonTransform
From the code section above:
[
, it happens only in the first chunk. Here we are using once
instead of on
, to listen only one time the event data
.,
to separate our JS objects.]
We don't need to exert a lot of effort for this step because Node.js has a core library to help.
The zlib
from Node.js is a Transform Stream used to compress chunks of data during the streaming process. We only need to add this to our stream pipeline.
The pipeline process needs to have at least the Readable and the Writable Stream, and it can have how ever many Transform Streams and PassThrough Streams we want.
To handle the pipeline process we are going to use the Builder design pattern, to control the creation process of our pipeline and throw some errors.
src/fileProcessor.js
const { pipeline } = require('stream/promises')
class FileProcessor {
constructor() {
this.readableStream = null
this.transforms = []
this.writableStream = null
}
// (1)
setReadable(readableStream) {
this.readableStream = readableStream
return this
}
// (2)
addTransforms(transformsStream) {
this.transforms = transformsStream
return this
}
// (3)
setWritable(writableStream) {
this.writableStream = writableStream
return this
}
// (4)
async execute() {
try {
if(!this.readableStream) {
throw Error('Readable stream not implemented')
}
if(!this.writableStream) {
throw Error('Writable stream not implemented')
}
await pipeline(this.readableStream, ...this.transforms, this.writableStream)
} catch (error) {
console.log(error)
}
}
}
module.exports = FileProcessor
From the code section above:
pipeline
function must have at least these two functions.It's time to import everything we've created, define the constants, and execute the entire app.
src/index.js
// (1)
const FileProcessor = require('./fileProcessor')
const JsonTransform = require('./jsonTransform')
const MonitorTransform = require('./monitorTransform')
const ObjectTransform = require('./objectTransform')
const { createGzip } = require('node:zlib')
const CloudStorageFileService = require('./cloudStorageFileService')
const ProgressPass = require('./progressPass')
const fileProcessor = new FileProcessor()
const cloudFileService = new CloudStorageFileService()
// (2)
const gzip = createGzip()
// (3)
const bucketName = 'myfileuploads'
const fileName = 'london_crime_by_lsoa2.csv'
const destFileName = 'london_crime_by_lsoa2.tar.gz'
// (4)
;
(async () => {
try {
const fileSize = await cloudFileService.getFileSize(bucketName, fileName)
await fileProcessor
.setReadable(await cloudFileService.downloadFile(bucketName, fileName))
.addTransforms([new ProgressPass(fileSize), new ObjectTransform(), new MonitorTransform(), new JsonTransform(), gzip])
.setWritable(await cloudFileService.uploadFile(bucketName, destFileName))
.execute()
} catch (e) {
console.log(e)
}
})()
From the code sections above:
You can take a look at the entire code here
Also published here