Last week I wrote about the Elixir Stream representation. In this post I’ll dive into the implementation of Stream.map/2.
First, I looked at the function itself
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) -> R.map(fun, f1) end
endTo understand map, we’ll first need to understand this call to lazy.
lazy
The macro lazy builds up a Stream represented as a struct as we see here in one of the the implementations
defp lazy(enum, fun),
do: %Stream{enum: enum, funs: [fun]}So, lazy builds a stream from an enumerable and a function. I want to dive deeper into this struct representation of a stream next week but we can still make sense of what is going on here in map. The call to lazy doesn’t evaluate fun or manipulate enum in any way - it just packs them into a Stream. So, the real work of map is what we see in the map function and the call to lazy just packs up the result into a lazily evaluatable stream.
map
We return the function map
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) -> R.map(fun, f1) end
endand now we can see that Stream.map/2 builds a lazily evaluated call to Stream.Reducers.map/2. But, what are these reducers?
Stream.Reducers
This is the reducers map macro:
defmacro map(callback, f \\ nil) do
quote do
fn(entry, acc) ->
cont(unquote(f), unquote(callback).(entry), acc)
end
end
endThe macro evaluates to a function that looks something like
fn(entry, acc) -> cont(f, callback.(entry), acc) endwhere f and callback are evaluated lazily.
cont is a macro defined in the Stream module
defmacrop cont(f, entry, acc) do
quote do: unquote(f).(unquote(entry), unquote(acc))
endwhich is used to evaluate the map functions.
map
Puting everything together we could imagine we have
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) ->
fn(entry, acc) -> f1.(fun.(entry, acc)) end
end
endOf course, this isn’t accurate because the the macros would evaluate their arguments lazily and the code I’ve written above can not. However, I find this expansion useful for concpetualizing what’s happening in a function. That said, this still hasn’t really enlightened me on how map works.
What I do understand is that Stream.map/2 builds up a function and that function takes another function, f1, as an argument. I’d really like to be able to step through the execution to see how this works in detail.
To refocus my investigation I asked myself …
Can I build a stream to print out each step of the evaluation?
Given what I learned in last week’s post about the representation of a stream I thought this might be possible. I started out trying to build a stream based on Stream.repeatedly/1. First, I copied the code for repeatedly into my own file and module named JKStream and tested that it worked.
defmodule JKStream do
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
end
endiex(1)> c "jkstream.ex"
[JKStream]
iex(2)> f = JKStream.repeatedly(fn () -> 5 end)
#Function<0.89232002/2 in JKStream.repeatedly/1>
iex(3)> f |> Stream.take(5) |> Enum.to_list
[5, 5, 5, 5, 5]
At this point I had a working copy of repeatedly that I could play around with. I started adding some puts calls.
defmodule JKStream do
# @spec repeatedly((() -> element)) :: Enumerable.t
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
IO.puts ":halt #{inspect acc}"
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
IO.puts ":cont #{inspect acc}"
do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
end
endiex(10)> JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5) |> Enum.to_list :cont [[], 5]
:cont [[5], 4]
:cont [[5, 5], 3]
:cont [[5, 5, 5], 2]
:cont [[5, 5, 5, 5], 1]
:cont [[5, 5, 5, 5, 5], 0]
:halt [[5, 5, 5, 5, 5], 0]
Now, this was intereting. This shows that Strea.take/1 maintains a counter in the stream. But, what will this tell us about map?
iex(11)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
:cont [[], 5]
:cont [[2.5], 4]
:cont [[2.5, 2.5], 3]
:cont [[2.5, 2.5, 2.5], 2]
:cont [[2.5, 2.5, 2.5, 2.5], 1]
:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]
:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]
Unfortunately, this doesn’t really provide any useful information about the inner workings of map.
After a few tries I ended up with this version of JKStream.repeatedly/1
defmodule JKStream do
# @spec repeatedly((() -> element)) :: Enumerable.t
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _state_fun) do
IO.puts "do_repeatedly({:halt #{inspect acc}}, _state_fun)"
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, state_fun) do
IO.puts "do_repeatedly({:cont #{inspect acc}}, #{inspect state_fun})"
do_repeatedly(generator_fun, state_fun.(generator_fun.(), acc), state_fun)
end
endI choose not to print the value of generator_fun because it is just the function I pass to repeatedly and would just be noise in the output. I do want to understand what state_fun is as it is the function used to advance the state during evaluation of the stream. With this verson I get the following trace
iex(9)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
do_repeatedly({:cont [[], 5]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5], 4]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5], 3]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5], 2]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5], 1]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, _state_fun)
which shows that state_fun comes from Stream.map and has arity 2. Looking back to our expanded source for map
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) ->
fn(entry, acc) -> f1.(fun.(entry, acc)) end
end
endwe must conclude that state_fun is fn(entry, acc) -> f1.(fun.(entry, acc)) end as it is the only function here with arity 2. But, what does this mean?
We know that fun is just the user definined function passed to map. In our example it is fn(x) -> x / 2 end. But what is f1? Looking at the code we see that f1 is an argument to the function embedded in the stream created by lazy. This tells us a few things
f1is somewhat abitrary because it is passed in from outside though it may have to follow some conventions.- because
f1comes from outside it can’t be part of the map computation. f1effecitively wraps (via function composition) each element in ourmapresult.
So, I see this as function chaining. I won’t go into a lot more detail but this is the mechanism that allows the composition of streams in Elixir. For example, Stream.take/2 will call Stream.Reducers.take/1 which has its own function to control :cont and :halt behavior based on the count. So in the case of JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5) take would pass the following function as f1
fn(entry, acc(h, n, t) = orig) ->
if n >= 1 do
cont_with_acc(unquote(f), entry, h, n-1, t)
else
{:halt, orig}
end
endAs f1 wraps the map computation it would fully wrap evaluation of the stream. Note though that the actual evaluation of the stream would not be triggered until to_list