Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ SortingHat backend to import identities from Eclipse Foundation

## Requirements

- Python >= 3.9
- Python >= 3.10

You will also need some other libraries for running the tool, you can find the
whole list of dependencies in [pyproject.toml](pyproject.toml) file.
Expand Down
10 changes: 10 additions & 0 deletions releases/unreleased/parallel-processing-for-importing-accounts.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: Parallel processing for importing accounts
category: performance
author: Jose Javier Merchante <jjmerchante@bitergia.com>
issue: null
notes: >
Implement parallel processing to improve the efficiency
of account imports. By processing multiple accounts
simultaneously, we reduce the overall time taken
to fetch and process account data.
127 changes: 80 additions & 47 deletions sortinghat/core/importer/backends/eclipse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import dateutil.relativedelta
import requests

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED

from django.conf import settings
from django.db.models import (Q, Subquery)

Expand All @@ -46,6 +48,9 @@
ECLIPSE_SOURCE = "eclipsefdn"
GITHUB_SOURCE = "github"

# Parallel processing
MAX_WORKERS = 8
MAX_QUEUE_SIZE = 100

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,68 +109,96 @@ def get_individuals(self):

epoch = int(self.from_date.timestamp())

# Fetch accounts pages
for account in client.fetch_accounts(epoch=epoch):
ef_profile = client.fetch_account_profile(account['name'])
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
pending = set()

for account in client.fetch_accounts(epoch=epoch):
future = executor.submit(self.process_account, client, account)
pending.add(future)

# Wait to complete when reaching max queue size
if len(pending) >= MAX_QUEUE_SIZE:
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60)
if not done:
raise TimeoutError("Timeout waiting for Eclipse account processing")
for future in done:
individual = future.result()
if individual:
yield individual

# Process remaining futures
for future in as_completed(pending, timeout=600):
try:
individual = future.result()
if individual:
yield individual
except Exception as exc:
logger.error(f"Error processing Eclipse account; error={exc}")

@staticmethod
def process_account(client, account):
"""Process a single Eclipse account to create an Individual."""

if not ef_profile:
continue
ef_profile = client.fetch_account_profile(account['name'])

individual = Individual(uuid=ef_profile['uid'])
if not ef_profile:
return None

name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
email = ef_profile['mail']
individual = Individual(uuid=ef_profile['uid'])

prf = Profile()
prf.name = name
prf.email = email
name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
email = ef_profile['mail']

individual.profile = prf
prf = Profile()
prf.name = name
prf.email = email

eclipse_id = Identity(
source=ECLIPSE_SOURCE,
individual.profile = prf

eclipse_id = Identity(
source=ECLIPSE_SOURCE,
name=name,
email=email,
username=ef_profile['name'],
)
individual.identities.append(eclipse_id)

if ef_profile['github_handle']:
idt = Identity(
source=GITHUB_SOURCE,
name=name,
username=ef_profile['github_handle'],
email=email,
username=ef_profile['name'],
)
individual.identities.append(eclipse_id)

if ef_profile['github_handle']:
idt = Identity(
source=GITHUB_SOURCE,
name=name,
username=ef_profile['github_handle'],
email=email,
)
individual.identities.append(idt)
individual.identities.append(idt)

# Fetch enrollments for the identity. If no enrollment is set
# use the organization field from the profile, if set.
employment_history = client.fetch_employment_history(account['name'])
# Fetch enrollments for the identity. If no enrollment is set
# use the organization field from the profile, if set.
employment_history = client.fetch_employment_history(account['name'])

if employment_history:
for employment in employment_history:
org = Organization(name=employment['organization_name'])
start, end = None, None
if employment_history:
for employment in employment_history:
org = Organization(name=employment['organization_name'])
start, end = None, None

if employment['start']:
start = str_to_datetime(employment['start'])
if employment['end']:
end = str_to_datetime(employment['end'])
if employment['start']:
start = str_to_datetime(employment['start'])
if employment['end']:
end = str_to_datetime(employment['end'])

enr = Enrollment(org, start=start, end=end)
individual.enrollments.append(enr)
enr = Enrollment(org, start=start, end=end)
individual.enrollments.append(enr)

if not individual.enrollments:
company = ef_profile.get('org', None)
if company:
org = Organization(name=company)
enr = Enrollment(org)
individual.enrollments.append(enr)
if not individual.enrollments:
company = ef_profile.get('org', None)
if company:
org = Organization(name=company)
enr = Enrollment(org)
individual.enrollments.append(enr)

logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")
logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")

yield individual
return individual

def post_process_individual(self, individual, uuid):
"""Post processing for Eclipse identities.
Expand Down Expand Up @@ -338,7 +371,7 @@ def _fetch_retry(self, url, params=None):
response = requests.get(url, params=params, auth=self.token)
response.raise_for_status()

return response
return response.json()

def _authenticate(self, client_id, client_secret, scope):
"""Authenticate using OAuth2.
Expand Down