Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions osf_tests/test_archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,65 @@ def test_compact_traceback_uses_last_lines(self):
def test_compact_traceback_handles_empty(self):
assert archiver_utils.compact_traceback(None) is None

def test_compact_traceback_uses_last_chars_then_last_lines(self):
traceback_text = '\n'.join(f'line {line_num}' for line_num in range(20))
compact = archiver_utils.compact_traceback(traceback_text, max_lines=3, max_chars=45)

# max_chars keeps only the tail content, then max_lines keeps the tail lines.
assert compact == '\n'.join(['line 17', 'line 18', 'line 19'])

@mock.patch('website.archiver.tasks.sentry.log_message')
def test_archiver_task_load_archive_job_retries_with_context(self, mock_log_message):
task = ArchiverTask()
task.name = 'website.archiver.tasks.stat_addon'
task.max_retries = 3
task.retry = mock.Mock(side_effect=RuntimeError('retry requested'))
request = mock.Mock(retries=1, id='task-123', kwargs={'dst_pk': 'reg123'})

with mock.patch.object(ArchiverTask, 'request', new_callable=mock.PropertyMock, return_value=request):
with mock.patch('website.archiver.tasks.ArchiveJob.load', return_value=None):
with pytest.raises(RuntimeError, match='retry requested'):
task.load_archive_job('abc123')

retry_exception = task.retry.call_args.kwargs['exc']
assert isinstance(retry_exception, ArchiverStateError)
assert retry_exception.info['job_pk'] == 'abc123'
assert retry_exception.info['registration_id'] == 'reg123'
assert retry_exception.info['should_retry'] is True
assert not mock_log_message.called

@mock.patch('website.archiver.tasks.sentry.log_exception')
@mock.patch('website.archiver.tasks.sentry.log_message')
def test_archiver_task_load_archive_job_final_failure_logs_context(self, mock_log_message, mock_log_exception):
task = ArchiverTask()
task.name = 'website.archiver.tasks.stat_addon'
task.max_retries = 3
task.retry = mock.Mock()
request = mock.Mock(retries=3, id='task-456', kwargs={'dst_pk': 'reg456'})

with mock.patch.object(ArchiverTask, 'request', new_callable=mock.PropertyMock, return_value=request):
with mock.patch('website.archiver.tasks.ArchiveJob.load', return_value=None):
with pytest.raises(ArchiverStateError) as exc:
task.load_archive_job('def456')

assert exc.value.info['job_pk'] == 'def456'
assert exc.value.info['registration_id'] == 'reg456'
assert exc.value.info['should_retry'] is False
assert not task.retry.called
mock_log_message.assert_called_once_with(
f'ArchiveJob {exc.value.info['job_pk']} not found during archiver task execution',
extra_data={
'job_pk': 'def456',
'registration_id': 'reg456',
'task_id': 'task-456',
'task_name': task.name,
'retries': 3,
'max_retries': 3,
'should_retry': False,
},
)
mock_log_exception.assert_called_once()

@mock.patch('website.archiver.tasks.archive_addon.delay')
def test_archive_node_pass(self, mock_archive_addon):
settings.MAX_ARCHIVE_SIZE = 1024 ** 3
Expand Down
106 changes: 82 additions & 24 deletions website/archiver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,52 @@ class ArchiverTask(celery.Task):
max_retries = 0
ignore_result = False

def load_archive_job(self, job_pk, retry_if_missing=True, task_id=None, kwargs=None):
"""Load an ArchiveJob and optionally retry bound tasks if row is missing."""
job = ArchiveJob.load(job_pk)
if job:
return job

request = getattr(self, 'request', None)
request_kwargs = kwargs or getattr(request, 'kwargs', None) or {}
context = {
'job_pk': job_pk,
'registration_id': request_kwargs.get('dst_pk'),
'task_id': task_id or getattr(request, 'id', None),
'task_name': self.name,
'retries': getattr(request, 'retries', None),
'max_retries': self.max_retries,
}
should_retry = (
retry_if_missing
and context['retries'] is not None
and context['max_retries'] is not None
and context['retries'] < context['max_retries']
)
context['should_retry'] = should_retry

error = ArchiverStateError({
'error': 'ArchiveJob not found',
**context,
})
if should_retry:
raise self.retry(exc=error)

sentry.log_message(
f'ArchiveJob {job_pk} not found during archiver task execution',
extra_data=context,
)
sentry.log_exception(error)
raise error

def on_failure(self, exc, task_id, args, kwargs, einfo):
job = ArchiveJob.load(kwargs.get('job_pk'))
compact_traceback = utils.compact_traceback(einfo)
if not job:
archiver_state_exc = ArchiverStateError({
'exception': exc,
'args': args,
'kwargs': kwargs,
'einfo': compact_traceback,
})
sentry.log_exception(archiver_state_exc)
raise archiver_state_exc
job_pk = kwargs.get('job_pk')
job = self.load_archive_job(job_pk, retry_if_missing=False, task_id=task_id, kwargs=kwargs)
compact_traceback = utils.compact_traceback(
einfo,
max_lines=20,
max_chars=3000,
)

