Streaming data in Elixir

July 2, 2024

Introduction

In this article, we explore how to stream data between systems. This is a common requirement in many applications, especially when dealing with large datasets.

Ecto Streaming

Firstly, streaming data between databases is often required for operations such as data migrations, system setups, backups, and ETL workflows. In our recent project, we encountered a scenario where we needed to copy data from one PostgreSQL database to another. We found that streaming data using Ecto.Repo.stream/2 and the COPY command was the most efficient and effective approach.

There is a function in Ecto called Ecto.Repo.stream/2that allows you to stream data from the database. This function generates a lazy enumerable that yields all entries from the database matching the specified query within a transaction.

Consider an example from the documentation:

# Fetch all post titles
query = from p in Post,
     select: p.title
stream = MyRepo.stream(query)
MyRepo.transaction(fn ->
  Enum.to_list(stream)
end)

In this example, we fetch all post titles from the Post table using Ecto.Repo.stream/2. The stream variable represents a lazy enumerable that yields all post titles, and we wrap the stream in a transaction to execute the query. This is the conventional way of using Ecto.Repo.stream/2. On deeper inspection, it becomes evident that Ecto.Repo.stream/2 is essentially a wrapper around Ecto.Adapters.SQL.stream/3.

The documentation notes:

If the adapter supports a collectable stream, the stream may also be used as the collectable in Enum.into/3.

This feature allows us to insert the given enumerable (source) into the collectable (sink) data structure.

The most effective way to copy data between databases is to use the COPY command, which is a PostgreSQL-specific feature that allows for efficient bulk data loading. The COPY command reads data from a file or a program's standard input and writes it to the database. However, the COPY command requires the file to be accessible by the PostgreSQL server, which may not always be feasible. In such cases, we can use the STDIN and STDOUT options to stream data between databases.

Let's see how we use this feature to copy data from a CSV file to a PostgreSQL database.

defmodule CopyFromCSV do
  def from_file(file) do
    users_stream = File.stream!(file, [:trim_bom, encoding: :utf8])

    MyRepo.transaction(
      fn ->
        sink = Ecto.Adapters.SQL.stream(MyRepo, "COPY users FROM STDIN", [])
        Enum.into(users_stream, sink)
      end,
      timeout: :infinity
    )
  end

  def fake(count \\ 1_000_000) do
    users_stream =
      Stream.map(1..count, fn id ->
        birthday = Date.add(~D[1950-01-01], :rand.uniform(365 * 50))
        Enum.join([id, Date.to_iso8601(birthday)], "\t") <> "\n"
      end)

    MyRepo.transaction(
      fn ->
        sink = Ecto.Adapters.SQL.stream(MyRepo, "COPY users FROM STDIN", [])
        Enum.into(users_stream, sink)
      end,
      timeout: :infinity
    )
  end
end

The use of STDIN in the COPY command is crucial as it transforms the sink into a collectable data structure, allowing efficient insertion of CSV data using Enum.into/2. In the CopyFromCSV.from_file/1 function, data is streamed from a CSV file accessible to Elixir but not to the PostgreSQL server. Furthermore, the CopyFromCSV.fake/1 function enables the generation of any data matching the schema of the users table.

We just demonstrated how to copy data from an Elixir enumerable to the database using the COPY command. Let's now explore how to copy data between two databases.

defmodule Copy do
  def run do
    SourceRepo.transaction(
      fn ->
        source =
          SourceRepo
          |> Ecto.Adapters.SQL.stream("COPY users TO STDOUT", [])
          |> Stream.flat_map(fn %Postgrex.Result{rows: rows} -> rows end)

        SinkRepo.transaction(
          fn ->
            sink = Ecto.Adapters.SQL.stream(SinkRepo, "COPY users FROM STDIN", [])
            Enum.into(source, sink)
          end,
          timeout: :infinity
        )
      end,
      timeout: :infinity
    )
  end
end

In this example, we stream data from the source database using the COPY users TO STDOUT command and then insert it into the sink database using the COPY users FROM STDIN command. At this example, the struct of the source variable is implemented the Enumerable protocol, and the sink's one is implemented the Collectable protocol. This allows us to use Enum.into/2 to insert the data from the source database into the sink database.

Another approach: using offset(offset)-based streaming

There is a disadvantage to the above approach is the need to hold a transaction open for an extended period especially in the serverless environment where the transaction timeout is limited. To address this issue, we can use an offset(offset)-based stream:

defmodule ResourceStreaming do
  @chunk_size 100

  def to_stream(opts \\ []) do
    opts = Keyword.validate!(opts, chunk_size: @chunk_size)

    chunk_size = Keyword.fetch!(opts, :chunk_size)

    import Ecto.Query

    Stream.resource(
      fn -> 0 end,
      fn offset ->
        User
        |> order_by(asc: :id)
        |> limit(^chunk_size)
        |> offset(^offset)
        |> SourceRepo.all()
        |> case do
          [] ->
            {:halt, :ok}

          users ->
            {users, offset + length(users)}
        end
      end,
      fn _ -> :ok end
    )
  end

  def to_remote_stream(opts \\ []) do
    opts = Keyword.validate!(opts, chunk_size: @chunk_size)

    chunk_size = Keyword.fetch!(opts, :chunk_size)

    Stream.resource(
      fn -> nil end,
      fn next ->
        resp = Req.get!(next || "http://example.com/posts?limit=#{chunk_size}")
        next = resp.headers["x-next"]
        data = resp.body

        if length(data) == 0 do
          {:halt, :ok}
        else
          {data, next}
        end
      end,
      fn _ -> :ok end
    )
  end
end

The function ResourceStreaming.to_stream/1 fetch data from the User table in the source database chunk by chunk and returns a stream of users. The ResourceStreaming.to_remote_stream/1 function fetches data from a remote source in chunks and returns a stream of data. This approach is particularly useful when the data source is not a database but an external API or service.

By using this pattern, we can change any list-based data into a stream-based data source, and then we can utilize the Stream functions to process the data in a more efficient manner.

Collectable protocol

In the previous examples, we used the Enumerable protocol to take values out of a collection. However, when we want to insert values into a collection, we need to use the Collectable protocol. In the opposite direction, the Collectable protocol is used to insert values into a collection. The Enum.into/2 function is used to insert values into a collection that implements the Collectable protocol.

Based on the Collectable feature, we can write data to a file or a database, even an external API, by using the Collectable protocol.

defmodule CollectableResource do
  defstruct [:remote]

  defimpl Collectable do
    def into(resource) do
      collector_fun = fn
        resource, {:cont, elem} ->
          IO.puts("#{inspect(elem)} is posted to the remote(#{resource.remote})")
          resource

        resource, :done ->
          resource

        _resource, :halt ->
          :ok
      end

      initial_acc = resource

      {initial_acc, collector_fun}
    end
  end
end

Final thoughts

Elixir and Ecto provide a powerful set of tools for handling stream data efficiently. By using Enumerable and Collectable protocols and Stream functions, we can stream data between databases, files, and external APIs elegantly and effectively.

There is a livebook that demonstrates useing the Ecto.Repo.stream/2 and the COPY command to copy data between databases. You can run the livebook by clicking the link below.

Run in Livebook