Skip to content

Commit 73cc640

Browse files
authored
Merge branch 'parallel-processing' of 'https://github.com/jjmerchante/sortinghat-eclipse-foundation'
Merges #9 Closes #9
2 parents 2ff4be2 + 48b47c4 commit 73cc640

3 files changed

Lines changed: 91 additions & 48 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ SortingHat backend to import identities from Eclipse Foundation
44

55
## Requirements
66

7-
- Python >= 3.9
7+
- Python >= 3.10
88

99
You will also need some other libraries for running the tool, you can find the
1010
whole list of dependencies in [pyproject.toml](pyproject.toml) file.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
title: Parallel processing for importing accounts
3+
category: performance
4+
author: Jose Javier Merchante <jjmerchante@bitergia.com>
5+
issue: null
6+
notes: >
7+
Implement parallel processing to improve the efficiency
8+
of account imports. By processing multiple accounts
9+
simultaneously, we reduce the overall time taken
10+
to fetch and process account data.

sortinghat/core/importer/backends/eclipse.py

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import dateutil.relativedelta
2121
import requests
2222

23+
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
24+
2325
from django.conf import settings
2426
from django.db.models import (Q, Subquery)
2527

@@ -46,6 +48,9 @@
4648
ECLIPSE_SOURCE = "eclipsefdn"
4749
GITHUB_SOURCE = "github"
4850

51+
# Parallel processing
52+
MAX_WORKERS = 8
53+
MAX_QUEUE_SIZE = 100
4954

5055
logger = logging.getLogger(__name__)
5156

@@ -104,68 +109,96 @@ def get_individuals(self):
104109

105110
epoch = int(self.from_date.timestamp())
106111

107-
# Fetch accounts pages
108-
for account in client.fetch_accounts(epoch=epoch):
109-
ef_profile = client.fetch_account_profile(account['name'])
112+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
113+
pending = set()
114+
115+
for account in client.fetch_accounts(epoch=epoch):
116+
future = executor.submit(self.process_account, client, account)
117+
pending.add(future)
118+
119+
# Wait to complete when reaching max queue size
120+
if len(pending) >= MAX_QUEUE_SIZE:
121+
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60)
122+
if not done:
123+
raise TimeoutError("Timeout waiting for Eclipse account processing")
124+
for future in done:
125+
individual = future.result()
126+
if individual:
127+
yield individual
128+
129+
# Process remaining futures
130+
for future in as_completed(pending, timeout=600):
131+
try:
132+
individual = future.result()
133+
if individual:
134+
yield individual
135+
except Exception as exc:
136+
logger.error(f"Error processing Eclipse account; error={exc}")
137+
138+
@staticmethod
139+
def process_account(client, account):
140+
"""Process a single Eclipse account to create an Individual."""
110141

111-
if not ef_profile:
112-
continue
142+
ef_profile = client.fetch_account_profile(account['name'])
113143

114-
individual = Individual(uuid=ef_profile['uid'])
144+
if not ef_profile:
145+
return None
115146

116-
name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
117-
email = ef_profile['mail']
147+
individual = Individual(uuid=ef_profile['uid'])
118148

119-
prf = Profile()
120-
prf.name = name
121-
prf.email = email
149+
name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
150+
email = ef_profile['mail']
122151

123-
individual.profile = prf
152+
prf = Profile()
153+
prf.name = name
154+
prf.email = email
124155

125-
eclipse_id = Identity(
126-
source=ECLIPSE_SOURCE,
156+
individual.profile = prf
157+
158+
eclipse_id = Identity(
159+
source=ECLIPSE_SOURCE,
160+
name=name,
161+
email=email,
162+
username=ef_profile['name'],
163+
)
164+
individual.identities.append(eclipse_id)
165+
166+
if ef_profile['github_handle']:
167+
idt = Identity(
168+
source=GITHUB_SOURCE,
127169
name=name,
170+
username=ef_profile['github_handle'],
128171
email=email,
129-
username=ef_profile['name'],
130172
)
131-
individual.identities.append(eclipse_id)
132-
133-
if ef_profile['github_handle']:
134-
idt = Identity(
135-
source=GITHUB_SOURCE,
136-
name=name,
137-
username=ef_profile['github_handle'],
138-
email=email,
139-
)
140-
individual.identities.append(idt)
173+
individual.identities.append(idt)
141174

142-
# Fetch enrollments for the identity. If no enrollment is set
143-
# use the organization field from the profile, if set.
144-
employment_history = client.fetch_employment_history(account['name'])
175+
# Fetch enrollments for the identity. If no enrollment is set
176+
# use the organization field from the profile, if set.
177+
employment_history = client.fetch_employment_history(account['name'])
145178

146-
if employment_history:
147-
for employment in employment_history:
148-
org = Organization(name=employment['organization_name'])
149-
start, end = None, None
179+
if employment_history:
180+
for employment in employment_history:
181+
org = Organization(name=employment['organization_name'])
182+
start, end = None, None
150183

151-
if employment['start']:
152-
start = str_to_datetime(employment['start'])
153-
if employment['end']:
154-
end = str_to_datetime(employment['end'])
184+
if employment['start']:
185+
start = str_to_datetime(employment['start'])
186+
if employment['end']:
187+
end = str_to_datetime(employment['end'])
155188

156-
enr = Enrollment(org, start=start, end=end)
157-
individual.enrollments.append(enr)
189+
enr = Enrollment(org, start=start, end=end)
190+
individual.enrollments.append(enr)
158191

159-
if not individual.enrollments:
160-
company = ef_profile.get('org', None)
161-
if company:
162-
org = Organization(name=company)
163-
enr = Enrollment(org)
164-
individual.enrollments.append(enr)
192+
if not individual.enrollments:
193+
company = ef_profile.get('org', None)
194+
if company:
195+
org = Organization(name=company)
196+
enr = Enrollment(org)
197+
individual.enrollments.append(enr)
165198

166-
logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")
199+
logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")
167200

168-
yield individual
201+
return individual
169202

170203
def post_process_individual(self, individual, uuid):
171204
"""Post processing for Eclipse identities.
@@ -338,7 +371,7 @@ def _fetch_retry(self, url, params=None):
338371
response = requests.get(url, params=params, auth=self.token)
339372
response.raise_for_status()
340373

341-
return response
374+
return response.json()
342375

343376
def _authenticate(self, client_id, client_secret, scope):
344377
"""Authenticate using OAuth2.

0 commit comments

Comments
 (0)