Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

In this post I dive into Elixir’s Collectable protocol. After working through some theory we’ll dig into existing implementations and then implement Collectable for some of our own types.

Also, if you’ve been following along in my series on application architecture, we’ll be adding Collectable to the UnshorteningPool. This is something I’ve been talking about doing for some time. If you haven’t been following along in the series, don’t worry you should still be able to follow this article.

The first thing we need to look at is …

Enum.into

The point of Collectable is to enable Enum.into/2 and Enum.into/3 for different collectable data structures. That is, Enum.into calls Collectable.into in order to implement the functionality.

Let’s look at an example of Enum.into/3 - we can use it to transform a list into a Keyword list, like this:

Enum.into(1..10, [], fn x -> {x, rem(x, 2) == 0} end)

The arguments are

  1. An Enumerable, in our case the range 1..10
  2. A Collectable, in our case a new empty List []
  3. A function to transform the data, in our case the anonymous function fn x -> {x, rem(x, 2) == 0}

With this setup Enum.into works just like Enum.map.

[{1, false}, {2, true}, {3, false}, {4, true}, {5, false}, {6, true},
 {7, false}, {8, true}, {9, false}, {10, true}]

Because we’ve written tuples to the array we now have a Keyword list. In fact, we can enumerate the values into any Collectable including a Map:

iex(3)> Enum.into(1..10, %{}, fn x -> {x, rem(x, 2) == 0} end)
%{1 => false, 2 => true, 3 => false, 4 => true, 5 => false, 6 => true,
  7 => false, 8 => true, 9 => false, 10 => true}

The thing to note here is that all Dict conformant modules accept this tuple form. That is you write 2-tuples into a Keyword, HashDict or Map to build the key-value assocaitions.

You can also use Enum.into/2 which works the same way except that it doesn’t need a function. It passes the values through directly. As an example, this variant can be used to convert a Keyword list to a Map or vice-versa:

# Convert Keyword into Map
iex(1)> Enum.into([a: "b", b: "c"], %{})
%{a: "b", b: "c"}

# Convert Map into Keyword list
iex(2)> Enum.into(%{a: "b", b: "c"}, [])
[a: "b", b: "c"]

Comprehensions

Similarly, to Enum.into/3 you can use a for comprehension to store data into an Collectable. By default a comprehension writes values out to a list but you can use the into: keyword to use a different type.

We can rewrite our Map example of Enum.into/3 like this:

for x <- 1..10, into: %{} do
  {x, rem(x, 2) == 0}
end

which yields

%{1 => false, 2 => true, 3 => false, 4 => true, 5 => false, 6 => true,
  7 => false, 8 => true, 9 => false, 10 => true}

Again, we generate tuples which fill in the Map.

Protocols

Protocols are the Elixir feature which allow Enum.into and comprehensions enumerate into different types. A Protocol defines an abstract interface that can be implemented for different types.

Let’s take a look at the Collectable Protocol:

defprotocol Collectable do
  @type command :: {:cont, term} | :done | :halt

  @spec into(t) :: {term, (term, command -> t | term)}
  def into(collectable)
end

I’ve stripped out the documentation, but you can read the full version here.

This looks similar to a Module with the biggest difference being that we define a function into/1 but it has no body. The fact that there is no body is the point of the Protocol.

The Protocol simply defines an interface. It is up an implementation of this protocol to provide a body for the function or functions defined by the Prototcol.

With that, we should look at implementations of the Collectable Protocol.

Collectable implementations

The Collectable documentation gives a description of how to implement the Protocol:

into(collectable)

Returns a function that collects values alongside the initial accumulation value.

The returned function receives a collectable and injects a given value into it for every {:cont, term} instruction.

:done is passed when no further values will be injected, useful for closing resources and normalizing values. A collectable must be returned on :done.

If injection is suddenly interrupted, :halt is passed and it can return any value, as it won’t be used.

I have to admit, I found this hard to understand the first few times I read it over. And that’s why I haven’t implemened Collectable for my UnshorteningPool (until now). So, let’s look at a few examples to see if that helps us understand what we need to do.

The implementations for Collectable for the standard library are for List, BitString and Map. They live in the same file referenced above (though there is no requirement that they be in the same file).

Let’s start by taking a look at the implementation for Map

defimpl Collectable, for: Map do
  def into(original) do
    {original, fn
      map, {:cont, {k, v}} -> :maps.put(k, v, map)
      map, :done -> map
      _, :halt -> :ok
    end}
  end
