Skip to content

Commit 6170486

Browse files
committed
Fetch accounts ordered by UID
This commits ensures that accounts are fetched in order by their UID. Ordering by UID is important when thousand of accounts share the same timestamp. Additionally, the from_page argument has been introduced to allow resuming the fetching process from a specific page instead of always starting from the beginning. Signed-off-by: Jose Javier Merchante <jjmerchante@bitergia.com>
1 parent 73cc640 commit 6170486

2 files changed

Lines changed: 33 additions & 10 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
title: Account fetching ordered by UID
3+
category: fixed
4+
author: Jose Javier Merchante <jjmerchante@bitergia.com>
5+
issue: null
6+
notes: >
7+
Sort accounts by UID and adds support for resuming from a specific page.

sortinghat/core/importer/backends/eclipse.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import logging
19+
import time
1920

2021
import dateutil.relativedelta
2122
import requests
@@ -52,6 +53,8 @@
5253
MAX_WORKERS = 8
5354
MAX_QUEUE_SIZE = 100
5455

56+
REQUEST_TIMEOUT = 30
57+
5558
logger = logging.getLogger(__name__)
5659

5760

@@ -78,7 +81,7 @@ class EclipseFoundationAccountsImporter(IdentitiesImporter):
7881
"""
7982
NAME = "EclipseFoundation"
8083

81-
def __init__(self, ctx, url, from_date=None):
84+
def __init__(self, ctx, url, from_date=None, from_page=1):
8285
super().__init__(ctx, url)
8386

8487
min_date = datetime_utcnow() - dateutil.relativedelta.relativedelta(years=1)
@@ -90,6 +93,11 @@ def __init__(self, ctx, url, from_date=None):
9093
else:
9194
self.from_date = from_date
9295

96+
if not from_page:
97+
self.from_page = 1
98+
else:
99+
self.from_page = int(from_page)
100+
93101
if self.from_date < min_date:
94102
msg = (
95103
"Invalid 'from_date' value. It can only import identities updated since a year ago."
@@ -112,19 +120,22 @@ def get_individuals(self):
112120
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
113121
pending = set()
114122

115-
for account in client.fetch_accounts(epoch=epoch):
123+
for account in client.fetch_accounts(epoch=epoch, from_page=self.from_page):
116124
future = executor.submit(self.process_account, client, account)
117125
pending.add(future)
118126

119127
# Wait to complete when reaching max queue size
120128
if len(pending) >= MAX_QUEUE_SIZE:
121-
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60)
129+
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=120)
122130
if not done:
123131
raise TimeoutError("Timeout waiting for Eclipse account processing")
124132
for future in done:
125-
individual = future.result()
126-
if individual:
127-
yield individual
133+
try:
134+
individual = future.result()
135+
if individual:
136+
yield individual
137+
except Exception as exc:
138+
logger.error(f"Error processing Eclipse account {account}; error={exc}")
128139

129140
# Process remaining futures
130141
for future in as_completed(pending, timeout=600):
@@ -268,10 +279,10 @@ def logout(self):
268279

269280
self.token = None
270281

271-
def fetch_accounts(self, epoch):
282+
def fetch_accounts(self, epoch, from_page=1):
272283
"""Fetch accounts updated from a given UNIX time."""
273284

274-
page = 1
285+
page = from_page
275286
total_accounts = 0
276287

277288
logger.info(f"Fetching accounts from API; url={self.ECLIPSE_ACCOUNTS_URL}, epoch={epoch}")
@@ -282,10 +293,14 @@ def fetch_accounts(self, epoch):
282293
'since': epoch,
283294
'page': page,
284295
'pagesize': 100,
296+
'sortby': 'uid',
285297
}
286298

287299
logger.debug(f"Fetching accounts from API; url={url}, params={params}")
288300
data = self._fetch(url, params=params)
301+
if not data:
302+
logger.error(f"No data returned from API; url={url}, params={params}")
303+
continue
289304

290305
for account in data['result']:
291306
yield account
@@ -349,7 +364,7 @@ def _fetch_retry(self, url, params=None):
349364

350365
while retries < max_retries:
351366
try:
352-
response = requests.get(url, params=params, auth=self.token)
367+
response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT)
353368
except ExpiredAccessToken:
354369
# Refresh token and try again
355370
self.login(self.user_id, self.password)
@@ -365,10 +380,11 @@ def _fetch_retry(self, url, params=None):
365380
elif 500 <= response.status_code < 600:
366381
# Errors could have been related to server overloading
367382
retries += 1
383+
time.sleep(2 ** retries)
368384
else:
369385
response.raise_for_status()
370386

371-
response = requests.get(url, params=params, auth=self.token)
387+
response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT)
372388
response.raise_for_status()
373389

374390
return response.json()

0 commit comments

Comments
 (0)