Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

Hello Learning Elixir readers! It’s has been a very long time since my last post. I’ve been focused on other technologies and have taken a new job. I want to thank my friend Doug Goldie for keeping me invovled with Elixir during that time. And I want to thank my wife for encouraging me to get back on track with this blog.

A lot has changed in Elixir since I last did any serious work with it. Elixir 1.3 was released, Elixir World 2016, and more. One of the things that caught my eye is GenStage. Last year, the plan was to build GenRouter and it looks like it’s now evolved into GenStage.

There’s a lot to learn with GenStage. To approach this subject, let’s start with some questions:

  1. What is GenStage (and Flow for that matter)?
  2. Where can I find out more information?
  3. What would be a good project to try them out?

For question #1 I started with a Google search and it turned up the official GenStage Announcement. Wow, it was posted back in July, I have been gone a long time. Anyway, it gives the following description:

GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.

But what does this mean? Let’s go over this definition piece by peice. First of all, GenStage is for Elixir. That’s simple enough.

GenStage is a “behaviour” which is the OTP term for an interface. Specifically, GenStage defines a set of functions or callbacks that must be implemented by a process adopting the behaviour. GenStage may also provide defaults implementations of those functions.

GenStage is “for exchanging events … between Elixir procsses”. Events? That’s a change from what I remember GenRouter was intended to be. If I remember correctly GenRouter was for streaming data through multiprocess pipelines. I think this shift to events may have been a generalization from GenRouter.

GenStage does all of this with “back-pressure”. Back-pressure is a mechanism for controlling the rate in a producer-consumer setup. If the producer and consumer run assynchronously then it is likely that one can get ahead of the other. If the consumer falls behind then back-pressure can be used to prevent the producer from over-producing.


The announcement gives this example!("path/to/some/file")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
|> Enum.to_list()

to describe the motivation for GenStage. This is an example of a lazy data transformation pipeline which is common in Elixir. But, this solution does not leverage the concurrency afforded by the BEAM and modern CPUs.

The goal for GenStage is to enable concurrent processing of large datasets while still retaining Elixir’s easy to understand style of data transformation pipelines.


The announcement goes on to show and example of GenStage stages. First there is a counter called A

alias Experimental.GenStage

defmodule A do
  use GenStage

  def init(counter) do
    {:producer, counter}

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)

    # The events to emit is the second element of the tuple,
    # the third being the state.
    {:noreply, events, counter + demand}

which is a producer. One thing I was left wondering after reading the announcement is the meaning of :producer in the init/1 function. Is :producer a special value recognized by GenStage or just the name for A? Looking at the docs I found:

In case of successful start, this callback must return a tuple where the first element is the stage type, which is either a :producer, :consumer or :producer_consumer if it is taking both roles.

So in fact :producer is a special value recognized by GenStage.

The rest of A is the handle_demand/2 function. This is a callback which GenStage will use to request more items from a producer type stage. The demand argument is the requested, or demanded, number of events. The counter argument is the current state for process A. Since stage A is a counter it maintains the current count as its state. For each handle_demand/2 call, enough values to satisfy demand are returned and counter is incremented by demand. In this way, A can return the subsequent set of values on the next call.

The announcment goes on to build a :consumer type stage which has a handle_events/3 callback. This function should process or store the passed events and update the GenStage’s state.

There announcment also builds a :producer_consumer type stage. This type must define both a handle_demand/2 and handle_events/3 callback.

Connecting the Stages

The next step is to start the stages and then connect them using sync_subscribe/3. This step seems a bit manual but it sounds like Flow will privde an easier way to assembly these stages for simple cases.

The part I found most interesting was that multiple consumers can be connected in order to create more concurrency. When I initially started reading the announcment I was worried that GenStage would only allow for creating pipelined concurrency which is not the most effective form of concurrency. And that’s because a 5 stage pipeline only allows for 5 concurrent activities. But GenStage seems to be much more flexible.

GenStage Use Cases

The announcment describes a few use cases for GenStage.

GenStage for Data-Ingestion

One of the use cases for GenStage is to consume data from third-party systems.

This sounds very similar to my Domain Scraper Domain Scraper experiment. It would be an interesting exercise to go back and rewrite Domain Scraper using GenStage.

GenStage for Event Dispatching

Another use case:

Another scenario where GenStage can be useful today is to replace cases where developers would have used GenEvent in the past.

The announcment goes on to describe an advantage of GenStage over GenEvent:

GenEvent, however, has one big flaw: the event manager and all event handlers run in the same process.

That’s interesting. I hadn’t realized this limitation of GenEvent before. GenStage seems like a big improvement in that case. When using GenEvent as a notification system for the observer pattern there can be many observers. With all handlers being run in the same process, all the handlers would have to be run serially, sacrificing the concurrency that BEAM provides us.

Later on the announcment has a call to action for GenEvent users:

First of all, now is the moment for the community to step in and try GenStage out. If you have used GenEvent in the past, can it be replaced by a GenStage? Similarly, if you were planning to implement an event handling system, give GenStage a try.

Well I do have a project that uses GenEvent. Back in February I wrote about Using GenEvent to Notify a Channel of Updates in Elixir. This might be a good place for me to start experimenting with GenStage.

A look at Flow

The end of the announcemnt gives a glimpse of Flow which allows the example at the beginning of the announcment to be rewritten like this:

alias Experimental.GenStage.Flow

# Let's compile common patterns for performance
empty_space = :binary.compile_pattern(" ") # NEW!!("path/to/some/file", read_ahead: 100_000) # NEW!
|> Flow.from_enumerable()
|> Flow.flat_map(fn line ->
    for word <- String.split(empty_space), do: {word, 1}
|> Flow.partition_with(storage: :ets) # NEW!
|> Flow.reduce_by_key(& &1 + &2)
|> Enum.to_list()

This allows for asynchonrous processing in the data transformation pipeline. Looking at the docs, it seems Flow is available and I’d like to try it out in a future post.

Next Steps

In this post we looked at the GenStage announcment and dug into certain parts more deeply. We started out with a list of questions:

  1. What is GenStage (and Flow for that matter)?
  2. Where can I find out more information?
  3. What would be a good project to try them out?

How did we do here?

  1. We answered this for GenStage. But, we’ve only touched on Flow.
  2. We found out more information by reading the announcement and the docs. I also plan to watch the ElixirConf 2016 Keynote which I hear covers GenStage and Flow.
  3. We identified two projects - GenEvent to notify a Phoenix Channel and Domain Scrapper.

In answering these questions we’ve left ourselves with some tasks to follow up on. Here’s a list:

  1. Watch the ElixirConf 2016 Keynote
  2. Use GenStage to notify a Phoenix Channel of new events.
  3. Experiment with Flow
  4. Rewrite Domain Scrapper to use GenStage.

I hope to work through items 2-4 in future posts and I encourage you to watch the Keynote, if you haven’t done so already, in the meantime.