How to Create a Dynamic Pipeline Route in Go

Written by yudaph | Published 2022/05/05
Tech Story Tags: golang | goroutines | pipeline | go | programming | coding | software-development | data-pipeline

TLDRThe pipeline that we will create has a flow diagram as shown below, after the “Body” pipeline the car can be built directly or added features according to its type. The pipeline is inspired by Builder Design Pattern and Optional Pattern in Go, I came up with an idea to make it happen. We will create 5 pipelines namely “BaseBuilder’, “bodyBuilder”, ‘FeatureABuilder’ and “FeatureCBuilder“ which serves to create the basis of the car model or add features.via the TL;DR App

After watching some videos tutorial about Go, I was wondering if it was possible to create a pipeline with dynamic routes. Inspired by Builder Design Pattern and Optional Pattern in Go, I came up with an idea to make it happen.

How to do it ?

Imagine we are in a car factory, one car model has several types. When producing a car of the same model, there will be some of the same manufacturing steps, such as the manufacture of the chassis and body, then each type will be added a different set of features. The pipeline that we will create has a flow diagram as shown below, after the “Body” pipeline the car can be built directly or added features according to its type.

Pipeline package

First we will create a car structure which has several fields such as base, body, feature A, feature B, and feature C. After that, we will create several methods to fill the fields.

var Process = func(channel chan *CarBuilder) func(builder *CarBuilder) {
	return func(builder *CarBuilder) {
		channel <- builder
	}
}

type NextProcess func(builder *CarBuilder)

type CarBuilder struct {
	car
	Next []NextProcess
}

func (c *CarBuilder) nextProcess() {
	next := c.Next[0]
	c.Next = c.Next[1:]
	next(c)
}

func (c *CarBuilder) Build() car {
	return c.car
}

After that we define a variable named “Process” which is inspired by the Optional Pattern, and a struct “CarBuilder” becomes the builder for the car containing the car fields and a list of features we want to add. Then create a “nextProcess” method to move to the next pipeline and a “Build” method to return the car we built.

var Process = func(channel chan *CarBuilder) func(builder *CarBuilder) {
	return func(builder *CarBuilder) {
		channel <- builder
	}
}

type NextProcess func(builder *CarBuilder)

type CarBuilder struct {
	car
	Next []NextProcess
}

func (c *CarBuilder) nextProcess() {
	next := c.Next[0]
	c.Next = c.Next[1:]
	next(c)
}

func (c *CarBuilder) Build() car {
	return c.car
}

The last stage is defining the pipelines, we will create 5 pipelines namely “BaseBuilder”, “BodyBuilder”, “FeatureABuilder”, “FeatureBBuilder”, and “FeatureCBuilder” which serves to create the basis of the car model or add features.

func BaseBuilder(channel <-chan *CarBuilder, nextChannel chan<- *CarBuilder) {
	for {
		carBuilder := <-channel
		carBuilder.setBase(true)
		nextChannel <- carBuilder
	}
}

func BodyBuilder(channel <-chan *CarBuilder) {
	for {
		carBuilder := <-channel
		carBuilder.setBody(true)
		carBuilder.nextProcess()
	}
}

func FeatureABuilder(channel <-chan *CarBuilder) {
	for {
		carBuilder := <-channel
		carBuilder.setFeatureA(true)
		carBuilder.nextProcess()
	}
}

func FeatureBBuilder(channel <-chan *CarBuilder) {
	for {
		carBuilder := <-channel
		carBuilder.setFeatureB(true)
		carBuilder.nextProcess()
	}
}

func FeatureCBuilder(channel <-chan *CarBuilder) {
	for {
		carBuilder := <-channel
		carBuilder.setFeatureC(true)
		carBuilder.nextProcess()
	}
}

Main Function

To use it in main function (main.go), first we define a “waitgroup” to wait for all processes to finish, and a channel for each pipelines with big buffer to speed up :D

