Elixir: Domain Driven Design with Actor Model

Written by naveennegi | Published 2017/05/13
Tech Story Tags: software-architecture | elixir | ddd

TLDRvia the TL;DR App

Couple of months back, I was listening to Eric Evan’s pod cast on Software engineering radio where he mentioned that actor model is a great fit for modeling aggregate root in domain driven design, I was like what, wait? ain’t OO languages the only way to model concepts like aggregate, sagas, domain service, repositories or bounded Contexts in Domain driven design ?

After hours of googling and days of reading and weeks of asking questions on forums, I realized that modeling DDD concepts is more explicit and easy in Elixir because of the language constructs it provides. For example using GenServer for aggregate root, OTP application for each Bounded Context, putting all of these Bounded Context together as an umbrella App.

What is an Aggregate Root ?

Essentially aggregate is pattern in domain driven design, Aggregate is a cluster of your domain objects which can be treated as a singe unit.

Let’s say, you are writing a blog engine and you have identified some domain models like

  1. Blog
  2. Post
  3. Comment
  4. Like

A Blog contains multiple Posts, a post contains multiple Comments and Likes, while defining relationship between domain models, it is a good practice to define relationship in such a way that object graph is acyclic, that is, there are no cycles. for example a blog might have posts but post should not contain reference to blog, similarly blog might contain comments but comment should not contain reference to post.

Blog, Post, Comment and Likes can be considered as a collection of objects which defines a aggregate in our application. Next thing is to identify a model among these models and promote it to be root (called aggregate root) and make sure that all the operations/access on these models are happening through the aggregate root.

In our case we can promote Blog model to be aggregate root, and make sure that we only access Post, like Blog.Post or Comments like Blog.Post.Comment, this approach guarantees that our models are being modified only via our aggregate roots and we are not breaking any invariant.

DDD and Phoenix 1.3

If you have read about latest changes in Phoenix framework (1.3), you would realize that it forces developer to think about their models upfront and it encourages the concepts like Bounded Context and aggregate, wherein user need to specify context (more in line with Bounded context in DDD) to access resource (there are no models anymore). For example Blog, Post, Comment and Likes can be grouped together and accessed by using a Blog Context object. For detailed information take a look at this keynote by Chris McCord at ElixirConf 2017.

Phoenix framework is awesome, but it is not my application ( look here if not convinced), I like to think of Phoenix as only a web interface for my application and should only be responsible for exposing endpoints to interact with my application with minimal business logic. Following this approach means that I am going to move out my business logic from Phoenix app and put it somewhere else, to achieve this I will use Umbrella project and cut out our Bounded Context in our application into different OTP application within single Umbrella project.

Aggregate root as Elixir Process

Let’s consider an application where we are storing and retrieving user information such as name, age, gender and educational details, so we have following models.

NOTE: If you want to look the the entire repository, you can find it here.

  • BasicInfo module containing name, age and gender fields.

defmodule Bhaduli.User.BasicInfo dodefstruct [name: nil, age: nil, gender: nil]

def new(basic_info) do%Bhaduli.User.BasicInfo{name: basic_info.name, age: basic_info.age, gender: basic_info.gender}endend

  • EducationalDetails module containing graduation, senior_secondary and intermediate fields

defmodule Bhaduli.User.EducationalDetails dodefstruct graduation: nil, senior_secondary: nil, intermediate: nil

def new(educational_details) dostruct(Bhaduli.User.EducationalDetails, educational_details)endend

  • User module which contains user_id, basicInfo and EducationalDetails model.

defmodule Bhaduli.User do

alias Bhaduli.User.{BasicInfo, EducationalDetails}

defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

...

User Module as Aggregate root

I have got three models, basicInfo, educationalDetails and User. I am going to make User as my aggregate root and expose methods to save and retrieve EducationalDetails and BasicInfo as shown below.

defmodule Bhaduli.User do

alias Bhaduli.User.{BasicInfo, EducationalDetails}

defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

def update(id, %BasicInfo{} = basic_info) doend

def update(id, %EducationalDetails{} = educational_details) doend

def get_basic_info(id) doend

def get_educational_details(id) doend

def get(id) doend

So we have got basic skeleton for User aggregate, but still we haven’t really done anything interesting. Like I said earlier I want to model my aggregate root as a long running elixir process and way to do that is to use GenServer.

