How to issue notifications using PubSub in Elixir Phoenix

As we continue to work more and more with Elixir and Phoenix, there is a growing number of features we have implemented in this framework to suit specific needs we have come upon.

Recently, we had a problem with flash notification messages in LiveView and decided to implement our own way of issuing notifications throughout an application using PubSub instead of live flash, not just to solve the bug but also to have greater flexibility and control over the notifications system.

Estimate reading 14 minutes

As we implemented this feature, we learned a good deal about Phoenix, LiveView and PubSub, and the purpose of this blog post is to record some of the things we learned, and to share the way we ultimately implemented the feature.

Set up

Our setup consisted of an alerts component with the following (simplified) html (we use slim):

.alerts phx-hook="Alerts"
  = if @info do
    = @info

  = if @error do
   = @error

with the following controller:

defmodule MyAppWeb.AlertsComponent do
  @moduledoc false

  use MyAppWeb, :live_component

  @impl true
  def update(assigns, socket) do
    {
      :ok,
      socket
      |> assign(assigns)
      |> assign_new(:info, fn -> nil end)
      |> assign_new(:error, fn -> nil end)
    }
  end

In our initial setup, whenever a notification was issued in the application, `put_flash` was used to issue an alert.

def handle_event("event_name", %{"object_name" => object_params}, socket) do
  case Object.update(params) do
    {:ok, object} ->
      {
        :noreply,
        socket
        |> put_flash("Object updated")
        |> assign(:object, object)
      }
  end
end

The problem we were having was that the flash message for the above event was displayed only once after the first update, but not upon later updates. We found that the problem could in part be solved by clearing the flash after a short delay using an internal message.

# Add a send function to the handle_event function above

def handle_event("event_name", %{"object_name" => object_params}, socket) do
  case Object.update(params) do
    {:ok, object} ->
      send(self(), :schedule_clear_flash)
      {
        :noreply,
        socket
        |> put_flash(:info, "Object updated")
        |> assign(:object, object)
      }
  end
end
 
# Add a handle_info function to receive the message and clear flash after a 5 second delay

def handle_info(:schedule_clear_flash, socket) do
  :timer.sleep(5000)
  {:noreply, clear_flash(socket)}
end

Now notification messages were displayed every time the object was updated, not just the first time.

However, we still had a bug. After the first few clicks with everything working correctly, notifications messages would start closing at different time intervals after their initial displays.

We found that this new bug could be solved by removing the JS Hook we had on our Alerts component, whose purpose was to let the user close the notification.

This was our hook:

export default {
  elements() {
    return this.el.querySelectorAll('.alert__item')
  },

  setCloseTimeout() {
    setInterval(() => {
      this.elements().forEach(element => element.classList.add('close'))
    }, 5000)
  },

  mounted() {
    this.setCloseTimeout()
  },

  updated() {
    this.setCloseTimeout()
  }
}

Removing the JS hook, everything worked great for this particular notification. Wonderful!

Now all we needed to do was to use internal messages to clear the flash, and remove the JS hook, right? Following this idea, we thought we would add a `handle_info` function to the alerts component directly, so when we need to display a notification using `put_flash` from anywhere in the app, we could send an internal message to the alerts component to clear the flash after a brief delay.

Not so fast… In this scenario, we need to communicate from anywhere in the application (namely from wherever we are issuing the notification message) with the alerts component. But this requires that we know exactly what pid the alerts components is running on when we send the message, and we don’t know this in advance.

It was at this point that we decided to use PubSub, since it enables us to establish communication between any two points in the application. It also enables us to drop the use of flash completely -- at which point, moreover, there are no problems with using our JS Hook.

Great!

Using PubSub

To implement a notifications system using PubSub:

  1. The Alerts component needs to subscribe to a PubSub topic.
  2. Issuing a notification will then consist in broadcasting a message to the PubSub topic.
  3. The Alerts component will be in charge of receiving the message and implementing a `handle_info` function with which it sets an assign with the content to be displayed in the alerts component.
  4. The Alerts component resets the assign to nil after a delay (the equivalent of clearing the flash).

A crucial thing to note about this setup is that the alerts element can no longer be a live component because the `handle_info` callback cannot be implemented from within a live component

1. Alerts Live View

The first thing we did was to transform our alerts component into a LiveView in its own right and added it to our app's root layout, so that it could be shown in any page, live and not, throughout our app. 

# root template

body
  = @inner_content
= live_render @conn, MyAppWeb.Layout.Alerts

The only problem with this is that, if the live layout template uses any assigns, we won't be able to add our LiveView to the root template unless we have the required assigns in it. To avert this issue, we made sure our alerts LiveView did not use the live layout, but had rather no layout at all. 

In our `app_web.ex` file, we defined the following helper function:

def live_view_without_layout do
  quote do
    use Phoenix.LiveView,
      layout: nil

    unquote(view_helpers())
  end
end

And then we defined our alerts LiveView in such a way that it does not use a layout at all:

# alerts.ex

use MyAppWeb, :live_view_without_layout
2. Subscribe to PubSub topic

