Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,52 @@ will spawn a number of threads in order to optimize performance for
iRODS server versions 4.2.9+ and file sizes larger than a default
threshold value of 32 Megabytes.

Because multithread processes under Unix-type operating systems sometimes
need special handling, it is recommended that any put or get of a large file
be appropriately handled in the case that a terminating signal aborts the
transfer:

```python
from irods.parallel import abort_parallel_transfers

def handler(signal, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def handler(signal, _)
def handler(signal, _):

abort_parallel_transfers()
os._exit(128 + signal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussion - remove os._exit.


signal(SIGTERM,handler)

try:
# a multi-1247 put or get can leave non-daemon threads running if not treated with care.
session.data_objects.put( ...)
except KeyboardInterrupt:
abort_parallel_transfers()
printf('Due to a SIGINT or Control-C, the put failed.')
# Raise again if we wish to exit the application.
raise
except RuntimeError:
printf('The put failed.')
# ...
```

Note that if had we intercepted SIGINT by assigning it, also, a non-default
handler, we would have avoided the need to handle the `KeyboardInterrupt`
inline.

(Internally, of course, the PRC must handle all eventualities, including
`KeyboardInterrupt`, by closing down the current foreground transfer.
Otherwise we would risk some non-daemon threads not finishing, and this
potentially risks holding
up the main program from eventually performing an orderly exit.)

If a signal handler calls abort_parallel_transfers() but does not call
`os._exit` after doing so, a parallel `put` or `get` can raise a `RuntimeError`
to indicate the failed transfer.

In general it is better (for simpler applications wanting to gracefully handle
interrupted lengthy data transfers to/from iRODS data objects) to anticipate
control-C by handling both `KeyboardInterrupt` and `RuntimeError`, as shown
above.

Progress bars
-------------

Expand Down
55 changes: 34 additions & 21 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ def __init__(self, *a, **kwd):
self._iRODS_session = kwd.pop("_session", None)
super(ManagedBufferedRandom, self).__init__(*a, **kwd)
import irods.session
self.do_close = True

with irods.session._fds_lock:
irods.session._fds[self] = None

def __del__(self):
if not self.closed:
if self.do_close and not self.closed:
self.close()
call___del__if_exists(super(ManagedBufferedRandom, self))

Expand Down Expand Up @@ -245,15 +246,21 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
if self.should_parallelize_transfer(
num_threads, o, open_options=options.items()
):
if not self.parallel_get(
(obj, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
data_open_returned_values=data_open_returned_values_,
updatables=updatables,
):
raise RuntimeError("parallel get failed")
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
data_open_returned_values=data_open_returned_values_,
updatables=updatables,
):
raise error
except ex.iRODSException as e:
raise e
except BaseException as e:
raise error from e
else:
with open(local_file, "wb") as f:
for chunk in chunks(o, self.READ_BUFFER_SIZE):
Expand Down Expand Up @@ -353,17 +360,23 @@ def put(
):
o = deferred_call(self.open, (obj, "w"), options)
f.close()
if not self.parallel_put(
local_path,
(obj, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "")
or options.get(kw.DEST_RESC_NAME_KW, ""),
open_options=options,
updatables=updatables,
):
raise RuntimeError("parallel put failed")
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "")
or options.get(kw.DEST_RESC_NAME_KW, ""),
open_options=options,
updatables=updatables,
):
raise error
except ex.iRODSException as e:
raise e
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
Expand Down
Loading
Loading