But before we do that I want to explain how entire flow should work. Each user will be an Elixir process, for example

  1. If a new user is created with user_id “Erin” than we would create a new process with user_id “Erin”.
  2. If we try to get user (user_id “Mustang”) for the first time , we will fetch it from database and spawn a new process with given user_id. Now we will have a total of two user processes one each for “Erin” and “Mustang”.
  3. If an existing user (user_id “Envy”) is updated and a process for that user has not been created yet than we are going to spawn new process and load its state from Database and update it. Now we will have three processes each for “Erin”, “Mustang” and “Envy”.
  4. We will keep one process for each user and keep it loaded in memory. If user process is already running we will read from it and return otherwise we go to database and spawn a new process.
  5. We will add a supervisor for user process so that every time a process crashes we create a new one and rehydrate it from database.

Process Registry

Even though I have outlined overall flow of application, there are few things that needs to be addressed first.

  1. Whenever a new process is created, we get back a PID. Since we are creating one process for each user, soon we will have thousands of processes . Anyone who want to get user data will have to know PID of each user process in order to interact with it.
  2. Having a supervisor for user process makes our application fault tolerant but also adds a complexity because every time a process crashes supervisor is going to create a new one and this new process will have a different PID than the one that had crashed.

Though we can hand-roll our process registration mechanism,but it will soon get hard to track all of these processes as we intend to create thousands of them. On top of it, since supervisor is going to create a new process for every crashed one, propagating new process Id to the client would add more complexity.

Luckily we have a module in Elixir called Registry, which allows us to map PID to any given string/atom, so that we don’t have to deal with cryptic PIDs. In our case we want to client to be able to interact with our application only by using user_id, so we would be mapping PID to user_id.

Registry can be started with Registry.start_link/3, once registry is started we need some kind of mechanism which will map our user_id to the PID of spawned process and way to do that is to implement a function that returns a :via tuple, based on a value we provide.

Our via_tuple function, along with the Registry module, will allows us to spawn a process for a given user_id and make subsequent calls on the process using the same id, rather than a PID.

Our :via_tuple function

def via_tuple(user_id) do{:via, Registry, {:user_process_registry, user_id}}end

GenServer provides the plumbing to make use of this :via tuple and automatically handle the registration of our name (user_id) to the actual process.

we will use this via_tuple function while spawning a new user process(User module)

#User.ex

def start_link(name) doGenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))end

Aggregate Root as GenServer

In order to make User aggregate an Elixir process, I am going to use GenServer behavior in User module.

In our main application file, start the registry. Here we are starting the registry by giving it a unique name as `user_process_registry` and supervising it.

defmodule Bhaduli douse Application

def start(_type, _args) doimport Supervisor.Spec, warn: false

children = [supervisor(Registry, [:unique, :user_process_registry]),supervisor(Bhaduli.UserSupervisor, [])]opts = [strategy: :one_for_one, name: Bhaduli.Supervisor]Supervisor.start_link(children, opts)endend

User Supervisor, we are defining a Supervisor for user module by giving it a simple_one_for_one strategy and module name.

defmodule Bhaduli.UserSupervisor douse Supervisor

def start_link doSupervisor.start_link(__MODULE__,nil, name: :user_sup)end

def init(_) dochildren = [worker(Bhaduli.User, [])]supervise(children, strategy: :simple_one_for_one)endend

Now, we will use GenServer behaviour in User module and alias BasicInfo and Educational Details.

defmodule Bhaduli.User do

use GenServer

alias Bhaduli.User.{BasicInfo, EducationalDetails}

...

Next we will define a User struct to contain user_id, basic_info and educational_details.

defmodule Bhaduli.User do

use GenServer

alias Bhaduli.User.{BasicInfo, EducationalDetails}

@registry :user_process_registry

defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

Let’s write a start_link/3 method in User module which will be called by User supervisor while starting User process. Here we are passing three arguments to start_link function

  1. __MODULE__, this is the module name.
  2. User struct as initial argument.
  3. Third parameter in alias for the spawned process, which allows us to access this process with given alias, we are using via_tuple function because GenServer provides the plumbing to make use of this :via tuple and automatically handle the registration of our alias (user_id) to the actual process.

#User.ex

def start_link(name) doGenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))end

Now comes the interesting part, we are defining a init callback which will be called when GenServer is started. It is this method which will be responsible for loading the User state from database.

#User.ex

def init(%Bhaduli.User{} = user) doGenServer.cast(self(), {:populate_user}){:ok, user}end