if job.status == ARCHIVER_FAILURE:
# already captured
Expand Down Expand Up @@ -161,9 +195,15 @@ def get_addon_from_gv(src_node, addon_name, requesting_user):
)


@celery_app.task(base=ArchiverTask, ignore_result=False)
@celery_app.task(
bind=True,
base=ArchiverTask,
ignore_result=False,
max_retries=3,
default_retry_delay=60,
)
@logged('stat_addon')
def stat_addon(addon_short_name, job_pk):
def stat_addon(self, addon_short_name, job_pk):
"""Collect metadata about the file tree of a given addon

:param addon_short_name: AddonConfig.short_name of the addon to be examined
Expand All @@ -178,7 +218,7 @@ def stat_addon(addon_short_name, job_pk):
addon_name = 'dataverse'
version = 'latest' if addon_short_name.split('-')[-1] == 'draft' else 'latest-published'
create_app_context()
job = ArchiveJob.load(job_pk)
job = self.load_archive_job(job_pk)
src, dst, user = job.info()

src_addon = None
Expand Down Expand Up @@ -206,9 +246,15 @@ def stat_addon(addon_short_name, job_pk):
return result


@celery_app.task(base=ArchiverTask, ignore_result=False)
@celery_app.task(
bind=True,
base=ArchiverTask,
ignore_result=False,
max_retries=3,
default_retry_delay=60,
)
@logged('make_copy_request')
def make_copy_request(job_pk, url, data):
def make_copy_request(self, job_pk, url, data):
"""Make the copy request to the WaterButler API and handle
successful and failed responses

Expand All @@ -218,7 +264,7 @@ def make_copy_request(job_pk, url, data):
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
job = self.load_archive_job(job_pk)
src, dst, user = job.info()
logger.info(f"Sending copy request for addon: {data['provider']} on node: {dst._id}")
cookie = furl(url).query.params.get('cookie')
Expand All @@ -235,9 +281,15 @@ def make_waterbutler_payload(dst_id, rename):
'provider': settings.ARCHIVE_PROVIDER,
}

@celery_app.task(base=ArchiverTask, ignore_result=False)
@celery_app.task(
bind=True,
base=ArchiverTask,
ignore_result=False,
max_retries=3,
default_retry_delay=60,
)
@logged('archive_addon')
def archive_addon(addon_short_name, job_pk):
def archive_addon(self, addon_short_name, job_pk):
"""Archive the contents of an addon by making a copy request to the
WaterButler API

Expand All @@ -246,7 +298,7 @@ def archive_addon(addon_short_name, job_pk):
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
job = self.load_archive_job(job_pk)
src, dst, user = job.info()
logger.info(f'Archiving addon: {addon_short_name} on node: {src._id}')

Expand Down Expand Up @@ -274,9 +326,15 @@ def archive_addon(addon_short_name, job_pk):
data = make_waterbutler_payload(dst._id, rename)
make_copy_request.delay(job_pk=job_pk, url=url, data=data)

@celery_app.task(base=ArchiverTask, ignore_result=False)
@celery_app.task(
bind=True,
base=ArchiverTask,
ignore_result=False,
max_retries=3,
default_retry_delay=60,
)
@logged('archive_node')
def archive_node(stat_results, job_pk):
def archive_node(self, stat_results, job_pk):
"""First use the results of #stat_node to check disk usage of the
initiated registration, then either fail the registration or
create a celery.group group of subtasks to archive addons
Expand All @@ -286,7 +344,7 @@ def archive_node(stat_results, job_pk):
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
job = self.load_archive_job(job_pk)
src, dst, user = job.info()
logger.info(f'Archiving node: {src._id}')

Expand Down Expand Up @@ -381,7 +439,7 @@ def archive_success(self, dst_pk, job_pk):
)
self.retry(exc=err)

job = ArchiveJob.load(job_pk)
job = self.load_archive_job(job_pk)
if not job.sent:
job.sent = True
job.save()
Expand Down
12 changes: 7 additions & 5 deletions website/archiver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ def compact_traceback(einfo, max_lines=25, max_chars=4000):
if not traceback_text:
return None

lines = traceback_text.splitlines()
compact = '\n'.join(lines[-max_lines:])
if len(compact) > max_chars:
compact = compact[-max_chars:]
return compact
max_lines = max(1, int(max_lines))
max_chars = max(1, int(max_chars))

# Always compact from the tail to preserve the latest failure context.
tail_text = traceback_text[-max_chars:]
lines = tail_text.splitlines()
return '\n'.join(lines[-max_lines:])