Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

Last week I wrote about publishing model data over a Phoenix Channel. My partner on the project I’m working suggested that we could use GenEvent to manage the model change notifications. The rest of this post will develop that idea.

GenEvent is a mechanism for managing event notifications. It consists of an event manager and one or more event handlers. We can try to use GenEvent by sending notification of model changes to the event manager. Then, we need an event handler which will be called for each event. In our handler we can respond to the event by publishing the data on the channel.

Event Architecture

This has the nice property that the source of events (the controller in our case) and the receiver(s) of the events (the channel in our case) don’t need to know about each other. Each module only needs to know about event manager. For this small example it isn’t a big deal but imagine that we had a larger application with multiple sources and receivers. We certainly don’t want our controller(s) to grow in complexity as the number of event receivers increases.

That’s the plan anyway, let’s start trying to put it together.

GenEvent and Phoenix.Channel

Here’s my first question: can I make my channel both a channel and a GenEvent? I don’t know the answer to this so let’s try it out.

First we add

use GenEvent

to PlayChannel.ToyChannel to implement the behaviour. A quick check of the show page makes it look like it’s still working. At this point I sort of wish I had written up some tests.

I’ll go back and fix any broken generated tests and write a few more.

Channel tests

I’ll back out the change to use GenEvent get the tests passing and then add it back in. Looks like only (and all) the toy_channel_tests are failing:

 1) test ping replies with status ok (PlayChannel.ToyChannelTest)
    test/channels/toy_channel_test.exs:14
    ** (EXIT from #PID<0.2791.0>) an exception was raised:
        ** (Ecto.CastError) deps/ecto/lib/ecto/repo/queryable.ex:188: value `"lobby"` in `where` cannot be cast to type :id in query:

    from t in PlayChannel.Toy,
      where: t.id == ^"lobby"

    Error when casting value to `PlayChannel.Toy.id`
            (elixir) lib/enum.ex:1473: Enum."-reduce/3-lists^foldl/2-0-"/3
            (elixir) lib/enum.ex:1151: Enum."-map_reduce/3-lists^mapfoldl/2-0-"/3
            (ecto) lib/ecto/repo/queryable.ex:91: Ecto.Repo.Queryable.execute/5
            (ecto) lib/ecto/repo/queryable.ex:15: Ecto.Repo.Queryable.all/4
            (ecto) lib/ecto/repo/queryable.ex:44: Ecto.Repo.Queryable.one/4
            (play_channel) web/channels/toy_channel.ex:5: PlayChannel.ToyChannel.join/3
            (phoenix) lib/phoenix/channel/server.ex:168: Phoenix.Channel.Server.init/1
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3



 2) test broadcasts are pushed to the client (PlayChannel.ToyChannelTest)
    test/channels/toy_channel_test.exs:24
    ** (EXIT from #PID<0.2793.0>) an exception was raised:
        ** (Ecto.CastError) deps/ecto/lib/ecto/repo/queryable.ex:188: value `"lobby"` in `where` cannot be cast to type :id in query:

    from t in PlayChannel.Toy,
      where: t.id == ^"lobby"

    Error when casting value to `PlayChannel.Toy.id`
            (elixir) lib/enum.ex:1473: Enum."-reduce/3-lists^foldl/2-0-"/3
            (elixir) lib/enum.ex:1151: Enum."-map_reduce/3-lists^mapfoldl/2-0-"/3
            (ecto) lib/ecto/repo/queryable.ex:91: Ecto.Repo.Queryable.execute/5
            (ecto) lib/ecto/repo/queryable.ex:15: Ecto.Repo.Queryable.all/4
            (ecto) lib/ecto/repo/queryable.ex:44: Ecto.Repo.Queryable.one/4
            (play_channel) web/channels/toy_channel.ex:5: PlayChannel.ToyChannel.join/3
            (phoenix) lib/phoenix/channel/server.ex:168: Phoenix.Channel.Server.init/1
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3



 3) test shout broadcasts to toys:lobby (PlayChannel.ToyChannelTest)
    test/channels/toy_channel_test.exs:19
    ** (EXIT from #PID<0.2795.0>) an exception was raised:
        ** (Ecto.CastError) deps/ecto/lib/ecto/repo/queryable.ex:188: value `"lobby"` in `where` cannot be cast to type :id in query:

    from t in PlayChannel.Toy,
      where: t.id == ^"lobby"

    Error when casting value to `PlayChannel.Toy.id`
            (elixir) lib/enum.ex:1473: Enum."-reduce/3-lists^foldl/2-0-"/3
            (elixir) lib/enum.ex:1151: Enum."-map_reduce/3-lists^mapfoldl/2-0-"/3
            (ecto) lib/ecto/repo/queryable.ex:91: Ecto.Repo.Queryable.execute/5
            (ecto) lib/ecto/repo/queryable.ex:15: Ecto.Repo.Queryable.all/4
            (ecto) lib/ecto/repo/queryable.ex:44: Ecto.Repo.Queryable.one/4
            (play_channel) web/channels/toy_channel.ex:5: PlayChannel.ToyChannel.join/3
            (phoenix) lib/phoenix/channel/server.ex:168: Phoenix.Channel.Server.init/1
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

The “shout” and “ping” tests need to be removed because I removed support for these messages from the channel. That leaves us with the broadcast test:

test "broadcasts are pushed to the client", %{socket: socket} do
  broadcast_from! socket, "broadcast", %{"some" => "data"}
  assert_push "broadcast", %{"some" => "data"}
end

And this fails because the generated test tries to join “toys:lobby” which we don’t recognize as a room. Fixing this a little difficult, I need a toy to be present in the database. I’ll have to update the setup funciton:

@valid_toy %{age: 42, color: "some content", name: "some content"}
setup do
  toy = Toy.changeset(%Toy{}, @valid_toy)
  |> Repo.insert!

  {:ok, payload, socket} =
    socket("user_id", %{some: :assign})
    |> subscribe_and_join(ToyChannel, "toys:#{toy.id}")

  {:ok, socket: socket, payload: payload, toy: toy}
end

Here we create a new toy and insert it into the database. Then we join the associated channel. I’ve also captured the payload from the reply and made it available for the tests. Futhermore, I’ve made the toy available to the test. I didn’t need these in the test right now but I think I will soon. With this change the test passes.

Now I’ll validate that we get the messages we expect and we’ll use the payload and toy to do it.

test "join delivers the original model", %{payload: payload, toy: toy} do
  assert payload == %{"age" => 42, "color" => "some content",
                      "name" => "some content", "id" => toy.id}
end

This test passes as well.

Finally, we need to perform an update to the toy and verify that there is a broadcast. This test passes:

test "broadcast_change tiggers a broadcast", %{toy: toy} do
  Toy.changeset(toy, %{age: 3})
  |> Repo.update!
  |> ToyChannel.broadcast_change

  assert_broadcast "change", _
end

Building a GenEvent

Now that I have some tests I’ll add use GenEvent line to PlayChannel.ToyChannel. The tests still pass but, I get some warnings:

web/channels/toy_channel.ex:1: warning: conflicting behaviours - callback code_change/3 required by both 'gen_event' and ''Elixir.Phoenix.Channel'' (line 1)
web/channels/toy_channel.ex:1: warning: conflicting behaviours - callback handle_info/2 required by both 'gen_event' and ''Elixir.Phoenix.Channel'' (line 1)
web/channels/toy_channel.ex:1: warning: conflicting behaviours - callback terminate/2 required by both 'gen_event' and ''Elixir.Phoenix.Channel'' (line 1)

This makes sense, both Channel and GenEvent have callbacks code_change, handle_info, and terminate. As both are OTP compliant I suspect the callbacks have the same purpose in both behaviours and could be compatible. But, this seems like an untravelled path and may not be a great idea.

Instead of having my channel act as both a channel and a GenEvent I’ll create a seperate GenEvent module to forward the messages on to the channel. I’ll start with this:

defmodule PlayChannel.Toy.UpdateEventHandler do
  use GenEvent
end

Now, what do I need to fill in? Reading the documentation for GenEvent I see that I need a handle_event function. It takes an event and current state as arguments and needs to return {:ok, newstate}. newstate? Do I need state in this handler? I don’t think so. So let’s do this:

def handle_event({:update, toy}, _) do
  PlayChannel.ToyChannel.broadcast_change(toy)
  {:ok, nil}
end

Our handle_event handles :update messages that include the toy that was updated. Then they forward toy on to PlayChannel.ToyChannel.broadcast_change so that the channel can respond. We return :ok with nil as our new state.

Now I think we have a functioning GenEvent handler. I have no tests to prove it but it looks good. The question now is: how do we get events into it?

We need to start a GenEvent event manager using GenEvent.start_link/1 and have the controller send events to it instead of sending them directly to the channel. We also need to register PlayChannel.Toy.UpdateEventHandler as a handler.

Also, I don’t just want to “start a GenEvent event manager” I want to add it to my supervision tree and name it. I do that by adding a worker entry in my child spec for the PlayChannel application:

@@ -13,6 +13,7 @@ defmodule PlayChannel do
       supervisor(PlayChannel.Repo, []),
       # Here you could define other workers and supervisors as children
       # worker(PlayChannel.Worker, [arg1, arg2, arg3]),
+      worker(GenEvent, [[name: :toy_event_manager]])
     ]

     # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html

