Hi, I’m Vilian Iaumbaev! I recently built a system that automatically handles new crash reports for both iOS and Android — it makes tracking and fixing issues a whole lot easier. Why we did this. Manually assigning crashes to the right developers quickly became a tedious and unreliable task. It was easy to forget something, overlook edge cases, or skip over complex issues. We wanted to make the process more predictable and systematic — so that no one on the team had to waste time triaging crashes, and no critical issue would fall through the cracks. Summary To get started, you’ll need Google tools like Crashlytics, Google Cloud Platform, and Jira. Crashlytics Crashlytics Google Cloud Platform Google Cloud Platform Once your projects are set up in Google services, configure data transfer from Crashlytics to GCP using the integration page. After that, all crash reports will appear in BigQuery tables. integration page integration page The crash data structures for iOS and Android are nearly identical, with just a few small differences — which means we can use a single script to process both. So now you have your crashes in BigQuery which means you can execute some work on this data. You can request all crash data and analyze it as you want on your side. I have choosen Python language and will explain you on this example. Firstly we need to get all crashes data to be analyzied but if you have large amount of data on over of million users you better to preprocess all data on Google side, make some aggregations. Plan Learn some basic SQL to get crash data from BigQuery Query crash data using Python Get all committers from the repository and merge duplicates Map each issue to a repo file and its owner Create a Jira task for the file owner, if a task doesn’t already exist Learn some basic SQL to get crash data from BigQuery Query crash data using Python Get all committers from the repository and merge duplicates Map each issue to a repo file and its owner Create a Jira task for the file owner, if a task doesn’t already exist Learn some SQL basics to get data from BigQuery BigQuery uses its own SQL dialect, which is similar to standard SQL but offers additional convenience for data analysis. For our integration, we needed to work with the complete crash dataset, but in an aggregated form. Specifically, we grouped individual crash reports into unique crash signatures and then aggregated relevant data within each group — such as the number of occurrences, affected user count, version breakdown, and more. You can find the SQL script below and test it in your own environment via the following link: https://console.cloud.google.com/bigquery https://console.cloud.google.com/bigquery https://console.cloud.google.com/bigquery WITH pre as( SELECT issue_id, ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles, ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files, ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries, ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols, COUNT(DISTINCT event_id) as total_events, COUNT(DISTINCT installation_uuid) as total_users, '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info FROM `YOUR TABLE NAME` WHERE 1=1 AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) AND event_timestamp < CURRENT_TIMESTAMP() AND error_type = "FATAL" GROUP BY issue_id, application.display_version ) SELECT issue_id, ARRAY_CONCAT_AGG(issue_titles) as issue_titles, ARRAY_CONCAT_AGG(blame_files) as blame_files, ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries, ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols, SUM(total_events) as total_events, SUM(total_users) as total_users, '[' || STRING_AGG(events_info, ",") || ']' as events_info FROM pre WHERE 1=1 AND issue_id IS NOT NULL AND events_info IS NOT NULL GROUP BY issue_id ORDER BY total_users DESC WITH pre as( SELECT issue_id, ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles, ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files, ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries, ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols, COUNT(DISTINCT event_id) as total_events, COUNT(DISTINCT installation_uuid) as total_users, '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info FROM `YOUR TABLE NAME` WHERE 1=1 AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) AND event_timestamp < CURRENT_TIMESTAMP() AND error_type = "FATAL" GROUP BY issue_id, application.display_version ) SELECT issue_id, ARRAY_CONCAT_AGG(issue_titles) as issue_titles, ARRAY_CONCAT_AGG(blame_files) as blame_files, ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries, ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols, SUM(total_events) as total_events, SUM(total_users) as total_users, '[' || STRING_AGG(events_info, ",") || ']' as events_info FROM pre WHERE 1=1 AND issue_id IS NOT NULL AND events_info IS NOT NULL GROUP BY issue_id ORDER BY total_users DESC As a result, you will get one row per unique issue_id, along with the following aggregated fields: issue_titles — a list of all crash titles. This is an array to account for cases where multiple unique titles exist for the same issue. In the scripting part, we’ll select the most frequent one. blame_files — a list of top stacktrace files blamed for the crash. This will be non-empty if the crash occurred in your codebase (rather than in system libraries). blame_libraries — a list of libraries associated with the crash. This is also an array, constructed for reasons similar to issue_titles. blame_symbols — a list of code symbols (functions/methods) where the crash occurred. Like the other fields above, it’s an array. total_events — the total number of crash occurrences during the selected time period. total_users — the number of unique users affected. Sometimes a crash may occur only for a specific group of users. events_info — a JSON array (as a string) containing total_events and total_users broken down by app version. See the example below. issue_titles — a list of all crash titles. This is an array to account for cases where multiple unique titles exist for the same issue. In the scripting part, we’ll select the most frequent one. issue_titles blame_files — a list of top stacktrace files blamed for the crash. This will be non-empty if the crash occurred in your codebase (rather than in system libraries). blame_files blame_libraries — a list of libraries associated with the crash. This is also an array, constructed for reasons similar to issue_titles. blame_libraries blame_symbols — a list of code symbols (functions/methods) where the crash occurred. Like the other fields above, it’s an array. blame_symbols total_events — the total number of crash occurrences during the selected time period. total_events total_users — the number of unique users affected. Sometimes a crash may occur only for a specific group of users. total_users events_info — a JSON array (as a string) containing total_events and total_users broken down by app version. See the example below. events_info [ { "version": "1.0.1", "events": 131, "users": 110 }, { "version": "1.2.1", "events": 489, "users": 426 } ] [ { "version": "1.0.1", "events": 131, "users": 110 }, { "version": "1.2.1", "events": 489, "users": 426 } ] Request crashes data from BigQuery using Python To get started, install the BigQuery Python client library from PyPI. After the installation, create a BigQueryExecutor.py file — this module will handle all communication with Google Cloud BigQuery. PyPI PyPI import os import json import tempfile from google.oauth2 import service_account from google.cloud import bigquery from collections import Counter class BigQueryExecutor: def __init__(self, credentialsJson: str, bqProjectId: str = ''): temp_file_path='' with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: json.dump(json.loads(credentialsJson), temp_file, indent=4) temp_file_path = temp_file.name credentials = service_account.Credentials.from_service_account_file(temp_file_path) os.remove(temp_file_path) self.client = bigquery.Client(project=bqProjectId, credentials=credentials) import os import json import tempfile from google.oauth2 import service_account from google.cloud import bigquery from collections import Counter class BigQueryExecutor: def __init__(self, credentialsJson: str, bqProjectId: str = ''): temp_file_path='' with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: json.dump(json.loads(credentialsJson), temp_file, indent=4) temp_file_path = temp_file.name credentials = service_account.Credentials.from_service_account_file(temp_file_path) os.remove(temp_file_path) self.client = bigquery.Client(project=bqProjectId, credentials=credentials) To start using the script, you’ll need just two things: A Google service account JSON credentials file The name (or id) of your BigQuery project A Google service account JSON credentials file The name (or id) of your BigQuery project Once you have these, you can authenticate and start executing queries through the script. Google service account json credential To create a service account, go to Google Cloud Console and assign it the BigQuery Data Editor role. Google Cloud Console Google Cloud Console BigQuery Data Editor Once the account is created, open it, navigate to the “Keys” tab, click “Add key”, and choose “JSON”. This will generate and download a JSON credentials file for the service account. “Keys” “Add key” “JSON” A service account JSON typically looks like this: { "type": "service_account", "project_id": YOUR_PROJECT, "private_key_id": private_key_id, "private_key": GCP_PRIVATE_KEY, "client_email": "email", "client_id": "id", "auth_uri": "auth_uri", "token_uri": "token_uri", "auth_provider_x509_cert_url": "auth_provider_x509_cert_url", "client_x509_cert_url": "url", "universe_domain": "universe_domain" } { "type": "service_account", "project_id": YOUR_PROJECT, "private_key_id": private_key_id, "private_key": GCP_PRIVATE_KEY, "client_email": "email", "client_id": "id", "auth_uri": "auth_uri", "token_uri": "token_uri", "auth_provider_x509_cert_url": "auth_provider_x509_cert_url", "client_x509_cert_url": "url", "universe_domain": "universe_domain" } For testing purposes, you can convert the JSON credentials into a single-line string and embed it directly into your script. However, this approach is not recommended for production — use a Secrets Manager to securely store and manage your credentials instead. not recommended for production You can also extract your bqProjectId from the project_id field inside the credentials JSON. Models To work with BigQuery data in a type-safe manner, it’s useful to define data models that reflect the structure of the query results. This allows you to write cleaner, safer, and more maintainable code. Below is an example of such model classes: class BQCrashlyticsVersionsModel: def __init__(self, version: str, events: int, users: int ): self.version = version self.events = events self.users = users class BQCrashlyticsIssueModel: def __init__(self, issue_id: str, issue_title: str, blame_file: str, blame_library: str, blame_symbol: str, total_events: int, total_users: int, versions: list[BQCrashlyticsVersionsModel] ): self.issue_id = issue_id self.issue_title = issue_title self.blame_file = blame_file self.blame_library = blame_library self.blame_symbol = blame_symbol self.total_events = total_events self.total_users = total_users self.versions = versions class BQCrashlyticsVersionsModel: def __init__(self, version: str, events: int, users: int ): self.version = version self.events = events self.users = users class BQCrashlyticsIssueModel: def __init__(self, issue_id: str, issue_title: str, blame_file: str, blame_library: str, blame_symbol: str, total_events: int, total_users: int, versions: list[BQCrashlyticsVersionsModel] ): self.issue_id = issue_id self.issue_title = issue_title self.blame_file = blame_file self.blame_library = blame_library self.blame_symbol = blame_symbol self.total_events = total_events self.total_users = total_users self.versions = versions getCrashlyticsIssues Function And finally, we can fetch data from BigQuery. Add the following method to your existing BigQueryExecutor class — it will execute the SQL query described earlier in the BigQuery SQL section and return the results parsed into model instances. BigQuery SQL def getCrashlyticsIssues(self, lastHoursCount: int, tableName: str) -> list[BQCrashlyticsIssueModel]: firstEventsInfo = """'[' || STRING_AGG(events_info, ",") || ']' as events_info""" asVersions = """ '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info """ query = f""" WITH pre as( SELECT issue_id, ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles, ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files, ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries, ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols, COUNT(DISTINCT event_id) as total_events, COUNT(DISTINCT installation_uuid) as total_users, {asVersions} FROM `{tableName}` WHERE 1=1 AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {lastHoursCount} HOUR) AND event_timestamp < CURRENT_TIMESTAMP() AND error_type = "FATAL" GROUP BY issue_id, application.display_version ) SELECT issue_id, ARRAY_CONCAT_AGG(issue_titles) as issue_titles, ARRAY_CONCAT_AGG(blame_files) as blame_files, ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries, ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols, SUM(total_events) as total_events, SUM(total_users) as total_users, {firstEventsInfo} FROM pre WHERE 1=1 AND issue_id IS NOT NULL AND events_info IS NOT NULL GROUP BY issue_id ORDER BY total_users DESC """ bqRows = self.client.query(query).result() rows: list[BQCrashlyticsIssueModel] = [] def mergeArray(array: list[str]) -> str: if not array: return '' counter = Counter(array) most_common = counter.most_common(1) return most_common[0][0] if most_common else '' for row in bqRows: issueModel = BQCrashlyticsIssueModel( issue_id=row.issue_id, issue_title=mergeArray(row.issue_titles), blame_file=mergeArray(row.blame_files), blame_library=mergeArray(row.blame_libraries), blame_symbol=mergeArray(row.blame_symbols), total_events=row.total_events, total_users=row.total_users, versions=[BQCrashlyticsVersionsModel(version=jj['version'], events=jj['events'], users=jj['users']) for jj in json.loads(row.events_info)] ) rows.append(issueModel) return rows def getCrashlyticsIssues(self, lastHoursCount: int, tableName: str) -> list[BQCrashlyticsIssueModel]: firstEventsInfo = """'[' || STRING_AGG(events_info, ",") || ']' as events_info""" asVersions = """ '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info """ query = f""" WITH pre as( SELECT issue_id, ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles, ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files, ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries, ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols, COUNT(DISTINCT event_id) as total_events, COUNT(DISTINCT installation_uuid) as total_users, {asVersions} FROM `{tableName}` WHERE 1=1 AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {lastHoursCount} HOUR) AND event_timestamp < CURRENT_TIMESTAMP() AND error_type = "FATAL" GROUP BY issue_id, application.display_version ) SELECT issue_id, ARRAY_CONCAT_AGG(issue_titles) as issue_titles, ARRAY_CONCAT_AGG(blame_files) as blame_files, ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries, ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols, SUM(total_events) as total_events, SUM(total_users) as total_users, {firstEventsInfo} FROM pre WHERE 1=1 AND issue_id IS NOT NULL AND events_info IS NOT NULL GROUP BY issue_id ORDER BY total_users DESC """ bqRows = self.client.query(query).result() rows: list[BQCrashlyticsIssueModel] = [] def mergeArray(array: list[str]) -> str: if not array: return '' counter = Counter(array) most_common = counter.most_common(1) return most_common[0][0] if most_common else '' for row in bqRows: issueModel = BQCrashlyticsIssueModel( issue_id=row.issue_id, issue_title=mergeArray(row.issue_titles), blame_file=mergeArray(row.blame_files), blame_library=mergeArray(row.blame_libraries), blame_symbol=mergeArray(row.blame_symbols), total_events=row.total_events, total_users=row.total_users, versions=[BQCrashlyticsVersionsModel(version=jj['version'], events=jj['events'], users=jj['users']) for jj in json.loads(row.events_info)] ) rows.append(issueModel) return rows Now we can execute our SQL request to BigQuery directly from Python. Here’s a full example of how to run the query and work with the results: executor = BigQueryExecutor(credentialsJson=_jsonToken, bqProjectId=_bqProjectId) allCrashes = executor.getCrashlyticsIssues(lastHoursCount=24, tableName="tableNAME_IOS_REALTIME") for i in allCrashes: print(i.issue_title) executor = BigQueryExecutor(credentialsJson=_jsonToken, bqProjectId=_bqProjectId) allCrashes = executor.getCrashlyticsIssues(lastHoursCount=24, tableName="tableNAME_IOS_REALTIME") for i in allCrashes: print(i.issue_title) [libswift_Concurrency.dylib] swift::swift_Concurrency_fatalError(unsigned int, char const*, ...) [SwiftUI] static DisplayList.ViewUpdater.Platform.updateClipShapesAsync(layer:oldState:newState:) Foundation [SwiftUI] __swift_memcpy112_8 [libswift_Concurrency.dylib] swift::AsyncTask::waitFuture(swift::AsyncTask*, swift::AsyncContext*, void (swift::AsyncContext* swift_async_context) swiftasynccall*, swift::AsyncContext*, swift::OpaqueValue*) [libobjc.A.dylib] objc_opt_respondsToSelector [VectorKit] void md::COverlayRenderLayer::layoutRibbon<md::Ribbons::PolylineOverlayRibbonDescriptor>(std::__1::unique_ptr<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor>, std::__1::default_delete<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor> > > const&, ggl::CommandBuffer*, md::PolylineOverlayLayoutContext&, unsigned int, unsigned long long, bool, bool, float) [libswiftCore.dylib] _swift_release_dealloc [libobjc.A.dylib] objc_msgSend [libswift_Concurrency.dylib] swift::swift_Concurrency_fatalError(unsigned int, char const*, ...) [SwiftUI] static DisplayList.ViewUpdater.Platform.updateClipShapesAsync(layer:oldState:newState:) Foundation [SwiftUI] __swift_memcpy112_8 [libswift_Concurrency.dylib] swift::AsyncTask::waitFuture(swift::AsyncTask*, swift::AsyncContext*, void (swift::AsyncContext* swift_async_context) swiftasynccall*, swift::AsyncContext*, swift::OpaqueValue*) [libobjc.A.dylib] objc_opt_respondsToSelector [VectorKit] void md::COverlayRenderLayer::layoutRibbon<md::Ribbons::PolylineOverlayRibbonDescriptor>(std::__1::unique_ptr<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor>, std::__1::default_delete<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor> > > const&, ggl::CommandBuffer*, md::PolylineOverlayLayoutContext&, unsigned int, unsigned long long, bool, bool, float) [libswiftCore.dylib] _swift_release_dealloc [libobjc.A.dylib] objc_msgSend Hooray! 🎉 Now that we’re able to fetch crash data from BigQuery, we can move on to the next step — taking the top 5 most frequent crashes and automatically creating Jira tasks for them. Get all commiters of repository and merge them Before assigning crash issues to developers, we first need to identify potential owners for each crash. To do that, we’ll start by gathering all commit authors from the repository. Since we’re using GitHub, we should be aware of a few specific details: Some developers may use multiple email addresses across commits, so we’ll need to merge identities where applicable. GitHub often uses noreply emails (e.g. username@users.noreply.github.com), so we’ll handle those cases accordingly. Some developers may use multiple email addresses across commits, so we’ll need to merge identities where applicable. GitHub often uses noreply emails (e.g. username@users.noreply.github.com), so we’ll handle those cases accordingly. The main goal at this step is to extract and normalize the list of Git authors with their names and emails, using the following command: git log | grep ‘^Author’ | sort | uniq -c git log | grep ‘^Author’ | sort | uniq -c import re class GitUserModel: def __init__(self, nicknames: set[str], emails: set[str], gitLogins: set[str] ): self.nicknames = nicknames self.emails = emails self.gitLogins = gitLogins def returnPossibleNicknames(text: str) -> set[str]: res = [findEmail(text), loginFromEmail(text), findGitNoreplyLogin(text)] return set(list(filter(None, res))) def findEmail(text: str) -> str: e = re.match(r"(([A-Za-z0-9+\.\_\-]*@[A-Za-z0-9+]*\.[A-Za-z0-9+]*))", text) if e: return e.group(1) def loginFromEmail(text: str) -> str: e = re.match(r"(([A-Za-z0-9+\.\_\-]*))@[A-Za-z0-9+]*\.[A-Za-z0-9+]*", text) if e: return e.group(1) def findGitNoreplyLogin(text: str) -> str: gu = re.match(r"\d+\+(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text) if gu: return gu.group(1) else: gu = re.match(r"(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text) if gu: return gu.group(1) class GitBlamer: def getAllRepoUsersMap(self, projectRootPath: str) -> list[GitUserModel]: users: list[GitUserModel] = [] allGitLog = os.popen("cd {}; git log | grep '^Author' | sort | uniq -c".format(projectRootPath)).read() for line in allGitLog.split('\n'): user = self._createUserFromBlameLine(line) if user: users.append(user) self._enrichUsersNicknames(users=users) users = self._mergeSameUsers(users) users = sorted(users, key=lambda x: list(x.emails)[0] if x.emails else list(x.gitLogins)[0] if x.gitLogins else "") return users def _createUserFromBlameLine(self, line): m = re.match(r".* Author: (.*) <(.*)>", line) user = GitUserModel(nicknames=set(), emails=set(), gitLogins=set()) if m: val=set() if m.group(1): val.add(m.group(1)) if m.group(2): val.add(m.group(2)) user.nicknames = val else: return return user def _enrichUsersNicknames(self, users: list[GitUserModel]): for user in users: possibleNicknames = set() for nick in user.nicknames: possibleNicknames = possibleNicknames.union(returnPossibleNicknames(text=nick)) e = findEmail(text=nick) if e: user.emails.add(e) gu = findGitNoreplyLogin(text=nick) if gu: user.gitLogins.add(gu) user.nicknames = user.nicknames.union(possibleNicknames) def _mergeSameUsers(self, users: list[GitUserModel]): for i in range(0, len(users)): if i >= len(users): break for j in range(i+1, len(users)): if j >= len(users): break setLoweredJNicknames=set([u.lower() for u in users[j].nicknames]) for k in range(0, j): if k >= j: break setLoweredKNicknames=set([u.lower() for u in users[k].nicknames]) isSameNickname=len(setLoweredKNicknames.intersection(setLoweredJNicknames)) > 0 if isSameNickname: users[j].gitLogins = users[j].gitLogins.union(users[k].gitLogins) users[j].emails = users[j].emails.union(users[k].emails) users.pop(k) break return users import re class GitUserModel: def __init__(self, nicknames: set[str], emails: set[str], gitLogins: set[str] ): self.nicknames = nicknames self.emails = emails self.gitLogins = gitLogins def returnPossibleNicknames(text: str) -> set[str]: res = [findEmail(text), loginFromEmail(text), findGitNoreplyLogin(text)] return set(list(filter(None, res))) def findEmail(text: str) -> str: e = re.match(r"(([A-Za-z0-9+\.\_\-]*@[A-Za-z0-9+]*\.[A-Za-z0-9+]*))", text) if e: return e.group(1) def loginFromEmail(text: str) -> str: e = re.match(r"(([A-Za-z0-9+\.\_\-]*))@[A-Za-z0-9+]*\.[A-Za-z0-9+]*", text) if e: return e.group(1) def findGitNoreplyLogin(text: str) -> str: gu = re.match(r"\d+\+(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text) if gu: return gu.group(1) else: gu = re.match(r"(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text) if gu: return gu.group(1) class GitBlamer: def getAllRepoUsersMap(self, projectRootPath: str) -> list[GitUserModel]: users: list[GitUserModel] = [] allGitLog = os.popen("cd {}; git log | grep '^Author' | sort | uniq -c".format(projectRootPath)).read() for line in allGitLog.split('\n'): user = self._createUserFromBlameLine(line) if user: users.append(user) self._enrichUsersNicknames(users=users) users = self._mergeSameUsers(users) users = sorted(users, key=lambda x: list(x.emails)[0] if x.emails else list(x.gitLogins)[0] if x.gitLogins else "") return users def _createUserFromBlameLine(self, line): m = re.match(r".* Author: (.*) <(.*)>", line) user = GitUserModel(nicknames=set(), emails=set(), gitLogins=set()) if m: val=set() if m.group(1): val.add(m.group(1)) if m.group(2): val.add(m.group(2)) user.nicknames = val else: return return user def _enrichUsersNicknames(self, users: list[GitUserModel]): for user in users: possibleNicknames = set() for nick in user.nicknames: possibleNicknames = possibleNicknames.union(returnPossibleNicknames(text=nick)) e = findEmail(text=nick) if e: user.emails.add(e) gu = findGitNoreplyLogin(text=nick) if gu: user.gitLogins.add(gu) user.nicknames = user.nicknames.union(possibleNicknames) def _mergeSameUsers(self, users: list[GitUserModel]): for i in range(0, len(users)): if i >= len(users): break for j in range(i+1, len(users)): if j >= len(users): break setLoweredJNicknames=set([u.lower() for u in users[j].nicknames]) for k in range(0, j): if k >= j: break setLoweredKNicknames=set([u.lower() for u in users[k].nicknames]) isSameNickname=len(setLoweredKNicknames.intersection(setLoweredJNicknames)) > 0 if isSameNickname: users[j].gitLogins = users[j].gitLogins.union(users[k].gitLogins) users[j].emails = users[j].emails.union(users[k].emails) users.pop(k) break return users In the code below, we attempt to match different commit identities that likely belong to the same person — for example, user@gmail.com and user@users.noreply.github.com. We also extract and group their names and GitHub usernames (where available) for convenience. With the script below, you can launch this process and get a cleaned, deduplicated list of all committers in the repository: projectRootPath="/IOS_project_path" blamer = GitBlamer() allUsers = blamer.getAllRepoUsersMap(projectRootPath=projectRootPath) for user in allUsers: print(", ".join(user.nicknames)) projectRootPath="/IOS_project_path" blamer = GitBlamer() allUsers = blamer.getAllRepoUsersMap(projectRootPath=projectRootPath) for user in allUsers: print(", ".join(user.nicknames)) Map each issue to file of repository and file owner At this point, we have detailed information about our crashes and the users affected by them. This allows us to associate a specific crash with a specific user and automatically create a corresponding Jira task. Before implementing the crash-to-user mapping logic, we separated the workflows for iOS and Android. These platforms use different symbol formats, and the criteria for linking crash files to issues also differ. To handle this cleanly, we introduced an abstract class with platform-specific implementations, enabling us to encapsulate the differences and solve the problem in a structured way. class AbstractFileToIssueMapper: def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: raise Exception('Not implemented method AbstractFileToIssueMapper') class AndroidFileToIssueMapper(AbstractFileToIssueMapper): def __init__(self): self.libraryName = 'inDrive' def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: if file != issue.blame_file or not issue.blame_symbol.startswith(self.libraryName): return False fileNameNoExtension = file.split('.')[0] fileNameIndexInSymbol = issue.blame_symbol.find(fileNameNoExtension) if fileNameIndexInSymbol < 0: return False relativeFilePathFromSymbol = issue.blame_symbol[0:fileNameIndexInSymbol].replace('.', '/') relativeFilePathFromSymbol = relativeFilePathFromSymbol + file return filePath.endswith(relativeFilePathFromSymbol) class IosFileToIssueMapper(AbstractFileToIssueMapper): def __init__(self): self.indriveLibraryName = 'inDrive' self.indriveFolderName = 'inDrive' self.modulesFolderName = 'Modules' def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: if file != issue.blame_file: return False isMatchFolder = False if issue.blame_library == self.indriveLibraryName: isMatchFolder = filePath.startswith('{}/'.format(self.indriveFolderName)) else: isMatchFolder = filePath.startswith('{}/{}/'.format(self.modulesFolderName, issue.blame_library)) return isMatchFolder class AbstractFileToIssueMapper: def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: raise Exception('Not implemented method AbstractFileToIssueMapper') class AndroidFileToIssueMapper(AbstractFileToIssueMapper): def __init__(self): self.libraryName = 'inDrive' def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: if file != issue.blame_file or not issue.blame_symbol.startswith(self.libraryName): return False fileNameNoExtension = file.split('.')[0] fileNameIndexInSymbol = issue.blame_symbol.find(fileNameNoExtension) if fileNameIndexInSymbol < 0: return False relativeFilePathFromSymbol = issue.blame_symbol[0:fileNameIndexInSymbol].replace('.', '/') relativeFilePathFromSymbol = relativeFilePathFromSymbol + file return filePath.endswith(relativeFilePathFromSymbol) class IosFileToIssueMapper(AbstractFileToIssueMapper): def __init__(self): self.indriveLibraryName = 'inDrive' self.indriveFolderName = 'inDrive' self.modulesFolderName = 'Modules' def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool: if file != issue.blame_file: return False isMatchFolder = False if issue.blame_library == self.indriveLibraryName: isMatchFolder = filePath.startswith('{}/'.format(self.indriveFolderName)) else: isMatchFolder = filePath.startswith('{}/{}/'.format(self.modulesFolderName, issue.blame_library)) return isMatchFolder The specific implementation may vary depending on your project, but the main responsibility of this class is to determine whether a given crash occurred in a particular file. Once this logic is in place, we can proceed to map files to issues and assign them to the corresponding file owners. import subprocess class MappedIssueFileModel: def __init__(self, fileGitLink: str, filePath: str, issue: BQCrashlyticsIssueModel, fileOwner: GitUserModel ): self.fileGitLink = fileGitLink self.filePath = filePath self.issue = issue self.fileOwner = fileOwner class BigQueryCrashesFilesMapper: def getBlameOut(self, filePath: str, projectRootPath: str) -> list[str]: dangerousChars = re.compile(r'[;|&\r\n]|\.\.') if dangerousChars.search(filePath) or dangerousChars.search(projectRootPath): return None if not subprocess.check_output(['git', 'ls-files', filePath], cwd=projectRootPath, text=True): return None blameProc = subprocess.Popen(['git', 'blame', filePath, '-cwe'], cwd=projectRootPath, stdout=subprocess.PIPE, text=True) blameRegex=r'<[a-zA-Z0-9\+\.\_\-]*@[a-zA-Z0-9\+\.\_\-]*>' grepProc = subprocess.Popen(['grep', '-o', blameRegex], stdin=blameProc.stdout, stdout=subprocess.PIPE, text=True) blameProc.stdout.close() sortProc = subprocess.Popen(['sort'], stdin=grepProc.stdout, stdout=subprocess.PIPE, text=True) grepProc.stdout.close() uniqProc = subprocess.Popen(['uniq', '-c'], stdin=sortProc.stdout, stdout=subprocess.PIPE, text=True) sortProc.stdout.close() finalProc = subprocess.Popen(['sort', '-bgr'], stdin=uniqProc.stdout, stdout=subprocess.PIPE, text=True) uniqProc.stdout.close() blameOut, _ = finalProc.communicate() blameArray=list(filter(len, blameOut.split('\n'))) return blameArray def findFileOwner(self, filePath: str, gitFileOwners: list[GitUserModel], projectRootPath: str) -> GitUserModel: blameArray = self.getBlameOut(filePath=filePath, projectRootPath=projectRootPath) if not blameArray: return foundAuthor = None for blameI in range(0, len(blameArray)): author = re.match(r".*\d+ <(.*)>", blameArray[blameI]) if author: possibleNicknames = returnPossibleNicknames(text=author.group(1)) for gitFileOwner in gitFileOwners: if len(gitFileOwner.nicknames.intersection(possibleNicknames)) > 0: foundAuthor = gitFileOwner break if foundAuthor: break return foundAuthor def mapBQResultsWithFiles(self, fileToIssueMapper: AbstractFileToIssueMapper, issues: list[BQCrashlyticsIssueModel], gitFileOwners: list[GitUserModel], projectRootPath: str ) -> list[MappedIssueFileModel]: mappedArray: list[MappedIssueFileModel] = [] githubMainBranch = "https://github.com/inDriver/UDF/blob/master" for root, dirs, files in os.walk(projectRootPath): for file in files: filePath = os.path.join(root, file).removeprefix(projectRootPath).strip('/') gitFileOwner = None for issue in issues: if fileToIssueMapper.isPathBelongsToIssue(file, filePath, issue): if not gitFileOwner: gitFileOwner = self.findFileOwner(filePath=filePath, gitFileOwners=gitFileOwners, projectRootPath=projectRootPath) mappedIssue = MappedIssueFileModel( fileGitLink='{}/{}'.format(githubMainBranch, filePath.strip('/')), filePath=filePath, issue=issue, fileOwner=gitFileOwner ) mappedArray.append(mappedIssue) mappedArray.sort(key=lambda x: x.issue.total_users, reverse=True) return mappedArray import subprocess class MappedIssueFileModel: def __init__(self, fileGitLink: str, filePath: str, issue: BQCrashlyticsIssueModel, fileOwner: GitUserModel ): self.fileGitLink = fileGitLink self.filePath = filePath self.issue = issue self.fileOwner = fileOwner class BigQueryCrashesFilesMapper: def getBlameOut(self, filePath: str, projectRootPath: str) -> list[str]: dangerousChars = re.compile(r'[;|&\r\n]|\.\.') if dangerousChars.search(filePath) or dangerousChars.search(projectRootPath): return None if not subprocess.check_output(['git', 'ls-files', filePath], cwd=projectRootPath, text=True): return None blameProc = subprocess.Popen(['git', 'blame', filePath, '-cwe'], cwd=projectRootPath, stdout=subprocess.PIPE, text=True) blameRegex=r'<[a-zA-Z0-9\+\.\_\-]*@[a-zA-Z0-9\+\.\_\-]*>' grepProc = subprocess.Popen(['grep', '-o', blameRegex], stdin=blameProc.stdout, stdout=subprocess.PIPE, text=True) blameProc.stdout.close() sortProc = subprocess.Popen(['sort'], stdin=grepProc.stdout, stdout=subprocess.PIPE, text=True) grepProc.stdout.close() uniqProc = subprocess.Popen(['uniq', '-c'], stdin=sortProc.stdout, stdout=subprocess.PIPE, text=True) sortProc.stdout.close() finalProc = subprocess.Popen(['sort', '-bgr'], stdin=uniqProc.stdout, stdout=subprocess.PIPE, text=True) uniqProc.stdout.close() blameOut, _ = finalProc.communicate() blameArray=list(filter(len, blameOut.split('\n'))) return blameArray def findFileOwner(self, filePath: str, gitFileOwners: list[GitUserModel], projectRootPath: str) -> GitUserModel: blameArray = self.getBlameOut(filePath=filePath, projectRootPath=projectRootPath) if not blameArray: return foundAuthor = None for blameI in range(0, len(blameArray)): author = re.match(r".*\d+ <(.*)>", blameArray[blameI]) if author: possibleNicknames = returnPossibleNicknames(text=author.group(1)) for gitFileOwner in gitFileOwners: if len(gitFileOwner.nicknames.intersection(possibleNicknames)) > 0: foundAuthor = gitFileOwner break if foundAuthor: break return foundAuthor def mapBQResultsWithFiles(self, fileToIssueMapper: AbstractFileToIssueMapper, issues: list[BQCrashlyticsIssueModel], gitFileOwners: list[GitUserModel], projectRootPath: str ) -> list[MappedIssueFileModel]: mappedArray: list[MappedIssueFileModel] = [] githubMainBranch = "https://github.com/inDriver/UDF/blob/master" for root, dirs, files in os.walk(projectRootPath): for file in files: filePath = os.path.join(root, file).removeprefix(projectRootPath).strip('/') gitFileOwner = None for issue in issues: if fileToIssueMapper.isPathBelongsToIssue(file, filePath, issue): if not gitFileOwner: gitFileOwner = self.findFileOwner(filePath=filePath, gitFileOwners=gitFileOwners, projectRootPath=projectRootPath) mappedIssue = MappedIssueFileModel( fileGitLink='{}/{}'.format(githubMainBranch, filePath.strip('/')), filePath=filePath, issue=issue, fileOwner=gitFileOwner ) mappedArray.append(mappedIssue) mappedArray.sort(key=lambda x: x.issue.total_users, reverse=True) return mappedArray All you need to do at this point is update the githubMainBranch property with the link to your own repository. Next, we gather issues and file owners, map files accordingly using the code below, and get a final result — a list of issues sorted by total_users in descending order. mapper = BigQueryCrashesFilesMapper() mappedIssues = mapper.mapBQResultsWithFiles( fileToIssueMapper=IosFileToIssueMapper(), issues=allCrashes, gitFileOwners=allUsers, projectRootPath=projectRootPath ) for issue in mappedIssues: if issue.fileOwner: print(list(issue.fileOwner.nicknames)[0], issue.issue.total_users, issue.issue.issue_title) else: print('no owner', issue.issue.total_users, issue.issue.issue_title) mapper = BigQueryCrashesFilesMapper() mappedIssues = mapper.mapBQResultsWithFiles( fileToIssueMapper=IosFileToIssueMapper(), issues=allCrashes, gitFileOwners=allUsers, projectRootPath=projectRootPath ) for issue in mappedIssues: if issue.fileOwner: print(list(issue.fileOwner.nicknames)[0], issue.issue.total_users, issue.issue.issue_title) else: print('no owner', issue.issue.total_users, issue.issue.issue_title) Create Jira task for a file owner of the crash At this point, we have everything we need to start creating Jira tasks for crash owners. However, keep in mind that Jira configurations often vary between companies — custom fields, workflows, and permissions may differ. I recommend referring to the official Jira API documentation and using their official Python client to ensure compatibility with your setup. Here are some practical tips based on our experience: Don’t create tasks for every issue. Focus on the top 5–10 issues based on the number of affected users or a certain impact threshold. Persist task metadata. Store information about created tasks in a persistent storage. I use BigQuery, saving data in a separate table and updating it on each script run. Recreate closed tasks if the issue reappears in newer versions of the app — this ensures that regressions aren’t ignored. Link tasks for the same issue to simplify future investigation and avoid duplication. Include as much detail as possible in the task description. Add crash aggregations, affected user counts, versions, etc. Link related crashes if they originate from the same file — this provides additional context. Notify your team in Slack (or another messaging system) when new tasks are created or existing ones need attention. Include helpful links to the crash report, task, relevant GitHub files, etc. Add error handling to your script. Use try/except blocks and send Slack alerts when something fails. Cache slow operations during development. For example, cache BigQuery crash retrievals locally to speed up iteration. Some crashes may involve shared or core libraries. In those cases, you’ll likely need to manually assign the task, but it’s still helpful to create the Jira issue automatically with full crash context. Don’t create tasks for every issue. Focus on the top 5–10 issues based on the number of affected users or a certain impact threshold. Don’t create tasks for every issue. Persist task metadata. Store information about created tasks in a persistent storage. I use BigQuery, saving data in a separate table and updating it on each script run. Persist task metadata. Recreate closed tasks if the issue reappears in newer versions of the app — this ensures that regressions aren’t ignored. Recreate closed tasks if the issue reappears Link tasks for the same issue to simplify future investigation and avoid duplication. Link tasks for the same issue Include as much detail as possible in the task description. Add crash aggregations, affected user counts, versions, etc. Include as much detail as possible in the task description. Link related crashes if they originate from the same file — this provides additional context. Link related crashes Notify your team in Slack (or another messaging system) when new tasks are created or existing ones need attention. Include helpful links to the crash report, task, relevant GitHub files, etc. Notify your team Add error handling to your script. Use try/except blocks and send Slack alerts when something fails. Add error handling to your script. Cache slow operations during development. For example, cache BigQuery crash retrievals locally to speed up iteration. Cache slow operations during development. Some crashes may involve shared or core libraries. In those cases, you’ll likely need to manually assign the task, but it’s still helpful to create the Jira issue automatically with full crash context. Some crashes may involve shared or core libraries. Conclusion This system allows us to process thousands of crash reports daily, and route them to the right developer in just a few minutes — without any manual work. If your team is drowning in uncategorized crash issues — automate it. 🙌 Written by Vilian Iaumbaev. Written by Vilian Iaumbaev. Vilian Iaumbaev