Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

I’ve been exploring application design in Elixir in the past and want to continue that course of learning by actually bulding an application. I’ve been putting together an idea for an application which I will describe in more detail in a future post. One part of its architecture requires a queue of work. In this post I will describe how I’ve built this queue.

This queue is different than many other work queues. Most queues take in work from one or more sources and have jobs consumed by one or more work processes (usually in a pool). My queue, BlockingQueue, is different. It basically has one produces and one consumer. It also has a limited size. This purpose of the queue is to cover the latency of operations between the comsumer and producer processes.

[Edit 6/24/2015 - As @pel_daniel points out in the comments I had an off by one error in my guard. I’ve fixed this in the code samples below as well as in the blocking_queue GitHub repo.]

Using ExActor

BlockingQueue will be implemented as a GenServer and I wanted to use ExActor to simplify the GenServer implementation.

I also plan to use TDD to develop BlockingQueue.

BlockingQueue has two operations 1. push - adds an item to the end of a list, blocks if the queue is full 2. pop - removes and returns the item from the head of the list, blocks if the queue is empty

Creating a new BlockingQueue should require a maximum depth. Let’s write a test for this.

test "BlockingQueue is started with a maximum depth" do
  {:ok, _pid } = BlockingQueue.start_link(5)
end
1) test BlockingQueue is started with a maximum depth (BlockingQueueTest)
   test/blocking_queue_test.exs:4
   ** (UndefinedFunctionError) undefined function: BlockingQueue.start_link/1
   stacktrace:
     (p2) BlockingQueue.start_link(5)
     test/blocking_queue_test.exs:5

.

Finished in 0.02 seconds (0.02s on load, 0.00s on tests)
2 tests, 1 failures

Randomized with seed 382862

Next, we create a simple implementation

defmodule BlockingQueue do
  use ExActor.GenServer

  defstart start_link(n), do: initial_state(n)
end

Since we required a maximum depth we hang on to it as the state even though at the moment we don’t have a use for it.

At this point the test passes.

Type Specs

I want to add tyepspecs since I’ve been writing about Elixir type specs recently.

@spec start_link(integer) :: on_start
defstart start_link(n), do: initial_state(n)

Where on_start is

@type on_start :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}

At this point Dialyzer runs clean.

Push and pop

Next we need to implement push. First, a test - this test forces the existance of the function:

test "BlockingQueue can push" do
  BlockingQueue.push("Hi")
end

The test fails so we add the simplest possible version of the function:

defcall push(item), state: state, do: reply(nil)

Hmm, writing the type is a little non-obvious here because ExActor doesn’t expose to me the functions that are generated. At this point I decided to skip over the typing, but I eventually came back to it.

Next we need a pop operation.

test "BlockingQueue can pop" do
  {:ok, pid} = BlockingQueue.start_link(5)
  BlockingQueue.pop(pid)
end

And the error informs us that we need to add the pop call

 ** (UndefinedFunctionError) undefined function: BlockingQueue.pop/1

so we write the simplest version we can:

defcall pop, state: state, do: reply(nil)

Next test that push and pop have to have the right behavior

  test "BlockingQueue pop should return the last push" do
    item = "Hi"
    {:ok, pid} = BlockingQueue.start_link(5)
    BlockingQueue.push(pid, item)
    assert item == BlockingQueue.pop(pid)
  end
1) test BlockingQueue pop should return the last push (BlockingQueueTest)
   test/blocking_queue_test.exs:18
   Assertion with == failed
   code: item == BlockingQueue.pop(pid)
   lhs:  "Hi"
   rhs:  nil
   stacktrace:
     test/blocking_queue_test.exs:22

Passing this test requires some extensive changes. The state for our actor will be a tuple containing

  • The maximum size of the queue
  • The list of queue items
defcall push(item), state: {max, list} do
  set_and_reply({ max, list ++ [item] }, nil)
end

defcall pop, state: {_max, []},      do: reply(nil)
defcall pop, state: {_max, [x | _]}, do: reply(x)

Test smells

Creating two call heads for pop handles both the pop only test and the push/pop test. Though the pop for empty list seems wrong to me. It is an indication that the original test isn’t correct because the queue should block when empty. But we will get to that eventually. First, we need to keep working on getting the right behavior for push/pop because the non-empty case for pop isn’t right either:

  test "BlockingQueue push/pop should be first in / first out" do
    {:ok, pid} = BlockingQueue.start_link(5)
    BlockingQueue.push(pid, "Hello")
    BlockingQueue.push(pid, "World")
    assert "Hello" == BlockingQueue.pop(pid)
    assert "World" == BlockingQueue.pop(pid)
  end
 1) test BlockingQueue push/pop should be first in / first out (BlockingQueueTest)
    test/blocking_queue_test.exs:25
    Assertion with == failed
    code: "World" == BlockingQueue.pop(pid)
    lhs:  "World"
    rhs:  "Hello"
    stacktrace:
      test/blocking_queue_test.exs:30

We just need to fix up pop

defcall pop, state: {max, [x | xs]} do
  set_and_reply({ max, xs }, x)
end

Blocking

OK, now I need to implement the blocking behavior. I’m not sure how to test this properly so I will have to proceed to code.

In the case of an empty list I will add the caller as a waiting pid to the state by returning a state as a 4-tuple of

  • The maximum size of the queue
  • The list of queue items
  • The atom :pop
  • The pid of the waiting process

So thew new call looks like this:

defcall pop, state: {max, []}, from: from do
    new_state {max, [], :pop, from}
end

but I must remove the test “BlockingQueue can pop” but this ok since I test pop in other tests. To actually satisfy a waiting pop a push must forward the new item directly to the waiter.

defcall push(item), state: {max, [], :pop, from} do
  GenServer.reply(from, {max, []}, item}
