Background I needed to extract a significant number of access logs from Elastic Cloud for analytical purposes over several months. During our research, I discovered that the number of logs generated per month was in the hundreds of millions, which far exceeded the limit of Kibana's built-in tools. To overcome this issue, we implemented a Python script that leverages the Elasticsearch API and the search_after method to iteratively retrieve logs. search_after Challenges High volume of logs: Querying logs for multiple months meant handling hundreds of millions of records.Kibana limitations: The default Kibana UI and API have limitations that prevent fetching such large datasets in a single query.Elasticsearch query limits: A single query can return a maximum of 10,000 records at a time.Performance considerations: Avoiding high load on the Elasticsearch cluster while ensuring smooth data retrieval. High volume of logs: Querying logs for multiple months meant handling hundreds of millions of records. Kibana limitations: The default Kibana UI and API have limitations that prevent fetching such large datasets in a single query. Elasticsearch query limits: A single query can return a maximum of 10,000 records at a time. Performance considerations: Avoiding high load on the Elasticsearch cluster while ensuring smooth data retrieval. API token creation To authenticate API requests, I created an API key with the following permissions: { "superuser": { "cluster": ["all"], "indices": [ { "names": ["*"], "privileges": ["all"], "allow_restricted_indices": false }, { "names": ["*"], "privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"], "allow_restricted_indices": true } ] } } { "superuser": { "cluster": ["all"], "indices": [ { "names": ["*"], "privileges": ["all"], "allow_restricted_indices": false }, { "names": ["*"], "privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"], "allow_restricted_indices": true } ] } } Notes: Notes: The API key must be in Base64 format before using it in requests.While not the safest approach, in this example, I give access to all indexes, all privileges, and all clusters. This is the easiest way to do it, and users can limit the indices and privileges necessary for the task. The API key must be in Base64 format before using it in requests. While not the safest approach, in this example, I give access to all indexes, all privileges, and all clusters. This is the easiest way to do it, and users can limit the indices and privileges necessary for the task. Elasticsearch query setup I used Kibana's Dev Tools to construct a query before implementing it in Python. The query included: A wildcard filter to fetch logs matching specific user agents.A time range filter to limit logs within the required period.Sorting by _doc to improve performance with search_after. A wildcard filter to fetch logs matching specific user agents. A time range filter to limit logs within the required period. Sorting by _doc to improve performance with search_after. _doc search_after Example query: { "query": { "bool": { "filter": [ {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}}, {"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}} ] } }, "size": 10000, "sort": [{"_doc": "desc"}], "pit": { "id": "", "keep_alive": "60m" }, "fields": [ "json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp" ] } { "query": { "bool": { "filter": [ {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}}, {"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}} ] } }, "size": 10000, "sort": [{"_doc": "desc"}], "pit": { "id": "", "keep_alive": "60m" }, "fields": [ "json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp" ] } Implementation using Python Before starting, install the elasticsearch package in any convenient way, e.g., pip install elasticsearch elasticsearch pip install elasticsearch I implemented a Python script using the elasticsearch library: elasticsearch import json import time from datetime import datetime, timezone, timedelta from elasticsearch import Elasticsearch # Elasticsearch connection settings ES_URL = "" API_KEY = "" # Query parameters BATCH_SIZE = 10000 # Max 10000 OUTPUT_FILE = "logs.json" INDEX = "EXAMPLE_INDEX" KEEP_ALIVE = "60m" TIME_WINDOW = 60 # Minutes # Initialize Elasticsearch client es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True) def create_pit(): """Create Point in Time""" return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"] def close_pit(pit_id): """Close Point in Time""" es.close_point_in_time(body={"id": pit_id}) def get_query(pit_id, search_after=None): """Generate search query for last TIME_WINDOW minutes""" now = datetime.now(timezone.utc) start_time = now - timedelta(minutes=TIME_WINDOW) query = { "pit": {"id": pit_id, "keep_alive": KEEP_ALIVE}, "size": BATCH_SIZE, "sort": [{"_doc": "desc"}], "query": { "bool": { "filter": [ {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}}, {"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(), "format": "strict_date_optional_time"}}} ] } }, "fields": [ "json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp" ] } if search_after: query["search_after"] = search_after return query def transform_hit(hit): """Transform record into required format""" fields = hit.get("fields", {}) return { "remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-", "remote_log": "-", "user": "-", "timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-", "request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-", "request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-", "status": "-", "response-bytes": "-", "time-take": "-", "referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-", "ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-" } def fetch_logs(): """Fetch logs from Elasticsearch""" pit_id = None start_time = time.time() try: pit_id = create_pit() print("PIT opened") total_records = 0 search_after = None print("Starting logs extraction from Elasticsearch...") with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile: while True: response = es.search(body=get_query(pit_id, search_after)) hits = response.get("hits", {}).get("hits", []) if not hits: break for hit in hits: outfile.write(json.dumps(transform_hit(hit)) + "\n") total_records += 1 if total_records % BATCH_SIZE == 0: elapsed_time = time.time() - start_time elapsed_str = str(timedelta(seconds=elapsed_time)) print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...") search_after = hits[-1].get("sort") time.sleep(0.1) elapsed_time = time.time() - start_time elapsed_str = str(timedelta(seconds=elapsed_time)) print(f"\nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}") except Exception as e: print(f"Error occurred: {e}") finally: if pit_id: try: close_pit(pit_id) print("PIT closed") except Exception as e: print(f"Error closing PIT: {e}") if __name__ == "__main__": fetch_logs() import json import time from datetime import datetime, timezone, timedelta from elasticsearch import Elasticsearch # Elasticsearch connection settings ES_URL = "" API_KEY = "" # Query parameters BATCH_SIZE = 10000 # Max 10000 OUTPUT_FILE = "logs.json" INDEX = "EXAMPLE_INDEX" KEEP_ALIVE = "60m" TIME_WINDOW = 60 # Minutes # Initialize Elasticsearch client es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True) def create_pit(): """Create Point in Time""" return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"] def close_pit(pit_id): """Close Point in Time""" es.close_point_in_time(body={"id": pit_id}) def get_query(pit_id, search_after=None): """Generate search query for last TIME_WINDOW minutes""" now = datetime.now(timezone.utc) start_time = now - timedelta(minutes=TIME_WINDOW) query = { "pit": {"id": pit_id, "keep_alive": KEEP_ALIVE}, "size": BATCH_SIZE, "sort": [{"_doc": "desc"}], "query": { "bool": { "filter": [ {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}}, {"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(), "format": "strict_date_optional_time"}}} ] } }, "fields": [ "json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp" ] } if search_after: query["search_after"] = search_after return query def transform_hit(hit): """Transform record into required format""" fields = hit.get("fields", {}) return { "remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-", "remote_log": "-", "user": "-", "timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-", "request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-", "request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-", "status": "-", "response-bytes": "-", "time-take": "-", "referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-", "ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-" } def fetch_logs(): """Fetch logs from Elasticsearch""" pit_id = None start_time = time.time() try: pit_id = create_pit() print("PIT opened") total_records = 0 search_after = None print("Starting logs extraction from Elasticsearch...") with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile: while True: response = es.search(body=get_query(pit_id, search_after)) hits = response.get("hits", {}).get("hits", []) if not hits: break for hit in hits: outfile.write(json.dumps(transform_hit(hit)) + "\n") total_records += 1 if total_records % BATCH_SIZE == 0: elapsed_time = time.time() - start_time elapsed_str = str(timedelta(seconds=elapsed_time)) print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...") search_after = hits[-1].get("sort") time.sleep(0.1) elapsed_time = time.time() - start_time elapsed_str = str(timedelta(seconds=elapsed_time)) print(f"\nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}") except Exception as e: print(f"Error occurred: {e}") finally: if pit_id: try: close_pit(pit_id) print("PIT closed") except Exception as e: print(f"Error closing PIT: {e}") if __name__ == "__main__": fetch_logs() Details Traffic filters in Elastic Cloud To allow access from any machine, users may need to configure Traffic filters in Elastic Cloud. Filters can be created here.Official documentation: Elastic Cloud Traffic Filtering.After creating a filter, don't forget to apply it:Go to cloud.elastic.co/deployments → select your deployment → Security → Traffic Filters → Apply Traffic Filter. Filters can be created here. here Official documentation: Elastic Cloud Traffic Filtering. Elastic Cloud Traffic Filtering After creating a filter, don't forget to apply it: Go to cloud.elastic.co/deployments → select your deployment → Security → Traffic Filters → Apply Traffic Filter. Query development in Kibana dev tools Before implementing queries in the script, I first built and tested them in Kibana dev tools. Documentation: Kibana Console. Kibana Console Creating an API key in Elastic Cloud To authenticate requests, create an API key in Elastic Cloud. The API key must be Base64-encoded for use in the script.Generate it in the Kibana Security section or in the Elastic Cloud console.Documentation: Create API Key. The API key must be Base64-encoded for use in the script. Generate it in the Kibana Security section or in the Elastic Cloud console. Documentation: Create API Key. Create API Key Conclusion Using search_after PIT allowed us to efficiently fetch large log datasets from Elastic Cloud. search_after