Three alternatives to using GenEvent in Elixir

- -

Wojket Gawronski's post here, neatly summarises the issues with GenEvent. Fortunately there are alternatives including gproc, phoenix_pubsub, and Elixir's upcoming Process registry.

The code used in these examples is available from this repository.

gproc

While gproc's main purpose is as a process registry, it can be used as pub/sub framework using a lovely trick.

defmodule PubsubSpike.Gproc do
  use GenServer

  def start_link(topic, otp_opts \\ []) do
    GenServer.start_link(__MODULE__, topic, otp_opts)
  end

  def broadcast(topic, message) do
    GenServer.cast({:via, :gproc, gproc_key(topic)},
                   {:broadcast, message})
  end

  def messages_received(pid) do
    GenServer.call(pid, :messages_received)
  end

  def init(topic) do
    :gproc.reg(gproc_key(topic))
    {:ok, []}
  end

  def handle_cast({:broadcast, message}, messages_received) do
    {:noreply, [message | messages_received]}
  end

  def handle_call(:messages_received, _from, messages_received) do
    {:reply, Enum.reverse(messages_received), messages_received}
  end

  defp gproc_key(topic) do
    {:p, :l, topic}
  end
end

The code above (also here), illustrates using gproc in this way. In init the GenServer process registers itself with a particular key: {:p, :l, topic}:

  • The :p atom in the tuple, indicates to gproc that multiple processes may be registered using the same key. (If it was :n, then the uniqueness of the key would be enforced.)
  • The :l just says that the process is registered locally, just on this node. :g would register globally, across all connected nodes, but that involves a certain amount of faff.
  • And topic is our topic.

broadcast broadcasts a message to all processes listening on a topic; this is implemented with GenServer.cast. Rather than identifying the cast's target process with a pid or a name in the Erlang process registry, it is identified using a via tuple, {:via, :gproc, gproc_key(topic)} which delegates finding the pid(s) to gproc's whereis_name function. Thus, all processes registered with that key (effectively listening to that topic) will receive the cast.

This test shows it all working (code also here):

  alias PubsubSpike.Gproc
  test "broadcast messages" do
    {:ok, pid1} = Gproc.start_link("sue")
    {:ok, pid2} = Gproc.start_link("sue")
    {:ok, pid3} = Gproc.start_link(:miranda)

    Gproc.broadcast("sue", "Hi Sue!")
    Gproc.broadcast(:miranda, "Hi Miranda!")

    assert Gproc.messages_received(pid1) == ["Hi Sue!"]
    assert Gproc.messages_received(pid2) == ["Hi Sue!"]
    assert Gproc.messages_received(pid3) == ["Hi Miranda!"]
  end

Elixir Registry

Registry is a more Elixir-like and built-in version of gproc. We can also use it as a pub/sub framework. A registry is a supervisor and must be started, such as in the application supervisor:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    supervisor(Registry, [:duplicate,  :pubsub_elixir_registry]),
    # ...
  ]
  opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
  Supervisor.start_link(children, opts)
end

Note that we have called this registry :pubsub_elixir_registry and have marked it as allowing duplicate keys.

Here is a pub/sub implementation similar to that for gproc:

defmodule PubsubSpike.ElixirRegistry do
  use GenServer

  def start_link(topic, otp_opts \\ []) do
    GenServer.start_link(__MODULE__, topic, otp_opts)
  end

  def broadcast(topic, message) do
    Registry.dispatch(:pubsub_elixir_registry, topic, fn entries ->
      for {pid, _} <- entries, do: send(pid, {:broadcast, message})
    end)
  end

  def messages_received(pid) do
    GenServer.call(pid, :messages_received)
  end

  def init(topic) do
    Registry.register(:pubsub_elixir_registry, topic, [])
    {:ok, []}
  end

  def handle_info({:broadcast, message}, messages_received) do
    {:noreply, [message | messages_received]}
  end

  def handle_call(:messages_received, _from, messages_received) do
    {:reply, Enum.reverse(messages_received), messages_received}
  end
end

Registries that allow duplicates are not allowed to service via tuples, so we cannot perform the same trick as we did with gproc. Instead, broadcast uses Registry.dispatch to send a message to each process listening on a topic. We can use handle_info to receive messages to the subscribed topic.

