I’ve been exploring application design in Elixir in the past and want to continue that course of learning by actually bulding an application. I’ve been putting together an idea for an application which I will describe in more detail in a future post. One part of its architecture requires a queue of work. In this post I will describe how I’ve built this queue.
This queue is different than many other work queues. Most queues take in work from one or more sources and have jobs consumed by one or more work processes (usually in a pool). My queue, BlockingQueue, is different. It basically has one produces and one consumer. It also has a limited size. This purpose of the queue is to cover the latency of operations between the comsumer and producer processes.
[Edit 6/24/2015 - As @pel_daniel points out in the comments I had an off by one error in my guard. I’ve fixed this in the code samples below as well as in the blocking_queue GitHub repo.]
Using ExActor
BlockingQueue will be implemented as a GenServer and I wanted to use ExActor to simplify the GenServer implementation.
I also plan to use TDD to develop BlockingQueue.
BlockingQueue has two operations 1. push - adds an item to the end of a list, blocks if the queue is full 2. pop - removes and returns the item from the head of the list, blocks if the queue is empty
Creating a new BlockingQueue should require a maximum depth. Let’s write a test for this.
test "BlockingQueue is started with a maximum depth" do
{:ok, _pid } = BlockingQueue.start_link(5)
end
1) test BlockingQueue is started with a maximum depth (BlockingQueueTest)
test/blocking_queue_test.exs:4
** (UndefinedFunctionError) undefined function: BlockingQueue.start_link/1
stacktrace:
(p2) BlockingQueue.start_link(5)
test/blocking_queue_test.exs:5
.
Finished in 0.02 seconds (0.02s on load, 0.00s on tests)
2 tests, 1 failures
Randomized with seed 382862
Next, we create a simple implementation
defmodule BlockingQueue do
use ExActor.GenServer
defstart start_link(n), do: initial_state(n)
end
Since we required a maximum depth we hang on to it as the state even though at the moment we don’t have a use for it.
At this point the test passes.
Type Specs
I want to add tyepspecs since I’ve been writing about Elixir type specs recently.
@spec start_link(integer) :: on_start
defstart start_link(n), do: initial_state(n)
Where on_start
is
@type on_start :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}
At this point Dialyzer runs clean.
Push and pop
Next we need to implement push
. First, a test - this test forces the existance of the function:
test "BlockingQueue can push" do
BlockingQueue.push("Hi")
end
The test fails so we add the simplest possible version of the function:
defcall push(item), state: state, do: reply(nil)
Hmm, writing the type is a little non-obvious here because ExActor doesn’t expose to me the functions that are generated. At this point I decided to skip over the typing, but I eventually came back to it.
Next we need a pop
operation.
test "BlockingQueue can pop" do
{:ok, pid} = BlockingQueue.start_link(5)
BlockingQueue.pop(pid)
end
And the error informs us that we need to add the pop call
** (UndefinedFunctionError) undefined function: BlockingQueue.pop/1
so we write the simplest version we can:
defcall pop, state: state, do: reply(nil)
Next test that push
and pop
have to have the right behavior
test "BlockingQueue pop should return the last push" do
item = "Hi"
{:ok, pid} = BlockingQueue.start_link(5)
BlockingQueue.push(pid, item)
assert item == BlockingQueue.pop(pid)
end
1) test BlockingQueue pop should return the last push (BlockingQueueTest)
test/blocking_queue_test.exs:18
Assertion with == failed
code: item == BlockingQueue.pop(pid)
lhs: "Hi"
rhs: nil
stacktrace:
test/blocking_queue_test.exs:22
Passing this test requires some extensive changes. The state for our actor will be a tuple containing
- The maximum size of the queue
- The list of queue items
defcall push(item), state: {max, list} do
set_and_reply({ max, list ++ [item] }, nil)
end
defcall pop, state: {_max, []}, do: reply(nil)
defcall pop, state: {_max, [x | _]}, do: reply(x)
Test smells
Creating two call heads for pop
handles both the pop
only test and the push
/pop
test. Though the pop
for empty list seems wrong to me. It is an indication that the original test isn’t correct because the queue should block when empty. But we will get to that eventually. First, we need to keep working on getting the right behavior for push
/pop
because the non-empty case for pop
isn’t right either:
test "BlockingQueue push/pop should be first in / first out" do
{:ok, pid} = BlockingQueue.start_link(5)
BlockingQueue.push(pid, "Hello")
BlockingQueue.push(pid, "World")
assert "Hello" == BlockingQueue.pop(pid)
assert "World" == BlockingQueue.pop(pid)
end
1) test BlockingQueue push/pop should be first in / first out (BlockingQueueTest)
test/blocking_queue_test.exs:25
Assertion with == failed
code: "World" == BlockingQueue.pop(pid)
lhs: "World"
rhs: "Hello"
stacktrace:
test/blocking_queue_test.exs:30
We just need to fix up pop
defcall pop, state: {max, [x | xs]} do
set_and_reply({ max, xs }, x)
end
Blocking
OK, now I need to implement the blocking behavior. I’m not sure how to test this properly so I will have to proceed to code.
In the case of an empty list I will add the caller as a waiting pid to the state by returning a state as a 4-tuple of
- The maximum size of the queue
- The list of queue items
- The atom
:pop
- The pid of the waiting process
So thew new call looks like this:
defcall pop, state: {max, []}, from: from do
new_state {max, [], :pop, from}
end
but I must remove the test “BlockingQueue can pop” but this ok since I test pop
in other tests. To actually satisfy a waiting pop
a push
must forward the new item directly to the waiter.
defcall push(item), state: {max, [], :pop, from} do
GenServer.reply(from, {max, []}, item}
end
And pop is similar.
Property Testing
I was not able to TDD this code and looking at it I am pretty sure it isn’t completely correct. However, I should be able to verify this code with property based testing using ExCheck. Here’s a test:
property "BlockingQueue supports async pushes and pops" do
for_all xs in list(int) do
implies length(xs) > 0 do
{:ok, pid} = BlockingQueue.start_link(5)
pusher = Task.async(fn ->
Enum.map(xs, fn x -> BlockingQueue.push(pid, x) end)
end)
puller = Task.async(fn ->
Enum.map(xs, fn _ -> BlockingQueue.pop(pid) end)
end)
Task.await(puller) == xs
end
end
end
This is a bit of a complex test but the idea is this:
- Use random arrays of ints of at least 1 item
- Create a BlockingQueue
- Create an async
Task
to push the entire array into the BlockingQueue - Create an async
Task
to pop all items in from the BlockingQueue - Assert that the array of
pop
results matches the original array
This fails like this:
1) test BlockingQueue supports async and blocking pushs and pops_property (BlockingQueueTest)
test/blocking_queue_test.exs:29
** (EXIT from #PID<0.130.0>) :timeout_value
..
08:52:25.232 [error] Task #PID<0.361.0> started from #PID<0.130.0> terminating
Function: #Function<2.133850526/0 in BlockingQueueTest.prop_BlockingQueue supports async and blocking pushs and pops/0>
Args: []
** (exit) exited in: GenServer.call(#PID<0.360.0>, {:push, -59}, 5000)
** (EXIT) :timeout_value
Finished in 0.1 seconds (0.07s on load, 0.03s on tests)
6 tests, 1 failures
This is a little odd that we got a timeout after 5 seconds but the whole test suite ran in 0.1 seconds. There’s no example failure printed by ExCheck here but if I run IO.inspect during the test the last example is a rather long array (it also changes from run to run).
Pushing through
At this point things went in several directions
- I took the dog out for a walk an figured out how to put the type specs together
- I was seeing a match error so I
- rewrote all of the
defcall
commands withhandle_call
functions so I could add the types - I had trouble getting some of it to compile (mostly had the reply options backwards)
- I was got part way through adding type specs and realized I hadn’t run the tests at the last step
- Commented out the type specs and ran the test - PASS
- rewrote all of the
- Next step
- Complete the type specs
- Decide if I want to go back to ExActor
defcall
’s or convert everything toGenServer
- Explain all this stuff
First, this is the expansion to handle_call
functions:
def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
{:noreply, {max, list, :push, waiter, item}}
end
def handle_call({:push, item}, _, {max, list}) do
{:reply, nil, { max, list ++ [item] }}
end
def handle_call({:push, item}, _, {max, [], :pop, from}) do
GenServer.reply(from, item)
{:reply, nil, {max, []}}
end
def push(pid, item) do
GenServer.call(pid, {:push, item})
end
and these are the specs I ended up writing for handle_call
. The part I figured out while walking the dog was to write out all the different possible state types in compound type state_t
. Then I did the ame with call_t
and reuslt_t
@typep from_t :: {pid, any}
@typep state_t :: {integer(), [any]}
| {integer(), [any], :pop, from_t}
| {integer(), [any], :push, from_t, any}
@typep call_t :: {:push, any}
| :pop
@type result_t :: {:reply, any, state_t}
| {:noreply, state_t}
@spec handle_call(call_t, from_t, state_t) :: result_t
After working with thisgs for a while I ended up converting everything to GenServer rather than use ExActor. The most compelling reason was that I don’t know how to write the guard for
def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
{:noreply, {max, list, :push, waiter, item}}
end
using ExActor. After cleaning up the code a bit I ended up with the …
Final version
defmodule BlockingQueue do
use GenServer
# Can I get this from somewhere?
@type on_start :: {:ok, pid} | :ignore | {:error, {:already_started, pid} | term}
@spec start_link(integer) :: on_start
def start_link(n), do: GenServer.start_link(__MODULE__, n)
def init(n), do: {:ok, {n, []}}
@typep from_t :: {pid, any}
@typep state_t :: {integer(), [any]}
| {integer(), [any], :pop, from_t}
| {integer(), [any], :push, from_t, any}
@typep call_t :: {:push, any}
| :pop
@typep result_t :: {:reply, any, state_t}
| {:noreply, state_t}
@spec handle_call(call_t, from_t, state_t) :: result_t
def handle_call({:push, item}, waiter, {max, list}) when length(list) >= max do
{:noreply, {max, list, :push, waiter, item}}
end
def handle_call({:push, item}, _, {max, list}) do
{:reply, nil, { max, list ++ [item] }}
end
def handle_call({:push, item}, _, {max, [], :pop, from}) do
GenServer.reply(from, item)
{:reply, nil, {max, []}}
end
def handle_call(:pop, from, {max, []}), do: {:noreply, {max, [], :pop, from}}
def handle_call(:pop, _, {max, [x | xs]}), do: {:reply, x, {max, xs}}
def handle_call(:pop, _, {max, [x | xs], :push, waiter, item}) do
GenServer.reply(waiter, nil)
{:reply, x, {max, xs ++ [item]}}
end
@spec push(pid, any) :: nil
def push(pid, item), do: GenServer.call(pid, {:push, item})
@spec pop(pid) :: any
def pop(pid), do: GenServer.call(pid, :pop)
end
And then finally I changed the types because the maximum queue size should be pos_integer()
Wrapping up
At this point I belive BlockingQueue has the features I need. I’ll discuss puting the queue to use in two weeks. Next week there will be a speial post.
I’ve also packaged up BlockingQueue as the blocking_queue hex package so you can use it in your projects if you need it.