Next we need to register PlayChannel.Toy.UpdateEventHandler as a handler. This is easy to do by calling GenEvent.add_handler/2 but the question is where to make the call?

I guess I should make the call after starting the supervision tree. So I’ll rewrite the end of the start function like this:

with {:ok, pid} <- Supervisor.start_link(children, opts),
     :ok <- GenEvent.add_handler(:toy_event_manager, PlayChannel.Toy.UpdateEventHandler, nil),
  do: {:ok, pid}

If we start the tree successfully then add the handler. This seems to work or at least all the exisitng tests pass which means the application came up. But, this code is a little ugly. Why does the application module need to know PlayChannel.Toy.UpdateEventHandler’s’ initial state? Let’s encapsulate this a bit better. I’m adding the function below to PlayChannel.Toy.UpdateEventHandler:

def register_with_manager(pid) do
  GenEvent.add_handler(pid, __MODULE__, nil)
end

Then I can call this from the application module instead:

with {:ok, pid} <- Supervisor.start_link(children, opts),
     :ok <- PlayChannel.Toy.UpdateEventHandler.register_with_manager(:toy_event_manager),
  do: {:ok, pid}

That’s a little better. Though the line is still a bit too long. But this has better separation of concerns. UpdateEventHandler knows its initial state. And PlayChannel knows the name of the GenServer process. Nice and clean.

