Two weeks ago, I described an architecture for Domain Scraper with this diagram
and used it to rationalize the design of putting the various fetchers and the unshortening pool together into separate applications. Then, last week I described how I wrote a new Fetcher module for reddit posts.
Now I should be able to put it all together but, as I tried this I found that the current implementation of unshortening pool doesn’t achieve this.
We will correct this as part of this post.
What’s the problem?
Here the runner for our processes:
It starts up the server for the Twitter fetchers, maps the Tweets through the
UnshorteningPool, and prints out some of the results. So, I thought to just add another copy of this code to handle reddit. I would somehow put them into separate processes and be done!
But, then I realized that this doesn’t really work the way I want it to. The problem is that I have this one big pipeline which looks like this
But as I mentioned I want this:
So I think I need to be able to write:
This would be a change in the interface to
UnshorteningPool so that the
map operation just accepts data and the output
Stream is retrieved in a separate operation (
output_stream/0). And we can no longer use a single pipeline. Given this usage pattern,
map actually doesn’t seem like the right operation anymoreNow,
into is stating to seem like the right abstraction. That is, the servers push into the pool and and we pull the (combined) results out separately. The
map operation makes me thing it should all be one pipeline of computation.
Another way to think about this is, that the design issue is actually in
UnshorteningPool.Mapper in that it creates an output BlockingQueue for each call to
map_through_pool. For my use case I want multiple input
Streams but only a single output
My plan to fix this is to restructure UnshorteningPool to
- Create a single output queue, I can add it to the supervision tree
map_through_poolwith a new interface
- Build up processes and output
This would let me write something like this:
But, I think there is one more question to answer: where should the processes be spawned?
UnshorteningPool.map kicked off a new process. Above, I create them in
run. Ideally, I would create them as workers in the supervision tree so that they could be restarted if anything goes wrong. This means that my
into implementation doesn’t need to spawn. But it does leave us with one more TODO item:
- Build producer app that supervises all the producers
So, here’s what I ended up with. It’s smaller than what I expected:
Main.run just has to take the output of the
UnshorteningPool and for now just print it out.
So you may be wondering where the fetchers are started. Well, in order to have them all supervised they need to go in the supervision tree:
So, there’s a new
Producer app that sets up like this:
This is a GenServer that takes the place of the anonymous process in the snippet above:
As a GenServer we can install this process into the supervision tree.
Next, let’s look at the changes to
UnshorteningPool that were necessary to support this.
First, we can see hat the new interface to the pool is called
UnshorteningPool.collect/1. Here’s the implementation:
UnshorteningPool only provides the top level interfaces but delegates a lot of the work of mapping input to the
UnshorteningPool.Mapper module. This is done to better organize the code. So
collect/1 looks up the pool name and passes things on to
Here we can see that
Mapper.collect/2 reuses the function
stream_through_pool/2 which is also used by
map_through_pool. The difference between
map is is mostly in not building up and returning an output
The other significant change to
Mapper is that
map_through_pool used to create a new pool every time. Now, we
UnshorteningPool.output_queue/0 to lookup the right queue to use. As we can see:
this function just returns a globally registered name for the an output queue.
There is also a new function
We see this function used in
Main.run to retrieve the
Stream of all results from the
UnshorteningPool. This function builds a up the
Stream of results from the queue and then extracts results from the processes and checks the poolboy workers back in, like this:
This code is also shared with the
This phase of the project went really well. I fixed some interfaces and was then able to assemble everything. The design is looking really good now.
I did need to make a minor change to BlockingQueue to allow it to take a name definition in start_link options. This allowed
UnshorteningPool to use a global registered name for its queue. You can pick this change up in the most recent version of BlockingQueue.
Next, I need to start looking at aggregation of the results. I’ll start on that next week.