Sergio Tapia

@sergiocodes

Background processing using Elixir, GenServer and the Erlang queue module

I’ve been writing Ruby code for years, and like me, you may reach out for familiar tools like Redis or Sidekiq to process background jobs and data. I’d like to show you why sometimes, pure Elixir is more than enough to get the job done.

This is the story of how I replaced Redis/Exq with pure Elixir and the Erlang :queue class.

A huge thank you to omgneering for his videos on GenServer. If you’re having trouble understanding GenServer, this will be a tremendous help.

I built Magnetissimo as a learning exercise to really understand Elixir and see what it takes to ship production ready code. And while the version on Github right now works, it’s lacking in very important areas I set out in the initial goals.

Goals:

  • Crawl multiple index sites for torrents and magnet links. — Working!
  • Run without ceremony. No pointless configuration needed. —Eh… kinda?
  • High performance, leveraging Elixir’s GenServer and Erlang’s BEAM VM. — Working!
  • Unit tested for correctness. —Working!

It wasn’t easy for people to run the project, even developers had questions and I wanted zero-friction.

The less steps there are in running Magnetissimo the higher the adoption rate would be.

I found my solution in Erlang’s queue class, and in Elixir’s GenServer.

This was step one towards that ultimate goal.

The first thing I did was make Crawlers, and create a worker for each of
those crawlers. All supervised by my supervisor.

children = [
# Start the Ecto repository
supervisor(Magnetissimo.Repo, []),
# Start the endpoint when the application starts
supervisor(Magnetissimo.Endpoint, []),
worker(Magnetissimo.Crawler.ThePirateBay, []),
worker(Magnetissimo.Crawler.EZTV, []),
worker(Magnetissimo.Crawler.LimeTorrents, []),
worker(Magnetissimo.Crawler.Leetx, []),
worker(Magnetissimo.Crawler.Demonoid, []),
]

Each crawler is actually a GenServer implementation. For example, here’s ThePirateBay version of the crawler.

defmodule Magnetissimo.Crawler.ThePirateBay do
use GenServer
alias Magnetissimo.Torrent
alias Magnetissimo.Crawler.Helper

def start_link do
queue = initial_queue
GenServer.start_link(__MODULE__, queue)
end

def init(queue) do
schedule_work()
{:ok, queue}
end

defp schedule_work do
Process.send_after(self(), :work, 1 * 1 * 300) # 5 seconds
end

# Callbacks

def handle_info(:work, queue) do
case :queue.out(queue) do
{{_value, item}, queue_2} ->
queue = queue_2
queue = process(item, queue)
_ ->
IO.puts "Queue is empty - restarting queue."
queue = initial_queue
end
schedule_work()
{:noreply, queue}
end

def process({:page_link, url}, queue) do
IO.puts "Downloading page: #{url}"
html_body = Helper.download(url)
if html_body != nil do
torrents = torrent_links(html_body)
queue = Enum.reduce(torrents, queue, fn torrent, queue ->
:queue.in({:torrent_link, torrent}, queue)
end)
end
queue
end

def process({:torrent_link, url}, queue) do
IO.puts "Downloading torrent: #{url}"
html_body = Helper.download(url)
if html_body != nil do
torrent_struct = torrent_information(html_body)
Torrent.save_torrent(torrent_struct)
end
queue
end

# Parser functions

def initial_queue do
urls = for i <- 1..6, j <- 1..50 do
{:page_link, "https://thepiratebay.org/browse/#{i}00/#{j}/3"}
end
:queue.from_list(urls)
end

def torrent_links(html_body) do
html_body
|> Floki.find(".detName a")
|> Floki.attribute("href")
|> Enum.map(fn(url) -> "https://thepiratebay.org" <> url end)
end

def torrent_information(html_body) do
name = html_body
|> Floki.find("#title")
|> Floki.text
|> String.trim
|> HtmlEntities.decode

magnet = html_body
|> Floki.find(".download a")
|> Floki.attribute("href")
|> Enum.filter(fn(url) -> String.starts_with?(url, "magnet:") end)
|> Enum.at(0)

size = html_body
|> Floki.find("#detailsframe #details .col1 dd")
|> Enum.at(2)
|> Floki.text
|> String.split(<<194, 160>>)
|> Enum.at(2)
|> String.replace("(", "")

{seeders, _} = html_body
|> Floki.find("#detailsframe #details .col2 dd")
|> Enum.at(2)
|> Floki.text
|> Integer.parse

{leechers, _} = html_body
|> Floki.find("#detailsframe #details .col2 dd")
|> Enum.at(3)
|> Floki.text
|> Integer.parse

%{
name: name,
magnet: magnet,
size: size,
website_source: "thepiratebay",
seeders: seeders,
leechers: leechers
}
end
end

Initial_queue is a function that creates an Erlang :queue object with the initial urls that sprawl out and link to other pagination pages or individual torrent links.

Each element in my :queue is a tuple, with two parts:

{:torrent_link, “some_url”}
{:page_link, "some_url"}

Using function pattern matching in the process methods above, I can easily either parse a pagination link or parse an individual torrent page.

The schedule_work function then schedules the next item to be processed.

The end result is more cohesive code, with less indirection. It’s much easier now to add a new crawler to the project. It’s also easier to know what exactly is running. Less chances for bugs, and more predictable growth behavior. One downside with this approach is volatility. If my app shuts down, I will lose the queue to process. But I’m comfortable with that for this particular project.

One potential upgrade will be to change from handle_info, to an async handle_call.

My next step is going to be using Distillery to build a single deployable executable so end users can just run it and have Magnetissimo start service on localhost.

Topics of interest

More Related Stories