In the alerts LiveView, we subscribed to a topic that we initially called "alerts".

# alerts.ex

def mount(_params, _session, socket) do
  PubSub.subscribe(MyApp.PubSub, "alerts")
  
  socket =
    socket
    |> assign(:info, nil)
    |> assign(:error, nil)
  {:ok, socket}
end
3. Broadcasting a message to send a notification

To issue a notification, the module that defines the function in charge of issuing the message needs to alias `Phoenix.PubSub` and then broadcast a notification message to the "alerts" topic at the right point.

alias Phoenix.PubSub

# more code

def handle_event("event_name", %{"object_name" => object_params}, socket) do
  case Object.update(params) do
    {:ok, object} ->
      PubSub.broadcast(MyApp.PubSub, "alerts", {:info, "Object saved successfully"}) 
 
      {:noreply, assign(socket, :object, object)}
  end
end
4. Update assigns upon receiving a message

The last part of the puzzle is that the alerts LiveView needs to set the right assigns with the message content that was broadcasted so that it is displayed.

# alerts.ex

@impl true
def handle_info({:info, message}, socket) do
  send(self(), :schedule_clear_message)

  {:noreply, assign(socket, info: message)}
end

@impl true
def handle_info({:error, message}, socket) do
  send(self(), :schedule_clear_message)

  {:noreply, assign(socket, error: message)}
end

@impl true
def handle_info({:schedule_clear_message, :info}, socket) do
:timer.sleep(5000)
{:noreply, assign(socket, :info, nil)}
end

@impl true
def handle_info({:schedule_clear_message, :error}, socket) do
:timer.sleep(5000)
{:noreply, assign(socket, :error, nil)}
end

Issue with redirect

At this point, the alerts LiveView was working great; when alerts needed to be displayed, a message would be broadcasted, and the alerts LiveView would display the message and then remove it after 5 seconds by resetting the `info` and `error` assigns.

There remained an issue with redirects, however. When the message was sent from a modal component that redirects after issuing the message, no notification would be displayed 

Example:

def handle_event("event_name", %{"object_name" => %{"object_id" => id}}, socket) do
  object = Object.get_object!(id)

  case Cycles.create_cycle_from(object) do
    {:ok, _cycle} ->
    PubSub.broadcast MyApp.PubSub, "alerts", {:info, "Cycle created successfully"}

      {
        :noreply,
        socket
        |> push_redirect(to: socket.assigns.return_to)
      }
  end
end

This code did not result on any alert being displayed, even though the message was correctly sent and received by the alerts LiveView (we double-checked using logging to debug). We also checked and confirmed that the alerts message was correctly updated, but still, nothing would be displayed.

We first thought the problem could be due to the redirect, but then realized that this hypothesis could not be right since a redirect does not cause the page to be reloaded. Also, we could still see that our DOM node with the alert remained after the redirect, and the socket for the alerts LiveView was correctly updated and remained live after redirect.

Strangely, we found by trial and error that if we added some sleep time after the message was broadcast and before the redirect, the message would get correctly displayed.

# modal_component

def handle_event("event_name", %{"object_name" => %{"object_id" => id}}, socket) do
  object = Object.get_object!(id)

  case Cycles.create_cycle_from(object) do
    {:ok, _cycle} ->
    PubSub.broadcast MyApp.PubSub, "alerts", {:info, "Cycle created successfully"}
      :timer.sleep(10)
      {
        :noreply,
        socket
        |> push_redirect(to: socket.assigns.return_to)
      }
  end
end

We do not know why this is so, but at least at this point we had a working notifications system using PubSub. Interestingly, this system works well whether or not we keep the old JS Hook (which did not work with flash messaging), but we didn't need it anymore in any case, so we removed it.

Adding a unique PubSub topic

Though things worked well at this point, we realized there would be an important problem if we left things unchanged; namely, the alerts would be shown for all users, even when fired by the action of a single user.

To make sure this does not happen, we must ensure that the PubSub topic to which the alert message is broadcasted is unique for each user. But how should we go about creating a topic that would be unique for each user? Our first thought was to build the topic name from the user id.

We thus defined the following LiveView helper function to create a unique topic name based on the user id:

@doc """
Returns a unique string per user to be used as the topic for alerts messages
"""
def alerts_topic_id(%MyApp.Accounts.User{} = user), do: "alerts-#{user.id}"

@doc """
Send a notification message to the unique pubsub topic for the current user
"""
def send_notification(socket, type, message) do
topic =
  socket.assigns.current_user
  |> alerts_topic_id()

PubSub.broadcast(MyApp.PubSub, topic, {type, message})
end

And yet, it became quickly evident that this strategy would get us into a world of trouble with notifications when users are not logged in.

We thus decided to change the way we were coming up with a unique user id by using a unique session id rather than a user id. Taking inspiration from this Stack Overflow post, we added the following module to our app:

# myapp_web/plugs/session_id.ex