var wg sync.WaitGroup
baseChan := make(chan *pipe.CarBuilder, 1000)
bodyChan := make(chan *pipe.CarBuilder, 1000)
featureAChan := make(chan *pipe.CarBuilder, 1000)
featureBChan := make(chan *pipe.CarBuilder, 1000)
featureCChan := make(chan *pipe.CarBuilder, 1000)
buildChan := make(chan *pipe.CarBuilder, 1000)

Also prepare a list of the types of cars we want and what features are available in those types, to facilitate the manufacture of cars on a large scale. The “NextProcess” slice contains the channels representing the features we needed to create the type.

typeStandard := []pipe.NextProcess{
  pipe.Process(buildChan),
}

typeA := []pipe.NextProcess{
  pipe.Process(featureAChan), 
  pipe.Process(featureCChan), 
  pipe.Process(buildChan),
}

typeB := []pipe.NextProcess{
  pipe.Process(featureAChan), 
  pipe.Process(featureBChan), 
  pipe.Process(buildChan),
}

typeC := []pipe.NextProcess{
  pipe.Process(featureBChan), 
  pipe.Process(featureCChan), 
  pipe.Process(buildChan),
}

typeFullFeature := []pipe.NextProcess{
  pipe.Process(featureAChan), 
  pipe.Process(featureBChan), 
  pipe.Process(featureCChan), 
  pipe.Process(buildChan),
}

features := [][]pipe.NextProcess{
	typeStandard, typeA, typeB, typeC, typeFullFeature,
}

After that, define the worker pool we want, I use 4 for each pipeline because there is no significant increase if using more than that (in my device). We can also define a car counter to determine how many cars have been made.

var carCount int64
for i := 1; i <= 4; i++ {
   go pipe.BaseBuilder(baseChan, bodyChan)
   go pipe.BodyBuilder(bodyChan)
   go pipe.FeatureABuilder(featureAChan)
   go pipe.FeatureBBuilder(featureBChan)
   go pipe.FeatureCBuilder(featureCChan)
   go func() {
      for {
         readyToBuild := <-buildChan
         readyToBuild.Build()
         wg.Done()
         atomic.AddInt64(&carCount, 1)
      }
   }()
}

Define how many cars we want to build and their type, in this case we will build 100000 cars.

var testBuilds []pipe.CarBuilder
for i := 0; i < 1000000; i++ {
   testBuilds = append(testBuilds, pipe.CarBuilder{
      Next: features[i%5],
   })
}

Finally, put the cars we defined above one by one into the first pipeline “BaseBuilder” and let the magic happen 😁

defer func() func() {
   start := time.Now()
   return func() { fmt.Println(time.Since(start), carCount) }
}()()
for _, testBuild := range testBuilds {
   wg.Add(1)
   t := testBuild
   baseChan <- &t
}
wg.Wait(

What is the key to creating dynamic pipeline route in go ?

The key is in the “Process” variable and the “nextProcess” method. In the process variable, we can input the channel of the pipeline we want to visit, while the “nextProcess” method will pass the CarBuilder pointer to the “Process” variable, which means it sends the CarBuilder pointer to the next pipeline.

var Process = func(channel chan *CarBuilder) func(builder *CarBuilder) {
	return func(builder *CarBuilder) {
		channel <- builder
	}
}

type NextProcess func(builder *CarBuilder)

type CarBuilder struct {
	car
	Next []NextProcess
}

func (c *CarBuilder) nextProcess() {
	next := c.Next[0]
	c.Next = c.Next[1:]
	next(c)
}

func (c *CarBuilder) Build() car {
	return c.car
}

I hope some of you find this helpful. If you’re interested in seeing all the source code, you can find it on https://github.com/yudaph/dynamic-pipeline.


Feel free to leave suggestions or criticisms in the comments section!


Thank you!

Also published here.


Written by yudaph | Enjoy coding
Published by HackerNoon on 2022/05/05