A couple of weeks ago I wrote about BlockingQueue and promised to show how I would put it to use in another project. But instead, this week we will take a small diversion and in this post I will explore how the Elixir Task
module works. I started using Task
in a project of mine in conjunction with BlockingQueue and found that it doesn’t work the way I expected and so I want to read through the Task
source to learn exactly what functionality it provides.
Questions
Whenever I read code I like to have a few questions in mind. That helps me focus my reading. Here are a few general questions I will consider as I read to improve my (and hopefuly your) understanding of Task
- What is the underlying implementation of
Task
? Is it a GenServer? - How is the result of the task communicated in
await
back to the caller?
I will add to this list as I go if I have questions that come up while reading. For example, if something in async
doesn’t make sense right away I might take a note to wait and see if await
makes it clearer.
Read the documentation
I’ll start with the documentation located here.
The documentation starts off with a description of Task
: “Conveniences for spawning and awaiting for tasks.” and then goes on to give an example of the most common usage.
After the example, the documentaton specifically calls out:
Tasks spawned with async can be awaited on by its caller process (and only its caller) as shown in the example above. They are implemented by spawning a process that sends a message to the caller once the given computation is performed.
This explains the problem I ran into. I was trying to start a Task
in one process and then send the task to another process, via BlockingQueue, to call await
. This would timeout because it isn’t a supported usage pattern. The explanation also gives us some insight into our first question – the result is sent by message to the “caller” (caller of Task.async/1
?).
The documentation goes on to describe the async
and await
functions at a high level. We’ll dig into them futher when we get to the API. The most important bit here is that “Once the task action finishes, a message will be sent to the caller with the result.”
The document continues with a discussion of “Supervised tasks” and “Supervision trees” which I am going to skip over because I am more intersted in the simple linked behavior.
Next we have a summary of the API. I will focus on the async
and await
functions.
async
The async
function has two variants.
async(fun)
async(mod, fun, args)
The two variations allow different ways to specify a function that will run inside a new Task
. Both functions return the Task
. The documentation describes the format of a reply – this reply coresponds to the “message” described in the high level description of async
and await
.
await
The await
function has only one variant: await(task, timeout \\ 5000)
which waits for a reply from the Task
. The documentation describes a reply that is sent by the Task
Read the source
Next we dive into the source starting with task.ex
Of cousre, the documentation is repeated here (or rather, originated from here) but I think reading the documentation in isolation the first time was worthwhile. It gives us an outsider’s or user’s perspective rather than the implementer’s perspective I expect to get from the source.
The first bit of code is the type definition.
defstruct pid: nil, ref: nil
The comment says that pid
is the process ID of the Task
and ref
is “the task monitor reference”. I don’t really understand what “task monitor” means so that’s a question to keep in mind.
- What is the underlying implementation of
Task
? Is it a GenServer? - How is the result of the task communicated in
await
back to the caller? - What is the task monitor?
Skipping a few functions we come to …
Task.async/1
:
@spec async(fun) :: t
def async(fun) do
async(:erlang, :apply, [fun, []])
end
This just delegates to Task.async/3
which is the next function:
@spec async(module, atom, [term]) :: t
def async(mod, fun, args) do
mfa = {mod, fun, args}
pid = :proc_lib.spawn_link(Task.Supervised, :async, [self, get_info(self), mfa])
ref = Process.monitor(pid)
send(pid, {self(), ref})
%Task{pid: pid, ref: ref}
end
A new process is created using the Erlang function proc_lib.spawn_link/3
. This call spawns a process but the process doesn’t directly call our function. It calls the function Task.Supervised.async/3
and passes it the arguments [self, get_info(self), mfa]
. Task.Supervised
is something we will have to look into later so our list of questions grows:
- What is
Task.Supervised
? What does it’sasync/3
function do?
But, continuing on in Task.async/3
we see that it creates ref
using Process.monitor/1
which causes the current process (i.e. the process that called async
) to monitor pid
.
Next Task.async/3
sends a message to the Task
process consisting of {self(), ref}. This is another thing to look into when we get to the body of the task:
- How does
Task.Supervised
respond to the message{self(), ref}
?
Finally, Task.async/3
returns the new Task
struct.
Task.await/2
Next we come to
@spec await(t, timeout) :: term | no_return
def await(%Task{ref: ref}=task, timeout \\ 5000) do
receive do
{^ref, reply} ->
Process.demonitor(ref, [:flush])
reply
{:DOWN, ^ref, _, _, :noconnection} ->
mfa = {__MODULE__, :await, [task, timeout]}
exit({ {:nodedown, node(task.pid)}, mfa})
{:DOWN, ^ref, _, _, reason} ->
exit({reason, {__MODULE__, :await, [task, timeout]}})
after
timeout ->
Process.demonitor(ref, [:flush])
exit({:timeout, {__MODULE__, :await, [task, timeout]}})
end
end
This just receives a message and accepts three different message patterns or a timeout. The three message patterns are
{^ref, reply}
- This receives the normal reply from theTask
. This pattern matches the format described byTask.async/1
. When this message arrivesTask.await/2
- Stops monitoring the
Task
process - Returns the reply to the caller
- Stops monitoring the
{:DOWN, ^ref, _, _, :noconnection}
-:DOWN
messages are part of being a monitor and are sent to the monitoring process when the monitored process terminates. This message means there is no connection to theTask
’s node. When this message arrivesTask.await/2
callsexit
to terminate the calling process.
{:DOWN, ^ref, _, _, reason}
- This is a general :DOWN message wherereason
describes the reason.- Again,
Task.await/2
exits in this case.
- Again,
- In the timeout case,
Task.await/2
- Stops monitoring the
Task
process. - Calls
exit
to terminate the calling process.
- Stops monitoring the
The rest of task.ex goes into the find
methods that I’m not as interested in.
Revisit the questions
We’ve read through the top level implementation of the async
and await
API so now is a good time to review our questions.
What is the underlying implementation of Task
? Is it a GenServer?
So far, we know that the underlying implementation is in Task.Supervised
. We will know more once we read through the code for that module. Since we have specific questions about Task.Supervised
we can cross this question off our list.
How is the result of the task communicated in await
back to the caller?
This we can answer – the Task
process sends a message back with the result.
What is the task monitor?
This we can answer – Task.async/3
uses Process.monitor
to create a monitor for the Task
process so that it can be notified if the Task
process crashes or if its node goes offline.
What is Task.Supervised
? What does it’s async/3
function do?
We can’t answer this yet so it stays on our list.
How does Task.Supervised
respond to the message {self(), ref}
?
This stays on the list.
Follow the questions
The remaining list is:
- What is
Task.Supervised
? What does it’sasync/3
function do? - How does
Task.Supervised
respond to the message{self(), ref}
?
The list points us to…
Task.Supervised
We’ll read the source to Task.Supervised
from task/supervised.ex.
First, notice that the module doesn’t use any other modules – and, related to our questions, it specifically does not use GenServer
. I skimmed through the entire module and confirmed it doesn’t implement any GenServer
functions like handle_call
. So, Task.Supervised
is not a GenServer
. It is simply a monitored process.
start_link
The module starts off with some start functions:
def start(info, fun) do
{:ok, :proc_lib.spawn(__MODULE__, :noreply, [info, fun])}
end
def start_link(info, fun) do
{:ok, :proc_lib.spawn_link(__MODULE__, :noreply, [info, fun])}
end
def start_link(caller, info, fun) do
:proc_lib.start_link(__MODULE__, :reply, [caller, info, fun])
end
Looking back to task.ex, I see that these are called from Task.start/2
, Task.start_link/2
and Task.start_link/3
. So I’ll skip over these as I’m more interested in the async
and await
API. Fortunately, the next function is:
async
def async(caller, info, mfa) do
initial_call(mfa)
ref = receive do: ({^caller, ref} -> ref)
send caller, {ref, do_apply(info, mfa)}
end
The last two lines are straightforward enough:
- First receive the message sent from
Task.async/3
which gives us the ref. - Then send
caller
the result. This assumesdo_apply
computes the result which will confirm shortly.
For the first line we need to go find the implementation of initial_call
defp initial_call(mfa) do
Process.put(:"$initial_call", get_initial_call(mfa))
end
defp get_initial_call({:erlang, :apply, [fun, []]}) when is_function(fun, 0) do
{:module, module} = :erlang.fun_info(fun, :module)
{:name, name} = :erlang.fun_info(fun, :name)
{module, name, 0}
end
defp get_initial_call({mod, fun, args}) do
{mod, fun, length(args)}
end
I wasn’t familiar with Process.put
but the documentation says that it “[s]tores the given key-value in the process dictionary.” The key is the atom :"$initial_call"
and the value is whatever is returned by get_initial_call(mfa)
. There are two cases here but it looks like ultimately they both resolve to a module, function, arguments tuple. This gets stored in the process dictionary.
I was curious about this atom $initial_call so I started up the Erlang observer from a default iex shell like this:
iex(2)> :observer.start
I navigated to the “Processes” tab and double clicked a process at random. The first row in the “Overview:” pane is:
Initial call: | erlang:apply/2 |
I’m guessing that setting a value in the process dictionary for $initial_call sets this property for the process. This way the the function passed to Task.async
will show up in the “Initial call” row. This would make it easier to debug a Task
or to find it in the observer.
The next function to look at is do_apply
defp do_apply(info, {module, fun, args} = mfa) do
try do
apply(module, fun, args)
catch
:error, value ->
exit(info, mfa, {value, System.stacktrace()})
:throw, value ->
exit(info, mfa, { {:nocatch, value}, System.stacktrace()})
:exit, value ->
exit(info, mfa, value)
end
end
So, this function just uses Kernel.apply/3
to call the Task
’s function and then handles any errors.
Review
Now that we’ve read though Task.Supervised
I think we can answer our remaining questions:
What is Task.Supervised
? What does it’s async/3
function do?
Task.Supervised
is the module that manages the process used to implement the Task
. It’s async/3
function:
- Sets the initial call in the process dictionary to the MFA passed in to
Task.async
- Waits for the message sent from
Task.async/3
which gives it the ref. - Applies the MFA
- Sends
caller
the result.
How does Task.Supervised
respond to the message {self(), ref}
?
Task.Supervised.async/3
recieves this message and extracts ref
from it. ref
is used in the reply back to the caller to identify which reply is coming back.
At first I wondered why ref
couldn’t be an argument to Task.Supervised.async/3
and I wondered if this was some sort of synchronization. But thinking about it more I realize that ref
is returned by Process.monitor(pid)
so it can’t be computed until after the Task
process starts. Hence, ref
must be sent as a message aftewards.
Wrapping up
At this point I have a much better understanding of how Task
’s async
and await
calls work. And I understand why I can’t initiate a Task
in one process and await
it in another. There’s still a lot more to Task
that I’ve skipped over, including it’s interaction with supervision trees.
I still need a means for creating something like a Task
in one process and collecting the result in another process. I’ve experimened with ExFuture, a Futures implementation for Elixir. It seems to do what I need. I’m going to try integerating it with my project and will let you know how it goes.