diff --git a/irods/data_object.py b/irods/data_object.py index 409f7f33..ddfa1e2c 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -1,14 +1,24 @@ +""" +Interface for iRODS data objects. + +Provides high level abstraction and POSIX-like facilities (create, open, +read/write) allowing clients to manipulate data objects very much as if they +were local files. +""" + +import ast +import enum import io -import sys import logging import os -import ast +import sys +from datetime import datetime, timezone -from irods.models import DataObject -from irods.meta import iRODSMetaCollection import irods.keywords as kw from irods.api_number import api_number from irods.message import JSON_Message, iRODSMessage +from irods.meta import iRODSMetaCollection +from irods.models import DataObject logger = logging.getLogger(__name__) @@ -41,11 +51,57 @@ def __repr__(self): return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name) +class _repl_status(enum.Enum): # noqa: N801 + STALE_REPLICA, GOOD_REPLICA, INTERMEDIATE_REPLICA, READ_LOCKED, WRITE_LOCKED = range(5) + + +# An ordering of the various replica status values, by descending fitness for use/interface +_REPL_STATUSES = tuple( + getattr(_repl_status, ident).value + for ident in ( + "GOOD_REPLICA", + "STALE_REPLICA", + "INTERMEDIATE_REPLICA", + "READ_LOCKED", + "WRITE_LOCKED", + ) +) + +# An appropriate reference datetime value for gauging replica age as part of +# the default sort key in PRC4 and onward. +_REFERENCE_DATETIME = datetime.fromtimestamp(0, timezone.utc) + +# ruff: noqa: D103 off + +# Key functions to dictate how replica row results will be sorted within an iRODSDataObject. + + +def REPLICA_NUMBER_SORT_KEY_FN(row): # noqa: N802 + return row[DataObject.replica_number] + + +def REPLICA_FITNESS_SORT_KEY_FN(row): # noqa: N802 + repl_status = int(row[DataObject.replica_status]) + + repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize + + return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time]) + + +# ruff: noqa: D103 on + +_DEFAULT_SORT_KEY_FN = REPLICA_NUMBER_SORT_KEY_FN + + class iRODSDataObject: - def __init__(self, manager, parent=None, results=None): + # iRODSDataObject's constructor is not usually directly accessed by iRODS client applications. See the main README. + # ruff: noqa: D107 off + + def __init__(self, manager, parent=None, results=None, replica_sort_function=None): self.manager = manager if parent and results: self.collection = parent + results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) for attr, value in DataObject.__dict__.items(): if not attr.startswith("_"): try: @@ -54,9 +110,8 @@ def __init__(self, manager, parent=None, results=None): # backward compatibility with older schema versions pass self.path = self.collection.path + "/" + self.name - replicas = sorted(results, key=lambda r: r[DataObject.replica_number]) - # The status quo before iRODS 5 + # Copy pre-iRODS 5 fields replica_args = [ ( @@ -75,18 +130,20 @@ def __init__(self, manager, parent=None, results=None): modify_time=r[DataObject.modify_time], ), ) - for r in replicas + for r in results ] # Adjust for adding access_time in the iRODS 5 case. if self.manager.sess.server_version >= (5,): - for n, r in enumerate(replicas): + for n, r in enumerate(results): replica_args[n][1]['access_time'] = r[DataObject.access_time] self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args] self._meta = None + # ruff: noqa: D107 off + def __repr__(self): return f"" diff --git a/irods/manager/data_object_manager.py b/irods/manager/data_object_manager.py index 6c2a6fe2..17361c7e 100644 --- a/irods/manager/data_object_manager.py +++ b/irods/manager/data_object_manager.py @@ -218,27 +218,29 @@ def should_parallelize_transfer( if size is not None and isinstance(open_options, dict): open_options[kw.DATA_SIZE_KW] = size - def _download(self, obj, local_path, num_threads, updatables=(), **options): + def _download(self, obj_path, local_path, num_threads, updatables=(), **options): """Transfer the contents of a data object to a local file. Called from get() when a local path is named. """ - if os.path.isdir(local_path): - local_file = os.path.join(local_path, irods_basename(obj)) - else: - local_file = local_path + + local_file = ( + os.path.join(local_path, irods_basename(obj_path)) # noqa: PTH118 + if os.path.isdir(local_path) # noqa: PTH112 + else local_path + ) # Check for force flag if local_file exists if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options: raise ex.OVERWRITE_WITHOUT_FORCE_FLAG data_open_returned_values_ = {} - with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o: + with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o: if self.should_parallelize_transfer(num_threads, o, open_options=options.items()): error = RuntimeError("parallel get failed") try: if not self.parallel_get( - (obj, o), + (obj_path, o), local_file, num_threads=num_threads, target_resource_name=options.get(kw.RESC_NAME_KW, ""), @@ -265,6 +267,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda """ parent = self.sess.collections.get(irods_dirname(path)) + replica_sort_function = options.pop('replica_sort_function', None) + # TODO: optimize if local_path: self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options) @@ -284,7 +288,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda results = query.all() # get up to max_rows replicas if len(results) <= 0: raise ex.DataObjectDoesNotExist() - return iRODSDataObject(self, parent, results) + return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function) @staticmethod def _resolve_force_put_option(options, default_setting=None, true_value=""): @@ -317,23 +321,25 @@ def put( self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default) if self.sess.collections.exists(irods_path): - obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) + obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) # noqa: PTH119 else: - obj = irods_path - if kw.FORCE_FLAG_KW not in options and self.exists(obj): + obj_path = irods_path + if kw.FORCE_FLAG_KW not in options and self.exists(obj_path): raise ex.OVERWRITE_WITHOUT_FORCE_FLAG options.pop(kw.FORCE_FLAG_KW, None) + replica_sort_function = options.pop('replica_sort_function', None) + with open(local_path, "rb") as f: sizelist = [] if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options): - o = deferred_call(self.open, (obj, "w"), options) + o = deferred_call(self.open, (obj_path, "w"), options) f.close() error = RuntimeError("parallel put failed") try: if not self.parallel_put( local_path, - (obj, o), + (obj_path, o), total_bytes=sizelist[0], num_threads=num_threads, target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""), @@ -346,7 +352,7 @@ def put( except BaseException as e: raise error from e else: - with self.open(obj, "w", **options) as o: + with self.open(obj_path, "w", **options) as o: # Set operation type to trigger acPostProcForPut if kw.OPR_TYPE_KW not in options: options[kw.OPR_TYPE_KW] = 1 # PUT_OPR @@ -360,10 +366,11 @@ def put( # Requested to register checksum without verifying, but source replica has a checksum. This can result # in multiple replicas being marked good with different checksums, which is an inconsistency. del repl_options[kw.REG_CHKSUM_KW] - self.replicate(obj, **repl_options) + self.replicate(obj_path, **repl_options) if return_data_object: - return self.get(obj) + return self.get(obj_path, replica_sort_function=replica_sort_function) + return None def chksum(self, path, **options): """ @@ -480,6 +487,7 @@ def create( raise ex.DataObjectExistsAtLogicalPath options = {**options, kw.DATA_TYPE_KW: "generic"} + replica_sort_function = options.pop('replica_sort_function', None) if resource: options[kw.DEST_RESC_NAME_KW] = resource @@ -508,7 +516,7 @@ def create( desc = response.int_info conn.close_file(desc) - return self.get(path) + return self.get(path, replica_sort_function=replica_sort_function) def open_with_FileRaw(self, *arg, **kw_options): holder = [] diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index f214950a..6b61007e 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -1,6 +1,5 @@ #! /usr/bin/env python -from datetime import datetime, timezone import base64 import collections import concurrent.futures @@ -17,13 +16,15 @@ import socket import stat import string -import sys import subprocess +import sys import threading import time import unittest import xml.etree.ElementTree -import irods.test.helpers as helpers +from datetime import datetime, timedelta, timezone + +from irods.test import helpers try: import tqdm @@ -49,26 +50,24 @@ def is_localhost_synonym(name): return localhost_with_optional_domain_pattern.match(name.lower()) or is_localhost_ip(name) -from irods.access import iRODSAccess -from irods.models import Collection, DataObject -from irods.path import iRODSPath -from irods.test.helpers import iRODSUserLogins +from tempfile import NamedTemporaryFile, gettempdir, mktemp + +import irods.client_configuration as config import irods.exception as ex -from irods.column import Criterion -from irods.data_object import chunks, irods_dirname +import irods.keywords as kw +import irods.parallel import irods.test.helpers as helpers import irods.test.modules as test_modules -import irods.keywords as kw -import irods.client_configuration as config +from irods.access import iRODSAccess +from irods.column import Criterion +from irods.data_object import REPLICA_FITNESS_SORT_KEY_FN, chunks, irods_dirname from irods.manager import data_object_manager -from irods.message import RErrorStack -from irods.message import ET, XML_Parser_Type, default_XML_parser, current_XML_parser -from datetime import datetime, timezone, timedelta -from tempfile import NamedTemporaryFile, gettempdir, mktemp -from irods.test.helpers import unique_name, my_function_name -from irods.ticket import Ticket -import irods.parallel from irods.manager.data_object_manager import Server_Checksum_Warning +from irods.message import ET, RErrorStack, XML_Parser_Type, current_XML_parser, default_XML_parser +from irods.models import Collection, DataObject +from irods.path import iRODSPath +from irods.test.helpers import iRODSUserLogins, my_function_name, unique_name +from irods.ticket import Ticket RODSUSER = "nonadmin" @@ -1253,8 +1252,7 @@ def test_replica_number(self): # assertions on replicas self.assertEqual(len(obj.replicas), number_of_replicas) - for i, replica in enumerate(obj.replicas): - self.assertEqual(replica.number, i) + self.assertEqual({repl.number for repl in obj.replicas}, {*range(len(obj.replicas))}) # now trim odd-numbered replicas # note (see irods/irods#4861): COPIES_KW might disappear in the future @@ -1267,10 +1265,7 @@ def test_replica_number(self): obj = session.data_objects.get(obj_path) # check remaining replica numbers - replica_numbers = [] - for replica in obj.replicas: - replica_numbers.append(replica.number) - self.assertEqual(replica_numbers, [0, 2, 4, 6]) + self.assertEqual({r.number for r in obj.replicas}, {0, 2, 4, 6}) # remove object obj.unlink(force=True) @@ -1728,11 +1723,12 @@ def test_get_data_objects(self): self.assertIsNotNone(obj.replicas[1].__getattribute__(i)) # ensure replica info is sensible + replicas = sorted(obj.replicas, key=lambda repl: repl.number) for i in range(2): - self.assertEqual(obj.replicas[i].number, i) - self.assertEqual(obj.replicas[i].status, "1") - self.assertEqual(obj.replicas[i].path.split("/")[-1], filename) - self.assertEqual(obj.replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name) + self.assertEqual(replicas[i].number, i) + self.assertEqual(replicas[i].status, "1") + self.assertEqual(replicas[i].path.split("/")[-1], filename) + self.assertEqual(replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name) self.assertEqual(obj.replicas[0].resource_name, ufs_resources[0].name) if self.sess.server_version < (4, 2, 0): @@ -2992,6 +2988,55 @@ def test_handling_of_termination_signals_during_multithread_put__issue_722(self) test_put__issue_722(self) + def test_modified_sorting_of_replicas__issue_746(self): + basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647' # noqa: DTZ005 + with self.create_simple_resc() as new_resc1, self.create_simple_resc() as new_resc2: + data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}') + + # Precondition for an eventual total of 3 replicas: initial data replica is not + # on either of the new resources. + self.assertFalse({repl.resource_name for repl in data.replicas} & {new_resc1, new_resc2}) + try: + data.replicate(resource=new_resc1) + + # Ensure that one of the replicas is stale, to test proper sorting. + with data.open('a', **{kw.RESC_NAME_KW: new_resc1}) as f: + f.write(b'.') + + # Sleep to ensure different replica modify timestamps. + time.sleep(2) + + data.replicate(resource=new_resc2) + + # At this point, there should ensure exactly two good replicas of the three. + # Assert exactly one replica is stale, to corroborate + data = self.sess.data_objects.get( + data.path, replica_sort_function=lambda row: int(row[DataObject.replica_status]) + ) + self.assertEqual([repl.status for repl in data.replicas], ['0', '1', '1']) + + # Get a data object with the PRC3-default sort order. Ordering is expected to + # be ascending by replica number. + if irods.version.version_as_tuple() < (4,): + data = self.sess.data_objects.get(data.path) + for i, repl in enumerate(data.replicas): + self.assertEqual(repl.number, i) + + options = {} + if irods.version.version_as_tuple() < (4,): + options['replica_sort_function'] = REPLICA_FITNESS_SORT_KEY_FN + + # Get a data object with the PRC3-alternative/PRC4-default sort order. + data = self.sess.data_objects.get(data.path, **options) + + # Test default replica sorting. + self.assertEqual(data.replicas[0].status, '1') + self.assertEqual(data.replicas[0].modify_time, data.modify_time) + self.assertGreater(data.replicas[0].modify_time, data.replicas[1].modify_time) + finally: + if data: + data.unlink(force=True) + if __name__ == "__main__": # let the tests find the parent irods lib