defmodule MyAppWeb.Plugs.SessionId do
  @moduledoc false

  @behaviour Plug

  import Plug.Conn

  @impl true
  def init(default), do: default

  @impl true
  def call(conn, _config) do
    case get_session(conn, :session_id) do
      nil ->
        session_id = unique_session_id()
        put_session(conn, :session_id, session_id)

      _ ->
        conn
    end
  end

  defp unique_session_id do
    :crypto.strong_rand_bytes(16) |> Base.encode16()
  end
end

And we then added the plug to our app's router:

# router.ex

pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug MyAppWeb.Plugs.SessionId
plug :fetch_live_flash
plug :put_root_layout, {MyAppWeb.LayoutView, :root}
plug :protect_from_forgery
plug :put_secure_browser_headers
plug :fetch_current_user
end

At this point, we were ready to craft some handy helper functions:

# live/live_helpers.ex

@doc """
Assign unique session_id to socket
"""
def assign_session_id(socket, %{"session_id" => session_id}) do
assign_new(socket, :session_id, fn -> session_id end)
end

@doc """
Return a unique string per session to be used as the topic name for alerts messages
"""
def alerts_topic_id(%{"session_id" => session_id}), do: "alerts-#{session_id}"
def alerts_topic_id(socket), do: "alerts-#{socket.assigns.session_id}"

@doc """
Send a notification message to the unique pubsub topic for the current session
"""
def send_notification(socket, type, message) do
topic = alerts_topic_id(socket)
PubSub.broadcast(MyApp.PubSub, topic, {type, message})
end

We need to use the first version of `alerts_topic_id/1` above (taking the `session_id` directly from the session) in the mount function of our alerts LiveView, since the alerts LiveView subscribes to the PubSub topic right away, from the same function from which the assigns are initially set (from `mount`), so we do not yet have a `unique_session assign` in the socket. 

# alerts.ex 

@impl true
def mount(_params, session, socket) do
subscribe_to_alarms_topic(session)

socket =
  socket
  |> assign(:info, nil)
  |> assign(:error, nil)

{:ok, socket}
end

defp subscribe_to_alarms_topic(session) do
topic = alerts_topic_id(session)
PubSub.subscribe(MyApp.PubSub, topic)
end

In all other LiveViews from which a notification message is issued, we use the `assign_session_id` helper function defined above in the LiveHelpers module in order to set an assign with the unique session id from which the PubSub topic can be constructed.

Voilà!

We now have our final configuration.

Our alerts LiveView controller looks like this:

# alerts.ex

defmodule MyAppWeb.Alerts do
  @moduledoc false

  use MyAppWeb, :live_view_without_layout
  alias Phoenix.PubSub

  @impl true
  def mount(_params, session, socket) do
    subscribe_to_alarms_topic(session)

    socket =
      socket
      |> assign(:info, nil)
      |> assign(:error, nil)

    {:ok, socket}
  end

   @impl true
  def handle_info({:info, message}, socket) do
    send(self(), :schedule_clear_message)

    {:noreply, assign(socket, info: message)}
  end

  @impl true
  def handle_info({:error, message}, socket) do
    send(self(), :schedule_clear_message)

    {:noreply, assign(socket, error: message)}
  end

 @impl true
  def handle_info({:schedule_clear_message, :info}, socket) do
    :timer.sleep(5000)

    {:noreply, assign(socket, :info, nil)}
  end

  @impl true
  def handle_info({:schedule_clear_message, :error}, socket) do
    :timer.sleep(5000)

    {:noreply, assign(socket, :error, nil)}
  end

 defp subscribe_to_alarms_topic(session) do
    topic = alerts_topic_id(session)
    PubSub.subscribe(MyApp.PubSub, topic)
end
end

Our alerts template is unchanged (except that we removed the js hook):

# alerts.html.slive

.alerts
  = if @info do
    = @info

  = if @error do
     = @error

And this is what a controller of a view from which a notification is sent looks like:

@impl true
def mount(_params, session, socket) do
  {:ok, assign_session_id(socket, session)}
end

@impl true
def handle_event("event_name", %{"object_name" => object_params}, socket) do
  case Object.update(params) do
    {:ok, object} ->
      send_notification(socket, :info, gettext("Object updated"))
      {
        :noreply,
        socket
        |> put_flash(:info, "Object updated")
        |> assign(:object, object)
      }
  end
end

Note that we don't need to alias `Phoenix.PubSub` in each module from which a notification is broadcasted because we do it directly in the LiveHelpers module where our `send_notification` helper function is defined. On the other hand, we need to make sure that a unique `session_id` assign is set in the mount function using the `assign_session_id(session)` helper function.

Conclusion

After these modifications, we had a notifications system working throughout our app using only PubSub, which not only solved our initial bug but also gave us greater flexibility and control than live flash, and we now plan to standardize on this solution in all of our projects.

One of the many reasons we love Elixir and Phoenix is because of PubSub, which offers some remarkable advantages, such as allowing us to publish to a topic rather than sending a message and needing to know how to reach the recipient in advance. Still, we only touched the surface of what can be done with PubSub in this feature, and we’re looking forward to exploring the possibilities it opens up in upcoming Elixir Phoenix projects.

Did you find this interesting?Know us better