Inside init we are calling handle_callback function, this callback goes to UserRepository and fetches the data from database, If a user is found it is returned otherwise and error is returned.

#User.ex

def handle_cast({:populate_user}, user) docase Bhaduli.UserRepository.get(user.user_id) do{:error, _} -> {:noreply, user}{:ok, user} -> {:noreply, user}endend

It is important to note that GenServer won’t start until init method returns. for this reason we are using handle_cast method instead of handle_call because handle_cast can load data asynchronously and init method don’t have to wait for it to finish.

Once we have done all this plumbing we can get the PID of any process for a given user_id from Registry as shown below.

[{pid, _}] = Registry.lookup(@registry, id)

Getting basic_info for a given user_id

def get_basic_info(id) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.call(pid, {:basic_info})end

We are using handle_call for getting basic_info as we are going to wait for reply. Interesting thing to note here is that we are not making database call (Guess why ?)

def handle_call({:basic_info}, pid, state) do{:reply, state.basic_info, state}end

Similarly for educational_details

def get_educational_details(id) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.call(pid, {:educational_details})end

def handle_call({:educational_details}, pid, state) do{:reply, state.educational_details, state}end

For updating basic_info and educational details we are going to use handle_cast as we don’t need to wait for response from GenServer callback.

def update(id, %BasicInfo{} = basic_info) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.cast(pid, {:update_basic_info, basic_info})end

def update(id, %EducationalDetails{} = educational_details) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.cast(pid, {:update_educational_details, educational_details})end

#Callbacks

def handle_cast({:update_basic_info, basic_info}, user) douser = %Bhaduli.User{user | basic_info: basic_info}{:noreply, user}end

def handle_cast({:update_educational_details, educational_details}, user) douser = %Bhaduli.User{user | educational_details: educational_details}{:noreply, user}end

Putting it all together.

#User.exdefmodule Bhaduli.User douse GenServeralias Bhaduli.User.{BasicInfo, EducationalDetails}@registry :user_process_registrydefstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

def start_link(name) doGenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))end

def init(%Bhaduli.User{} = user) doGenServer.cast(self(), {:populate_user}){:ok, user}end

def update(id, %BasicInfo{} = basic_info) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.cast(pid, {:update_basic_info, basic_info})end

def update(id, %EducationalDetails{} = educational_details) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.cast(pid, {:update_educational_details, educational_details})end

def get_basic_info(id) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.call(pid, {:basic_info})end

def get_educational_details(id) do[{pid, _}] = Registry.lookup(@registry, id)GenServer.call(pid, {:educational_details})end

def get(id) docase Registry.lookup(@registry, id) do[{pid, _}] -> Registry.lookup(@registry, id)GenServer.call(pid, {})[] -> {:error, :not_found}end

end

def handle_cast({:update_basic_info, basic_info}, user) douser = %Bhaduli.User{user | basic_info: basic_info}{:noreply, user}end

def handle_cast({:update_educational_details, educational_details}, user) douser = %Bhaduli.User{user | educational_details: educational_details}{:noreply, user}end

def handle_cast({:populate_user}, user) docase Bhaduli.UserRepository.get(user.user_id) do{:error, _} -> {:noreply, user}{:ok, user} -> {:noreply, user}enddef handle_cast({:update_basic_info, basic_info}, user) douser = %Bhaduli.User{user | basic_info: basic_info}{:noreply, user}end

def handle_cast({:update_educational_details, educational_details}, user) douser = %Bhaduli.User{user | educational_details: educational_details}{:noreply, user}end

end

def handle_call({:basic_info}, pid, state) do{:reply, state.basic_info, state}end

def handle_call({:educational_details}, pid, state) do{:reply, state.educational_details, state}end

def handle_call({}, pid, state) do{:reply, state, state}end

def via_tuple(user_id) do{:via, Registry, {:user_process_registry, user_id}}end

end

Code for this can be found here. This application is part of an Umbrella project and in next post I’ll show how this application is used from a Phoenix application.

About Author

About Author :

Naveen Negi is software engineer based out of Wolfsburg Germany. He believes in software craftsmanship. He is also an avid blogger, reader, and a polyglot developer. In his free time he tries to explore different spiritual paradigm and the one which entices him most is Samakhya school of philosophy.

LinkedIn profile: http://linkedin.com/in/nav301186


Published by HackerNoon on 2017/05/13