Handling messages in batch using GenServer

December 14, 2024

Scenario

Recently, I’ve been working on a project, and one of the improvements is to take snapshots of resources. These snapshots are used to accelerate the state restoration process, but they can be lost, as we can reconstruct them from scratch by applying our logs.

Since there are many resources to snapshot concurrently, we don’t want to hit the database too often. Instead, we want to batch the snapshot messages, either when the batch size is reached or when a timeout occurs. This approach can also be applied to other scenarios, such as sending emails or other tasks that can be batched.

Handling Messages in Batch or When Timeout Is Reached

We will utilities the GenServer timeout option to implement this feature. According to the documentation, the timeout option is used to specify the timeout in milliseconds. When the specified number of milliseconds have elapsed with no message arriving, the handle_info/2 callback will be invoked with :timeout as the first argument. If any message arrives before the timeout, the timeout is cleared and the message is handled as usual.

Here is the implementation:

defmodule Snapshoting do
  use GenServer

  @default_timeout :timer.seconds(5)
  @default_batch_size 100

  def start_link(init_arg) do
    GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def take(key, snapshot) do
    GenServer.cast(__MODULE__, {:take, key, snapshot})
  end

  @impl GenServer
  def init(_init_arg) do
    {:ok, new_state()}
  end

  @impl GenServer
  def handle_cast({:take, key, snapshot}, state)
      when not is_map_key(state.snapshots, key) and
             map_size(state.snapshots) + 1 === @default_batch_size do
    state = stash_snapshot(state, key, snapshot)
    handle_in_batch(state.snapshots)

    {:noreply, new_state()}
  end

  def handle_cast({:take, key, snapshot}, state) do
    state = stash_snapshot(state, key, snapshot)
    {timeout, state} = timeout(state)

    {:noreply, state, timeout}
  end

  @impl GenServer
  def handle_info(:timeout, state) do
    handle_in_batch(state.snapshots)

    {:noreply, new_state()}
  end

  defp handle_in_batch(snapshots) do
    require Logger
    Logger.debug("Taking #{map_size(snapshots)} snapshots")

    # take snapshots
  end

  defp new_state, do: %{snapshots: %{}, start_mono_time: nil}

  defp stash_snapshot(state, key, snapshot) do
    put_in(state, [:snapshots, key], snapshot)
  end

  defp timeout(%{start_mono_time: nil} = state) do
    {@default_timeout, %{state | start_mono_time: now()}}
  end

  defp timeout(%{start_mono_time: start_time} = state) do
    {max(@default_timeout - (now() - start_time), 0), state}
  end

  defp now, do: System.monotonic_time(:millisecond)
end

The core of the implementation is the handle_cast/2 callback. When a :take snapshot message arrives, we check if the number of stashed snapshots is about to reach the batch size. If so, we handle the snapshots in batch immediately; otherwise, we stash the snapshot and set the timeout. When the timeout is reached, the snapshots are handled in batch through the handle_info/2 callback.`

@impl GenServer
def handle_cast({:take, key, snapshot}, state)
    when not is_map_key(state.snapshots, key) and
            map_size(state.snapshots) + 1 === @default_batch_size do
  state = stash_snapshot(state, key, snapshot)
  handle_in_batch(state.snapshots)

  {:noreply, new_state()}
end

def handle_cast({:take, key, snapshot}, state) do
  state = stash_snapshot(state, key, snapshot)
  {timeout, state} = timeout(state)

  {:noreply, state, timeout}
end

@impl GenServer
def handle_info(:timeout, state) do
  handle_in_batch(state.snapshots)

  {:noreply, new_state()}
end

Conclusion

In this article, we’ve used Elixir’s built-in GenServer to implement the message batching feature elegantly, without introducing any new dependencies. This highlights the power of Elixir, where we can leverage its built-in features to solve complex problems with simple and elegant solutions.

There are other solutions for handling messages in batches, such as gen_batch_server, which is a library that provides a GenServer behavior to handle messages in batches.