end

This says that we want to define the implementation of the Collectable Protocol for the struct Map. Within the implementation we get to define Map’s version of the into/1 function.

The function into/1 returns a 2-tuple. This wasn’t clear to me from the documentation. The two elements of the tuple are

  1. A value, this value will be passed back to the function in element 2. You can think of this an accumulator that can be used across multiple invocations of into/1.
  2. A function that accepts the 3 command terms described in the documentation.

So now that we have some sense for the idea of what we are trying to do, let’s look at the specifics for Map.

The return tuple uses original as the value. That is, if we had called

eleven = {11 => false}
Enum.into(1..10, eleven, fn x -> {x, rem(x, 2) == 0} end)

Then original would be a starting Map, four. This allows us to start with the passed in map.

The function returned is this 3-headed monster:

fn
  map, {:cont, {k, v}} -> :maps.put(k, v, map)
  map, :done -> map
  _, :halt -> :ok
end

There are two arguments in each clause, first the accumulated value. This will be original the first time the anonymous function is called. The second time, it will be the result of the first call. The value accumulates as we enumerate the source.

The second argument is called the command and it takes one of three forms. We have one function clause per command.

The first clause handles the enumeration case. In this case a single value from the enumeration is passed an in and the anonymous function needs to put it into the map. It does this simply by calling :map.put/3.

The second clause handles the completion case. For Map there is nothing to do. In responsonse to the :done command the function must return a Collectable so it returns the accumulated, and now complete, Map.

The third clause handles error cases and as such can ignore the first parameter (the map). It can also return anything as the value will be ignored. Map doesn’t need to do any special clean up here but you could imagine that there might be cases that do.

And that’s it for Map.

Next, let’s look at List.

defimpl Collectable, for: List do
  def into(original) do
    {[], fn
      list, {:cont, x} -> [x|list]
      list, :done -> original ++ :lists.reverse(list)
      _, :halt -> :ok
    end}
  end
end

This of course takes on the same form as the implementation for Map. We need another defimpl block to hold the implementation for List. Again into/1 returns a tuple which we must look at in more detail.

The return tuple contains [] as the value. This will be used as an intermediate accumulator.

The {:cont, x} clause accumulates a list. The first time it is called it will construct [x]. The second time it is called it will prepend the new value to its growing list. By the time the enumeration is done it will have build a list of all items with the last item at the head of this list and the first item at the tail.

At that time the :done clause will be invoked and the accumulated list will be reversed and appended to the original. This gives the final result for Enum.into in the expected order.

Again, the :halt clause doesn’t have much to do in this example.

If you like this post then you should know that I'm writing a book full of patterns like these called Idiomatic Elixir.

If you inerested in the book then sign up below to follow its progress and be notified when it is launched. You'll also receive two free chapters as a sample of what the book will contain.

Along the way you will also receive Elixir tips based on my research for the book.

Implementing Collectable for your own types

The nice thing about Protocols is that you can implement them for your own types. If we implement Collectable for our own types then we can use Enum.into to push data into them.

I have two types where I think this might be useful. The first is the UnshorteningPool from my Domain Scrapper application.

Implementing Collectable for UnshorteningPool

UnshorteningPool supports streams of data like this:

Twitter.get |> UnshorteningPool.collect

By implementing Collectable I would be able to write:

Twitter.get |> Enum.into(UnshorteningPool)

Now, this isn’t any shorter but maybe it’s clearer. I mean, what does UnshorteningPool.collect/1 do? Unless you are familiar with UnshorteningPool, you may have no idea. The function name collect is suggestive and is maybe good enough. But, you know what Enum.into/2 does (at least I hope you do now after reading this post). It’s a standard function and communicates the intent without any extra context.

The one roadblock I have is that I actually don’t have a type for UnshorteningPool. It’s a GenServer that is identified by its name. But we can make up a type like this:

defmodule UnshorteningPool do
  # Used so we can implement protocols
  defstruct name: UnshorteningPool

  # ...
end

This type isn’t really useful for anything. But we can use it as a way to implement protocols. Futhermore, I decided to hide it from the callers so instead they can call UnshorteningPool.pool to return one these structs:

def pool, do: %UnshorteningPool{}

The next step is to implement the protocol. The UnshorteningPool already has a Mapper module that handles all the mapping and stream of data in and out of the pool. This is where I want the real code for implementing into to live. I can do this by writing the defimpl in it’s own file and then calling into the Mapper module like this:

