I’ve been writing Ruby code for years, and like me, you may reach out for familiar tools like Redis or Sidekiq to process background jobs and data. I’d like to show you why sometimes, pure Elixir is more than enough to get the job done.
This is the story of how I replaced Redis/Exq with pure Elixir and the Erlang :queue class.
A huge thank you to omgneering for his videos on GenServer. If you’re having trouble understanding GenServer, this will be a tremendous help.
I built Magnetissimo as a learning exercise to really understand Elixir and see what it takes to ship production ready code. And while the version on Github right now works, it’s lacking in very important areas I set out in the initial goals.
Goals:
It wasn’t easy for people to run the project, even developers had questions and I wanted zero-friction.
The less steps there are in running Magnetissimo the higher the adoption rate would be.
I found my solution in Erlang’s queue class, and in Elixir’s GenServer.
This was step one towards that ultimate goal.
The first thing I did was make Crawlers, and create a worker for each ofthose crawlers. All supervised by my supervisor.
children = [
supervisor(Magnetissimo.Repo, []),
supervisor(Magnetissimo.Endpoint, []),worker(Magnetissimo.Crawler.ThePirateBay, []),worker(Magnetissimo.Crawler.EZTV, []),worker(Magnetissimo.Crawler.LimeTorrents, []),worker(Magnetissimo.Crawler.Leetx, []),worker(Magnetissimo.Crawler.Demonoid, []),]
Each crawler is actually a GenServer implementation. For example, here’s ThePirateBay version of the crawler.
defmodule Magnetissimo.Crawler.ThePirateBay douse GenServeralias Magnetissimo.Torrentalias Magnetissimo.Crawler.Helper
def start_link doqueue = initial_queueGenServer.start_link(__MODULE__, queue)end
def init(queue) doschedule_work(){:ok, queue}end
defp schedule_work doProcess.send_after(self(), :work, 1 * 1 * 300) # 5 secondsend
def handle_info(:work, queue) docase :queue.out(queue) do{{_value, item}, queue_2} ->queue = queue_2queue = process(item, queue)_ ->IO.puts "Queue is empty - restarting queue."queue = initial_queueendschedule_work(){:noreply, queue}end
def process({:page_link, url}, queue) doIO.puts "Downloading page: #{url}"html_body = Helper.download(url)if html_body != nil dotorrents = torrent_links(html_body)queue = Enum.reduce(torrents, queue, fn torrent, queue ->:queue.in({:torrent_link, torrent}, queue)end)endqueueend
def process({:torrent_link, url}, queue) doIO.puts "Downloading torrent: #{url}"html_body = Helper.download(url)if html_body != nil dotorrent_struct = torrent_information(html_body)Torrent.save_torrent(torrent_struct)endqueueend
def initial_queue dourls = for i <- 1..6, j <- 1..50 do{:page_link, "https://thepiratebay.org/browse/#{i}00/#{j}/3"}end:queue.from_list(urls)end
def torrent_links(html_body) dohtml_body|> Floki.find(".detName a")|> Floki.attribute("href")|> Enum.map(fn(url) -> "https://thepiratebay.org" <> url end)end
def torrent_information(html_body) doname = html_body|> Floki.find("#title")|> Floki.text|> String.trim|> HtmlEntities.decode
magnet = html\_body
|> Floki.find(".download a")
|> Floki.attribute("href")
|> Enum.filter(fn(url) -> String.starts\_with?(url, "magnet:") end)
|> Enum.at(0)
size = html\_body
|> Floki.find("#detailsframe #details .col1 dd")
|> Enum.at(2)
|> Floki.text
|> String.split(<<194, 160>>)
|> Enum.at(2)
|> String.replace("(", "")
{seeders, \_} = html\_body
|> Floki.find("#detailsframe #details .col2 dd")
|> Enum.at(2)
|> Floki.text
|> Integer.parse
{leechers, \_} = html\_body
|> Floki.find("#detailsframe #details .col2 dd")
|> Enum.at(3)
|> Floki.text
|> Integer.parse
%{
name: name,
magnet: magnet,
size: size,
website\_source: "thepiratebay",
seeders: seeders,
leechers: leechers
}
endend
Initial_queue is a function that creates an Erlang :queue object with the initial urls that sprawl out and link to other pagination pages or individual torrent links.
Each element in my :queue is a tuple, with two parts:
{:torrent_link, “some_url”}{:page_link, "some_url"}
Using function pattern matching in the process methods above, I can easily either parse a pagination link or parse an individual torrent page.
The schedule_work function then schedules the next item to be processed.
The end result is more cohesive code, with less indirection. It’s much easier now to add a new crawler to the project. It’s also easier to know what exactly is running. Less chances for bugs, and more predictable growth behavior. One downside with this approach is volatility. If my app shuts down, I will lose the queue to process. But I’m comfortable with that for this particular project.
One potential upgrade will be to change from handle_info, to an async handle_call.
My next step is going to be using Distillery to build a single deployable executable so end users can just run it and have Magnetissimo start service on localhost.