Last week I learned about
by reading the announcement
. It was a good introduction, but now I’m ready to get my hands dirty and write some
GenStage code myself. From the previous post I built up a TODO list of things to try:
- Watch the ElixirConf 2016 Keynote
GenStageto notify a Phoenix Channel of new events.
- Experiment with
- Rewrite Domain Scrapper to use
I’ve watched the Keynote, so let’s dive into 2.
Use GenStage to Notify a Phoenix Channel of new events.
This post will follow in the footsteps of Using GenEvent to Notify a Channel of Updates in Elixir.
In the post using
GenEvent, we had a channel that we were using to broadcast
changes to a model to all web clients. But, we wanted to decouple the controller
that was performing updates to the model and the channel. The thought being that
we would use an observer pattern to communicate changes. There could be many
observers to the changes in addition to the channel and the controller shouldn’t
have to be aware of any of them.
I have a play channel project in github. I went back and created a branch called pre-gen-event and will begin work from there.
First, I need to insure that I can still build everything. There are some warnings in the deps. These are because the deps are old but I’ve upgraded to Elixir 1.3.2. I don’t want to update Phoenix yet so I’m going to live with the warnings for now.
There is one warning in my code:
warning: variable payload is unused web/channels/toy_channel.ex:4
This is easy enough to fix by renaming
_payload in the argument list to
At this point the project compiles and all the tests pass.
To get started with our GenStage changes, we’ll have to add
Experimental.GenStage to our dependencies in mix.exs by adding a line like this to
Then, I’ll add
alias Experimental.GenStage to the top of toy_channel.ex and
use GenStage just after
use PlayChannel.Web, :channel at the
begining of the module.
These changes trigger a few warnings:
warning: undefined behaviour function init/1 (for behaviour Experimental.GenStage) web/channels/toy_channel.ex:3 warning: conflicting behaviours - callback code_change/3 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) web/channels/toy_channel.ex:3 warning: conflicting behaviours - callback handle_info/2 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) web/channels/toy_channel.ex:3 warning: conflicting behaviours - callback terminate/2 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) web/channels/toy_channel.ex:3
The tests still pass but I’m worried about these warnings. There are two types. Here’s the first:
warning: undefined behaviour function init/1 (for behaviour Experimental.GenStage)
This is a good warning and we can deal with it. I just haven’t implemented GenStage’s required
init/1 function. Once we add the function it should address the warning. We’re left with:
warning: conflicting behaviours - callback code_change/3 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) warning: conflicting behaviours - callback handle_info/2 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) web/channels/toy_channel.ex:3 warning: conflicting behaviours - callback terminate/2 required by both ''Elixir.Phoenix.Channel'' and ''Elixir.Experimental.GenStage'' (line 3) web/channels/toy_channel.ex:3
For these three, we had similar warning when adding GenEvent to the channel. And again, these are the OTP compliant callbacks and will have the same purpose for both
Phoenix.Channel. So I think we’re ok to proceed here.
Well, with that out of the way let’s go ahead and implement the
init/1 function. To do this, we need to decide if
PlayChannel.ToyChannel is a
GenStage producer or consumer.
Our goal here is to use GenStage “to Notify a Channel of Updates”. That means that
- a change will original in the controller
- there is a notification center to broadcast an event about the change
PlayChannel.ToyChannelreceives the event and sends the payload over the channel.
This means the notification center will be the event producer and
PlayChannel.ToyChannel will be the consumer.
Now we can write the
init/1 function and intialize as a consumer. Here’s the function we need:
init/1 needs no arguments and no state. It only need to return that it will behave as a
At this point the tests still pass. However, we need to add a
handle_events/3 function in order to receive events from our notification center. I think this should do it:
This is similar to what we wrote for
GenStage.handle_events/3 accepts a list of events. To process the list we use
Enum.each and broadcast each update on the channel.
handle_events returns no events (it’s a consumer) and needs no state.
Build the GenStage Producer
Next, we need to create the producer. We’ll call it
PlayChannel.Toy.UpdateEventHandler (this is the same name we used in the
GenEvent version). We’ll start with
There’s actually a great example of what we need in the GenStage annoucement
Let’s go over this.
First there is the
start_link function. This will be used to start and name the GenStage.
sync_notify. This is the function that our controller will call to notify observers that an update has been made. It uses
GenStage.call to send the update event to the GenStage server.
Next come the callbacks, the first is
init. This initializes the GenStage as a producer and sets a custom dispatch.
GenStage.BroadcastDispatcher is describe in the documentation as:
A dispatcher that accumulates demand from all consumers before broadcasting events to all of them
This is the behavior we need for an notification center. Finally, we set up a state for the stage. Out state is a queue, and a demand. The initial queue is created with
:queue.new. This queue will hold events waiting to be broadcast. The initial demand is 0.
At the end of the module is a function,
dispatch_events which is used to implement both
Let’s think first about
handle_demand. There are two cases to consider
First, there are events in the queue ready to be returned. In this case we can just return them in
handle_demand. Second, the queue is empty. In this case we can’t return a result to the consumer, we have to defer it’s return until there is an event.
The next callback is
handle_call/3 This is just like
handle_call and will receive the messages sent by our
handle_demand/2 which is the GenStage callback for a producer to provide more events. This calls
dispatch_events which is an interesting function. It recursively calls itself and for each element in queue up to the demand. For each recursive call it calls
GenStage.reply/2. This is the reply back to the
I’m actually having a hard time understanding and explaining this code. I think I’ll try writing my own version.
Building a GenStage producer for Event Notification
I want to TDD a solution for
UpdateEventHandler. I had one false start trying to write this up and ended up with tests like this:
As I wrote more and more tests, I started to think that this is too tied to my implementation. Yes, I want to use
GenStage. And in fact I’m writing a blog post about
GenStage. But should my tests really require that? The fact that I’ll use
GenStage is a implementation detail and the tests should really only test the externally visible behavior, right? I’m throwing these tests away and starting over. I’ll focus on the behaviors.
That said, I do have a
PlayChannel.ToyChannel (the observer) and it will need to subscribe the the
UpdateEventHandler. But that’s the only
GenStage detail that I should depend on.
I’ll start with this test:
This is adapated from test tests I had before, but I’ve created a new
UpdateEventHandler.stop/1 interface. This is nice, it abstracts me away from
Here’s an implementation that passes the tests:
Now, I could have written a simpler implementation that passed the tests. But I’m going to go ahead and write this as a
GenStage because I know I’ll need to.
The next test I’ll add is:
which requires that I have a
notify function. This is part of the behavior I want in my module. I want callers to be able to notify all observers of an event.
Here’s a very simple implementation that passes:
I should also note that I get a few warnings when compiling:
warning: variable event is unused lib/play_channel/toy/update_event_handler.ex:9 warning: variable handler is unused lib/play_channel/toy/update_event_handler.ex:9
I could easily fix these, but I keep them as a reminder that
notify is imcomplete, I’m not actually notifying anything yet.
The next behavior that I want to test for is that I can subscribe a
GenStage consumer to the UpdateEventHandler. This test requires that I setup such a consumer. Here’s my shot at it:
TestObserver is kind of a lot of code to write for a test. But I’ve tried to keep it as simple as possible. When I run this test I get:
1) test an observer can subscribe to it (PlayChannel.Toy.UpdateEventHandlerTest) test/update_event_handler_test.exs:32 ** (EXIT from #PID<0.256.0>) an exception was raised: ** (UndefinedFunctionError) function PlayChannel.Toy.UpdateEventHandler.handle_demand/2 is undefined or private.
This means that I need to add the
handle_demand/2 callback. Let’s do it:
This doesn’t do anything yet. When I run the test it still fails, now with this error:
1) test an observer can subscribe to it (PlayChannel.Toy.UpdateEventHandlerTest) test/update_event_handler_test.exs:32 ** (exit) no process stacktrace: (stdlib) proc_lib.erl:794: :proc_lib.stop/3 test/update_event_handler_test.exs:38: (test)
Line 38, in the test, coresponds to the call to
GenStage.stop(observer). I guess that when I stop
updater the subscribed consumers must also shutdown. So I can just remove the call.
With this change the test passes.
Next, I want to write a test that calls notify on
updater then verifies that
observer sees the event. To do this I’m going to need to add something to
TestObserer to allow me to verify what it observes. I’ll do this before writing any new tests. I think this should do it:
Here I’ve added an interface
get/1 which is procesed by
handle_call/3. The get functionality returns the state of the
TestObserver. I’ve also updated
handle_events to just accumulate events into the state. That is,
TestObserver collects its events and returns them when you call
After making this change I reran the existing tests to make sure I didn’t break
TestObserver. All was good.
Next, I wrote this test which uses the new
This is very similar to the previous test except that we’ve added the assert:
This uses the new
get function to retrieve any observed events. And we expect to get the single event “Hi” because that’s the event we notified to the updater.
We can make this new test pass by updating
This version declares
[event] as available events, ready to be dispatched. The
GenStage logic will see to it that these events are stored in an internal buffer and dispatched when there is demand.
After writing this I found that the documentation has an example that uses almost the same exact code as I have just written! And it goes on to describe the reason for the more complex version.
The motivation for a more complex implementation is that unserved demand is being held in
GenStage’s internal buffer. Unserved demand refers to events that have arrived through the
notify function but have not yet been delivered to subscribers through the
handle_demand function. If this internal buffer becomes too full a log message will be emitted. A better solution is to manage a queue within our code to hold all the pending events.
Let’s try adding a queue to our version. There’s no test that should motivate this change as the externally visible behavior stays the same.
UpdateEventHandler will need state. Let’s think for a minute what this state is:
- A queue to hold incoming events. This queue fills up when the rate of incoming events exceeds the demand.
- Unfilled demand. This can be a simple counter that records demand that we haven’t been able to fill because the demand exceeds the suppy of events.
Let’s put this together. First, in
init we have to intialize our state. We need an empty queue and 0 for outstanding demand.
With this change the tests still pass.
handle_call we accept new events. There are two cases to deal with
- There is no outstanding demand, so we put the new event in our queue.
- There is outstanding demand, so we forward the new event on immediately.
Here, we handle the two cases using two separate function clauses. The first, handles the case where there is no outstanding demand (demand is 0).
The second function clause handles the case where there is demand. It inserts the event at the rear of the queue and then pops off an event from the head of the queue and returns it.
Do I need to handle the case that
demand > 1 and the queue is not empty in
handle_call? If so, it means that I should return more than one event. But, if there is demand and there were events waiting why were they not already dispatched? This is something to look into a little later.
Also, our tests fail after this change:
1) test a subscribed observer is notified of all events (PlayChannel.Toy.UpdateEventHandlerTest) test/update_event_handler_test.exs:39 Assertion with == failed code: TestObserver.get(observer) == ["Hi"] lhs:  rhs: ["Hi"] stacktrace: test/update_event_handler_test.exs:46: (test)
Ok, this failure makes sense because we’re never increasing the demand stored in our state. So we always use the first case (demand is 0) and never return the events.
To fix this, we need to update
handle_demand. Again, there are two cases.
- There are waiting events in the queue. In this case satisfy the demand from the queue.
- There are no waiting events in the queue. In this case save the demand in the state.
Here’s an implementation
I’ve introduced the function
take to take multiple items out of the queue. It’s a simple recursive function though it would be nice to have tests for it. It returns a tuple of the taken events, a new queue with the remaining events, and the remaining demand.
With this change the tests (the ones we already had) pass.
An interesting thing to note is that we could apply
handle_call to deal with the case of
demand > 1 and queue not empty that I asked about earlier. It would look something like this:
I guess there is no reason not to use this version.
Here’s the whole module:
This is now pretty close to
QueueBroadcaster in the
GenStage docs. The difference is that I opted for the abstraction of
take which is inspired by
QueueBroadcaster does the same thing but combines extracting items from the queue with building up the return values for
handle_call in the
Conceptually, I find my version a little easier to read because
handle_call build their own return values. This way
take just focuses on the single tasks of extracting items from the queue.
That said, there are a couple of things that
QueueBroadcaster does better than
dispatch_eventshas an optimization in building up the lists. It uses
[event | events]where
items ++ [item]. Inserting at the head of the list is more efficient.
dispatch_eventsthen reverse the list before returning the final result.
handle_callif the value isn’t being sent to the observers immediately. Then, when the event is finally consumed it uses
GenStage.reply/2to send the reply. This keeps the caller from going on until the notification is delivered.
I think point number 2 is really important and I need that functionality as well. And I think this is what necessitates
dispatch_events being as complex as it is.
Give this, I think it makes sense to use the
QueueBroadcaster implementation. But working through this implementation has given me a much better understanding of what’s going on.
Also, there is one more thing we should have done with
UpdateEventHandler. We needed to write a test for multiple subscribers. And this would have revealed that we needed to change our init function to use the
QueueBroadcaster does. This would like this:
I’ll switch over to the implementaiton from
QueueBroadcaster but in doing so I’ll need to remove this test:
notify waits for the event to be accepted by the subscribers. As this test sets up no subscribers it ends up timing out. This is no longer a suported behavior and it wasn’t a behavior that was important to me anyway. So I can remove the test. The behavior or accepting events and delivering them to subscribers is already tested in this test:
My final version looks like this:
I’ve maintained my
notify functions. Also, I haven’t named my server after the module. I expect to be able to have many instances of it.
Now, the last step is to actually call our
UpdateEventHandler to make an update. We do this from the controller like this:
Well, wait, that isn’t quite right. I’ve adapted this from the GenEvent version but in this case what’s
I guess I do need to name my
GenStage. Se we need to:
- Start an
UpdateEventHandlerin our supervision tree.
- Add support for naming
- Name our instance
- Register the channel with the
Let’s start with a test for naming first.
The implementation is easy enough. We’ll just allow any options accepted by
and the tests pass. Now we just need to start an
UpdateEventHandler in our supervision tree like this:
What am I doing?
Ugh, I think I’m totally lost here. In the GenEvent case,
UpdateEventHandler was actually a separate GenServer, not part of the channel. And
GenEvent played the role of the notification server. I need something like this again. I really should have reivewed the old post better. (And honestly, I think I made this same mistake with
Ok, we can fix this. I rename
NotificationCenter which feels good. I like that name better.
Then, I extract what I’d written in the channel into a separate
GenStage. And it is this new actor that should be named
UpdateEventHandler. It looks like this:
There’s also one change that I’ve added here which I’ve taken from the
GenStage announcment. The
init function now uses
subscribe_to: [:toy_notifcation_center to automatically subscribe to the notification center.
:toy_notifcation_center is the name I setup for that notification center when setting up my supervisor, like this:
subscribe_to functionality in
GenStage is nice in that I don’t have to write extra code after the supervisor is started to build up the connections.
And now it works!
Well, that was quite a long post. Part of the length came from a few mistakes I made along the way but I know I learned something from them and I hope you can too.
It was good to explore using
GenStage in an observer pattern and as a replacement for
GenEvent. It felt like a lot of code to write though I guesss I went though several iterations with
NotificationCenter. If I needed to do this again elsewhere the understanding I came to here would allow me to move a lot faster.
One thing I found really interesting is that some of the work involved in managing the queue in
NotificationCenter reminded me of the
BlockingQueue that I wrote a long time back in Blocking Queue. And the
BlockingQueue is something I’ve used in many posts to solve similar problems to those solved by
GenStage. Though I think in the
GenStage model, the queue is primarily used up front at the producer stage and supply/demand between consumer stages is managed by the demand system build into
Coming up in future posts, we still have a few items left on our
GenStage TODO list:
- Experiment with
- Rewrite Domain Scrapper to use
Next week I hope to learn more about
Flow. I hope you’ll join me.