Introduction
In this article, we explore how to stream data between systems. This is a common requirement in many applications, especially when dealing with large datasets.
Ecto Streaming
Firstly, streaming data between databases is often required for operations such as data migrations, system setups, backups, and ETL workflows. In our recent project, we encountered a scenario where we needed to copy data from one PostgreSQL database to another. We found that streaming data using Ecto.Repo.stream/2
and the COPY
command was the most efficient and effective approach.
There is a function in Ecto called Ecto.Repo.stream/2
that allows you to stream data from the database. This function generates a lazy enumerable that yields all entries from the database matching the specified query within a transaction.
Consider an example from the documentation:
# Fetch all post titles
query = from p in Post,
select: p.title
stream = MyRepo.stream(query)
MyRepo.transaction(fn ->
Enum.to_list(stream)
end)
In this example, we fetch all post titles from the Post
table using Ecto.Repo.stream/2
. The stream
variable represents a lazy enumerable that yields all post titles, and we wrap the stream in a transaction to execute the query. This is the conventional way of using Ecto.Repo.stream/2
. On deeper inspection, it becomes evident that Ecto.Repo.stream/2
is essentially a wrapper around Ecto.Adapters.SQL.stream/3
.
The documentation notes:
If the adapter supports a collectable stream, the stream may also be used as the collectable in
Enum.into/3
.
This feature allows us to insert the given enumerable
(source) into the collectable
(sink) data structure.
The most effective way to copy data between databases is to use the COPY
command, which is a PostgreSQL-specific feature that allows for efficient bulk data loading. The COPY
command reads data from a file or a program's standard input and writes it to the database. However, the COPY
command requires the file to be accessible by the PostgreSQL server, which may not always be feasible. In such cases, we can use the STDIN
and STDOUT
options to stream data between databases.
Let's see how we use this feature to copy data from a CSV file to a PostgreSQL database.
defmodule CopyFromCSV do
def from_file(file) do
users_stream = File.stream!(file, [:trim_bom, encoding: :utf8])
MyRepo.transaction(
fn ->
sink = Ecto.Adapters.SQL.stream(MyRepo, "COPY users FROM STDIN", [])
Enum.into(users_stream, sink)
end,
timeout: :infinity
)
end
def fake(count \\ 1_000_000) do
users_stream =
Stream.map(1..count, fn id ->
birthday = Date.add(~D[1950-01-01], :rand.uniform(365 * 50))
Enum.join([id, Date.to_iso8601(birthday)], "\t") <> "\n"
end)
MyRepo.transaction(
fn ->
sink = Ecto.Adapters.SQL.stream(MyRepo, "COPY users FROM STDIN", [])
Enum.into(users_stream, sink)
end,
timeout: :infinity
)
end
end
The use of STDIN
in the COPY
command is crucial as it transforms the sink
into a collectable
data structure, allowing efficient insertion of CSV data using Enum.into/2
. In the CopyFromCSV.from_file/1
function, data is streamed from a CSV file accessible to Elixir but not to the PostgreSQL server. Furthermore, the CopyFromCSV.fake/1
function enables the generation of any data matching the schema of the users
table.
We just demonstrated how to copy data from an Elixir enumerable
to the database using the COPY
command. Let's now explore how to copy data between two databases.
defmodule Copy do
def run do
SourceRepo.transaction(
fn ->
source =
SourceRepo
|> Ecto.Adapters.SQL.stream("COPY users TO STDOUT", [])
|> Stream.flat_map(fn %Postgrex.Result{rows: rows} -> rows end)
SinkRepo.transaction(
fn ->
sink = Ecto.Adapters.SQL.stream(SinkRepo, "COPY users FROM STDIN", [])
Enum.into(source, sink)
end,
timeout: :infinity
)
end,
timeout: :infinity
)
end
end
In this example, we stream data from the source database using the COPY users TO STDOUT
command and then insert it into the sink database using the COPY users FROM STDIN
command. At this example, the struct of the source
variable is implemented the Enumerable
protocol, and the sink
's one is implemented the Collectable
protocol. This allows us to use Enum.into/2
to insert the data from the source database into the sink database.
Another approach: using offset(offset)-based streaming
There is a disadvantage to the above approach is the need to hold a transaction open for an extended period especially in the serverless environment where the transaction timeout is limited. To address this issue, we can use an offset(offset)-based stream:
defmodule ResourceStreaming do
@chunk_size 100
def to_stream(opts \\ []) do
opts = Keyword.validate!(opts, chunk_size: @chunk_size)
chunk_size = Keyword.fetch!(opts, :chunk_size)
import Ecto.Query
Stream.resource(
fn -> 0 end,
fn offset ->
User
|> order_by(asc: :id)
|> limit(^chunk_size)
|> offset(^offset)
|> SourceRepo.all()
|> case do
[] ->
{:halt, :ok}
users ->
{users, offset + length(users)}
end
end,
fn _ -> :ok end
)
end
def to_remote_stream(opts \\ []) do
opts = Keyword.validate!(opts, chunk_size: @chunk_size)
chunk_size = Keyword.fetch!(opts, :chunk_size)
Stream.resource(
fn -> nil end,
fn next ->
resp = Req.get!(next || "http://example.com/posts?limit=#{chunk_size}")
next = resp.headers["x-next"]
data = resp.body
if length(data) == 0 do
{:halt, :ok}
else
{data, next}
end
end,
fn _ -> :ok end
)
end
end
The function ResourceStreaming.to_stream/1
fetch data from the User
table in the source database chunk by chunk and returns a stream of users. The ResourceStreaming.to_remote_stream/1
function fetches data from a remote source in chunks and returns a stream of data. This approach is particularly useful when the data source is not a database but an external API or service.
By using this pattern, we can change any list-based data into a stream-based data source, and then we can utilize the Stream
functions to process the data in a more efficient manner.
Collectable
protocol
In the previous examples, we used the Enumerable
protocol to take values out of a collection. However, when we want to insert values into a collection, we need to use the Collectable
protocol. In the opposite direction, the Collectable
protocol is used to insert values into a collection. The Enum.into/2
function is used to insert values into a collection that implements the Collectable
protocol.
Based on the Collectable
feature, we can write data to a file or a database, even an external API, by using the Collectable
protocol.
defmodule CollectableResource do
defstruct [:remote]
defimpl Collectable do
def into(resource) do
collector_fun = fn
resource, {:cont, elem} ->
IO.puts("#{inspect(elem)} is posted to the remote(#{resource.remote})")
resource
resource, :done ->
resource
_resource, :halt ->
:ok
end
initial_acc = resource
{initial_acc, collector_fun}
end
end
end
Final thoughts
Elixir and Ecto provide a powerful set of tools for handling stream data efficiently. By using Enumerable
and Collectable
protocols and Stream
functions, we can stream data between databases, files, and external APIs elegantly and effectively.
There is a livebook that demonstrates useing the Ecto.Repo.stream/2
and the COPY
command to copy data between databases. You can run the livebook by clicking the link below.