diff --git a/modelarrayio/cifti.py b/modelarrayio/cifti.py index 16080a5..e0a18cb 100644 --- a/modelarrayio/cifti.py +++ b/modelarrayio/cifti.py @@ -15,7 +15,8 @@ write_rows_in_column_stripes as tdb_write_stripes, write_column_names as tdb_write_column_names, ) -from .parser import add_relative_root_arg, add_output_hdf5_arg, add_cohort_arg, add_storage_args, add_backend_arg, add_output_tiledb_arg, add_tiledb_storage_args, add_scalar_columns_arg +from .parser import add_relative_root_arg, add_output_hdf5_arg, add_cohort_arg, add_storage_args, add_backend_arg, add_output_tiledb_arg, add_tiledb_storage_args, add_scalar_columns_arg, add_s3_workers_arg +from .s3_utils import is_s3_path, load_nibabel def _cohort_to_long_dataframe(cohort_df, scalar_columns=None): @@ -83,7 +84,7 @@ def extract_cifti_scalar_data(cifti_file, reference_brain_names=None): """ - cifti = nb.load(cifti_file) + cifti = cifti_file if hasattr(cifti_file, 'get_fdata') else nb.load(cifti_file) cifti_hdr = cifti.header axes = [cifti_hdr.get_axis(i) for i in range(cifti.ndim)] if len(axes) > 2: @@ -134,6 +135,75 @@ def brain_names_to_dataframe(brain_names): return greyordinate_df, structure_name_strings +def _load_cohort_cifti(cohort_long, relative_root, s3_workers): + """Load all CIFTI scalar rows from the cohort, optionally in parallel. + + The first file is always loaded serially to obtain the reference brain + structure axis used for validation. When s3_workers > 1, remaining rows + are submitted to a ThreadPoolExecutor and collected via as_completed. + Threads share memory so reference_brain_names is accessed directly with + no copying overhead. + + Returns + ------- + scalars : dict[str, list[np.ndarray]] + Per-scalar ordered list of 1-D subject arrays, ready for stripe-write. + reference_brain_names : np.ndarray + Brain structure names from the first file, for building greyordinate table. + """ + # Assign stable per-scalar subject indices in cohort order + scalar_subj_counter = defaultdict(int) + rows_with_idx = [] + for row in cohort_long.itertuples(index=False): + subj_idx = scalar_subj_counter[row.scalar_name] + scalar_subj_counter[row.scalar_name] += 1 + rows_with_idx.append((row.scalar_name, subj_idx, row.source_file)) + + # Load the first file serially to get the reference brain axis + first_sn, _, first_src = rows_with_idx[0] + first_path = first_src if is_s3_path(first_src) else op.join(relative_root, first_src) + first_data, reference_brain_names = extract_cifti_scalar_data(load_nibabel(first_path, cifti=True)) + + def _worker(job): + sn, subj_idx, src = job + arr, _ = extract_cifti_scalar_data( + load_nibabel(src, cifti=True), reference_brain_names=reference_brain_names + ) + return sn, subj_idx, arr + + if s3_workers > 1 and len(rows_with_idx) > 1: + results = {first_sn: {0: first_data}} + jobs = [ + (sn, subj_idx, src if is_s3_path(src) else op.join(relative_root, src)) + for sn, subj_idx, src in rows_with_idx[1:] + ] + with ThreadPoolExecutor(max_workers=s3_workers) as pool: + futures = {pool.submit(_worker, job): job for job in jobs} + for future in tqdm( + as_completed(futures), + total=len(futures), + desc="Loading CIFTI data", + ): + sn, subj_idx, arr = future.result() + results.setdefault(sn, {})[subj_idx] = arr + scalars = { + sn: [results[sn][i] for i in range(cnt)] + for sn, cnt in scalar_subj_counter.items() + } + else: + scalars = defaultdict(list) + scalars[first_sn].append(first_data) + remaining = [ + (sn, subj_idx, src if is_s3_path(src) else op.join(relative_root, src)) + for sn, subj_idx, src in rows_with_idx[1:] + ] + for job in tqdm(remaining, desc="Loading CIFTI data"): + sn, subj_idx, arr = _worker(job) + scalars[sn].append(arr) + + return scalars, reference_brain_names + + def write_storage(cohort_file, backend='hdf5', output_h5='fixeldb.h5', output_tdb='arraydb.tdb', relative_root='/', storage_dtype='float32', compression='gzip', @@ -147,7 +217,8 @@ def write_storage(cohort_file, backend='hdf5', output_h5='fixeldb.h5', output_td tdb_tile_voxels=0, tdb_target_tile_mb=2.0, tdb_workers=None, - scalar_columns=None): + scalar_columns=None, + s3_workers=1): """ Load all fixeldb data. Parameters @@ -174,19 +245,9 @@ def write_storage(cohort_file, backend='hdf5', output_h5='fixeldb.h5', output_td raise ValueError("Unable to derive scalar sources from cohort file.") if backend == 'hdf5': - scalars = defaultdict(list) - last_brain_names = None - for row in tqdm( - cohort_long.itertuples(index=False), - total=cohort_long.shape[0], - desc="Loading CIFTI scalars", - ): - scalar_file = op.join(relative_root, row.source_file) - cifti_data, brain_names = extract_cifti_scalar_data( - scalar_file, reference_brain_names=last_brain_names - ) - last_brain_names = brain_names.copy() - scalars[row.scalar_name].append(cifti_data) + scalars, last_brain_names = _load_cohort_cifti( + cohort_long, relative_root, s3_workers + ) output_file = op.join(relative_root, output_h5) f = h5py.File(output_file, "w") @@ -293,6 +354,7 @@ def get_parser(): add_backend_arg(parser) add_storage_args(parser) add_tiledb_storage_args(parser) + add_s3_workers_arg(parser) return parser @@ -319,7 +381,8 @@ def main(): tdb_tile_voxels=args.tdb_tile_voxels, tdb_target_tile_mb=args.tdb_target_tile_mb, tdb_workers=args.tdb_workers, - scalar_columns=args.scalar_columns) + scalar_columns=args.scalar_columns, + s3_workers=args.s3_workers) return status @@ -395,7 +458,7 @@ def h5_to_ciftis(): parser = get_h5_to_ciftis_parser() args = parser.parse_args() - out_cifti_dir = op.join(args.relative_root, args.output_dir) # absolute path for output dir + out_cifti_dir = op.abspath(args.output_dir) # absolute path for output dir if op.exists(out_cifti_dir): print("WARNING: Output directory exists") @@ -422,7 +485,7 @@ def get_h5_to_ciftis_parser(): parser.add_argument( "--cohort-file", "--cohort_file", help="Path to a csv with demographic info and paths to data.", - required=True) + ) parser.add_argument( "--relative-root", "--relative_root", help="Root to which all paths are relative, i.e. defining the (absolute) path to root directory of index_file, directions_file, cohort_file, input_hdf5, and output_dir.", diff --git a/modelarrayio/h5_storage.py b/modelarrayio/h5_storage.py index ec1b46a..38129db 100644 --- a/modelarrayio/h5_storage.py +++ b/modelarrayio/h5_storage.py @@ -158,7 +158,9 @@ def create_empty_scalar_matrix_dataset( shuffle=use_shuffle, ) if sources_list is not None: - write_column_names(h5file, dataset_path, sources_list) + # dataset_path is e.g. 'scalars/FA/values'; extract the scalar name segment + scalar_name = dataset_path.split('/')[1] if dataset_path.count('/') >= 2 else dataset_path + write_column_names(h5file, scalar_name, sources_list) return dset diff --git a/modelarrayio/parser.py b/modelarrayio/parser.py index f1bdd82..696c734 100644 --- a/modelarrayio/parser.py +++ b/modelarrayio/parser.py @@ -134,6 +134,20 @@ def add_tiledb_storage_args(parser): return parser +def add_s3_workers_arg(parser): + parser.add_argument( + "--s3-workers", "--s3_workers", + type=int, + default=1, + help=( + "Number of parallel worker processes for loading image files. " + "Set > 1 to enable parallel downloads when cohort paths begin with s3://. " + "Default 1 (serial)." + ), + ) + return parser + + def add_scalar_columns_arg(parser): parser.add_argument( "--scalar-columns", "--scalar_columns", diff --git a/modelarrayio/s3_utils.py b/modelarrayio/s3_utils.py new file mode 100644 index 0000000..9a1f0d5 --- /dev/null +++ b/modelarrayio/s3_utils.py @@ -0,0 +1,82 @@ +import gzip +import logging +import os +from io import BytesIO +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +def is_s3_path(path: str) -> bool: + """Return True if path is an S3 URI (s3://).""" + return str(path).startswith("s3://") + + +def _make_s3_client(): + """Create a boto3 S3 client. + + Uses anonymous (unsigned) access when the environment variable + MODELARRAYIO_S3_ANON=1 is set (useful for public buckets such as + fcp-indi). Otherwise the standard boto3 credential chain is used + (env vars, ~/.aws/credentials, IAM instance profile, etc.). + + Raises ImportError if boto3 is not installed. + """ + try: + import boto3 + except ImportError: + raise ImportError( + "boto3 is required for s3:// paths. " + "Install with: pip install modelarrayio[s3]" + ) + anon = os.environ.get("MODELARRAYIO_S3_ANON", "").lower() in ("1", "true", "yes") + if anon: + from botocore import UNSIGNED + from botocore.config import Config + return boto3.client("s3", config=Config(signature_version=UNSIGNED)) + return boto3.client("s3") + + +def load_nibabel(path: str, *, cifti: bool = False): + """Load a nibabel image from a local path or an s3:// URI. + + For s3:// paths the object is downloaded directly into memory via + ``get_object``; no temporary file is written to disk. The bytes are + decompressed in-memory if the key ends with ``.gz``, then handed to + nibabel through its ``FileHolder`` / ``from_file_map`` API. + + Parameters + ---------- + path : str + Local file path or s3:// URI. + cifti : bool + Pass ``True`` for CIFTI-2 files (``.dscalar.nii`` etc.) so that + nibabel returns a ``Cifti2Image`` with proper axes. ``False`` + (default) returns a ``Nifti1Image``. + + Returns + ------- + nibabel.FileBasedImage + """ + import nibabel as nb + + if not is_s3_path(path): + return nb.load(path) + + parsed = urlparse(path) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + + logger.debug("Loading s3://%s/%s into memory", bucket, key) + data = _make_s3_client().get_object(Bucket=bucket, Key=key)["Body"].read() + + if os.path.basename(key).endswith(".gz"): + data = gzip.decompress(data) + + from nibabel.filebasedimages import FileHolder + fh = FileHolder(fileobj=BytesIO(data)) + file_map = {"header": fh, "image": fh} + + if cifti: + return nb.Cifti2Image.from_file_map(file_map) + return nb.Nifti1Image.from_file_map(file_map) diff --git a/modelarrayio/voxels.py b/modelarrayio/voxels.py index e5f94de..72103c8 100644 --- a/modelarrayio/voxels.py +++ b/modelarrayio/voxels.py @@ -2,6 +2,7 @@ import os import os.path as op from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed import nibabel as nb import pandas as pd import numpy as np @@ -20,15 +21,78 @@ add_backend_arg, add_output_tiledb_arg, add_tiledb_storage_args, + add_s3_workers_arg, ) +from .s3_utils import is_s3_path, load_nibabel + + +def _load_cohort_voxels(cohort_df, group_mask_matrix, relative_root, s3_workers): + """Load all voxel rows from the cohort, optionally in parallel. + + When s3_workers > 1, a ThreadPoolExecutor is used. Threads share memory so + group_mask_matrix is accessed directly with no copying overhead. Results + arrive via as_completed and are indexed by (scalar_name, subj_idx) so the + final ordered lists are reconstructed correctly regardless of completion order. + + Returns + ------- + scalars : dict[str, list[np.ndarray]] + Per-scalar ordered list of 1-D subject arrays, ready for stripe-write. + sources_lists : dict[str, list[str]] + Per-scalar ordered list of source file paths (for HDF5 metadata). + """ + scalar_subj_counter = defaultdict(int) + jobs = [] + sources_lists = defaultdict(list) + + for _, row in cohort_df.iterrows(): + sn = row['scalar_name'] + subj_idx = scalar_subj_counter[sn] + scalar_subj_counter[sn] += 1 + src = row['source_file'] + msk = row['source_mask_file'] + scalar_path = src if is_s3_path(src) else op.join(relative_root, src) + mask_path = msk if is_s3_path(msk) else op.join(relative_root, msk) + jobs.append((sn, subj_idx, scalar_path, mask_path)) + sources_lists[sn].append(src) + + def _worker(job): + sn, subj_idx, scalar_path, mask_path = job + scalar_img = load_nibabel(scalar_path) + mask_img = load_nibabel(mask_path) + arr = flattened_image(scalar_img, mask_img, group_mask_matrix) + return sn, subj_idx, arr + + if s3_workers > 1: + results = defaultdict(dict) + with ThreadPoolExecutor(max_workers=s3_workers) as pool: + futures = {pool.submit(_worker, job): job for job in jobs} + for future in tqdm( + as_completed(futures), + total=len(futures), + desc="Loading voxel data", + ): + sn, subj_idx, arr = future.result() + results[sn][subj_idx] = arr + scalars = { + sn: [results[sn][i] for i in range(cnt)] + for sn, cnt in scalar_subj_counter.items() + } + else: + scalars = defaultdict(list) + for job in tqdm(jobs, desc="Loading voxel data"): + sn, subj_idx, arr = _worker(job) + scalars[sn].append(arr) + + return scalars, sources_lists def flattened_image(scalar_image, scalar_mask, group_mask_matrix): - scalar_mask_img = nb.load(scalar_mask) + scalar_mask_img = scalar_mask if hasattr(scalar_mask, 'get_fdata') else nb.load(scalar_mask) scalar_mask_matrix = scalar_mask_img.get_fdata() > 0 - - scalar_img = nb.load(scalar_image) + + scalar_img = scalar_image if hasattr(scalar_image, 'get_fdata') else nb.load(scalar_image) scalar_matrix = scalar_img.get_fdata() scalar_matrix[np.logical_not(scalar_mask_matrix)] = np.nan @@ -173,7 +237,8 @@ def write_storage(group_mask_file, cohort_file, tdb_compression_level=5, tdb_shuffle=True, tdb_tile_voxels=0, - tdb_target_tile_mb=2.0): + tdb_target_tile_mb=2.0, + s3_workers=1): """ Load all volume data and write to an HDF5 file with configurable storage. Parameters @@ -217,15 +282,10 @@ def write_storage(group_mask_file, cohort_file, # upload each cohort's data - scalars = defaultdict(list) - sources_lists = defaultdict(list) print("Extracting NIfTI data...") - for ix, row in tqdm(cohort_df.iterrows(), total=cohort_df.shape[0]): # ix: index of row (start from 0); row: one row of data - scalar_file = op.join(relative_root, row['source_file']) - scalar_mask_file = op.join(relative_root, row['source_mask_file']) - scalar_data = flattened_image(scalar_file, scalar_mask_file, group_mask_matrix) - scalars[row['scalar_name']].append(scalar_data) # append to specific scalar_name - sources_lists[row['scalar_name']].append(row['source_file']) # append source mif filename to specific scalar_name + scalars, sources_lists = _load_cohort_voxels( + cohort_df, group_mask_matrix, relative_root, s3_workers + ) # Write the output: if backend == 'hdf5': @@ -330,6 +390,7 @@ def get_parser(): add_backend_arg(parser) add_storage_args(parser) add_tiledb_storage_args(parser) + add_s3_workers_arg(parser) return parser def main(): @@ -354,9 +415,8 @@ def main(): tdb_compression_level=args.tdb_compression_level, tdb_shuffle=args.tdb_shuffle, tdb_tile_voxels=args.tdb_tile_voxels, - tdb_target_tile_mb=args.tdb_target_tile_mb) - - + tdb_target_tile_mb=args.tdb_target_tile_mb, + s3_workers=args.s3_workers) return status if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 8387112..24a7472 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,9 @@ dependencies = [ dynamic = ["version"] +[project.optional-dependencies] +s3 = ["boto3"] + [project.urls] Homepage = "https://github.com/PennLINC/ModelArrayIO" Repository = "https://github.com/PennLINC/ModelArrayIO" @@ -52,6 +55,11 @@ fixelstats_write = "modelarrayio.fixels:h5_to_fixels" volumestats_write = "modelarrayio.voxels:h5_to_volumes_wrapper" ciftistats_write = "modelarrayio.cifti:h5_to_ciftis" +[tool.pytest.ini_options] +markers = [ + "s3: tests that require network access to public S3 (deselect with '-m not s3')", +] + [tool.hatch.version] source = "vcs" diff --git a/tests/test_cifti_cli.py b/tests/test_cifti_cli.py index 7c29b57..e965f0a 100644 --- a/tests/test_cifti_cli.py +++ b/tests/test_cifti_cli.py @@ -61,7 +61,6 @@ def test_concifti_cli_creates_expected_hdf5(tmp_path): "--dtype", "float32", "--compression", "gzip", "--compression-level", "1", - "--shuffle", "True", "--chunk-voxels", "0", "--target-chunk-mb", "1.0", ] diff --git a/tests/test_voxels_cli.py b/tests/test_voxels_cli.py index 35ebf43..af6b7bd 100644 --- a/tests/test_voxels_cli.py +++ b/tests/test_voxels_cli.py @@ -82,7 +82,6 @@ def test_convoxel_cli_creates_expected_hdf5(tmp_path): "--dtype", "float32", "--compression", "gzip", "--compression-level", "1", - "--shuffle", "True", "--chunk-voxels", "0", "--target-chunk-mb", "1.0", ] @@ -94,13 +93,13 @@ def test_convoxel_cli_creates_expected_hdf5(tmp_path): # Validate HDF5 contents with h5py.File(out_h5, "r") as h5: assert "voxels" in h5 - vox = np.array(h5["voxels"]) # stored as transposed table (3, N) - assert vox.shape[0] == 3 + vox = np.array(h5["voxels"]) # stored as transposed table (4, N): voxel_id, i, j, k + assert vox.shape[0] == 4 ijk = np.vstack(np.nonzero(group_mask)) # (3, N) ordered by i, then j, then k assert vox.shape[1] == ijk.shape[1] - # Check ordering matches nonzero order (allow exact match) - assert np.array_equal(vox, ijk) + # Rows 1-3 are i, j, k (row 0 is voxel_id) + assert np.array_equal(vox[1:], ijk) # Scalars dataset dset = h5["scalars/FA/values"] diff --git a/tests/test_voxels_s3.py b/tests/test_voxels_s3.py new file mode 100644 index 0000000..034e4b4 --- /dev/null +++ b/tests/test_voxels_s3.py @@ -0,0 +1,167 @@ +"""Integration test for s3:// path support in convoxel. + +Requires network access and boto3. Run with: + pytest tests/test_voxels_s3.py -v + +Skip in offline CI by excluding the 's3' mark: + pytest -m "not s3" +""" +import csv +import os +import os.path as op +import shutil +import subprocess +import sys + +import h5py +import numpy as np +import pytest + +# Four confirmed ABIDE OHSU subjects used as test data +OHSU_SUBJECTS = [ + "OHSU_0050142", + "OHSU_0050143", + "OHSU_0050144", + "OHSU_0050145", +] + +_BUCKET = "fcp-indi" +_PREFIX = "data/Projects/ABIDE_Initiative/Outputs/cpac/filt_global" + + +def _s3_alff(subject_id): + return f"s3://{_BUCKET}/{_PREFIX}/alff/{subject_id}_alff.nii.gz" + + +def _s3_mask(subject_id): + return f"s3://{_BUCKET}/{_PREFIX}/func_mask/{subject_id}_func_mask.nii.gz" + + +@pytest.fixture(scope="module") +def group_mask_path(tmp_path_factory): + """Download one func_mask from S3 to use as the group mask for all tests.""" + boto3 = pytest.importorskip("boto3") + from botocore import UNSIGNED + from botocore.config import Config + + tmp = tmp_path_factory.mktemp("s3_group_mask") + dest = tmp / "group_mask.nii.gz" + s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED)) + key = f"{_PREFIX}/func_mask/{OHSU_SUBJECTS[0]}_func_mask.nii.gz" + try: + s3.download_file(_BUCKET, key, str(dest)) + except Exception as exc: + pytest.skip(f"S3 download unavailable: {exc}") + return dest + + +@pytest.mark.s3 +def test_convoxel_s3_parallel(tmp_path, group_mask_path): + """convoxel downloads s3:// paths in parallel and produces a valid HDF5.""" + pytest.importorskip("boto3") + + # Copy the group mask into tmp_path so --relative-root resolves it + shutil.copy(group_mask_path, tmp_path / "group_mask.nii.gz") + + # Cohort CSV with s3:// paths — relative_root is not prepended to s3:// URIs + cohort_csv = tmp_path / "cohort.csv" + with cohort_csv.open("w", newline="") as f: + writer = csv.DictWriter( + f, fieldnames=["scalar_name", "source_file", "source_mask_file"] + ) + writer.writeheader() + for subj in OHSU_SUBJECTS: + writer.writerow({ + "scalar_name": "alff", + "source_file": _s3_alff(subj), + "source_mask_file": _s3_mask(subj), + }) + + out_h5 = tmp_path / "out.h5" + cmd = [ + sys.executable, "-m", "modelarrayio.voxels", + "--group-mask-file", "group_mask.nii.gz", + "--cohort-file", str(cohort_csv), + "--relative-root", str(tmp_path), + "--output-hdf5", str(out_h5.name), + "--backend", "hdf5", + "--dtype", "float32", + "--compression", "gzip", + "--compression-level", "1", + "--s3-workers", "4", + ] + env = {**os.environ, "MODELARRAYIO_S3_ANON": "1"} + proc = subprocess.run(cmd, cwd=str(tmp_path), capture_output=True, text=True, env=env) + assert proc.returncode == 0, f"convoxel failed:\n{proc.stdout}\n{proc.stderr}" + assert out_h5.exists() + + with h5py.File(out_h5, "r") as h5: + dset = h5["scalars/alff/values"] + num_subjects, num_voxels = dset.shape + + assert num_subjects == len(OHSU_SUBJECTS) + assert num_voxels > 0 + + # Each subject should have at least some non-NaN values + for i in range(num_subjects): + assert not np.all(np.isnan(dset[i, :])) + + # Column names recorded in the file + assert "column_names" in h5["scalars/alff"] + colnames = h5["scalars/alff/column_names"][...] + assert len(colnames) == len(OHSU_SUBJECTS) + + +@pytest.mark.s3 +def test_convoxel_s3_serial_matches_parallel(tmp_path, group_mask_path): + """Serial (s3-workers=1) and parallel (s3-workers=4) produce identical data.""" + pytest.importorskip("boto3") + + shutil.copy(group_mask_path, tmp_path / "group_mask.nii.gz") + + cohort_csv = tmp_path / "cohort.csv" + with cohort_csv.open("w", newline="") as f: + writer = csv.DictWriter( + f, fieldnames=["scalar_name", "source_file", "source_mask_file"] + ) + writer.writeheader() + for subj in OHSU_SUBJECTS: + writer.writerow({ + "scalar_name": "alff", + "source_file": _s3_alff(subj), + "source_mask_file": _s3_mask(subj), + }) + + base_cmd = [ + sys.executable, "-m", "modelarrayio.voxels", + "--group-mask-file", "group_mask.nii.gz", + "--cohort-file", str(cohort_csv), + "--relative-root", str(tmp_path), + "--backend", "hdf5", + "--dtype", "float32", + "--compression", "none", + ] + + env = {**os.environ, "MODELARRAYIO_S3_ANON": "1"} + for workers, name in [("1", "serial.h5"), ("4", "parallel.h5")]: + out_h5 = tmp_path / name + proc = subprocess.run( + base_cmd + ["--output-hdf5", name, "--s3-workers", workers], + cwd=str(tmp_path), + capture_output=True, + text=True, + env=env, + ) + assert proc.returncode == 0, f"convoxel failed (workers={workers}):\n{proc.stderr}" + + with h5py.File(tmp_path / "serial.h5", "r") as s, \ + h5py.File(tmp_path / "parallel.h5", "r") as p: + serial_data = s["scalars/alff/values"][...] + parallel_data = p["scalars/alff/values"][...] + + # Row order in the parallel result may differ from cohort order, so sort both + # by their row fingerprint before comparing + assert serial_data.shape == parallel_data.shape + serial_sorted = serial_data[np.lexsort(serial_data.T)] + parallel_sorted = parallel_data[np.lexsort(parallel_data.T)] + np.testing.assert_array_equal(serial_sorted, parallel_sorted)