diff --git a/releases/unreleased/account-fetching-ordered-by-uid.yml b/releases/unreleased/account-fetching-ordered-by-uid.yml new file mode 100644 index 0000000..6a6b023 --- /dev/null +++ b/releases/unreleased/account-fetching-ordered-by-uid.yml @@ -0,0 +1,7 @@ +--- +title: Account fetching ordered by UID +category: fixed +author: Jose Javier Merchante +issue: null +notes: > + Sort accounts by UID and adds support for resuming from a specific page. diff --git a/sortinghat/core/importer/backends/eclipse.py b/sortinghat/core/importer/backends/eclipse.py index e756c4a..7ec2bbe 100644 --- a/sortinghat/core/importer/backends/eclipse.py +++ b/sortinghat/core/importer/backends/eclipse.py @@ -16,6 +16,7 @@ # import logging +import time import dateutil.relativedelta import requests @@ -52,6 +53,8 @@ MAX_WORKERS = 8 MAX_QUEUE_SIZE = 100 +REQUEST_TIMEOUT = 30 + logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EclipseFoundationAccountsImporter(IdentitiesImporter): """ NAME = "EclipseFoundation" - def __init__(self, ctx, url, from_date=None): + def __init__(self, ctx, url, from_date=None, from_page=1): super().__init__(ctx, url) min_date = datetime_utcnow() - dateutil.relativedelta.relativedelta(years=1) @@ -90,6 +93,11 @@ def __init__(self, ctx, url, from_date=None): else: self.from_date = from_date + if not from_page: + self.from_page = 1 + else: + self.from_page = int(from_page) + if self.from_date < min_date: msg = ( "Invalid 'from_date' value. It can only import identities updated since a year ago." @@ -112,19 +120,22 @@ def get_individuals(self): with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: pending = set() - for account in client.fetch_accounts(epoch=epoch): + for account in client.fetch_accounts(epoch=epoch, from_page=self.from_page): future = executor.submit(self.process_account, client, account) pending.add(future) # Wait to complete when reaching max queue size if len(pending) >= MAX_QUEUE_SIZE: - done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60) + done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=120) if not done: raise TimeoutError("Timeout waiting for Eclipse account processing") for future in done: - individual = future.result() - if individual: - yield individual + try: + individual = future.result() + if individual: + yield individual + except Exception as exc: + logger.error(f"Error processing Eclipse account {account}; error={exc}") # Process remaining futures for future in as_completed(pending, timeout=600): @@ -268,10 +279,10 @@ def logout(self): self.token = None - def fetch_accounts(self, epoch): + def fetch_accounts(self, epoch, from_page=1): """Fetch accounts updated from a given UNIX time.""" - page = 1 + page = from_page total_accounts = 0 logger.info(f"Fetching accounts from API; url={self.ECLIPSE_ACCOUNTS_URL}, epoch={epoch}") @@ -282,10 +293,14 @@ def fetch_accounts(self, epoch): 'since': epoch, 'page': page, 'pagesize': 100, + 'sortby': 'uid', } logger.debug(f"Fetching accounts from API; url={url}, params={params}") data = self._fetch(url, params=params) + if not data: + logger.error(f"No data returned from API; url={url}, params={params}") + continue for account in data['result']: yield account @@ -349,7 +364,7 @@ def _fetch_retry(self, url, params=None): while retries < max_retries: try: - response = requests.get(url, params=params, auth=self.token) + response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT) except ExpiredAccessToken: # Refresh token and try again self.login(self.user_id, self.password) @@ -365,10 +380,11 @@ def _fetch_retry(self, url, params=None): elif 500 <= response.status_code < 600: # Errors could have been related to server overloading retries += 1 + time.sleep(2 ** retries) else: response.raise_for_status() - response = requests.get(url, params=params, auth=self.token) + response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT) response.raise_for_status() return response.json()