Nice and clean is well enough, but does this actually work?

Sending the Event

The last thing we need to do is to modify the controller to send an event instead of calling the channel directly like this:

@@ -45,7 +45,7 @@ defmodule PlayChannel.ToyController do

     case Repo.update(changeset) do
       {:ok, toy} ->
-        PlayChannel.ToyChannel.broadcast_change(toy)
+        GenEvent.notify(:toy_event_manager, {:update, toy})

         conn
         |> put_flash(:info, "Toy updated successfully.")

And this works. The show page still updates in realtime in response to edits.

Refactor

Now that things are working I want to refactor a little. We made some nice cleanups in the last section between the PlayChannel application module and the PlayChannel.Toy.UpdateEventHandler. But we’ve taken a step backwards in this regard with these changes to the controller.

The issue comes down to where to put the knowledge about the name of the GenEvent process and other details. I think I want them in a new module:

defmodule PlayChannel.Toy.EventManager do

end

But what goes in here? There are few things we’ve written, so far, related to the event manager:

  • worker specification
  • notify
  • registration - generally, not specifically to any particular handler.

I think this should do it:

defmodule PlayChannel.Toy.EventManager do
  @name :toy_event_manager

  def child_spec, do: Supervisor.Spec.worker(GenEvent, [[name: @name]])

  @doc """
  Notifies the event manager with an event of the form {:update, toy}
  """
  def update(toy), do: GenEvent.notify(@name, {:update, toy})

  def register(handler, args), do: GenEvent.add_handler(@name, handler, args)
end

Here I store the name for the process in the module attribute @name. An alternative might just be to use __MODULE__ as the process name.

Next, we have the child_spec function. This can be used when building the supervision tree like this:

children = [
  # Start the endpoint when the application starts
  supervisor(PlayChannel.Endpoint, []),
  # Start the Ecto repository
  supervisor(PlayChannel.Repo, []),
  # Here you could define other workers and supervisors as children
  # worker(PlayChannel.Worker, [arg1, arg2, arg3]),
  PlayChannel.Toy.EventManager.child_spec
]

Next, in PlayChannel.Toy.EventManager we have the update/1 function which notifies with the :update event. I’ve choosen to document this function because the format of the event needs to be used outside this module. That is, the handlers need to be able to match on this event format. We can call update/1 from the controller like this:

@@ -45,7 +45,7 @@ defmodule PlayChannel.ToyController do

     case Repo.update(changeset) do
       {:ok, toy} ->
-        GenEvent.notify(:toy_event_manager, {:update, toy})
+        PlayChannel.Toy.EventManager.update(toy)

         conn
         |> put_flash(:info, "Toy updated successfully.")

Finally, in PlayChannel.Toy.EventManager we have the register/2 method. We’ll call this from PlayChannel.Toy.UpdateEventHandler.register_with_manager. With this change register_with_manager no longer needs to take a pid or name as an argument. So the function looks like this now:

def register_with_manager do
  PlayChannel.Toy.EventManager.register(__MODULE__, nil)
end

You can probably guess what the change to the application to call register_with_manager now.

And with these changes I can still edit a toy in one window and see it update live in another!

Conclusion

I’m pretty happy with this now. We have some nice clean interfaces for our event manager in the PlayChannel.Toy.EventManager module. We have a clean handler. There’s certinaly more that could be cleaned up here. For example, it would nice to add some aliases in the various modules so I could shorten up these module names. But I’ll save that for later.

We’ve also spent a lot of work to insert this abstract event manager and event handler into something that was already working. One might ask why bother? Well, for the simple example we have this may not be a big deal. But suppose our application grows in complexity, then we could end up filling our controller with a long list of modules that need to be updated every time a module changes. Is this really something we want our controller doing? I think not, we want to keep our controller small. This mechanism also reduces coupling. The controller only needs to know about the event manager. It doesn’t need to know anything about the handlers or the channels they represent.