Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

A couple of weeks ago I wrote about BlockingQueue and promised to show how I would put it to use in another project. But instead, this week we will take a small diversion and in this post I will explore how the Elixir Task module works. I started using Task in a project of mine in conjunction with BlockingQueue and found that it doesn’t work the way I expected and so I want to read through the Task source to learn exactly what functionality it provides.

Questions

Whenever I read code I like to have a few questions in mind. That helps me focus my reading. Here are a few general questions I will consider as I read to improve my (and hopefuly your) understanding of Task

  • What is the underlying implementation of Task? Is it a GenServer?
  • How is the result of the task communicated in await back to the caller?

I will add to this list as I go if I have questions that come up while reading. For example, if something in async doesn’t make sense right away I might take a note to wait and see if await makes it clearer.

Read the documentation

I’ll start with the documentation located here.

The documentation starts off with a description of Task: “Conveniences for spawning and awaiting for tasks.” and then goes on to give an example of the most common usage.

After the example, the documentaton specifically calls out:

Tasks spawned with async can be awaited on by its caller process (and only its caller) as shown in the example above. They are implemented by spawning a process that sends a message to the caller once the given computation is performed.

This explains the problem I ran into. I was trying to start a Task in one process and then send the task to another process, via BlockingQueue, to call await. This would timeout because it isn’t a supported usage pattern. The explanation also gives us some insight into our first question – the result is sent by message to the “caller” (caller of Task.async/1?).

The documentation goes on to describe the async and await functions at a high level. We’ll dig into them futher when we get to the API. The most important bit here is that “Once the task action finishes, a message will be sent to the caller with the result.”

The document continues with a discussion of “Supervised tasks” and “Supervision trees” which I am going to skip over because I am more intersted in the simple linked behavior.

Next we have a summary of the API. I will focus on the async and await functions.

async

The async function has two variants.

  • async(fun)
  • async(mod, fun, args)

The two variations allow different ways to specify a function that will run inside a new Task. Both functions return the Task. The documentation describes the format of a reply – this reply coresponds to the “message” described in the high level description of async and await.

await

The await function has only one variant: await(task, timeout \\ 5000) which waits for a reply from the Task. The documentation describes a reply that is sent by the Task

Read the source

Next we dive into the source starting with task.ex

Of cousre, the documentation is repeated here (or rather, originated from here) but I think reading the documentation in isolation the first time was worthwhile. It gives us an outsider’s or user’s perspective rather than the implementer’s perspective I expect to get from the source.

The first bit of code is the type definition.

defstruct pid: nil, ref: nil

The comment says that pid is the process ID of the Task and ref is “the task monitor reference”. I don’t really understand what “task monitor” means so that’s a question to keep in mind.

  • What is the underlying implementation of Task? Is it a GenServer?
  • How is the result of the task communicated in await back to the caller?
  • What is the task monitor?

Skipping a few functions we come to …

Task.async/1:

@spec async(fun) :: t
def async(fun) do
  async(:erlang, :apply, [fun, []])
end

This just delegates to Task.async/3 which is the next function:

@spec async(module, atom, [term]) :: t
def async(mod, fun, args) do
  mfa = {mod, fun, args}
  pid = :proc_lib.spawn_link(Task.Supervised, :async, [self, get_info(self), mfa])
  ref = Process.monitor(pid)
  send(pid, {self(), ref})
  %Task{pid: pid, ref: ref}
end

A new process is created using the Erlang function proc_lib.spawn_link/3. This call spawns a process but the process doesn’t directly call our function. It calls the function Task.Supervised.async/3 and passes it the arguments [self, get_info(self), mfa]. Task.Supervised is something we will have to look into later so our list of questions grows:

  • What is Task.Supervised? What does it’s async/3 function do?

But, continuing on in Task.async/3 we see that it creates ref using Process.monitor/1 which causes the current process (i.e. the process that called async) to monitor pid.

Next Task.async/3 sends a message to the Task process consisting of {self(), ref}. This is another thing to look into when we get to the body of the task:

  • How does Task.Supervised respond to the message {self(), ref}?

Finally, Task.async/3 returns the new Task struct.

Task.await/2

Next we come to

@spec await(t, timeout) :: term | no_return
def await(%Task{ref: ref}=task, timeout \\ 5000) do
  receive do
    {^ref, reply} ->
      Process.demonitor(ref, [:flush])
      reply
    {:DOWN, ^ref, _, _, :noconnection} ->
      mfa = {__MODULE__, :await, [task, timeout]}
      exit({ {:nodedown, node(task.pid)}, mfa})
    {:DOWN, ^ref, _, _, reason} ->
      exit({reason, {__MODULE__, :await, [task, timeout]}})
  after
    timeout ->
      Process.demonitor(ref, [:flush])
      exit({:timeout, {__MODULE__, :await, [task, timeout]}})
  end
end

This just receives a message and accepts three different message patterns or a timeout. The three message patterns are

  1. {^ref, reply} - This receives the normal reply from the Task. This pattern matches the format described by Task.async/1. When this message arrives Task.await/2
    • Stops monitoring the Task process
    • Returns the reply to the caller
  2. {:DOWN, ^ref, _, _, :noconnection} - :DOWN messages are part of being a monitor and are sent to the monitoring process when the monitored process terminates. This message means there is no connection to the Task’s node. When this message arrives
    • Task.await/2 calls exit to terminate the calling process.
  3. {:DOWN, ^ref, _, _, reason} - This is a general :DOWN message where reason describes the reason.
    • Again, Task.await/2 exits in this case.
  4. In the timeout case, Task.await/2
    • Stops monitoring the Task process.
    • Calls exit to terminate the calling process.

