As Staff Engineer on Lob’s team, I was recently tasked with standing up an endpoint for customers that supplied them with all the data we had stored for a particular zip code. We use pretty heavily to store our data so this boiled down to querying ES by zip code and writing the data to Amazon S3. But both services have subtle limitations on how you read/write them: ES caps your result set to 10,000 records, and Amazon S3 insists that you write to your buckets in chunks of at least 4 Megabytes. This turned out to be a perfect use case for Elixir streams! Address Verification Elasticsearch Elixir Streams Before we dive into the nitty gritty of Elasticsearch and AWS S3, let’s first do a brief refresher on Elixir Streams. What is a Stream? . This is typically accomplished by maintaining a little bit of state that describes where you currently are in your enumerable and a method of computing the next item. A trivial example would be a Stream of natural numbers: A Stream is a composable enumerable that is computed lazily Stream.iterate(0, fn x -> x + 1 end) The gory details of how this is implemented under the hood is the topic for another day. You rarely need to get that low level — the Stream library has a wealth of functions similar to that you can use as building blocks to construct your own Streams. And several Elixir library functions natively return streams: will turn a file (or other source) into a Stream allowing you to process very large files without having to read them into memory first in their entirety. The lazy aspect of Streams means that the minimum amount of the Stream is computed that’s needed to generate your answer. Enum IO.stream Imagine we want to find the 100th natural number that is palindromic and also divisible by 3: 1 |> Stream.iterate(fn n -> n + 1 end) |> Stream.filter(fn n -> s = Integer.to_string(n); s == String.reverse(s) end) |> Stream.filter(fn n -> rem(n, 3) == 0 end) |> Enum.at(99) What I find elegant (and efficient) about this solution, is that no number beyond the answer, 20202, will be computed. Neat! If you’d like to dive in deeper, there’s a great Getting Started section on on the Elixir language website. The is also chock full of instructive examples. Enumerables and Streams Elixir API documentation Elasticsearch Now onto . The first thing to know is that After scrutinizing the documentation, it became clear that the original method that popped up on my Stack Overflow ( ) had been deprecated and the new approved method was to use . The process is: Elasticsearch ES caps the size of the result set at 10k items. Scroll API PointInTime IDs PIT IDs essentially give you a snapshot of your index at one point in time so that you get a consistent view of your search results across multiple queries. acquire a PIT for your index perform your queries using the PIT (without the index — it’s now implicit in the PID) delete the PIT That last step is crucial — PITs are extremely resource hungry so it is vital that you delete them as soon as you’re done. I borked our staging ES in my early experiments with PITs because I wasn’t deleting them and the expiration parameter seemed to have no effect. The other thing we’d like to do is abstract away the underlying PIT and paging mechanism for the end user so we provide an interface that just takes an ES query and generates a Stream of hits. Let’s start by creating and deleting the PID. is just a matter of requesting a PID associated with the index that we’re interested in running queries against. Creating the PID @spec create(ES.index_t()) :: {:ok, pit_t()} | {:error, ES.response_t()} def create(index) do :post |> HTTP.request(url(index), HTTP.no_payload(), HTTP.json_encoding()) |> HTTP.on_200(& &1["id"]) end is a simple HTTP delete except that the PIDs are huge so you need to supply them in the body and not as URL params. Some HTTP libraries won’t let you do this. The HTTP spec is a little muddy on the matter and last I checked on Stack Overflow there was a healthy debate on the subject. This is why we use HTTPoison — it allows payloads on DELETE requests. Deleting a PID @spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response} def delete(pit) do url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options()) with {:ok, payload} <- Poison.encode(%{"id" => pit}), %HTTPoison.Response{status_code: 200} <- HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do :ok else error -> {:error, error} end end Our basic query is enhanced with three extra parameters: Now that the PIDs are sorted out, our next order of business is figuring out how to leverage them in our queries. size of 10,000 (the maximum allowed by elastic search) pit, a hash that contains %{id: <id>, keep_alive: “1m”} sort, a hash that contains %{_shard_doc: “desc”} (For sort, you need to provide something and is baked into every Elastic search index so that’s nice and portable.) _shard_doc @spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t() def initial(pit, query) do %{ query: query, pit: %{id: pit, keep_alive: PIT.expiration()}, size: ES.max_results(), sort: %{"_shard_doc" => "desc"} } end The trick here is that we feed into the next query the value of our sort field from the last hit of the previous result, like so: With our basic query down, we can focus on how to generate the sequence of queries that will extract all the items that satisfy our search criteria. @spec update(ES.query_t(), ES.response_t()) :: ES.query_t() def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do Map.put(query, :search_after, List.last(hits)["sort"]) end Armed with these two query-generating functions, we can code up that will pull all the results out of our desired index: Stream.iterator @spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any} def streamer(initial) do {:ok, initial |> search() |> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end) } rescue error -> {:error, error} end We need a high-level function that takes the results from this function and produces a Stream of hits. But there’s a wrinkle — we have to delete the PIT when we are done with the Stream. But how will we know? The solution is to pass in a consumer that consumes the Stream and then we delete the PIT afterward, like so: @spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()} def stream_many(query, index, consumer, count) do case PIT.create(index) do {:ok, pit_id} -> try do case pit_id |> Query.initial(query) |> streamer() do {:ok, stream} -> stream |> Stream.flat_map(& &1.body["hits"]["hits"]) |> Stream.take(count) |> then(consumer) error -> error end after PIT.delete(pit_id) end error -> error end rescue error -> {:error, error} end But if our query returns less than 10k results, we don’t need the whole PID/sort/search_after machinery — one query will do the trick. @spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()} def stream_one(query, index, consumer) do query |> search(index) |> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end) end Now we need a top-level function that query the size of the result and chooses between or depending on the value, like so: stream_one stream_many @spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()} def stream(query, index, consumer) do case count(query, index) do {:ok, count} -> if count > ES.max_results() do stream_many(query, index, consumer, count) else stream_one(query, index, consumer) end error -> error end end AWS S3 The library we use to access is It handles all the low-level details for us but it does have one requirement: . Amazon S3 ex_aws_s3. the input data stream must be chunks of at least 4 Mbytes To accomplish this, we use the Stream function . This takes four inputs: chunk_while the enumerable to be chunked the initial value of the accumulator the step function a finalizer function Our first decision is, what should the accumulator look like that gets passed forward by the step function? It obviously should contain the list of the items in the current chunk. But more subtly, it should also contain the size of the current chunk so we don’t waste resources recomputing it each time. So that gives us a two-element tuple containing a list and an integer. Next, we turn our attention to the function. It should check if the current size is greater than or equal to the desired chunk size. If it is, we should take the list of items from the accumulator and convert them into a chunk using ; if not, we should add it to the current chunk (and update the size) using step convert add_chunk. What should do? Just push the item onto the front of a list and increase the value of by the current chunk’s size. add_chunk size The behaviour of depends on whether we care about the order of the items in the chunk being preserved in the output because the items in the list will be in reverse order so need to be reversed. But if we don’t care, we can skip that transformation. Putting this all together gives us: convert @spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t() def chunker( chunks, chunk_size, ordered \\ true, sizer \\ &String.length/1, joiner \\ &Enum.join/1 ) do zero = {0, []} convert = if ordered do fn chunks -> chunks |> Enum.reverse() |> then(joiner) end else joiner end final = fn {_, chunks} -> {:cont, convert.(chunks), zero} end add_chunk = fn {size, chunks}, chunk -> {size + sizer.(chunk), [chunk | chunks]} end step = fn chunk, accum = {size, chunks} -> if size >= chunk_size do {:cont, convert.(chunks), add_chunk.(zero, chunk)} else {:cont, add_chunk.(accum, chunk)} end end Stream.chunk_while(chunks, zero, step, final) end We can now combine all the components we have written to produce a simple controller action that requests an output stream of hits from our Elasticsearch section into 4M chunks that will satisfy the requirement of the ExAWS.S3 module: defmodule Controller do @four_megabytes 4*1024*1024 def find_all(params) do query(params) |> ElasticSearch.stream(params.index, &consumer(params, &1)) end def query(params) do %{term: %{params.query_term => params.query_value}} end def consumer(params, stream) do stream |> Stream.map(&Poison.encode!(&1["_source"])) |> chunker(@four_megabytes, false) |> S3.upload(params.bucket, params.filename) |> ExAws.request() end end And that wraps up our exploration of Elixir streams, Elasticsearch, and AWS S3. I hope you enjoyed it! By Lob Staff Engineer, Guy Argo.