Say we have a search feature and an index_article/1 function that knows what to do with the given article – can fetch all related data, concatenate it into a tsvector field and then save in the search cache table. With it we can index any article – so far so good.
Now what if we want to rebuild the whole search index? Looping over each entry in the database and indexing it with the existing function would work, but will require loading everything in RAM which will definitely break in the long run.
Usually we would create some kind of Task that loops as long as there are unprocessed entries in the Database, takes a number of those entries, processes each of them adjusts offset etc.
Well, Elixir can do better, using Repo.stream/2 we can create a stream that we can use with the Stream module, this way we can greatly simplify the above logic.
Here is the whole function that reindexes all articles of the given type:
@spec do_index_all(String.t, module()) :: {:ok, :ok} | {:error, any()} defp do_index_all(type, queryable) do Repo.transaction(fn() -> Repo.delete_all( from(s in "search_cache", where: s.searchable_type == ^type) ) (from q in queryable) |> Repo.stream(max_rows: 500) |> Stream.map(&index_article/1) |> Stream.run() end, [timeout: 360_000, pool_timeout: 180_000]) end
This part is the most interesting:
(from q in queryable) |> Repo.stream(max_rows: 500) |> Stream.map(&index_article/1) |> Stream.run()
Only the last line actually does something, the rest describes what will be done when the stream is actually “triggered”. We can rewrite it like this
stream = (from q in queryable) |> Repo.stream(max_rows: 500) |> Stream.map(&index_article/1) # nothing happened yet, but we have the stream variable # that describes the whole process Stream.run(stream) # this triggers the actual DB requests
Despite using Elixir for some time now, the fact that we can do things in such a simple and concise way still makes me happy 🙂