From 6170486b28ea2102f0a160e9ca0debef9e00a446 Mon Sep 17 00:00:00 2001 From: Jose Javier Merchante Date: Tue, 11 Nov 2025 10:39:02 +0100 Subject: [PATCH] Fetch accounts ordered by UID This commits ensures that accounts are fetched in order by their UID. Ordering by UID is important when thousand of accounts share the same timestamp. Additionally, the from_page argument has been introduced to allow resuming the fetching process from a specific page instead of always starting from the beginning. Signed-off-by: Jose Javier Merchante --- .../account-fetching-ordered-by-uid.yml | 7 ++++ sortinghat/core/importer/backends/eclipse.py | 36 +++++++++++++------ 2 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 releases/unreleased/account-fetching-ordered-by-uid.yml diff --git a/releases/unreleased/account-fetching-ordered-by-uid.yml b/releases/unreleased/account-fetching-ordered-by-uid.yml new file mode 100644 index 0000000..6a6b023 --- /dev/null +++ b/releases/unreleased/account-fetching-ordered-by-uid.yml @@ -0,0 +1,7 @@ +--- +title: Account fetching ordered by UID +category: fixed +author: Jose Javier Merchante +issue: null +notes: > + Sort accounts by UID and adds support for resuming from a specific page. diff --git a/sortinghat/core/importer/backends/eclipse.py b/sortinghat/core/importer/backends/eclipse.py index e756c4a..7ec2bbe 100644 --- a/sortinghat/core/importer/backends/eclipse.py +++ b/sortinghat/core/importer/backends/eclipse.py @@ -16,6 +16,7 @@ # import logging +import time import dateutil.relativedelta import requests @@ -52,6 +53,8 @@ MAX_WORKERS = 8 MAX_QUEUE_SIZE = 100 +REQUEST_TIMEOUT = 30 + logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EclipseFoundationAccountsImporter(IdentitiesImporter): """ NAME = "EclipseFoundation" - def __init__(self, ctx, url, from_date=None): + def __init__(self, ctx, url, from_date=None, from_page=1): super().__init__(ctx, url) min_date = datetime_utcnow() - dateutil.relativedelta.relativedelta(years=1) @@ -90,6 +93,11 @@ def __init__(self, ctx, url, from_date=None): else: self.from_date = from_date + if not from_page: + self.from_page = 1 + else: + self.from_page = int(from_page) + if self.from_date < min_date: msg = ( "Invalid 'from_date' value. It can only import identities updated since a year ago." @@ -112,19 +120,22 @@ def get_individuals(self): with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: pending = set() - for account in client.fetch_accounts(epoch=epoch): + for account in client.fetch_accounts(epoch=epoch, from_page=self.from_page): future = executor.submit(self.process_account, client, account) pending.add(future) # Wait to complete when reaching max queue size if len(pending) >= MAX_QUEUE_SIZE: - done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60) + done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=120) if not done: raise TimeoutError("Timeout waiting for Eclipse account processing") for future in done: - individual = future.result() - if individual: - yield individual + try: + individual = future.result() + if individual: + yield individual + except Exception as exc: + logger.error(f"Error processing Eclipse account {account}; error={exc}") # Process remaining futures for future in as_completed(pending, timeout=600): @@ -268,10 +279,10 @@ def logout(self): self.token = None - def fetch_accounts(self, epoch): + def fetch_accounts(self, epoch, from_page=1): """Fetch accounts updated from a given UNIX time.""" - page = 1 + page = from_page total_accounts = 0 logger.info(f"Fetching accounts from API; url={self.ECLIPSE_ACCOUNTS_URL}, epoch={epoch}") @@ -282,10 +293,14 @@ def fetch_accounts(self, epoch): 'since': epoch, 'page': page, 'pagesize': 100, + 'sortby': 'uid', } logger.debug(f"Fetching accounts from API; url={url}, params={params}") data = self._fetch(url, params=params) + if not data: + logger.error(f"No data returned from API; url={url}, params={params}") + continue for account in data['result']: yield account @@ -349,7 +364,7 @@ def _fetch_retry(self, url, params=None): while retries < max_retries: try: - response = requests.get(url, params=params, auth=self.token) + response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT) except ExpiredAccessToken: # Refresh token and try again self.login(self.user_id, self.password) @@ -365,10 +380,11 @@ def _fetch_retry(self, url, params=None): elif 500 <= response.status_code < 600: # Errors could have been related to server overloading retries += 1 + time.sleep(2 ** retries) else: response.raise_for_status() - response = requests.get(url, params=params, auth=self.token) + response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT) response.raise_for_status() return response.json()