Edit: I was by that elasticsearch provides a native solution to my problem called . This feature was introduced in Elasticsearch 5.0. If you have es < 5.0, you can still use the routing solution that I present here; and I hope that you find the other information also useful. told /u/warkolm sliced scroll Elasticsearch provides a native api to scan and over indexes. It means that you get a ‘cursor’ and you can scroll over it. You can use the helper method for an easier use of the scroll api: scroll scan The drawback with this action is that it limits you to one scroller. To get the next batch of documents, you’d need the next scroll_id, which you’ll get on the next scroll command. Scanning a large index (millions++ documents) can take awhile. How can you scroll over a large index with millions of documents (or more) In a reasonable time? The solution is to parallel the scan. Parallelism in Elasticsearch In Elasticsearch each index is split into smaller elements known as . These shards are distributed across multiple nodes. You can define the number of primary shards and number of replicas to ensure data integrity if the primary shard fails, and to increase performance — replica shards can handle search requests. shards uses shards for parallelism. To use Elasticsearch-hadoop you will need Spark or Hadoop cluster which can have a high overhead to use, especially if your task is very simple. We will try to implement a way to parallel the scan by ourselves. Elasticsearch-hadoop The default number of shards in elasticsearch index is 5; Which means that elasticsearch-hadoop can parallel the scan to up to 5 parallel tasks. the number of shards when you create the index to increase parallelism on large indexes. Determine The Shards Api How can we implement the parallel scan by ourselves? Instead of scanning the entire index, we would like to scroll over each shard independently. search shards api returns the shards that a search request would be executed against ( ) link To get the shards & nodes information use: $ curl -XGET http://elasticsearch:9200/index/_search_shards The parameter determines which shard the request would be executed against. routing When we query the search shards api with the routing parameter, the result will be the shard that the request will be executed against. For example — search requests to index: with routing will be executed against shard number 1: my-index hello $ curl http://elasticsearch:9200/my-index/_search_shards?routing=hello {"nodes": {...},"shards": [[{"state": "STARTED","primary": true,"node": "nu81I57KRCWw8zq27fKd1A","shard": 1,"index": "index",}]]} To find the complete mapping of routing to shard We can use the following script: the result will be something like this: {0: 491, 83: 493, 27: 465, 74: 403, 111: 508, 57: 404, 78: 495, 106: 463... } This means that if you use the routing the expected shard will be shard number zero. 491 Each shard holds about documents. We can verify this by sending a search request with routing 491. In my case I got ~ 3 million hits for shard zero with total of 356 million documents in the entire index which is ~120 shards (the actual shard count). total documents / number of shards Implementing the parallel scan We now have the shard to routing mapping. The next step is to scan and scroll each shard: We managed to split the scan/scroll per shard, but the above method is still synchronous; Each shard scan has to finish before another shard scan can start. Let’s use to run a worker (shard scan) for each CPU on our local machine: multiprocessing.Pool I used the pool to count the number of documents on each shard in a 5 shards index on a 4 CPUs machine (and added logs). The pool limits the number of workers to the number of CPUs — 4 parallel tasks. You can see that the last worker started (**) only after the first worker finished (*). [16:42:26.485186] Starting: ForkPoolWorker-1 Pid: 11447[16:42:26.485286] Starting: ForkPoolWorker-2 Pid: 11448[16:42:26.485437] Starting: ForkPoolWorker-3 Pid: 11449[16:42:26.485457] Starting: ForkPoolWorker-4 Pid: 11450 [16:42:28.235941] Name: ForkPoolWorker-4 Pid: 11450 Result: 3 152[16:42:28.581638] Name: ForkPoolWorker-1 Pid: 11447 Result: 0 133 149[16:42:28.264004] Name: ForkPoolWorker-3 Pid: 11449 Result: 2 170[16:42:29.845960] Name: ForkPoolWorker-2 Pid: 11448 Result: 1 * [16:42:33.240230] Exiting: ForkPoolWorker-4 Pid: 11450 [16:42:33.264458] Exiting: ForkPoolWorker-3 Pid: 11449[16:42:33.586739] Exiting: ForkPoolWorker-1 Pid: 11447[16:42:34.850162] Exiting: ForkPoolWorker-2 Pid: 11448 ** [16:42:33.240581] Starting: ForkPoolWorker-4 Pid: 11450 [16:42:34.851107] Name: ForkPoolWorker-4 Pid: 11450 Result: 5==148 [16:42:39.854437] Exiting: ForkPoolWorker-4 Pid: 11450 Docker worker The next step will be to distribute the workers over a group of machines. I’ll pack each worker as a docker image and run it in a cluster. fleet To run a single docker you can use: $ docker run -ti -e ES_HOST=*** -e ES_PORT=*** -e INDEX=index -e DOC_TYPE=doc_type -e SHARD=1 es-parallel Each worker will get a shard number, determine the relevant routing number, run the scan_shard command and quit. The worker code is similar to the previous sync and pool code, but it only runs one shard scan. You can see the final result of the worker : here Parallel on fleet cluster is an example unit file to start on a fleet cluster. Set the correct configuration and Let’s start the units with: es-parallel@.service $ fleetctl start es-parallel@{0..4} Where 4 is the number of shards (minus 1). Notice that the command uses the unit number as the shard number: ExecStart ExecStart=/bin/sh -c '\/usr/bin/docker run \--name %p-%i \-e ES_HOST="" \-e ES_PORT="" \-e INDEX="" \-e DOC_TYPE="" \-e ROUTING=" " \es-parallel ' %i And the result: $ fleetctl journal es-parallel@1 $ g-t-5 docker[43117]: Using default tag: latest$ g-t-5 docker[43117]: latest: Pulling from es-parallel$ g-t-5 systemd[1]: Started Es Parallel.$ g-t-5 sh[43160]: Namespace(doc_type='doc_type', es_auth=None, es_host='elasticsearch', es_port=9200, es_use_ssl=False, index='index', shard='1')$ g-t-5 sh[43160]: $ g-t-5 docker[43312]: es-parallel-1 Result: 148 Notes and Ideas: Currently — the results are printed to console. You can do whatever you want with them. All workers will start at the same time. Make sure that your elasticsearch cluster can handle it. If not — you can write a script that starts workers as needed. Sliced Scroll Elasticsearch 5.0 the feature — a native way to split the scroll to multiple slices: introduced sliced scroll GET /index/doc_type/_search?scroll=1m{"slice": {"id": 0,"max": 2},"query": { ... }} GET /index/doc_type/_search?scroll=1m{"slice": {"id": 1,"max": 2},"query": { ... }} is the number of slices and is the slice number. Max can be equal to the number of shards, lower or higher. Splitting is done first on the shards and then locally on each shard. This means that if each slice will be a scroll on a single shard. max id max == num_of_shards Be aware that when the number of slices is bigger than the number of shards a memory cost operations occurs. You can use the sliced scroll for , and . Read more . parallel reindex update by query delete by query here Summary es-parallel can be a nice replacement to the overhead of using Spark/Hadoop cluster on elasticsearch. It is great for mapping, filtering and selecting documents, But you (still) don’t have a way to reduce the results. Find the source code And the docker here here