end

And pop is similar.

Property Testing

I was not able to TDD this code and looking at it I am pretty sure it isn’t completely correct. However, I should be able to verify this code with property based testing using ExCheck. Here’s a test:

  property "BlockingQueue supports async pushes and pops" do
    for_all xs in list(int) do
      implies length(xs) > 0 do
        {:ok, pid} = BlockingQueue.start_link(5)
        pusher = Task.async(fn ->
          Enum.map(xs, fn x -> BlockingQueue.push(pid, x) end)
        end)

        puller = Task.async(fn ->
          Enum.map(xs, fn _ -> BlockingQueue.pop(pid) end)
        end)

        Task.await(puller) == xs
      end
    end
  end

This is a bit of a complex test but the idea is this:

  • Use random arrays of ints of at least 1 item
  • Create a BlockingQueue
  • Create an async Task to push the entire array into the BlockingQueue
  • Create an async Task to pop all items in from the BlockingQueue
  • Assert that the array of pop results matches the original array

This fails like this:

  1) test BlockingQueue supports async and blocking pushs and pops_property (BlockingQueueTest)
     test/blocking_queue_test.exs:29
     ** (EXIT from #PID<0.130.0>) :timeout_value

..
08:52:25.232 [error] Task #PID<0.361.0> started from #PID<0.130.0> terminating
Function: #Function<2.133850526/0 in BlockingQueueTest.prop_BlockingQueue supports async and blocking pushs and pops/0>
    Args: []
** (exit) exited in: GenServer.call(#PID<0.360.0>, {:push, -59}, 5000)
    ** (EXIT) :timeout_value

Finished in 0.1 seconds (0.07s on load, 0.03s on tests)
6 tests, 1 failures

This is a little odd that we got a timeout after 5 seconds but the whole test suite ran in 0.1 seconds. There’s no example failure printed by ExCheck here but if I run IO.inspect during the test the last example is a rather long array (it also changes from run to run).

Pushing through

At this point things went in several directions

  • I took the dog out for a walk an figured out how to put the type specs together
  • I was seeing a match error so I
    • rewrote all of the defcall commands with handle_call functions so I could add the types
    • I had trouble getting some of it to compile (mostly had the reply options backwards)
    • I was got part way through adding type specs and realized I hadn’t run the tests at the last step
    • Commented out the type specs and ran the test - PASS
  • Next step
    • Complete the type specs
    • Decide if I want to go back to ExActor defcall’s or convert everything to GenServer
    • Explain all this stuff

First, this is the expansion to handle_call functions:

def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
  {:noreply, {max, list, :push, waiter, item}}
end

def handle_call({:push, item}, _, {max, list}) do
  {:reply, nil, { max, list ++ [item] }}
end

def handle_call({:push, item}, _, {max, [], :pop, from}) do
  GenServer.reply(from, item)
  {:reply, nil, {max, []}}
end

def push(pid, item) do
  GenServer.call(pid, {:push, item})
end

and these are the specs I ended up writing for handle_call. The part I figured out while walking the dog was to write out all the different possible state types in compound type state_t. Then I did the ame with call_t and reuslt_t

@typep from_t  :: {pid, any}
@typep state_t :: {integer(), [any]}
                | {integer(), [any], :pop, from_t}
                | {integer(), [any], :push, from_t, any}
@typep call_t  :: {:push, any}
                | :pop
@type result_t :: {:reply, any, state_t}
                | {:noreply, state_t}

@spec handle_call(call_t, from_t, state_t) :: result_t

After working with thisgs for a while I ended up converting everything to GenServer rather than use ExActor. The most compelling reason was that I don’t know how to write the guard for

def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
  {:noreply, {max, list, :push, waiter, item}}
end

using ExActor. After cleaning up the code a bit I ended up with the …

Final version

defmodule BlockingQueue do
  use GenServer

  # Can I get this from somewhere?
  @type on_start :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}

  @spec start_link(integer) :: on_start
  def start_link(n), do: GenServer.start_link(__MODULE__, n)
  def init(n), do: {:ok, {n, []}}

  @typep from_t   :: {pid, any}
  @typep state_t  :: {integer(), [any]}
                   | {integer(), [any], :pop, from_t}
                   | {integer(), [any], :push, from_t, any}
  @typep call_t   :: {:push, any}
                   | :pop
  @typep result_t :: {:reply, any, state_t}
                   | {:noreply, state_t}

  @spec handle_call(call_t, from_t, state_t) :: result_t

  def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
    {:noreply, {max, list, :push, waiter, item}}
  end

  def handle_call({:push, item}, _, {max, list}) do
    {:reply, nil, { max, list ++ [item] }}
  end

  def handle_call({:push, item}, _, {max, [], :pop, from}) do
    GenServer.reply(from, item)
    {:reply, nil, {max, []}}
  end

  def handle_call(:pop, from, {max, []}), do: {:noreply, {max, [], :pop, from}}
  def handle_call(:pop, _, {max, [x | xs]}), do: {:reply, x, {max, xs}}
  def handle_call(:pop, _, {max, [x | xs], :push, waiter, item}) do
    GenServer.reply(waiter, nil)
    {:reply, x, {max, xs ++ [item]}}
  end

  @spec push(pid, any) :: nil
  def push(pid, item), do: GenServer.call(pid, {:push, item})

  @spec pop(pid) :: any
  def pop(pid), do: GenServer.call(pid, :pop)
end

And then finally I changed the types because the maximum queue size should be pos_integer()

Wrapping up

At this point I belive BlockingQueue has the features I need. I’ll discuss puting the queue to use in two weeks. Next week there will be a speial post.

I’ve also packaged up BlockingQueue as the blocking_queue hex package so you can use it in your projects if you need it.