defimpl Collectable, for: UnshorteningPool do
  def into(%UnshorteningPool{}), do:
    UnshorteningPool.Mapper.into(UnshorteningPool.pool_name)
end

So, the into/1 function matches on our type and then delegates to UnshorteningPool.Mapper. Let’s take a look at the new code in the Mapper module:

def into(pool) do
  { nil, &into(pool, &1, &2) }
end

defp into(pool,  _, {:cont, item}), do: push_item_through_pool(item, pool)
defp into(_pool, _, :done), do: %UnshorteningPool{}
defp into(_pool, _, :halt), do: :ok

defp push_item_through_pool(x, pool) do
  :poolboy.checkout(pool)
  |> UnshorteningPool.Worker.work(x)
end

I added these new functions to implement the into functionality. First, into/1 builds the return tuple. I don’t need an accumulator since the UnshorteningPool is a server process that maintains its own state so the value in the tuple is nil. Then, for clarity, I’ve broken out the function in the tuple into a named function in the module, into/3. I have 3 heads for this function, one for each command.

For {:cont, item} we call Mapper.push_item_through_pool to do the work of adding new work to the pool. This function checks out a poolboy worker and calls its work function.

For :done we just return our dummy type. This satisfies the requiremenent that into/1 must return a Collectable.

For :halt we don’t do anything special.

Note that all the functions ignore the accumulator argument and that it would be nice to have just not passed it in the first place but I can’t use &2 in a partial without using &1.

The functionality in push_item_through_pool/2 came from the function stream_through_pool which used to look like this:

defp stream_through_pool(enum, pool) do
  enum
  |> Stream.map(fn x -> {x, :poolboy.checkout(pool)} end)
  |> Stream.map(fn {x, worker} -> UnshorteningPool.Worker.work(worker, x) end)
  |> Stream.run
end

After extracting the functionality from the first two lines I refactored stream_through_pool into:

defp stream_through_pool(enum, pool) do
  Enum.map(enum, &push_item_through_pool(&1, pool))
end

With these changes I have a working into implementation. This test will pass:

test "it should implement Collectable" do
  input = [ ... ] # omitted
  expected = [ ...] |> Enum.sort # omitted
  Enum.into(input, UnshorteningPool.pool)

  assert expected ==
    UnshorteningPool.output_stream
    |> Enum.take(4)
    |> Enum.sort
end

Implementing Collectable for BlockingQueue

Again, we need to add a new type to use with Collectable as BlockingQueue is simply identified by a pid or name. Though BlockingQueue is different from UnshorteningPool in that there can be more than one. So we do need some data in our new type:

defmodule BlockingQueue do
  defstruct pid: nil
  # ...
end

I’ve added a struct for BlockingQueue that includes a field for the pid. With this we can implement Collectable:

defimpl Collectable, for: BlockingQueue do
  def into(%BlockingQueue{pid: pid}), do: {nil, &into(pid, &1, &2) }

  defp into(pid, _, {:cont, item}), do: BlockingQueue.push(pid, item)
  defp into(pid, _, :done), do: %BlockingQueue{pid: pid}
  defp into(_, _, :halt), do: :ok
end

I’ve taken a very similar approach to the one I used for UnshorteningPool in that I’ve written out the private function into/3 rather than using a large anonymous function. Again, three clauses:

In the {:cont, item} clause I use the existing BlockingQueue.push/2 function to push item into the queue.

In the :done clause I don’t need to do anything special. I just return our wrapper type.

In the :halt clause I don’t need to do anything at all and just return :ok.

Given that I can use the public push function from BlockingQueue I don’t need to write a function inside the BlockingQueue module. So, the whole implementation exists here in collectable.ex.

And with these changes BlockingQueue can pass a test like this:

test "BlockingQueue should implement Collectable" do
  input = ["Hello", "World"]

  {:ok, pid} = BlockingQueue.start_link(5)
  Enum.into(input, %BlockingQueue{pid: pid})

  assert input == BlockingQueue.pop_stream(pid) |> Enum.take(2)
end

Conclusion

In this post we dove deep into Collectable, first looking at how it is used, then how the Elixir standard library implements Collectable for two of its types and then finally by implementing Collectable for two types of our own.

I hope you’ve learned a lot from this post; I know I learned a lot in writing it.