diff --git a/osf_tests/test_archiver.py b/osf_tests/test_archiver.py index 5b67375a5a2..94c8b4f2827 100644 --- a/osf_tests/test_archiver.py +++ b/osf_tests/test_archiver.py @@ -467,6 +467,65 @@ def test_compact_traceback_uses_last_lines(self): def test_compact_traceback_handles_empty(self): assert archiver_utils.compact_traceback(None) is None + def test_compact_traceback_uses_last_chars_then_last_lines(self): + traceback_text = '\n'.join(f'line {line_num}' for line_num in range(20)) + compact = archiver_utils.compact_traceback(traceback_text, max_lines=3, max_chars=45) + + # max_chars keeps only the tail content, then max_lines keeps the tail lines. + assert compact == '\n'.join(['line 17', 'line 18', 'line 19']) + + @mock.patch('website.archiver.tasks.sentry.log_message') + def test_archiver_task_load_archive_job_retries_with_context(self, mock_log_message): + task = ArchiverTask() + task.name = 'website.archiver.tasks.stat_addon' + task.max_retries = 3 + task.retry = mock.Mock(side_effect=RuntimeError('retry requested')) + request = mock.Mock(retries=1, id='task-123', kwargs={'dst_pk': 'reg123'}) + + with mock.patch.object(ArchiverTask, 'request', new_callable=mock.PropertyMock, return_value=request): + with mock.patch('website.archiver.tasks.ArchiveJob.load', return_value=None): + with pytest.raises(RuntimeError, match='retry requested'): + task.load_archive_job('abc123') + + retry_exception = task.retry.call_args.kwargs['exc'] + assert isinstance(retry_exception, ArchiverStateError) + assert retry_exception.info['job_pk'] == 'abc123' + assert retry_exception.info['registration_id'] == 'reg123' + assert retry_exception.info['should_retry'] is True + assert not mock_log_message.called + + @mock.patch('website.archiver.tasks.sentry.log_exception') + @mock.patch('website.archiver.tasks.sentry.log_message') + def test_archiver_task_load_archive_job_final_failure_logs_context(self, mock_log_message, mock_log_exception): + task = ArchiverTask() + task.name = 'website.archiver.tasks.stat_addon' + task.max_retries = 3 + task.retry = mock.Mock() + request = mock.Mock(retries=3, id='task-456', kwargs={'dst_pk': 'reg456'}) + + with mock.patch.object(ArchiverTask, 'request', new_callable=mock.PropertyMock, return_value=request): + with mock.patch('website.archiver.tasks.ArchiveJob.load', return_value=None): + with pytest.raises(ArchiverStateError) as exc: + task.load_archive_job('def456') + + assert exc.value.info['job_pk'] == 'def456' + assert exc.value.info['registration_id'] == 'reg456' + assert exc.value.info['should_retry'] is False + assert not task.retry.called + mock_log_message.assert_called_once_with( + f'ArchiveJob {exc.value.info['job_pk']} not found during archiver task execution', + extra_data={ + 'job_pk': 'def456', + 'registration_id': 'reg456', + 'task_id': 'task-456', + 'task_name': task.name, + 'retries': 3, + 'max_retries': 3, + 'should_retry': False, + }, + ) + mock_log_exception.assert_called_once() + @mock.patch('website.archiver.tasks.archive_addon.delay') def test_archive_node_pass(self, mock_archive_addon): settings.MAX_ARCHIVE_SIZE = 1024 ** 3 diff --git a/website/archiver/tasks.py b/website/archiver/tasks.py index 407585527e2..7608b5c02c6 100644 --- a/website/archiver/tasks.py +++ b/website/archiver/tasks.py @@ -87,18 +87,52 @@ class ArchiverTask(celery.Task): max_retries = 0 ignore_result = False + def load_archive_job(self, job_pk, retry_if_missing=True, task_id=None, kwargs=None): + """Load an ArchiveJob and optionally retry bound tasks if row is missing.""" + job = ArchiveJob.load(job_pk) + if job: + return job + + request = getattr(self, 'request', None) + request_kwargs = kwargs or getattr(request, 'kwargs', None) or {} + context = { + 'job_pk': job_pk, + 'registration_id': request_kwargs.get('dst_pk'), + 'task_id': task_id or getattr(request, 'id', None), + 'task_name': self.name, + 'retries': getattr(request, 'retries', None), + 'max_retries': self.max_retries, + } + should_retry = ( + retry_if_missing + and context['retries'] is not None + and context['max_retries'] is not None + and context['retries'] < context['max_retries'] + ) + context['should_retry'] = should_retry + + error = ArchiverStateError({ + 'error': 'ArchiveJob not found', + **context, + }) + if should_retry: + raise self.retry(exc=error) + + sentry.log_message( + f'ArchiveJob {job_pk} not found during archiver task execution', + extra_data=context, + ) + sentry.log_exception(error) + raise error + def on_failure(self, exc, task_id, args, kwargs, einfo): - job = ArchiveJob.load(kwargs.get('job_pk')) - compact_traceback = utils.compact_traceback(einfo) - if not job: - archiver_state_exc = ArchiverStateError({ - 'exception': exc, - 'args': args, - 'kwargs': kwargs, - 'einfo': compact_traceback, - }) - sentry.log_exception(archiver_state_exc) - raise archiver_state_exc + job_pk = kwargs.get('job_pk') + job = self.load_archive_job(job_pk, retry_if_missing=False, task_id=task_id, kwargs=kwargs) + compact_traceback = utils.compact_traceback( + einfo, + max_lines=20, + max_chars=3000, + ) if job.status == ARCHIVER_FAILURE: # already captured @@ -161,9 +195,15 @@ def get_addon_from_gv(src_node, addon_name, requesting_user): ) -@celery_app.task(base=ArchiverTask, ignore_result=False) +@celery_app.task( + bind=True, + base=ArchiverTask, + ignore_result=False, + max_retries=3, + default_retry_delay=60, +) @logged('stat_addon') -def stat_addon(addon_short_name, job_pk): +def stat_addon(self, addon_short_name, job_pk): """Collect metadata about the file tree of a given addon :param addon_short_name: AddonConfig.short_name of the addon to be examined @@ -178,7 +218,7 @@ def stat_addon(addon_short_name, job_pk): addon_name = 'dataverse' version = 'latest' if addon_short_name.split('-')[-1] == 'draft' else 'latest-published' create_app_context() - job = ArchiveJob.load(job_pk) + job = self.load_archive_job(job_pk) src, dst, user = job.info() src_addon = None @@ -206,9 +246,15 @@ def stat_addon(addon_short_name, job_pk): return result -@celery_app.task(base=ArchiverTask, ignore_result=False) +@celery_app.task( + bind=True, + base=ArchiverTask, + ignore_result=False, + max_retries=3, + default_retry_delay=60, +) @logged('make_copy_request') -def make_copy_request(job_pk, url, data): +def make_copy_request(self, job_pk, url, data): """Make the copy request to the WaterButler API and handle successful and failed responses @@ -218,7 +264,7 @@ def make_copy_request(job_pk, url, data): :return: None """ create_app_context() - job = ArchiveJob.load(job_pk) + job = self.load_archive_job(job_pk) src, dst, user = job.info() logger.info(f"Sending copy request for addon: {data['provider']} on node: {dst._id}") cookie = furl(url).query.params.get('cookie') @@ -235,9 +281,15 @@ def make_waterbutler_payload(dst_id, rename): 'provider': settings.ARCHIVE_PROVIDER, } -@celery_app.task(base=ArchiverTask, ignore_result=False) +@celery_app.task( + bind=True, + base=ArchiverTask, + ignore_result=False, + max_retries=3, + default_retry_delay=60, +) @logged('archive_addon') -def archive_addon(addon_short_name, job_pk): +def archive_addon(self, addon_short_name, job_pk): """Archive the contents of an addon by making a copy request to the WaterButler API @@ -246,7 +298,7 @@ def archive_addon(addon_short_name, job_pk): :return: None """ create_app_context() - job = ArchiveJob.load(job_pk) + job = self.load_archive_job(job_pk) src, dst, user = job.info() logger.info(f'Archiving addon: {addon_short_name} on node: {src._id}') @@ -274,9 +326,15 @@ def archive_addon(addon_short_name, job_pk): data = make_waterbutler_payload(dst._id, rename) make_copy_request.delay(job_pk=job_pk, url=url, data=data) -@celery_app.task(base=ArchiverTask, ignore_result=False) +@celery_app.task( + bind=True, + base=ArchiverTask, + ignore_result=False, + max_retries=3, + default_retry_delay=60, +) @logged('archive_node') -def archive_node(stat_results, job_pk): +def archive_node(self, stat_results, job_pk): """First use the results of #stat_node to check disk usage of the initiated registration, then either fail the registration or create a celery.group group of subtasks to archive addons @@ -286,7 +344,7 @@ def archive_node(stat_results, job_pk): :return: None """ create_app_context() - job = ArchiveJob.load(job_pk) + job = self.load_archive_job(job_pk) src, dst, user = job.info() logger.info(f'Archiving node: {src._id}') @@ -381,7 +439,7 @@ def archive_success(self, dst_pk, job_pk): ) self.retry(exc=err) - job = ArchiveJob.load(job_pk) + job = self.load_archive_job(job_pk) if not job.sent: job.sent = True job.save() diff --git a/website/archiver/utils.py b/website/archiver/utils.py index a2b0a9d5778..8c2a19b9ec3 100644 --- a/website/archiver/utils.py +++ b/website/archiver/utils.py @@ -394,8 +394,10 @@ def compact_traceback(einfo, max_lines=25, max_chars=4000): if not traceback_text: return None - lines = traceback_text.splitlines() - compact = '\n'.join(lines[-max_lines:]) - if len(compact) > max_chars: - compact = compact[-max_chars:] - return compact + max_lines = max(1, int(max_lines)) + max_chars = max(1, int(max_chars)) + + # Always compact from the tail to preserve the latest failure context. + tail_text = traceback_text[-max_chars:] + lines = tail_text.splitlines() + return '\n'.join(lines[-max_lines:])