Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

Two weeks ago, I described an architecture for Domain Scraper with this diagram

""

and used it to rationalize the design of putting the various fetchers and the unshortening pool together into separate applications. Then, last week I described how I wrote a new Fetcher module for reddit posts.

Now I should be able to put it all together but, as I tried this I found that the current implementation of unshortening pool doesn’t achieve this.

We will correct this as part of this post.

What’s the problem?

Here the runner for our processes:

defmodule Main do
  def run do
    Twitter.Server.get
    |> UnshorteningPool.map
    |> Stream.map(fn x -> IO.inspect x end)
    |> Enum.take(10)
    |> Enum.to_list
  end
end

It starts up the server for the Twitter fetchers, maps the Tweets through the UnshorteningPool, and prints out some of the results. So, I thought to just add another copy of this code to handle reddit. I would somehow put them into separate processes and be done!

But, then I realized that this doesn’t really work the way I want it to. The problem is that I have this one big pipeline which looks like this

""

But as I mentioned I want this:

""

So I think I need to be able to write:

spawn_link fn ->
  Twitter.Server.get
  |> UnshorteningPool.map
end

spawn_link fn ->
  Reddit.Server.get
  |> UnshorteningPool.map
end  

spawn_link fn ->
  HackerNews.Server.get
  |> UnshorteningPool.map
end

UnshorteningPool.output_stream
|> Stream.map(fn x -> IO.inspect x end)
|> Enum.take(30)
|> Enum.to_list

This would be a change in the interface to UnshorteningPool so that the map operation just accepts data and the output Stream is retrieved in a separate operation (output_stream/0). And we can no longer use a single pipeline. Given this usage pattern, map actually doesn’t seem like the right operation anymoreNow, into is stating to seem like the right abstraction. That is, the servers push into the pool and and we pull the (combined) results out separately. The map operation makes me thing it should all be one pipeline of computation.

Another way to think about this is, that the design issue is actually in UnshorteningPool.Mapper in that it creates an output BlockingQueue for each call to map_through_pool. For my use case I want multiple input Streams but only a single output Stream.

Plan

My plan to fix this is to restructure UnshorteningPool to

  • Create a single output queue, I can add it to the supervision tree
  • Replace map and map_through_pool with a new interface
  • Build up processes and output

This would let me write something like this:

defmodule Main do
  def run do
    spawn_link fn ->
      Twitter.Server.get
      |> Enum.into(UnshorteningPool)
    end

    spawn_link fn ->
      Reddit.Server.get
      |> Enum.into(UnshorteningPool)
    end  

    spawn_link fn ->
      HackerNews.Server.get
      |> Enum.into(UnshorteningPool)
    end

    UnshorteningPool.output_stream
    |> Stream.map(fn x -> IO.inspect x end)
    |> Enum.take(30)
    |> Enum.to_list
  end
end

But, I think there is one more question to answer: where should the processes be spawned?

Previously, UnshorteningPool.map kicked off a new process. Above, I create them in run. Ideally, I would create them as workers in the supervision tree so that they could be restarted if anything goes wrong. This means that my into implementation doesn’t need to spawn. But it does leave us with one more TODO item:

  • Build producer app that supervises all the producers

Implementation

So, here’s what I ended up with. It’s smaller than what I expected:

defmodule Main do
  def run do
    UnshorteningPool.output_stream
    |> Stream.map(fn x -> IO.inspect x end)
    |> Enum.take(30)
    |> Enum.to_list
  end
end

Main.run just has to take the output of the UnshorteningPool and for now just print it out.

So you may be wondering where the fetchers are started. Well, in order to have them all supervised they need to go in the supervision tree:

So, there’s a new Producer app that sets up like this:

defmodule Producer do
  use Application

  # See http://elixir-lang.org/docs/stable/elixir/Application.html
  # for more information on OTP Applications
  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(Producer.Worker, [Reddit.Server], id: :producer_reddit),
      worker(Producer.Worker, [Twitter.Server], id: :producer_twitter),
    ]

    # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Producer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

And Producer.Worker is:

defmodule Producer.Worker do
  use ExActor.GenServer

  # Just start up the evaluation
  defstart start_link(module) do
    pid = spawn_link fn ->
      module.get
      |> UnshorteningPool.collect
    end

    initial_state(pid)
  end
end

This is a GenServer that takes the place of the anonymous process in the snippet above:

spawn_link fn ->
  Twitter.Server.get
  |> Enum.into(UnshorteningPool)
end

As a GenServer we can install this process into the supervision tree.

Next, let’s look at the changes to UnshorteningPool that were necessary to support this.

Unshortening Pool

First, we can see hat the new interface to the pool is called UnshorteningPool.collect/1. Here’s the implementation:

def collect(enum), do: UnshorteningPool.Mapper.collect(enum, pool_name)

Recall that UnshorteningPool only provides the top level interfaces but delegates a lot of the work of mapping input to the UnshorteningPool.Mapper module. This is done to better organize the code. So collect/1 looks up the pool name and passes things on to Mapper:

defmodule UnshorteningPool.Mapper do
  def collect(enum, pool), do: stream_through_pool(enum, pool)

  def map_through_pool(enum, pool) do
    enum
    |> resource(pool)
    |> extract_and_checkin(pool)
  end

  defp resource(enum, pool) do
    Stream.resource(
      fn ->
        spawn_link fn -> stream_through_pool(enum, pool) end
      end,

      fn _ -> {[BlockingQueue.pop(UnshorteningPool.output_queue)], nil} end,
      fn _ -> true end
    )
  end

  defp stream_through_pool(enum, pool) do
    enum
    |> Stream.map(fn x -> {x, :poolboy.checkout(pool)} end)
    |> Stream.map(fn {x, worker} -> UnshorteningPool.Worker.work(worker, x) end)
    |> Stream.run
  end

  defp extract_and_checkin(stream, pool) do
    Stream.map stream, fn {worker, result} ->
      :poolboy.checkin(pool, worker)
      result
    end
  end

  def output_stream(pool) do
    BlockingQueue.pop_stream(UnshorteningPool.output_queue)
    |> extract_and_checkin(pool)
  end
end

Here we can see that Mapper.collect/2 reuses the function stream_through_pool/2 which is also used by map_through_pool. The difference between collect and map is is mostly in not building up and returning an output Stream.

The other significant change to Mapper is that map_through_pool used to create a new pool every time. Now, we UnshorteningPool.output_queue/0 to lookup the right queue to use. As we can see:

def output_queue, do: :output_queue

this function just returns a globally registered name for the an output queue.

There is also a new function

def output_stream(pool) do
  BlockingQueue.pop_stream(UnshorteningPool.output_queue)
  |> extract_and_checkin(pool)
end

We see this function used in Main.run to retrieve the Stream of all results from the UnshorteningPool. This function builds a up the Stream of results from the queue and then extracts results from the processes and checks the poolboy workers back in, like this:

defp extract_and_checkin(stream, pool) do
  Stream.map stream, fn {worker, result} ->
    :poolboy.checkin(pool, worker)
    result
  end
end

This code is also shared with the map/1 path.

Next Steps

This phase of the project went really well. I fixed some interfaces and was then able to assemble everything. The design is looking really good now.

I did need to make a minor change to BlockingQueue to allow it to take a name definition in start_link options. This allowed UnshorteningPool to use a global registered name for its queue. You can pick this change up in the most recent version of BlockingQueue.

Next, I need to start looking at aggregation of the results. I’ll start on that next week.