Two weeks ago, I described an architecture for Domain Scraper with this diagram
and used it to rationalize the design of putting the various fetchers and the unshortening pool together into separate applications. Then, last week I described how I wrote a new Fetcher module for reddit posts.
Now I should be able to put it all together but, as I tried this I found that the current implementation of unshortening pool doesn’t achieve this.
We will correct this as part of this post.
What’s the problem?
Here the runner for our processes:
defmodule Main do
def run do
Twitter.Server.get
|> UnshorteningPool.map
|> Stream.map(fn x -> IO.inspect x end)
|> Enum.take(10)
|> Enum.to_list
end
end
It starts up the server for the Twitter fetchers, maps the Tweets through the UnshorteningPool
, and prints out some of the results. So, I thought to just add another copy of this code to handle reddit. I would somehow put them into separate processes and be done!
But, then I realized that this doesn’t really work the way I want it to. The problem is that I have this one big pipeline which looks like this
But as I mentioned I want this:
So I think I need to be able to write:
spawn_link fn ->
Twitter.Server.get
|> UnshorteningPool.map
end
spawn_link fn ->
Reddit.Server.get
|> UnshorteningPool.map
end
spawn_link fn ->
HackerNews.Server.get
|> UnshorteningPool.map
end
UnshorteningPool.output_stream
|> Stream.map(fn x -> IO.inspect x end)
|> Enum.take(30)
|> Enum.to_list
This would be a change in the interface to UnshorteningPool
so that the map
operation just accepts data and the output Stream
is retrieved in a separate operation (output_stream/0
). And we can no longer use a single pipeline. Given this usage pattern, map
actually doesn’t seem like the right operation anymoreNow, into
is stating to seem like the right abstraction. That is, the servers push into the pool and and we pull the (combined) results out separately. The map
operation makes me thing it should all be one pipeline of computation.
Another way to think about this is, that the design issue is actually in UnshorteningPool.Mapper
in that it creates an output BlockingQueue for each call to map_through_pool
. For my use case I want multiple input Stream
s but only a single output Stream
.
Plan
My plan to fix this is to restructure UnshorteningPool to
- Create a single output queue, I can add it to the supervision tree
- Replace
map
andmap_through_pool
with a new interface - Build up processes and output
This would let me write something like this:
defmodule Main do
def run do
spawn_link fn ->
Twitter.Server.get
|> Enum.into(UnshorteningPool)
end
spawn_link fn ->
Reddit.Server.get
|> Enum.into(UnshorteningPool)
end
spawn_link fn ->
HackerNews.Server.get
|> Enum.into(UnshorteningPool)
end
UnshorteningPool.output_stream
|> Stream.map(fn x -> IO.inspect x end)
|> Enum.take(30)
|> Enum.to_list
end
end
But, I think there is one more question to answer: where should the processes be spawned?
Previously, UnshorteningPool.map
kicked off a new process. Above, I create them in run
. Ideally, I would create them as workers in the supervision tree so that they could be restarted if anything goes wrong. This means that my into
implementation doesn’t need to spawn. But it does leave us with one more TODO item:
- Build producer app that supervises all the producers
Implementation
So, here’s what I ended up with. It’s smaller than what I expected:
defmodule Main do
def run do
UnshorteningPool.output_stream
|> Stream.map(fn x -> IO.inspect x end)
|> Enum.take(30)
|> Enum.to_list
end
end
Main.run
just has to take the output of the UnshorteningPool
and for now just print it out.
So you may be wondering where the fetchers are started. Well, in order to have them all supervised they need to go in the supervision tree:
So, there’s a new Producer
app that sets up like this:
defmodule Producer do
use Application
# See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(Producer.Worker, [Reddit.Server], id: :producer_reddit),
worker(Producer.Worker, [Twitter.Server], id: :producer_twitter),
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Producer.Supervisor]
Supervisor.start_link(children, opts)
end
end
And Producer.Worker
is:
defmodule Producer.Worker do
use ExActor.GenServer
# Just start up the evaluation
defstart start_link(module) do
pid = spawn_link fn ->
module.get
|> UnshorteningPool.collect
end
initial_state(pid)
end
end
This is a GenServer that takes the place of the anonymous process in the snippet above:
spawn_link fn ->
Twitter.Server.get
|> Enum.into(UnshorteningPool)
end
As a GenServer we can install this process into the supervision tree.
Next, let’s look at the changes to UnshorteningPool
that were necessary to support this.
Unshortening Pool
First, we can see hat the new interface to the pool is called UnshorteningPool.collect/1
. Here’s the implementation:
def collect(enum), do: UnshorteningPool.Mapper.collect(enum, pool_name)
Recall that UnshorteningPool
only provides the top level interfaces but delegates a lot of the work of mapping input to the UnshorteningPool.Mapper
module. This is done to better organize the code. So collect/1
looks up the pool name and passes things on to Mapper
:
defmodule UnshorteningPool.Mapper do
def collect(enum, pool), do: stream_through_pool(enum, pool)
def map_through_pool(enum, pool) do
enum
|> resource(pool)
|> extract_and_checkin(pool)
end
defp resource(enum, pool) do
Stream.resource(
fn ->
spawn_link fn -> stream_through_pool(enum, pool) end
end,
fn _ -> {[BlockingQueue.pop(UnshorteningPool.output_queue)], nil} end,
fn _ -> true end
)
end
defp stream_through_pool(enum, pool) do
enum
|> Stream.map(fn x -> {x, :poolboy.checkout(pool)} end)
|> Stream.map(fn {x, worker} -> UnshorteningPool.Worker.work(worker, x) end)
|> Stream.run
end
defp extract_and_checkin(stream, pool) do
Stream.map stream, fn {worker, result} ->
:poolboy.checkin(pool, worker)
result
end
end
def output_stream(pool) do
BlockingQueue.pop_stream(UnshorteningPool.output_queue)
|> extract_and_checkin(pool)
end
end
Here we can see that Mapper.collect/2
reuses the function stream_through_pool/2
which is also used by map_through_pool
. The difference between collect
and map
is is mostly in not building up and returning an output Stream
.
The other significant change to Mapper
is that map_through_pool
used to create a new pool every time. Now, we UnshorteningPool.output_queue/0
to lookup the right queue to use. As we can see:
def output_queue, do: :output_queue
this function just returns a globally registered name for the an output queue.
There is also a new function
def output_stream(pool) do
BlockingQueue.pop_stream(UnshorteningPool.output_queue)
|> extract_and_checkin(pool)
end
We see this function used in Main.run
to retrieve the Stream
of all results from the UnshorteningPool
. This function builds a up the Stream
of results from the queue and then extracts results from the processes and checks the poolboy workers back in, like this:
defp extract_and_checkin(stream, pool) do
Stream.map stream, fn {worker, result} ->
:poolboy.checkin(pool, worker)
result
end
end
This code is also shared with the map/1
path.
Next Steps
This phase of the project went really well. I fixed some interfaces and was then able to assemble everything. The design is looking really good now.
I did need to make a minor change to BlockingQueue to allow it to take a name definition in start_link options. This allowed UnshorteningPool
to use a global registered name for its queue. You can pick this change up in the most recent version of BlockingQueue.
Next, I need to start looking at aggregation of the results. I’ll start on that next week.