Scenario
Recently, I’ve been working on a project, and one of the improvements is to take snapshots of resources. These snapshots are used to accelerate the state restoration process, but they can be lost, as we can reconstruct them from scratch by applying our logs.
Since there are many resources to snapshot concurrently, we don’t want to hit the database too often. Instead, we want to batch the snapshot messages, either when the batch size is reached or when a timeout occurs. This approach can also be applied to other scenarios, such as sending emails or other tasks that can be batched.
Handling Messages in Batch or When Timeout Is Reached
We will utilities the GenServer
timeout option to implement this feature.
According to the documentation,
the timeout
option is used to specify the timeout in milliseconds. When
the specified number of milliseconds have elapsed with no message arriving,
the handle_info/2
callback will be invoked with :timeout
as the first argument.
If any message arrives before the timeout, the timeout is cleared and
the message is handled as usual.
Here is the implementation:
defmodule Snapshoting do
use GenServer
@default_timeout :timer.seconds(5)
@default_batch_size 100
def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def take(key, snapshot) do
GenServer.cast(__MODULE__, {:take, key, snapshot})
end
@impl GenServer
def init(_init_arg) do
{:ok, new_state()}
end
@impl GenServer
def handle_cast({:take, key, snapshot}, state)
when not is_map_key(state.snapshots, key) and
map_size(state.snapshots) + 1 === @default_batch_size do
state = stash_snapshot(state, key, snapshot)
handle_in_batch(state.snapshots)
{:noreply, new_state()}
end
def handle_cast({:take, key, snapshot}, state) do
state = stash_snapshot(state, key, snapshot)
{timeout, state} = timeout(state)
{:noreply, state, timeout}
end
@impl GenServer
def handle_info(:timeout, state) do
handle_in_batch(state.snapshots)
{:noreply, new_state()}
end
defp handle_in_batch(snapshots) do
require Logger
Logger.debug("Taking #{map_size(snapshots)} snapshots")
# take snapshots
end
defp new_state, do: %{snapshots: %{}, start_mono_time: nil}
defp stash_snapshot(state, key, snapshot) do
put_in(state, [:snapshots, key], snapshot)
end
defp timeout(%{start_mono_time: nil} = state) do
{@default_timeout, %{state | start_mono_time: now()}}
end
defp timeout(%{start_mono_time: start_time} = state) do
{max(@default_timeout - (now() - start_time), 0), state}
end
defp now, do: System.monotonic_time(:millisecond)
end
The core of the implementation is the handle_cast/2
callback. When a
:take
snapshot message arrives, we check if the number of stashed
snapshots is about to reach the batch size. If so, we handle the snapshots
in batch immediately; otherwise, we stash the snapshot and set the timeout.
When the timeout is reached, the snapshots are handled in batch through the
handle_info/2
callback.`
@impl GenServer
def handle_cast({:take, key, snapshot}, state)
when not is_map_key(state.snapshots, key) and
map_size(state.snapshots) + 1 === @default_batch_size do
state = stash_snapshot(state, key, snapshot)
handle_in_batch(state.snapshots)
{:noreply, new_state()}
end
def handle_cast({:take, key, snapshot}, state) do
state = stash_snapshot(state, key, snapshot)
{timeout, state} = timeout(state)
{:noreply, state, timeout}
end
@impl GenServer
def handle_info(:timeout, state) do
handle_in_batch(state.snapshots)
{:noreply, new_state()}
end
Conclusion
In this article, we’ve used Elixir’s built-in GenServer to implement the message batching feature elegantly, without introducing any new dependencies. This highlights the power of Elixir, where we can leverage its built-in features to solve complex problems with simple and elegant solutions.
There are other solutions for handling messages in batches, such as gen_batch_server
,
which is a library that provides a GenServer behavior to handle messages in batches.