paint-brush
Background processing using Elixir, GenServer and the Erlang queue moduleby@sergiocodes
6,968 reads
6,968 reads

Background processing using Elixir, GenServer and the Erlang queue module

by Sergio TapiaJanuary 2nd, 2017
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

I’ve been writing Ruby code for years, and like me, you may reach out for familiar tools like Redis or Sidekiq to process background jobs and data. I’d like to show you why sometimes, pure Elixir is more than enough to get the job done.

Company Mentioned

Mention Thumbnail
featured image - Background processing using Elixir, GenServer and the Erlang queue module
Sergio Tapia HackerNoon profile picture

I’ve been writing Ruby code for years, and like me, you may reach out for familiar tools like Redis or Sidekiq to process background jobs and data. I’d like to show you why sometimes, pure Elixir is more than enough to get the job done.

This is the story of how I replaced Redis/Exq with pure Elixir and the Erlang :queue class.

A huge thank you to omgneering for his videos on GenServer. If you’re having trouble understanding GenServer, this will be a tremendous help.

I built Magnetissimo as a learning exercise to really understand Elixir and see what it takes to ship production ready code. And while the version on Github right now works, it’s lacking in very important areas I set out in the initial goals.

Goals:

  • Crawl multiple index sites for torrents and magnet links. —Working!
  • Run without ceremony. No pointless configuration needed. —Eh… kinda?
  • High performance, leveraging Elixir’s GenServer and Erlang’s BEAM VM. —Working!
  • Unit tested for correctness. —Working!

It wasn’t easy for people to run the project, even developers had questions and I wanted zero-friction.

The less steps there are in running Magnetissimo the higher the adoption rate would be.

I found my solution in Erlang’s queue class, and in Elixir’s GenServer.

This was step one towards that ultimate goal.


The first thing I did was make Crawlers, and create a worker for each ofthose crawlers. All supervised by my supervisor.

children = [

Start the Ecto repository

supervisor(Magnetissimo.Repo, []),

Start the endpoint when the application starts







supervisor(Magnetissimo.Endpoint, []),worker(Magnetissimo.Crawler.ThePirateBay, []),worker(Magnetissimo.Crawler.EZTV, []),worker(Magnetissimo.Crawler.LimeTorrents, []),worker(Magnetissimo.Crawler.Leetx, []),worker(Magnetissimo.Crawler.Demonoid, []),]

Each crawler is actually a GenServer implementation. For example, here’s ThePirateBay version of the crawler.




defmodule Magnetissimo.Crawler.ThePirateBay douse GenServeralias Magnetissimo.Torrentalias Magnetissimo.Crawler.Helper




def start_link doqueue = initial_queueGenServer.start_link(__MODULE__, queue)end




def init(queue) doschedule_work(){:ok, queue}end



defp schedule_work doProcess.send_after(self(), :work, 1 * 1 * 300) # 5 secondsend

Callbacks












def handle_info(:work, queue) docase :queue.out(queue) do{{_value, item}, queue_2} ->queue = queue_2queue = process(item, queue)_ ->IO.puts "Queue is empty - restarting queue."queue = initial_queueendschedule_work(){:noreply, queue}end











def process({:page_link, url}, queue) doIO.puts "Downloading page: #{url}"html_body = Helper.download(url)if html_body != nil dotorrents = torrent_links(html_body)queue = Enum.reduce(torrents, queue, fn torrent, queue ->:queue.in({:torrent_link, torrent}, queue)end)endqueueend









def process({:torrent_link, url}, queue) doIO.puts "Downloading torrent: #{url}"html_body = Helper.download(url)if html_body != nil dotorrent_struct = torrent_information(html_body)Torrent.save_torrent(torrent_struct)endqueueend

Parser functions






def initial_queue dourls = for i <- 1..6, j <- 1..50 do{:page_link, "https://thepiratebay.org/browse/#{i}00/#{j}/3"}end:queue.from_list(urls)end






def torrent_links(html_body) dohtml_body|> Floki.find(".detName a")|> Floki.attribute("href")|> Enum.map(fn(url) -> "https://thepiratebay.org" <> url end)end






def torrent_information(html_body) doname = html_body|> Floki.find("#title")|> Floki.text|> String.trim|> HtmlEntities.decode

magnet = html\_body  
  |> Floki.find(".download a")  
  |> Floki.attribute("href")  
  |> Enum.filter(fn(url) -> String.starts\_with?(url, "magnet:") end)  
  |> Enum.at(0)  

size = html\_body  
  |> Floki.find("#detailsframe #details .col1 dd")  
  |> Enum.at(2)  
  |> Floki.text  
  |> String.split(<<194, 160>>)  
  |> Enum.at(2)  
  |> String.replace("(", "")  

{seeders, \_} = html\_body  
  |> Floki.find("#detailsframe #details .col2 dd")  
  |> Enum.at(2)  
  |> Floki.text  
  |> Integer.parse  

{leechers, \_} = html\_body  
  |> Floki.find("#detailsframe #details .col2 dd")  
  |> Enum.at(3)  
  |> Floki.text  
  |> Integer.parse  

%{  
  name: name,  
  magnet: magnet,  
  size: size,  
  website\_source: "thepiratebay",  
  seeders: seeders,  
  leechers: leechers  
}  


endend

Initial_queue is a function that creates an Erlang :queue object with the initial urls that sprawl out and link to other pagination pages or individual torrent links.

Each element in my :queue is a tuple, with two parts:


{:torrent_link, “some_url”}{:page_link, "some_url"}

Using function pattern matching in the process methods above, I can easily either parse a pagination link or parse an individual torrent page.

The schedule_work function then schedules the next item to be processed.

The end result is more cohesive code, with less indirection. It’s much easier now to add a new crawler to the project. It’s also easier to know what exactly is running. Less chances for bugs, and more predictable growth behavior. One downside with this approach is volatility. If my app shuts down, I will lose the queue to process. But I’m comfortable with that for this particular project.

One potential upgrade will be to change from handle_info, to an async handle_call.

My next step is going to be using Distillery to build a single deployable executable so end users can just run it and have Magnetissimo start service on localhost.