Last week I wrote about the Elixir Stream representation. In this post I’ll dive into the implementation of Stream.map/2
.
First, I looked at the function itself
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) -> R.map(fun, f1) end
end
To understand map
, we’ll first need to understand this call to lazy
.
lazy
The macro lazy builds up a Stream represented as a struct as we see here in one of the the implementations
defp lazy(enum, fun),
do: %Stream{enum: enum, funs: [fun]}
So, lazy builds a stream from an enumerable and a function. I want to dive deeper into this struct representation of a stream next week but we can still make sense of what is going on here in map
. The call to lazy
doesn’t evaluate fun
or manipulate enum
in any way - it just packs them into a Stream
. So, the real work of map
is what we see in the map
function and the call to lazy
just packs up the result into a lazily evaluatable stream.
map
We return the function map
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) -> R.map(fun, f1) end
end
and now we can see that Stream.map/2
builds a lazily evaluated call to Stream.Reducers.map/2
. But, what are these reducers?
Stream.Reducers
This is the reducers map
macro:
defmacro map(callback, f \\ nil) do
quote do
fn(entry, acc) ->
cont(unquote(f), unquote(callback).(entry), acc)
end
end
end
The macro evaluates to a function that looks something like
fn(entry, acc) -> cont(f, callback.(entry), acc) end
where f
and callback
are evaluated lazily.
cont
is a macro defined in the Stream
module
defmacrop cont(f, entry, acc) do
quote do: unquote(f).(unquote(entry), unquote(acc))
end
which is used to evaluate the map
functions.
map
Puting everything together we could imagine we have
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) ->
fn(entry, acc) -> f1.(fun.(entry, acc)) end
end
end
Of course, this isn’t accurate because the the macros would evaluate their arguments lazily and the code I’ve written above can not. However, I find this expansion useful for concpetualizing what’s happening in a function. That said, this still hasn’t really enlightened me on how map
works.
What I do understand is that Stream.map/2
builds up a function and that function takes another function, f1
, as an argument. I’d really like to be able to step through the execution to see how this works in detail.
To refocus my investigation I asked myself …
Can I build a stream to print out each step of the evaluation?
Given what I learned in last week’s post about the representation of a stream I thought this might be possible. I started out trying to build a stream based on Stream.repeatedly/1
. First, I copied the code for repeatedly
into my own file and module named JKStream and tested that it worked.
defmodule JKStream do
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
end
end
iex(1)> c "jkstream.ex"
[JKStream]
iex(2)> f = JKStream.repeatedly(fn () -> 5 end)
#Function<0.89232002/2 in JKStream.repeatedly/1>
iex(3)> f |> Stream.take(5) |> Enum.to_list
[5, 5, 5, 5, 5]
At this point I had a working copy of repeatedly
that I could play around with. I started adding some puts
calls.
defmodule JKStream do
# @spec repeatedly((() -> element)) :: Enumerable.t
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
IO.puts ":halt #{inspect acc}"
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
IO.puts ":cont #{inspect acc}"
do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
end
end
iex(10)> JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5) |> Enum.to_list :cont [[], 5]
:cont [[5], 4]
:cont [[5, 5], 3]
:cont [[5, 5, 5], 2]
:cont [[5, 5, 5, 5], 1]
:cont [[5, 5, 5, 5, 5], 0]
:halt [[5, 5, 5, 5, 5], 0]
Now, this was intereting. This shows that Strea.take/1
maintains a counter in the stream. But, what will this tell us about map
?
iex(11)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
:cont [[], 5]
:cont [[2.5], 4]
:cont [[2.5, 2.5], 3]
:cont [[2.5, 2.5, 2.5], 2]
:cont [[2.5, 2.5, 2.5, 2.5], 1]
:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]
:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]
Unfortunately, this doesn’t really provide any useful information about the inner workings of map.
After a few tries I ended up with this version of JKStream.repeatedly/1
defmodule JKStream do
# @spec repeatedly((() -> element)) :: Enumerable.t
def repeatedly(generator_fun) when is_function(generator_fun, 0) do
&do_repeatedly(generator_fun, &1, &2)
end
defp do_repeatedly(_generator_fun, {:halt, acc}, _state_fun) do
IO.puts "do_repeatedly({:halt #{inspect acc}}, _state_fun)"
{:halted, acc}
end
defp do_repeatedly(generator_fun, {:cont, acc}, state_fun) do
IO.puts "do_repeatedly({:cont #{inspect acc}}, #{inspect state_fun})"
do_repeatedly(generator_fun, state_fun.(generator_fun.(), acc), state_fun)
end
end
I choose not to print the value of generator_fun
because it is just the function I pass to repeatedly and would just be noise in the output. I do want to understand what state_fun
is as it is the function used to advance the state during evaluation of the stream. With this verson I get the following trace
iex(9)> JKStream.repeatedly(fn () -> 5 end) |> Stream.map(fn(x) -> x / 2 end) |> Stream.take(5) |> Enum.to_list
do_repeatedly({:cont [[], 5]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5], 4]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5], 3]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5], 2]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5], 1]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:cont [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, #Function<62.75994740/2 in Stream.map/2>)
do_repeatedly({:halt [[2.5, 2.5, 2.5, 2.5, 2.5], 0]}, _state_fun)
which shows that state_fun
comes from Stream.map
and has arity 2. Looking back to our expanded source for map
@spec map(Enumerable.t, (element -> any)) :: Enumerable.t
def map(enum, fun) do
lazy enum, fn(f1) ->
fn(entry, acc) -> f1.(fun.(entry, acc)) end
end
end
we must conclude that state_fun
is fn(entry, acc) -> f1.(fun.(entry, acc)) end
as it is the only function here with arity 2. But, what does this mean?
We know that fun
is just the user definined function passed to map
. In our example it is fn(x) -> x / 2 end
. But what is f1
? Looking at the code we see that f1
is an argument to the function embedded in the stream created by lazy
. This tells us a few things
f1
is somewhat abitrary because it is passed in from outside though it may have to follow some conventions.- because
f1
comes from outside it can’t be part of the map computation. f1
effecitively wraps (via function composition) each element in ourmap
result.
So, I see this as function chaining. I won’t go into a lot more detail but this is the mechanism that allows the composition of streams in Elixir. For example, Stream.take/2
will call Stream.Reducers.take/1
which has its own function to control :cont
and :halt
behavior based on the count. So in the case of JKStream.repeatedly(fn () -> 5 end) |> Stream.take(5)
take
would pass the following function as f1
fn(entry, acc(h, n, t) = orig) ->
if n >= 1 do
cont_with_acc(unquote(f), entry, h, n-1, t)
else
{:halt, orig}
end
end
As f1
wraps the map
computation it would fully wrap evaluation of the stream. Note though that the actual evaluation of the stream would not be triggered until to_list