Background processing using Elixir, GenServer and the Erlang queue module

Written by sergiocodes | Published 2017/01/02
Tech Story Tags: elixir | erlang | programming | functional-programming | phoenix-framework

TLDRvia the TL;DR App

I’ve been writing Ruby code for years, and like me, you may reach out for familiar tools like Redis or Sidekiq to process background jobs and data. I’d like to show you why sometimes, pure Elixir is more than enough to get the job done.

This is the story of how I replaced Redis/Exq with pure Elixir and the Erlang :queue class.

A huge thank you to omgneering for his videos on GenServer. If you’re having trouble understanding GenServer, this will be a tremendous help.

I built Magnetissimo as a learning exercise to really understand Elixir and see what it takes to ship production ready code. And while the version on Github right now works, it’s lacking in very important areas I set out in the initial goals.

Goals:

  • Crawl multiple index sites for torrents and magnet links. —Working!
  • Run without ceremony. No pointless configuration needed. —Eh… kinda?
  • High performance, leveraging Elixir’s GenServer and Erlang’s BEAM VM. —Working!
  • Unit tested for correctness. —Working!

It wasn’t easy for people to run the project, even developers had questions and I wanted zero-friction.

The less steps there are in running Magnetissimo the higher the adoption rate would be.

I found my solution in Erlang’s queue class, and in Elixir’s GenServer.

This was step one towards that ultimate goal.

The first thing I did was make Crawlers, and create a worker for each ofthose crawlers. All supervised by my supervisor.

children = [

Start the Ecto repository

supervisor(Magnetissimo.Repo, []),

Start the endpoint when the application starts

supervisor(Magnetissimo.Endpoint, []),worker(Magnetissimo.Crawler.ThePirateBay, []),worker(Magnetissimo.Crawler.EZTV, []),worker(Magnetissimo.Crawler.LimeTorrents, []),worker(Magnetissimo.Crawler.Leetx, []),worker(Magnetissimo.Crawler.Demonoid, []),]

Each crawler is actually a GenServer implementation. For example, here’s ThePirateBay version of the crawler.

defmodule Magnetissimo.Crawler.ThePirateBay douse GenServeralias Magnetissimo.Torrentalias Magnetissimo.Crawler.Helper

def start_link doqueue = initial_queueGenServer.start_link(__MODULE__, queue)end

def init(queue) doschedule_work(){:ok, queue}end

defp schedule_work doProcess.send_after(self(), :work, 1 * 1 * 300) # 5 secondsend

Callbacks

def handle_info(:work, queue) docase :queue.out(queue) do{{_value, item}, queue_2} ->queue = queue_2queue = process(item, queue)_ ->IO.puts "Queue is empty - restarting queue."queue = initial_queueendschedule_work(){:noreply, queue}end

def process({:page_link, url}, queue) doIO.puts "Downloading page: #{url}"html_body = Helper.download(url)if html_body != nil dotorrents = torrent_links(html_body)queue = Enum.reduce(torrents, queue, fn torrent, queue ->:queue.in({:torrent_link, torrent}, queue)end)endqueueend

def process({:torrent_link, url}, queue) doIO.puts "Downloading torrent: #{url}"html_body = Helper.download(url)if html_body != nil dotorrent_struct = torrent_information(html_body)Torrent.save_torrent(torrent_struct)endqueueend

Parser functions

def initial_queue dourls = for i <- 1..6, j <- 1..50 do{:page_link, "https://thepiratebay.org/browse/#{i}00/#{j}/3"}end:queue.from_list(urls)end

def torrent_links(html_body) dohtml_body|> Floki.find(".detName a")|> Floki.attribute("href")|> Enum.map(fn(url) -> "https://thepiratebay.org" <> url end)end

def torrent_information(html_body) doname = html_body|> Floki.find("#title")|> Floki.text|> String.trim|> HtmlEntities.decode

magnet = html\_body  
  |> Floki.find(".download a")  
  |> Floki.attribute("href")  
  |> Enum.filter(fn(url) -> String.starts\_with?(url, "magnet:") end)  
  |> Enum.at(0)  

size = html\_body  
  |> Floki.find("#detailsframe #details .col1 dd")  
  |> Enum.at(2)  
  |> Floki.text  
  |> String.split(<<194, 160>>)  
  |> Enum.at(2)  
  |> String.replace("(", "")  

{seeders, \_} = html\_body  
  |> Floki.find("#detailsframe #details .col2 dd")  
  |> Enum.at(2)  
  |> Floki.text  
  |> Integer.parse  

{leechers, \_} = html\_body  
  |> Floki.find("#detailsframe #details .col2 dd")  
  |> Enum.at(3)  
  |> Floki.text  
  |> Integer.parse  

%{  
  name: name,  
  magnet: magnet,  
  size: size,  
  website\_source: "thepiratebay",  
  seeders: seeders,  
  leechers: leechers  
}  

endend

Initial_queue is a function that creates an Erlang :queue object with the initial urls that sprawl out and link to other pagination pages or individual torrent links.

Each element in my :queue is a tuple, with two parts:

{:torrent_link, “some_url”}{:page_link, "some_url"}

Using function pattern matching in the process methods above, I can easily either parse a pagination link or parse an individual torrent page.

The schedule_work function then schedules the next item to be processed.

The end result is more cohesive code, with less indirection. It’s much easier now to add a new crawler to the project. It’s also easier to know what exactly is running. Less chances for bugs, and more predictable growth behavior. One downside with this approach is volatility. If my app shuts down, I will lose the queue to process. But I’m comfortable with that for this particular project.

One potential upgrade will be to change from handle_info, to an async handle_call.

My next step is going to be using Distillery to build a single deployable executable so end users can just run it and have Magnetissimo start service on localhost.


Published by HackerNoon on 2017/01/02