The rest of task.ex goes into the find methods that I’m not as interested in.

Revisit the questions

We’ve read through the top level implementation of the async and await API so now is a good time to review our questions.

What is the underlying implementation of Task? Is it a GenServer?

So far, we know that the underlying implementation is in Task.Supervised. We will know more once we read through the code for that module. Since we have specific questions about Task.Supervised we can cross this question off our list.

How is the result of the task communicated in await back to the caller?

This we can answer – the Task process sends a message back with the result.

What is the task monitor?

This we can answer – Task.async/3 uses Process.monitor to create a monitor for the Task process so that it can be notified if the Task process crashes or if its node goes offline.

What is Task.Supervised? What does it’s async/3 function do?

We can’t answer this yet so it stays on our list.

How does Task.Supervised respond to the message {self(), ref}?

This stays on the list.

Follow the questions

The remaining list is:

  • What is Task.Supervised? What does it’s async/3 function do?
  • How does Task.Supervised respond to the message {self(), ref}?

The list points us to…

Task.Supervised

We’ll read the source to Task.Supervised from task/supervised.ex.

First, notice that the module doesn’t use any other modules – and, related to our questions, it specifically does not use GenServer. I skimmed through the entire module and confirmed it doesn’t implement any GenServer functions like handle_call. So, Task.Supervised is not a GenServer. It is simply a monitored process.

The module starts off with some start functions:

def start(info, fun) do
  {:ok, :proc_lib.spawn(__MODULE__, :noreply, [info, fun])}
end

def start_link(info, fun) do
  {:ok, :proc_lib.spawn_link(__MODULE__, :noreply, [info, fun])}
end

def start_link(caller, info, fun) do
  :proc_lib.start_link(__MODULE__, :reply, [caller, info, fun])
end

Looking back to task.ex, I see that these are called from Task.start/2, Task.start_link/2 and Task.start_link/3. So I’ll skip over these as I’m more interested in the async and await API. Fortunately, the next function is:

async

def async(caller, info, mfa) do
  initial_call(mfa)
  ref = receive do: ({^caller, ref} -> ref)
  send caller, {ref, do_apply(info, mfa)}
end

The last two lines are straightforward enough:

  • First receive the message sent from Task.async/3 which gives us the ref.
  • Then send caller the result. This assumes do_apply computes the result which will confirm shortly.

For the first line we need to go find the implementation of initial_call

defp initial_call(mfa) do
  Process.put(:"$initial_call", get_initial_call(mfa))
end

defp get_initial_call({:erlang, :apply, [fun, []]}) when is_function(fun, 0) do
  {:module, module} = :erlang.fun_info(fun, :module)
  {:name, name} = :erlang.fun_info(fun, :name)
  {module, name, 0}
end

defp get_initial_call({mod, fun, args}) do
  {mod, fun, length(args)}
end

I wasn’t familiar with Process.put but the documentation says that it “[s]tores the given key-value in the process dictionary.” The key is the atom :"$initial_call" and the value is whatever is returned by get_initial_call(mfa). There are two cases here but it looks like ultimately they both resolve to a module, function, arguments tuple. This gets stored in the process dictionary.

I was curious about this atom $initial_call so I started up the Erlang observer from a default iex shell like this:

iex(2)> :observer.start

I navigated to the “Processes” tab and double clicked a process at random. The first row in the “Overview:” pane is:

Initial call: erlang:apply/2

I’m guessing that setting a value in the process dictionary for $initial_call sets this property for the process. This way the the function passed to Task.async will show up in the “Initial call” row. This would make it easier to debug a Task or to find it in the observer.

The next function to look at is do_apply

defp do_apply(info, {module, fun, args} = mfa) do
  try do
    apply(module, fun, args)
  catch
    :error, value ->
      exit(info, mfa, {value, System.stacktrace()})
    :throw, value ->
      exit(info, mfa, { {:nocatch, value}, System.stacktrace()})
    :exit, value ->
      exit(info, mfa, value)
  end
end

So, this function just uses Kernel.apply/3 to call the Task’s function and then handles any errors.

Review

Now that we’ve read though Task.Supervised I think we can answer our remaining questions:

What is Task.Supervised? What does it’s async/3 function do?

Task.Supervised is the module that manages the process used to implement the Task. It’s async/3 function:

  • Sets the initial call in the process dictionary to the MFA passed in to Task.async
  • Waits for the message sent from Task.async/3 which gives it the ref.
  • Applies the MFA
  • Sends caller the result.

How does Task.Supervised respond to the message {self(), ref}?

Task.Supervised.async/3recieves this message and extracts ref from it. ref is used in the reply back to the caller to identify which reply is coming back.

At first I wondered why ref couldn’t be an argument to Task.Supervised.async/3 and I wondered if this was some sort of synchronization. But thinking about it more I realize that ref is returned by Process.monitor(pid) so it can’t be computed until after the Task process starts. Hence, ref must be sent as a message aftewards.

Wrapping up

At this point I have a much better understanding of how Task’s async and await calls work. And I understand why I can’t initiate a Task in one process and await it in another. There’s still a lot more to Taskthat I’ve skipped over, including it’s interaction with supervision trees.

I still need a means for creating something like a Task in one process and collecting the result in another process. I’ve experimened with ExFuture, a Futures implementation for Elixir. It seems to do what I need. I’m going to try integerating it with my project and will let you know how it goes.