The illustrative test, here, is so similar to the gproc version that I will not include it below.

Phoenix PubSub

Unlike gproc and Registry, Phoenix PubSub's primary purpose is as a pub/sub framework. There is a clue in the name.

Phoenix PubSub supports multiple implementations, such as a Phoenix PubSub Redis, but we will just use the built-in one based on Erlang's PG2. We will to start the Phoenix.PubSub.PG2 supervisor, in the application supervisor:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    supervisor(Phoenix.PubSub.PG2, [:pubsub_spike, []]),
    # ..
  ]

  opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
  Supervisor.start_link(children, opts)
end

Note that we have named the Phoenix PubSub supervisor, :pubsub_spike.

Implementing with the same interface as the other examples, we get this, following:

defmodule PubsubSpike.PhoenixPubsub do
  use GenServer

  alias Phoenix.PubSub

  def start_link(topic, otp_opts \\ []) when is_binary(topic) do
    GenServer.start_link(__MODULE__, topic, otp_opts)
  end

  def broadcast(topic, message) do
    PubSub.broadcast(:pubsub_spike, topic, {:pubsub_spike, message})
  end

  def messages_received(pid) do
    GenServer.call(pid, :messages_received)
  end

  def init(topic) do
    PubSub.subscribe(:pubsub_spike, topic)
    {:ok, []}
  end

  def handle_call(:messages_received, _from, messages_received) do
    {:reply, Enum.reverse(messages_received), messages_received}
  end

  def handle_info({:pubsub_spike, msg}, messages_received) do
    {:noreply, [msg | messages_received]}
  end
end

The interface to Phoenix Pub Sub is more straightforward than the other two, as it was designed specifically for pub/sub. We explicitly subscribed to a topic (in init), broadcast to a topic (in broadcast), and receive a message (in handle_info). It is worth noting that PHoenix PubSub only supports binary (String) topics.

There is test code here, but is also so similar to the gproc test that it is not worth embedding in the blog.

Bonus: distributed events with Phoenix PubSub

A bonus of using Phoenix PubSub is that it can publishes events across connected nodes. This post's companion code is set up for you to easily play with this.

A PubsubSpike.PhoenixPubsub worker (as above), named :phoenix_pubsub and listening on the topic "topic:phoenix_pubsub", is created by its application supervisor:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    supervisor(Phoenix.PubSub.PG2, [:pubsub_spike, []]),
    worker(PubsubSpike.PhoenixPubsub, ["topic:phoenix_pubsub",
                                      [name: :phoenix_pubsub]]),
    # ..
  ]

  opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
  Supervisor.start_link(children, opts)
end

To try this out, (assuming you're all set up for Elixir) open a terminal and type the following.

git clone [email protected]:CultivateHQ/pubsub_spike.git
cd pubsub_spike
mix deps.get

iex --sname mel@localhost -S mix

In another terminal, but the same directory:

iex --sname sue@localhost -S mix
Node.connect(:"mel@localhost") # should return true

PubsubSpike.PhoenixPubsub.broadcast("topic:phoenix_pubsub", "Hello all!")

PubsubSpike.PhoenixPubsub.messages_received(:phoenix_pubsub)

messages_received should return ["Hello all!"]. Now in the other terminal (Node "mel@localhost"), try

PubsubSpike.PhoenixPubsub.messages_received(:phoenix_pubsub)

That too should return ["Hello all!"]. The message has been broadcast from the other node.

Summary

You may already be using gproc as a registry for dynamic processes; if so, it may be convenient to also use it's pub/sub capabilities. In most cases it would be better to use the built-in Registry if you need the events to be propagated locally.

Use Phoenix.PubSub if you want events to be propagated to all connected nodes.

Updates

  • 2018-02-27 Update to Elixir 1.6; changed references to Registry being an upcoming feature of Elixir 1.4 to it being built-in to Elixir; altered the summary to be clearer about Phoenix.PubSub events propagating across all connected nodes, while Registry being for local events.
We're passionate about understanding businesses, ideas and people. Let's Talk