Sometimes we encounter problems processing big files using Node.js because the main thread (responsible for the event loop) can stop. For this reason, I decided to write about how to use streams in Node.js showing a funny example. Let's go! : Node.js application that downloads a huge CSV file from Google Cloud Storage, processes the data, transforms it into a JSON file, extracts some data insights, and finally compresses the file and uploads it again to the Cloud Storage. (We are going to create a terminal progress bar to visualize the process progress) Application : Main principles Streams: Interface to work with chunks of data. Readable Streams: Interface to consume data streams. Writable Streams: Interface to provide data and source to the data streams. Duplex Streams: Implement both interfaces, Readable and Writable. Transform Streams: It's a Duplex Stream used to modify the data during the process. PassThrough: It's a Transform Stream useful when we need to check or test something during the stream pipeline process. Pipeline: It's useful to handle all the stream process. : Steps to reproduce Download file stream Create a progress bar Transform each CSV file line into javascript objects Get information from the file Convert the data into JSON format Compress the data Upload the new file converted Requirements The CSV file you can find here We are going to use Node version 16.16 You need to have a Google account to access Google Cloud Services. Cloud Storage Service We need to install cloud storage library to deal with google cloud service. npm install @google-cloud/storage If you need help configuring your Cloud Storage service on your Google account, there are many tutorials that can help with that, it's not the purpose of this tutorial. Let's create our first file in folder to work with our storage. cloudStorageFileService.js src src/cloudStorageFileService.js const { Storage } = require('@google-cloud/storage') const path = require('path') const serviceKey = path.join(__dirname, '../gkeys.json') class CloudStorageFileService { // (1) constructor() { this.storage = new Storage({ projectId: 'my-project-id', keyFilename: serviceKey }) } // (2) async downloadFile(bucketName, fileName) { return await this.storage .bucket(bucketName) .file(fileName) .createReadStream() } // (3) async uploadFile(bucketName, destFileName) { return await this.storage .bucket(bucketName) .file(destFileName) .createWriteStream() } // (4) async getFileSize(bucketName, fileName) { const [metadata] = await this.storage .bucket(bucketName) .file(fileName) .getMetadata(); return metadata.size } } module.exports = CloudStorageFileService From the code sections above: Basic configurations to use Cloud Storage service, as the project id and the path with your Google Cloud credentials. Google Cloud Storage provides us a Readable Stream for downloading files. We can use it to download our huge file as a stream and not have the main Node.js thread become stuck. Google Cloud Storage also provides a Writable Stream to upload files. The last function we are going to use to create our progress bar. Progress Bar Here we are extending because we don't need to apply any modification to the stream, only to get some data to create our progress bar. PassThrough It is important to know that Streams implement interface, and we can work with to emit events and to listen to them. Event Emitter emit on src/progressPass.js const { PassThrough } = require('stream') class ProgressPass extends PassThrough { // (1) constructor(fileSize, options = {}) { super({ ...options }) this.on('data', this.processData) this.on('progress', this.showResult) this.on('close', this.finishProgress) this.bytesRead = 0 this.progress = 0 this.fileSize = fileSize this.createProgressBar() } // (2) processData(data) { this.bytesRead += data.length this.progress = (this.bytesRead / this.fileSize) * 100 this.emit('progress', Math.floor(this.progress)) } // (3) createProgressBar() { process.stdout.write("\x1B[?25l") process.stdout.write('[') for(let i = 1; i <= 101; i++) { process.stdout.write('-') } process.stdout.write(']') } // (4) showResult(progress) { process.stdout.cursorTo(progress+1) process.stdout.write('=') process.stdout.cursorTo(105) process.stdout.write(`${progress}%`) } finishProgress() { process.stdout.write("\x1B[?25h") process.stdout.write("\n") } } module.exports = ProgressPass From the code sections above: Initializing our progress bar: The file size parameter added to the constructor will be useful to create the progress percentage. event is emitted when a new chunk comes to the current stream. data is our custom event that we emit to update the progress bar. progress is the event emitted when there is no more data to pass through the stream. close Every time new data comes we get the chunk and add to the sum the chunk's length and emitting event to update the progress bar. progress Creating a unfilled progress bar like: [------------------------------------------------] 4 When the event happens we update the progress bar with the current percentage: progress [===========-------------------------------------] 25% Transforming CSV Line Into a Javascript Object Now it's time to get every line of our CSV file and convert it to a JS object. The intention of this is to manipulate the data easier and at the final step of the process, convert it to a JSON file. The strategy is to convert the binary chunk into text, and to read every line of the it. To get the values we need to split the lines using . , When we are reading using streams, by default each chunk has 16kb (we can modify it). Moments of separation could occur similar to the examples below: chunk 1: value1, value2, val chunk 2: ue3, value4, value5 Keep in mind we need to treat this kind of thing and temporarily save the previous chunk before splitting the text. To treat the transformation we are going to use Transform Stream. As I mentioned it's useful when we want to transform our data chunks. src/objectTransform.js const { Transform } = require('stream') class ObjectTranform extends Transform { // (1) constructor(options = {}) { super({ ...options }) this.headerLine = true this.keys = [] this.tailChunk = '' } // (2) _transform(chunk, encoding, callback) { const stringChunks = chunk.toString("utf8") const lines = stringChunks.split('\n') for (const line of lines) { const lineString = (this.tailChunk + line) let values = lineString.split(',') if (this.headerLine) { this.keys = values this.headerLine = false continue } if (values.length !== this.keys.length || lineString[lineString.length - 1] === ',') { this.tailChunk = line } else { const chunkObject = {} this.keys.forEach((element, index) => { chunkObject[element] = values[index] }) this.tailChunk = '' this.push(`${JSON.stringify(chunkObject)}`) } } callback() } // (3) _flush(callback) { callback() } } module.exports = ObjectTranform From the code sections above: Let's save the keys to create our JS objects using a flag. The is for saving the incomplete CSV lines. tailChunk The magic happens here. We read every line, split the text whether the lines are completed, and convert it to JS object. is called when there is no more data to be processed. flush Extracting Information Now that we have JS objects we can extract some data from the chunks. Here again, we are going to use PassThrough Stream because we only want to check some information. src/monitorTransform.js const { PassThrough } = require('stream') class MonitorTransform extends PassThrough { constructor(options = {}) { super({ ...options }) this.on('data', this.processData) this.on('close', this.showResult) this.totalCrimes = 0 this.boroughTotal = new Map() this.monthTotal = new Map() this.yearTotal = new Map() } // (1) processData(data) { const row = JSON.parse(data.toString()) const rowCrimeQuantity = Number(row.value) || 0 const currentBoroughTotal = Number(this.boroughTotal.get(row.borough)) || 0 const currentMonthTotal = Number(this.monthTotal.get(row.month)) || 0 const currentYearTotal = Number(this.yearTotal.get(row.year)) || 0 this.totalCrimes += rowCrimeQuantity this.boroughTotal.set(row.borough, currentBoroughTotal + rowCrimeQuantity) this.monthTotal.set(row.month, currentMonthTotal + rowCrimeQuantity) this.yearTotal.set(row.year, currentYearTotal + rowCrimeQuantity) } // (2) showResult() { console.log(this.totalCrimes) console.log(this.boroughTotal) console.log(this.monthTotal) console.log(this.yearTotal) } } module.exports = MonitorTransform From the code sections above: When a event happens we can process the chunk and get whatever we want from it. data If there is no more data to process we show the extracted information. Transforming The File to a JSON File All the chunks are in a JS object format, but when we save the file, this won't work properly. We need to treat it by transforming it into an array of objects using Transform Stream. src/jsonTransform.js const { Transform } = require('stream') class JsonTransform extends Transform { constructor (options = {}) { super({ ...options }) this.once('data', this.startJson) this.firstLine = true } // (1) startJson() { this.push('[') } // (2) _transform (chunk, encoding, callback) { const row = JSON.parse(chunk.toString()) const newChunk = this.firstLine ? `${JSON.stringify(row)}` : `,${JSON.stringify(row)}` this.push(newChunk) if(this.firstLine) { this.firstLine = false } callback() } // (3) _flush(callback) { this.push(']') callback() } } module.exports = JsonTransform From the code section above: Our array should start with , it happens only in the first chunk. Here we are using instead of , to listen only one time the event . [ once on data Here we are putting to separate our JS objects. , At the end of the file we need to close the array with ] Compressing The Chunks We don't need to exert a lot of effort for this step because Node.js has a core library to help. The from Node.js is a Transform Stream used to compress chunks of data during the streaming process. We only need to add this to our stream pipeline. zlib Creating The Stream Pipeline Process The pipeline process needs to have at least the Readable and the Writable Stream, and it can have how ever many Transform Streams and PassThrough Streams we want. To handle the pipeline process we are going to use the Builder design pattern, to control the creation process of our pipeline and throw some errors. src/fileProcessor.js const { pipeline } = require('stream/promises') class FileProcessor { constructor() { this.readableStream = null this.transforms = [] this.writableStream = null } // (1) setReadable(readableStream) { this.readableStream = readableStream return this } // (2) addTransforms(transformsStream) { this.transforms = transformsStream return this } // (3) setWritable(writableStream) { this.writableStream = writableStream return this } // (4) async execute() { try { if(!this.readableStream) { throw Error('Readable stream not implemented') } if(!this.writableStream) { throw Error('Writable stream not implemented') } await pipeline(this.readableStream, ...this.transforms, this.writableStream) } catch (error) { console.log(error) } } } module.exports = FileProcessor From the code section above: Setting our Readable Stream, in this case, should be the download function from Google Cloud Storage. Adding Transforms to our pipeline. All the Transform and PassThrough should be added as an array. The Writable Stream is the upload function from Google Cloud Storage. To execute the pipeline if there is the Writable and Readable Stream. The function must have at least these two functions. pipeline Joining Everything It's time to import everything we've created, define the constants, and execute the entire app. src/index.js // (1) const FileProcessor = require('./fileProcessor') const JsonTransform = require('./jsonTransform') const MonitorTransform = require('./monitorTransform') const ObjectTransform = require('./objectTransform') const { createGzip } = require('node:zlib') const CloudStorageFileService = require('./cloudStorageFileService') const ProgressPass = require('./progressPass') const fileProcessor = new FileProcessor() const cloudFileService = new CloudStorageFileService() // (2) const gzip = createGzip() // (3) const bucketName = 'myfileuploads' const fileName = 'london_crime_by_lsoa2.csv' const destFileName = 'london_crime_by_lsoa2.tar.gz' // (4) ; (async () => { try { const fileSize = await cloudFileService.getFileSize(bucketName, fileName) await fileProcessor .setReadable(await cloudFileService.downloadFile(bucketName, fileName)) .addTransforms([new ProgressPass(fileSize), new ObjectTransform(), new MonitorTransform(), new JsonTransform(), gzip]) .setWritable(await cloudFileService.uploadFile(bucketName, destFileName)) .execute() } catch (e) { console.log(e) } })() From the code sections above: Here are all the things we've created, services, Transform Streams and PassThrough Streams. As I mentioned, this is the Node.js library to compress data. We need to define our Cloud Storage bucket and file. This is our final function to execute everything together, defining the Readable, Transforms, and the Writable. Takeaways You can use this approach to import and export data between databases or generate reports. Streams are useful to process and treat audio and video files and for file conversion. You can take a look at the entire code here Also published here