Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

Last week I wrote about the Elixir Stream representation. In this post I’ll dive into the implementation of Stream.map/2.

First, I looked at the function itself

@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
  lazy enum, fn(f1) -> R.map(fun, f1) end
end

To understand map, we’ll first need to understand this call to lazy.

lazy

The macro lazy builds up a Stream represented as a struct as we see here in one of the the implementations

defp lazy(enum, fun),
  do: %Stream{enum: enum, funs: [fun]}

So, lazy builds a stream from an enumerable and a function. I want to dive deeper into this struct representation of a stream next week but we can still make sense of what is going on here in map. The call to lazy doesn’t evaluate fun or manipulate enum in any way - it just packs them into a Stream. So, the real work of map is what we see in the map function and the call to lazy just packs up the result into a lazily evaluatable stream.

map

We return the function map

@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
  lazy enum, fn(f1) -> R.map(fun, f1) end
end

and now we can see that Stream.map/2 builds a lazily evaluated call to Stream.Reducers.map/2. But, what are these reducers?

Stream.Reducers

This is the reducers map macro:

defmacro map(callback, f \\ nil) do
  quote do
    fn(entry, acc) ->
      cont(unquote(f), unquote(callback).(entry), acc)
    end
  end
end

The macro evaluates to a function that looks something like

fn(entry, acc) -> cont(f, callback.(entry), acc) end

where f and callback are evaluated lazily.

cont is a macro defined in the Stream module

defmacrop cont(f, entry, acc) do
  quote do: unquote(f).(unquote(entry), unquote(acc))
end

which is used to evaluate the map functions.

map

Puting everything together we could imagine we have

@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
  lazy enum, fn(f1) ->
    fn(entry, acc) -> f1.(fun.(entry, acc)) end
  end
end

Of course, this isn’t accurate because the the macros would evaluate their arguments lazily and the code I’ve written above can not. However, I find this expansion useful for concpetualizing what’s happening in a function. That said, this still hasn’t really enlightened me on how map works.

What I do understand is that Stream.map/2 builds up a function and that function takes another function, f1, as an argument. I’d really like to be able to step through the execution to see how this works in detail.

To refocus my investigation I asked myself …

Can I build a stream to print out each step of the evaluation?

Given what I learned in last week’s post about the representation of a stream I thought this might be possible. I started out trying to build a stream based on Stream.repeatedly/1. First, I copied the code for repeatedly into my own file and module named JKStream and tested that it worked.

defmodule JKStream do
  def repeatedly(generator_fun) when is_function(generator_fun, 0) do
    &do_repeatedly(generator_fun, &1, &2)
  end

  defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
    {:halted, acc}
  end

  defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
    do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
  end
end
iex(1)> c "jkstream.ex"
[JKStream]
iex(2)> f = JKStream.repeatedly(fn () -> 5 end)
#Function<0.89232002/2 in JKStream.repeatedly/1>
iex(3)> f |> Stream.take(5) |> Enum.to_list
[5, 5, 5, 5, 5]

At this point I had a working copy of repeatedly that I could play around with. I started adding some puts calls.

defmodule JKStream do
  # @spec repeatedly((() -> element)) :: Enumerable.t
  def repeatedly(generator_fun) when is_function(generator_fun, 0) do
    &do_repeatedly(generator_fun, &1, &2)
  end

  defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
    IO.puts ":halt #{inspect acc}"
    {:halted, acc}
  end

  defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
    IO.puts ":cont #{inspect acc}"
    do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
  end
end
iex(10)> JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5) |> Enum.to_list  :cont [[], 5]
:cont [[5], 4]
:cont [[5, 5], 3]
:cont [[5, 5, 5], 2]
:cont [[5, 5, 5, 5], 1]
:cont [[5, 5, 5, 5, 5], 0]
:halt [[5, 5, 5, 5, 5], 0]

Now, this was intereting. This shows that Strea.take/1 maintains a counter in the stream. But, what will this tell us about map?

iex(11)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
:cont [[], 5]
:cont [[2.5], 4]
:cont [[2.5, 2.5], 3]
:cont [[2.5, 2.5, 2.5], 2]
:cont [[2.5, 2.5, 2.5, 2.5], 1]
:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]
:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]

Unfortunately, this doesn’t really provide any useful information about the inner workings of map.

After a few tries I ended up with this version of JKStream.repeatedly/1

defmodule JKStream do
  # @spec repeatedly((() -> element)) :: Enumerable.t
  def repeatedly(generator_fun) when is_function(generator_fun, 0) do
    &do_repeatedly(generator_fun, &1, &2)
  end

  defp do_repeatedly(_generator_fun, {:halt, acc}, _state_fun) do
		IO.puts "do_repeatedly({:halt #{inspect acc}}, _state_fun)"
    {:halted, acc}
  end

  defp do_repeatedly(generator_fun, {:cont, acc}, state_fun) do
		IO.puts "do_repeatedly({:cont #{inspect acc}}, #{inspect state_fun})"
    do_repeatedly(generator_fun, state_fun.(generator_fun.(), acc), state_fun)
  end
end

I choose not to print the value of generator_fun because it is just the function I pass to repeatedly and would just be noise in the output. I do want to understand what state_fun is as it is the function used to advance the state during evaluation of the stream. With this verson I get the following trace

iex(9)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
do_repeatedly({:cont [[], 5]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5], 4]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5], 3]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5], 2]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5], 1]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, _state_fun)

which shows that state_fun comes from Stream.map and has arity 2. Looking back to our expanded source for map

@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
  lazy enum, fn(f1) ->
    fn(entry, acc) -> f1.(fun.(entry, acc)) end
  end
end

we must conclude that state_fun is fn(entry, acc) -> f1.(fun.(entry, acc)) end as it is the only function here with arity 2. But, what does this mean?

We know that fun is just the user definined function passed to map. In our example it is fn(x) -> x / 2 end. But what is f1? Looking at the code we see that f1 is an argument to the function embedded in the stream created by lazy. This tells us a few things

  • f1 is somewhat abitrary because it is passed in from outside though it may have to follow some conventions.
  • because f1 comes from outside it can’t be part of the map computation.
  • f1 effecitively wraps (via function composition) each element in our map result.

So, I see this as function chaining. I won’t go into a lot more detail but this is the mechanism that allows the composition of streams in Elixir. For example, Stream.take/2 will call Stream.Reducers.take/1 which has its own function to control :cont and :halt behavior based on the count. So in the case of JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5) take would pass the following function as f1

fn(entry, acc(h, n, t) = orig) ->
  if n >= 1 do
    cont_with_acc(unquote(f), entry, h, n-1, t)
  else
    {:halt, orig}
  end
end

As f1 wraps the map computation it would fully wrap evaluation of the stream. Note though that the actual evaluation of the stream would not be triggered until to_list