Then I had to manually chunk my list of pages to 80 elements and process them in parallel, finally reducing the resulting lists into a larger list again to pass it through to the next steps in the Workflow. The code gets convoluted like this:
1 2 3 4 5 6 7 8 9 10 11 |
def images_sources(pages_list) do pages_list |> chunk(@maximum_fetches) |> Enum.reduce([], fn pages_chunk, acc -> result = pages_chunk |> Enum.map(&(Task.async(fn -> Page.image(&1) end))) |> Enum.map(&(Task.await(&1, @http_timeout))) |> Enum.map(fn {:ok, image} -> image end) acc ++ result end) end |
Now I was able to reimplement this aspect and the same code now looks like this:
1 2 3 4 5 6 |
def images_sources(pages_list) do pages_list |> Enum.map(&Worker.page_image/1) |> Enum.map(&Task.await(&1, @await_timeout_ms)) |> Enum.map(fn {:ok, image} -> image end) end |
Wow! Now this is a big improvement and it's way more obvious what it is doing.
Best of all: downloading a Manga the size of Akira (around 2,200 pages long) took less than 50 seconds. And this is not because Elixir is super fast, it's because MangaReader can't keep up if I extend the Pool size. It's hitting at a constant rate of 50 concurrent connections!
This just makes my 4 cores machine, sitting down over a connection of 40Mbs use less than 40% ~ 30% of CPU and using no more than around 3,5 Mbs. If MangaReader could keep up we could easily fetch all pages 2 or 3 times faster without breaking a sweat.
It was fast with the previous strategy, but I guess it got at least twice as fast as a bonus. But how did I accomplish that?
Open Telecom Platform
In the previous article I also said that I didn't want to dive into "OTP and GenServers" just yet. But if you're a beginner like me you probably didn't understand what this means.
OTP is what makes Erlang (and consequently, Elixir) different from pretty much every other language platform but, maybe, Java.
Many new languages today make it do many tasks in parallel through convoluted Reactor patterns (Node.js, EventMachine/Ruby, Tornado/Twisted/Python, etc) or through (cleaner) green threads (Scala, Go).
But none of this matters. It's not difficult to launch millions of lightweight processes, but it is not trivial to actually CONTROL them all. It doesn't matter how fast you can exhaust your hardware if you can't control it. Then you end up with millions of dumb minions wreaking havoc without an adult to coordinate them.
Erlang solved this problem decades ago through the development of OTP, initially called Open Systems, within Ericsson in 1995. By itself OTP is a subject that can easilly fill an entire fat book and you will still not be able to call yourself an expert.
Just so I don't get too boring here:
- Start with this very brief summary;
- Then read Elixir's tutorial on Supervisors and the page on Processes first if you haven't already;
- Finally, you can go further with The Elixir & OTP Guidebook from Manning Publications.
Now, below is my personal point of view. As I am a beginner myself, let me know in the comments section below if I got some concept wrong.
OTP is a collection of technologies and frameworks. The part that interests us the most is to understand that this is a sophisticated collection of patterns to achieve the Nirvana of highly reliable, highly scalable, distributed systems. You know? That thing every new platform promises you but fails to actually deliver.
For our very simple intents and purposes, let's pick up what I said before: it's trivial to fire up millions of small processes. We call them "workers". OTP provides the means to control them: Supervisors. And then it also provides the concept of Supervisor Trees (Supervisors that supervise other Supervisor). This is the gist of it.
Supervisors are responsible for starting up the workers and also to recover from exceptions coming from the workers (which is why in Erlang/Elixir we don't do ugly try/catch stuff: let the error be raised and caught by the Supervisor). Then we can configure the Supervisor to deal with faulty worker by, for example, restarting them.
We already touched this OTP stuff before. An Elixir Task is just a high level abstraction. It internally starts its own supervisor and supervised to monitor over asynchronous tasks.
There are so many subjects and details that it's difficult to even get started. One concept that is important to know is about state. There is no global state! (Yay, no Javascript globals nightmare.) Each function has its own state and that's it. There is no concept of an "object" that holds state and then methods that can modify that state.
But there is the concept of Erlang processes. Now, a process do have state, it's a lightweight piece of state that exists only in runtime. To execute a function in a separated, parallel process, you can just do:
1 2 |
iex> spawn fn -> 1 + 2 end #PID<0.43.0> |
Different from an object, a process does not have a set of methods that access its inner "this" or "self" states. Instead each process has a mailbox. When you start (or "spawn" in Erlang lingo) a new process, it returns a pid (process ID). You can now send messages to the process through its pid. Each process has a mailbox and you can choose to respond to incoming messages and send responses back to the pid that sent the message. This is how you can send a message to the IEx console and receive the messages in its mailbox:
1 2 3 4 5 6 7 |
iex> send self(), {:hello, "world"} {:hello, "world"} iex> receive do ...> {:hello, msg} -> msg ...> {:world, msg} -> "won't match" ...> end "world" |
In essence, it's almost like an "object" that holds state. Each process has its own garbage collector, so when it dies it's individually collected. And each process is isolated from other processes, they don't bleed state out, which makes them much easier to reason about.
The Getting Started page on Processes from the Elixir website show examples of what I just explained and I recommend you follow it throughly.
In summary, a process can hold internal state by locking indefinitely waiting for an incoming message in its mailbox and then recursing to itself! This is a mindblowing concept at first.
But, just a simple process is just too dawn weak. This is where you get OTP's GenServer, which is a much more accomplished process. OTP exposes Behaviours for you to implement in order to add your own code but it takes care of the dirty infrastructure stuff so you don't have to.
Deferring the Heavy Workflow calls to a Worker
All that having being said, we know that in the Workflow we implemented before, we have trouble with the Page.image/1 and Workflow.download_image/2 functions. This is why we made them asynchronous processes and we wait for batches of 80 calls every time.
Now, let's start by moving away this logic to a GenServer Worker, for example, in the ex_manga_downloadr/pool_management/worker.ex file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
defmodule PoolManagement.Worker do use GenServer use ExMangaDownloadr.MangaReader @timeout_ms 1_000_000 @transaction_timeout_ms 1_000_000 # larger just to be safe def start_link(_) do GenServer.start_link(__MODULE__, nil, []) end def handle_call({:page_image, page_link}, _from, state) do {:reply, Page.image(page_link), state} end def handle_call({:page_download_image, image_data, directory}, _from, state) do {:reply, Page.download_image(image_data, directory), state} end end |
I first moved the Workflow.download_image/2 to Page.download_image/2 just for consistency's sake. But this is a GenServer in a nutshell. We have some setup in the start_link/1 function and then we have to implement handle_call/3 functions to handle each kind of arguments it might receive. We separate them through pattern matching the arguments.
As a convention, we can add public functions that are just prettier versions that call each handle_call/3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def page_image(page_link) do Task.async fn -> :poolboy.transaction :worker_pool, fn(server) -> GenServer.call(server, {:page_image, page_link}, @timeout_ms) end, @transaction_timeout_ms end end def page_download_image(image_data, directory) do Task.async fn -> :poolboy.transaction :worker_pool, fn(server) -> GenServer.call(server, {:page_download_image, image_data, directory}, @timeout_ms) end, @transaction_timeout_ms end end |
But we are not just calling the previous handle_call/3 functions. First there is the Task.async/1 we were already using in the Workflow functions to make the parallel batches. But inside the Task calls there is this other strange thing: :poolboy.
Controlling Pools of Processes through Poolboy
The entire OTP ordeal I wrote here was just an introduction so I could show off Poolboy.
Repeating myself again: its trivial to fire up millions of processes. OTP is how we control failures to those processes. But there is another problem: the computation within each process may be so heavy we can either bring down the machine or, in our case, do a Distributed Denial of Service (DDoS) against poor MangaReader website.
My initial idea was to just do parallel requests in batches. But the logic is convoluted.
Instead, we can use a process pool! It queues up our requests for new processes. Whenever a process finishes it is returned to the pool and a new computation can take over the available process. This is how pools work (you probably have an intuition of how it works from traditional database connection pools). Pools and queues are useful software constructs to deal with limited resources.
By doing this we can remove the chunking of the large list into batches and do it like we would process every element of the large list in parallel at once, repeating again the initial version:
1 2 3 4 5 6 7 8 9 |
pages_list |> chunk(@maximum_fetches) |> Enum.reduce([], fn pages_chunk, acc -> result = pages_chunk |> Enum.map(&(Task.async(fn -> Page.image(&1) end))) |> Enum.map(&(Task.await(&1, @http_timeout))) |> Enum.map(fn {:ok, image} -> image end) acc ++ result end) |
Now, removing the chunking and reducing logic:
1 2 3 4 |
pages_list |> Enum.map(&(Task.async(fn -> Page.image(&1) end))) |> Enum.map(&(Task.await(&1, @http_timeout))) |> Enum.map(fn {:ok, image} -> image end) |
And finally, replacing the direct Task.async/1 call for the GenServer worker we just implemented above:
1 2 3 4 |
pages_list |> Enum.map(&Worker.page_image/1) |> Enum.map(&Task.await(&1, @await_timeout_ms)) |> Enum.map(fn {:ok, image} -> image end) |
Now, Poolboy requires will require a Supervisor that monitors our Worker. Let's put it under ex_manga_downloadr/pool_management/supervisor.ex:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
defmodule PoolManagement.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, []) end def init([]) do pool_options = [ name: {:local, :worker_pool}, worker_module: PoolManagement.Worker, size: 50, max_overflow: 0 ] children = [ :poolboy.child_spec(:worker_pool, pool_options, []) ] supervise(children, strategy: :one_for_one) end end |
More OTP goodness here. We had a rogue Worker, now we have a responsible Supervisor deferring responsability to Poolboy. We start with a pool that can hold a maximum of 50 process within (without overflowing). This number comes from trial and error again. And the Supervisor will use a strategy of :one_for_one, which means that if the Worker dies it restarts it.
Now, we must add Poolboy to the mix.exs as a dependency and run mix deps.get to fetch it:
1 2 3 4 5 6 7 |
defp deps do [ ... {:poolboy, github: "devinus/poolboy", tag: "1.5.1"}, ... ] end |
In the same mix.exs we make the main Application (surprise: which is already a supervised OTP application) start the PoolManagement.Supervisor for us:
1 2 3 4 |
def application do [applications: [:logger, :httpotion, :porcelain], mod: {PoolManagement, []}] end |
But we also need to have this PoolManagement module for it to call. We may call it pool_management.ex:
1 2 3 4 5 6 7 |
defmodule PoolManagement do use Application def start(_type, _args) do PoolManagement.Supervisor.start_link end end |
Summary
Let's summarize:
- the ExMangaDownloadr application will start up and fire up the PoolManagement application;
- the PoolManagement application fires up the PoolManagement.Supervisor;
- the PoolManagement.Supervisor fires up Poolboy and assigns PoolManagement.Worker to it, configuring it's pool size to 50 and responding to the pool name :worker_pool
- now we start to fetch and parse through the Manga's chapters and pages until the ExMangaDownloadr.Workflow.images_sources/1 is called;
- it will call the PoolManagement.Worker.page_image/1 function which in turn fires up a Task.async/1, calling :poolboy.transaction(:worker_pool, fn -> ... end);
- if a process is available in Poolboy's pool it starts right away, otherwise it awaits for a process to become available, it will wait until the @transaction_timeout_ms timeout configuration.
- the process maps the entire pages_list, creating one async Task for each page in the list, we end up with a ginourmous list of Task pids, then we Task.await/2 for them all to return.
Now, this application is much more reliable and faster as it fires up a new HTTP connection as soon as the first one responds and it return the process back to the pool. Instead of firing up 80 connections at a time, in batches, we start with 50 at the same time and then we fire up one at a time for each process returned to the pool.
Through trial and error I set the @http_timeout to wait at most 60 seconds. I also set the timeout_ms which is the time to wait for the GenServer worker call handle to return and transaction_timeout_ms which is the time Poolboy awaits for a new process in the pool, both to around 16 minutes (1,000,000 ms).
This is putting 25 years of Erlang experience in the Telecom industry to good use!
And to make it crystal clear: OTP is the thing that sets Erlang/Elixir apart from all the rest. It's not the same thing, but it's like if the standard would be to write everything in Java as an EJB, ready to run inside a JEE Container. What comes to mind is: heavy.
In Erlang, an OTP application is lightweight, you can just build and use it ad hoc, without bureacracy, without having to set up complicated servers. As in our case, it's a very simple command line tool, and within it, the entire power of a JEE Container! Think about it.