Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions releases/unreleased/account-fetching-ordered-by-uid.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: Account fetching ordered by UID
category: fixed
author: Jose Javier Merchante <jjmerchante@bitergia.com>
issue: null
notes: >
Sort accounts by UID and adds support for resuming from a specific page.
36 changes: 26 additions & 10 deletions sortinghat/core/importer/backends/eclipse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import time

import dateutil.relativedelta
import requests
Expand Down Expand Up @@ -52,6 +53,8 @@
MAX_WORKERS = 8
MAX_QUEUE_SIZE = 100

REQUEST_TIMEOUT = 30

logger = logging.getLogger(__name__)


Expand All @@ -78,7 +81,7 @@ class EclipseFoundationAccountsImporter(IdentitiesImporter):
"""
NAME = "EclipseFoundation"

def __init__(self, ctx, url, from_date=None):
def __init__(self, ctx, url, from_date=None, from_page=1):
super().__init__(ctx, url)

min_date = datetime_utcnow() - dateutil.relativedelta.relativedelta(years=1)
Expand All @@ -90,6 +93,11 @@ def __init__(self, ctx, url, from_date=None):
else:
self.from_date = from_date

if not from_page:
self.from_page = 1
else:
self.from_page = int(from_page)

if self.from_date < min_date:
msg = (
"Invalid 'from_date' value. It can only import identities updated since a year ago."
Expand All @@ -112,19 +120,22 @@ def get_individuals(self):
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
pending = set()

for account in client.fetch_accounts(epoch=epoch):
for account in client.fetch_accounts(epoch=epoch, from_page=self.from_page):
future = executor.submit(self.process_account, client, account)
pending.add(future)

# Wait to complete when reaching max queue size
if len(pending) >= MAX_QUEUE_SIZE:
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=60)
done, pending = wait(pending, return_when=FIRST_COMPLETED, timeout=120)
if not done:
raise TimeoutError("Timeout waiting for Eclipse account processing")
for future in done:
individual = future.result()
if individual:
yield individual
try:
individual = future.result()
if individual:
yield individual
except Exception as exc:
logger.error(f"Error processing Eclipse account {account}; error={exc}")

# Process remaining futures
for future in as_completed(pending, timeout=600):
Expand Down Expand Up @@ -268,10 +279,10 @@ def logout(self):

self.token = None

def fetch_accounts(self, epoch):
def fetch_accounts(self, epoch, from_page=1):
"""Fetch accounts updated from a given UNIX time."""

page = 1
page = from_page
total_accounts = 0

logger.info(f"Fetching accounts from API; url={self.ECLIPSE_ACCOUNTS_URL}, epoch={epoch}")
Expand All @@ -282,10 +293,14 @@ def fetch_accounts(self, epoch):
'since': epoch,
'page': page,
'pagesize': 100,
'sortby': 'uid',
}

logger.debug(f"Fetching accounts from API; url={url}, params={params}")
data = self._fetch(url, params=params)
if not data:
logger.error(f"No data returned from API; url={url}, params={params}")
continue

for account in data['result']:
yield account
Expand Down Expand Up @@ -349,7 +364,7 @@ def _fetch_retry(self, url, params=None):

while retries < max_retries:
try:
response = requests.get(url, params=params, auth=self.token)
response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT)
except ExpiredAccessToken:
# Refresh token and try again
self.login(self.user_id, self.password)
Expand All @@ -365,10 +380,11 @@ def _fetch_retry(self, url, params=None):
elif 500 <= response.status_code < 600:
# Errors could have been related to server overloading
retries += 1
time.sleep(2 ** retries)
else:
response.raise_for_status()

response = requests.get(url, params=params, auth=self.token)
response = requests.get(url, params=params, auth=self.token, timeout=REQUEST_TIMEOUT)
response.raise_for_status()

return response.json()
Expand Down