diff --git a/README.md b/README.md index 8ed13cc..e960819 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ SortingHat backend to import identities from Eclipse Foundation ## Requirements - - Python >= 3.9 + - Python >= 3.10 You will also need some other libraries for running the tool, you can find the whole list of dependencies in [pyproject.toml](pyproject.toml) file. diff --git a/releases/unreleased/parallel-processing-for-importing-accounts.yml b/releases/unreleased/parallel-processing-for-importing-accounts.yml new file mode 100644 index 0000000..27063bc --- /dev/null +++ b/releases/unreleased/parallel-processing-for-importing-accounts.yml @@ -0,0 +1,10 @@ +--- +title: Parallel processing for importing accounts +category: performance +author: Jose Javier Merchante +issue: null +notes: > + Implement parallel processing to improve the efficiency + of account imports. By processing multiple accounts + simultaneously, we reduce the overall time taken + to fetch and process account data. diff --git a/sortinghat/core/importer/backends/eclipse.py b/sortinghat/core/importer/backends/eclipse.py index 6f6c9c4..e756c4a 100644 --- a/sortinghat/core/importer/backends/eclipse.py +++ b/sortinghat/core/importer/backends/eclipse.py @@ -20,6 +20,8 @@ import dateutil.relativedelta import requests +from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED + from django.conf import settings from django.db.models import (Q, Subquery) @@ -46,6 +48,9 @@ ECLIPSE_SOURCE = "eclipsefdn" GITHUB_SOURCE = "github" +# Parallel processing +MAX_WORKERS = 8 +MAX_QUEUE_SIZE = 100 logger = logging.getLogger(__name__) @@ -104,68 +109,96 @@ def get_individuals(self): epoch = int(self.from_date.timestamp()) - # Fetch accounts pages - for account in client.fetch_accounts(epoch=epoch): - ef_profile = client.fetch_account_profile(account['name']) + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + pending = set() + + for account in client.fetch_accounts(epoch=epoch): + future = executor.submit(self.process_account, client, account) + pending.add(future) + + # Wait to complete when reaching max queue size + if len(pending) >= MAX_QUEUE_SIZE: + done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60) + if not done: + raise TimeoutError("Timeout waiting for Eclipse account processing") + for future in done: + individual = future.result() + if individual: + yield individual + + # Process remaining futures + for future in as_completed(pending, timeout=600): + try: + individual = future.result() + if individual: + yield individual + except Exception as exc: + logger.error(f"Error processing Eclipse account; error={exc}") + + @staticmethod + def process_account(client, account): + """Process a single Eclipse account to create an Individual.""" - if not ef_profile: - continue + ef_profile = client.fetch_account_profile(account['name']) - individual = Individual(uuid=ef_profile['uid']) + if not ef_profile: + return None - name = ef_profile['first_name'] + ' ' + ef_profile['last_name'] - email = ef_profile['mail'] + individual = Individual(uuid=ef_profile['uid']) - prf = Profile() - prf.name = name - prf.email = email + name = ef_profile['first_name'] + ' ' + ef_profile['last_name'] + email = ef_profile['mail'] - individual.profile = prf + prf = Profile() + prf.name = name + prf.email = email - eclipse_id = Identity( - source=ECLIPSE_SOURCE, + individual.profile = prf + + eclipse_id = Identity( + source=ECLIPSE_SOURCE, + name=name, + email=email, + username=ef_profile['name'], + ) + individual.identities.append(eclipse_id) + + if ef_profile['github_handle']: + idt = Identity( + source=GITHUB_SOURCE, name=name, + username=ef_profile['github_handle'], email=email, - username=ef_profile['name'], ) - individual.identities.append(eclipse_id) - - if ef_profile['github_handle']: - idt = Identity( - source=GITHUB_SOURCE, - name=name, - username=ef_profile['github_handle'], - email=email, - ) - individual.identities.append(idt) + individual.identities.append(idt) - # Fetch enrollments for the identity. If no enrollment is set - # use the organization field from the profile, if set. - employment_history = client.fetch_employment_history(account['name']) + # Fetch enrollments for the identity. If no enrollment is set + # use the organization field from the profile, if set. + employment_history = client.fetch_employment_history(account['name']) - if employment_history: - for employment in employment_history: - org = Organization(name=employment['organization_name']) - start, end = None, None + if employment_history: + for employment in employment_history: + org = Organization(name=employment['organization_name']) + start, end = None, None - if employment['start']: - start = str_to_datetime(employment['start']) - if employment['end']: - end = str_to_datetime(employment['end']) + if employment['start']: + start = str_to_datetime(employment['start']) + if employment['end']: + end = str_to_datetime(employment['end']) - enr = Enrollment(org, start=start, end=end) - individual.enrollments.append(enr) + enr = Enrollment(org, start=start, end=end) + individual.enrollments.append(enr) - if not individual.enrollments: - company = ef_profile.get('org', None) - if company: - org = Organization(name=company) - enr = Enrollment(org) - individual.enrollments.append(enr) + if not individual.enrollments: + company = ef_profile.get('org', None) + if company: + org = Organization(name=company) + enr = Enrollment(org) + individual.enrollments.append(enr) - logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}") + logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}") - yield individual + return individual def post_process_individual(self, individual, uuid): """Post processing for Eclipse identities. @@ -338,7 +371,7 @@ def _fetch_retry(self, url, params=None): response = requests.get(url, params=params, auth=self.token) response.raise_for_status() - return response + return response.json() def _authenticate(self, client_id, client_secret, scope): """Authenticate using OAuth2.