From 48b47c4808083f171b39ec07842859d08e5b454c Mon Sep 17 00:00:00 2001 From: Jose Javier Merchante Date: Mon, 3 Nov 2025 16:33:46 +0100 Subject: [PATCH] Add parallel processing for importing accounts This change implements parallel processing using ThreadPoolExecutor to improve the efficiency of account imports. By processing multiple accounts simultaneously, we reduce the overall time taken to fetch and process account data. Signed-off-by: Jose Javier Merchante --- README.md | 2 +- ...llel-processing-for-importing-accounts.yml | 10 ++ sortinghat/core/importer/backends/eclipse.py | 127 +++++++++++------- 3 files changed, 91 insertions(+), 48 deletions(-) create mode 100644 releases/unreleased/parallel-processing-for-importing-accounts.yml diff --git a/README.md b/README.md index 8ed13cc..e960819 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ SortingHat backend to import identities from Eclipse Foundation ## Requirements - - Python >= 3.9 + - Python >= 3.10 You will also need some other libraries for running the tool, you can find the whole list of dependencies in [pyproject.toml](pyproject.toml) file. diff --git a/releases/unreleased/parallel-processing-for-importing-accounts.yml b/releases/unreleased/parallel-processing-for-importing-accounts.yml new file mode 100644 index 0000000..27063bc --- /dev/null +++ b/releases/unreleased/parallel-processing-for-importing-accounts.yml @@ -0,0 +1,10 @@ +--- +title: Parallel processing for importing accounts +category: performance +author: Jose Javier Merchante +issue: null +notes: > + Implement parallel processing to improve the efficiency + of account imports. By processing multiple accounts + simultaneously, we reduce the overall time taken + to fetch and process account data. diff --git a/sortinghat/core/importer/backends/eclipse.py b/sortinghat/core/importer/backends/eclipse.py index 6f6c9c4..e756c4a 100644 --- a/sortinghat/core/importer/backends/eclipse.py +++ b/sortinghat/core/importer/backends/eclipse.py @@ -20,6 +20,8 @@ import dateutil.relativedelta import requests +from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED + from django.conf import settings from django.db.models import (Q, Subquery) @@ -46,6 +48,9 @@ ECLIPSE_SOURCE = "eclipsefdn" GITHUB_SOURCE = "github" +# Parallel processing +MAX_WORKERS = 8 +MAX_QUEUE_SIZE = 100 logger = logging.getLogger(__name__) @@ -104,68 +109,96 @@ def get_individuals(self): epoch = int(self.from_date.timestamp()) - # Fetch accounts pages - for account in client.fetch_accounts(epoch=epoch): - ef_profile = client.fetch_account_profile(account['name']) + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + pending = set() + + for account in client.fetch_accounts(epoch=epoch): + future = executor.submit(self.process_account, client, account) + pending.add(future) + + # Wait to complete when reaching max queue size + if len(pending) >= MAX_QUEUE_SIZE: + done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60) + if not done: + raise TimeoutError("Timeout waiting for Eclipse account processing") + for future in done: + individual = future.result() + if individual: + yield individual + + # Process remaining futures + for future in as_completed(pending, timeout=600): + try: + individual = future.result() + if individual: + yield individual + except Exception as exc: + logger.error(f"Error processing Eclipse account; error={exc}") + + @staticmethod + def process_account(client, account): + """Process a single Eclipse account to create an Individual.""" - if not ef_profile: - continue + ef_profile = client.fetch_account_profile(account['name']) - individual = Individual(uuid=ef_profile['uid']) + if not ef_profile: + return None - name = ef_profile['first_name'] + ' ' + ef_profile['last_name'] - email = ef_profile['mail'] + individual = Individual(uuid=ef_profile['uid']) - prf = Profile() - prf.name = name - prf.email = email + name = ef_profile['first_name'] + ' ' + ef_profile['last_name'] + email = ef_profile['mail'] - individual.profile = prf + prf = Profile() + prf.name = name + prf.email = email - eclipse_id = Identity( - source=ECLIPSE_SOURCE, + individual.profile = prf + + eclipse_id = Identity( + source=ECLIPSE_SOURCE, + name=name, + email=email, + username=ef_profile['name'], + ) + individual.identities.append(eclipse_id) + + if ef_profile['github_handle']: + idt = Identity( + source=GITHUB_SOURCE, name=name, + username=ef_profile['github_handle'], email=email, - username=ef_profile['name'], ) - individual.identities.append(eclipse_id) - - if ef_profile['github_handle']: - idt = Identity( - source=GITHUB_SOURCE, - name=name, - username=ef_profile['github_handle'], - email=email, - ) - individual.identities.append(idt) + individual.identities.append(idt) - # Fetch enrollments for the identity. If no enrollment is set - # use the organization field from the profile, if set. - employment_history = client.fetch_employment_history(account['name']) + # Fetch enrollments for the identity. If no enrollment is set + # use the organization field from the profile, if set. + employment_history = client.fetch_employment_history(account['name']) - if employment_history: - for employment in employment_history: - org = Organization(name=employment['organization_name']) - start, end = None, None + if employment_history: + for employment in employment_history: + org = Organization(name=employment['organization_name']) + start, end = None, None - if employment['start']: - start = str_to_datetime(employment['start']) - if employment['end']: - end = str_to_datetime(employment['end']) + if employment['start']: + start = str_to_datetime(employment['start']) + if employment['end']: + end = str_to_datetime(employment['end']) - enr = Enrollment(org, start=start, end=end) - individual.enrollments.append(enr) + enr = Enrollment(org, start=start, end=end) + individual.enrollments.append(enr) - if not individual.enrollments: - company = ef_profile.get('org', None) - if company: - org = Organization(name=company) - enr = Enrollment(org) - individual.enrollments.append(enr) + if not individual.enrollments: + company = ef_profile.get('org', None) + if company: + org = Organization(name=company) + enr = Enrollment(org) + individual.enrollments.append(enr) - logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}") + logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}") - yield individual + return individual def post_process_individual(self, individual, uuid): """Post processing for Eclipse identities. @@ -338,7 +371,7 @@ def _fetch_retry(self, url, params=None): response = requests.get(url, params=params, auth=self.token) response.raise_for_status() - return response + return response.json() def _authenticate(self, client_id, client_secret, scope): """Authenticate using OAuth2.