Skip to content

Commit 48b47c4

Browse files
committed
Add parallel processing for importing accounts
This change implements parallel processing using ThreadPoolExecutor to improve the efficiency of account imports. By processing multiple accounts simultaneously, we reduce the overall time taken to fetch and process account data. Signed-off-by: Jose Javier Merchante <jjmerchante@bitergia.com>
1 parent 2ff4be2 commit 48b47c4

3 files changed

Lines changed: 91 additions & 48 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ SortingHat backend to import identities from Eclipse Foundation
44

55
## Requirements
66

7-
- Python >= 3.9
7+
- Python >= 3.10
88

99
You will also need some other libraries for running the tool, you can find the
1010
whole list of dependencies in [pyproject.toml](pyproject.toml) file.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
title: Parallel processing for importing accounts
3+
category: performance
4+
author: Jose Javier Merchante <jjmerchante@bitergia.com>
5+
issue: null
6+
notes: >
7+
Implement parallel processing to improve the efficiency
8+
of account imports. By processing multiple accounts
9+
simultaneously, we reduce the overall time taken
10+
to fetch and process account data.

sortinghat/core/importer/backends/eclipse.py

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import dateutil.relativedelta
2121
import requests
2222

23+
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
24+
2325
from django.conf import settings
2426
from django.db.models import (Q, Subquery)
2527

@@ -46,6 +48,9 @@
4648
ECLIPSE_SOURCE = "eclipsefdn"
4749
GITHUB_SOURCE = "github"
4850

51+
# Parallel processing
52+
MAX_WORKERS = 8
53+
MAX_QUEUE_SIZE = 100
4954

5055
logger = logging.getLogger(__name__)
5156

@@ -104,68 +109,96 @@ def get_individuals(self):
104109

105110
epoch = int(self.from_date.timestamp())
106111

107-
# Fetch accounts pages
108-
for account in client.fetch_accounts(epoch=epoch):
109-
ef_profile = client.fetch_account_profile(account['name'])
112+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
113+
pending = set()
114+
115+
for account in client.fetch_accounts(epoch=epoch):
116+
future = executor.submit(self.process_account, client, account)
117+
pending.add(future)
118+
119+
# Wait to complete when reaching max queue size
120+
if len(pending) >= MAX_QUEUE_SIZE:
121+
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60)
122+
if not done:
123+
raise TimeoutError("Timeout waiting for Eclipse account processing")
124+
for future in done:
125+
individual = future.result()
126+
if individual:
127+
yield individual
128+
129+
# Process remaining futures
130+
for future in as_completed(pending, timeout=600):
131+
try:
132+
individual = future.result()
133+
if individual:
134+
yield individual
135+
except Exception as exc:
136+
logger.error(f"Error processing Eclipse account; error={exc}")
137+
138+
@staticmethod
139+
def process_account(client, account):
140+
"""Process a single Eclipse account to create an Individual."""
110141

111-
if not ef_profile:
112-
continue
142+
ef_profile = client.fetch_account_profile(account['name'])
113143

114-
individual = Individual(uuid=ef_profile['uid'])
144+
if not ef_profile:
145+
return None
115146

116-
name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
117-
email = ef_profile['mail']
147+
individual = Individual(uuid=ef_profile['uid'])
118148

119-
prf = Profile()
120-
prf.name = name
121-
prf.email = email
149+
name = ef_profile['first_name'] + ' ' + ef_profile['last_name']
150+
email = ef_profile['mail']
122151

123-
individual.profile = prf
152+
prf = Profile()
153+
prf.name = name
154+
prf.email = email
124155

125-
eclipse_id = Identity(
126-
source=ECLIPSE_SOURCE,
156+
individual.profile = prf
157+
158+
eclipse_id = Identity(
159+
source=ECLIPSE_SOURCE,
160+
name=name,
161+
email=email,
162+
username=ef_profile['name'],
163+
)
164+
individual.identities.append(eclipse_id)
165+
166+
if ef_profile['github_handle']:
167+
idt = Identity(
168+
source=GITHUB_SOURCE,
127169
name=name,
170+
username=ef_profile['github_handle'],
128171
email=email,
129-
username=ef_profile['name'],
130172
)
131-
individual.identities.append(eclipse_id)
132-
133-
if ef_profile['github_handle']:
134-
idt = Identity(
135-
source=GITHUB_SOURCE,
136-
name=name,
137-
username=ef_profile['github_handle'],
138-
email=email,
139-
)
140-
individual.identities.append(idt)
173+
individual.identities.append(idt)
141174

142-
# Fetch enrollments for the identity. If no enrollment is set
143-
# use the organization field from the profile, if set.
144-
employment_history = client.fetch_employment_history(account['name'])
175+
# Fetch enrollments for the identity. If no enrollment is set
176+
# use the organization field from the profile, if set.
177+
employment_history = client.fetch_employment_history(account['name'])
145178

146-
if employment_history:
147-
for employment in employment_history:
148-
org = Organization(name=employment['organization_name'])
149-
start, end = None, None
179+
if employment_history:
180+
for employment in employment_history:
181+
org = Organization(name=employment['organization_name'])
182+
start, end = None, None
150183

151-
if employment['start']:
152-
start = str_to_datetime(employment['start'])
153-
if employment['end']:
154-
end = str_to_datetime(employment['end'])
184+
if employment['start']:
185+
start = str_to_datetime(employment['start'])
186+
if employment['end']:
187+
end = str_to_datetime(employment['end'])
155188

156-
enr = Enrollment(org, start=start, end=end)
157-
individual.enrollments.append(enr)
189+
enr = Enrollment(org, start=start, end=end)
190+
individual.enrollments.append(enr)
158191

159-
if not individual.enrollments:
160-
company = ef_profile.get('org', None)
161-
if company:
162-
org = Organization(name=company)
163-
enr = Enrollment(org)
164-
individual.enrollments.append(enr)
192+
if not individual.enrollments:
193+
company = ef_profile.get('org', None)
194+
if company:
195+
org = Organization(name=company)
196+
enr = Enrollment(org)
197+
individual.enrollments.append(enr)
165198

166-
logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")
199+
logger.info(f"Eclipse account processed; account={account['name']}; changed={account['changed']}")
167200

168-
yield individual
201+
return individual
169202

170203
def post_process_individual(self, individual, uuid):
171204
"""Post processing for Eclipse identities.
@@ -338,7 +371,7 @@ def _fetch_retry(self, url, params=None):
338371
response = requests.get(url, params=params, auth=self.token)
339372
response.raise_for_status()
340373

341-
return response
374+
return response.json()
342375

343376
def _authenticate(self, client_id, client_secret, scope):
344377
"""Authenticate using OAuth2.

0 commit comments

Comments
 (0)