Source code for darfix.tests.test_dimension

import pytest
import numpy
import os

from darfix.tests import utils
from darfix.core.dataset import ImageDataset, Data
from darfix.core.dimension import Dimension, POSITIONER_METADATA
import h5py
from silx.io.url import DataUrl

try:
    from importlib.resources import files as resource_files
except ImportError:
    from importlib_resources import files as resource_files
import darfix.resources.tests
from darfix.dtypes import Dataset


[docs]@pytest.fixture def dataset_args(): """ " Creating random dataset with specific headers. """ counter_mne = "a b c d e f g h" motor_mne = "obpitch y z mainx ffz m obx" dims = (20, 100, 100) # Create headers header = [] # Dimensions for reshaping a = numpy.random.rand(2) b = numpy.random.rand(5) c = numpy.random.rand(2) motors = numpy.random.rand(7) for i in numpy.arange(20): header.append({}) header[i]["HeaderID"] = i header[i]["counter_mne"] = counter_mne header[i]["motor_mne"] = motor_mne header[i]["counter_pos"] = "" header[i]["motor_pos"] = "" for count in counter_mne: header[i]["counter_pos"] += str(numpy.random.rand(1)[0]) + " " for j, m in enumerate(motor_mne.split()): if m == "z": header[i]["motor_pos"] += ( str(a[int((i > 4 and i < 10) or i > 14)]) + " " ) elif m == "m": header[i]["motor_pos"] += str(b[i % 5]) + " " elif m == "obpitch": header[i]["motor_pos"] += str(c[int(i > 9)]) + " " elif m == "mainx": header[i]["motor_pos"] += "50 " else: header[i]["motor_pos"] += str(motors[j]) + " " data = numpy.zeros(dims) background = numpy.random.random(dims) idxs = [0, 2, 4] data[idxs] += background[idxs] return utils.DatasetArgs(data=data, header=header)
[docs]@pytest.fixture def in_memory_dataset(tmpdir, dataset_args): return utils.createDataset( data=dataset_args.data, header=dataset_args.header, _dir=str(tmpdir), backend="edf", )
[docs]@pytest.fixture def on_disk_dataset(tmpdir, dataset_args): return utils.createDataset( data=dataset_args.data, header=dataset_args.header, _dir=str(tmpdir), in_memory=False, backend="edf", )
[docs]def test_add_one_dimension(in_memory_dataset, on_disk_dataset): """Tests the correct add of a dimension""" dimension = Dimension(POSITIONER_METADATA, "test", 20) # In memory in_memory_dataset.add_dim(0, dimension) saved_dimension = in_memory_dataset.dims.get(0) assert saved_dimension.name == "test" assert saved_dimension.kind == POSITIONER_METADATA assert saved_dimension.size == 20 # On disk on_disk_dataset.add_dim(0, dimension) saved_dimension = on_disk_dataset.dims.get(0) assert saved_dimension.name == "test" assert saved_dimension.kind == POSITIONER_METADATA assert saved_dimension.size == 20
[docs]def test_add_several_dimensions(in_memory_dataset, on_disk_dataset): """Tests the correct add of several dimensions""" dimension1 = Dimension(POSITIONER_METADATA, "test1", 20) dimension2 = Dimension(POSITIONER_METADATA, "test2", 30) dimension3 = Dimension(POSITIONER_METADATA, "test3", 40) # In memory in_memory_dataset.add_dim(0, dimension1) in_memory_dataset.add_dim(1, dimension2) in_memory_dataset.add_dim(2, dimension3) assert in_memory_dataset.dims.ndim == 3 # On disk on_disk_dataset.add_dim(0, dimension1) on_disk_dataset.add_dim(1, dimension2) on_disk_dataset.add_dim(2, dimension3) assert on_disk_dataset.dims.ndim == 3
[docs]def test_remove_dimension(in_memory_dataset, on_disk_dataset): """Tests the correct removal of a dimension""" dimension = Dimension(POSITIONER_METADATA, "test", 20) # In memory in_memory_dataset.add_dim(0, dimension) in_memory_dataset.remove_dim(0) assert in_memory_dataset.dims.ndim == 0 # On disk on_disk_dataset.add_dim(0, dimension) on_disk_dataset.remove_dim(0) assert on_disk_dataset.dims.ndim == 0
[docs]def test_remove_dimensions(in_memory_dataset, on_disk_dataset): """Tests the correct removal of several dimensions""" dimension1 = Dimension(POSITIONER_METADATA, "test1", 20) dimension2 = Dimension(POSITIONER_METADATA, "test2", 30) dimension3 = Dimension(POSITIONER_METADATA, "test3", 40) # In memory in_memory_dataset.add_dim(0, dimension1) in_memory_dataset.add_dim(1, dimension2) in_memory_dataset.add_dim(2, dimension3) in_memory_dataset.remove_dim(0) in_memory_dataset.remove_dim(2) assert in_memory_dataset.dims.ndim == 1 assert in_memory_dataset.dims.get(1).name == "test2" # On disk on_disk_dataset.add_dim(0, dimension1) on_disk_dataset.add_dim(1, dimension2) on_disk_dataset.add_dim(2, dimension3) on_disk_dataset.remove_dim(0) on_disk_dataset.remove_dim(2) assert on_disk_dataset.dims.ndim == 1 assert on_disk_dataset.dims.get(1).name == "test2"
[docs]def test_find_dimensions(in_memory_dataset, on_disk_dataset): """Tests the correct finding of the dimensions""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) assert in_memory_dataset.dims.ndim == 3 assert in_memory_dataset.dims.get(0).name == "m" assert in_memory_dataset.dims.get(1).name == "z" assert in_memory_dataset.dims.get(2).name == "obpitch" # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) assert on_disk_dataset.dims.ndim == 3 assert on_disk_dataset.dims.get(0).name == "m" assert on_disk_dataset.dims.get(1).name == "z" assert on_disk_dataset.dims.get(2).name == "obpitch"
[docs]def test_reshaped_data(in_memory_dataset, on_disk_dataset): """Tests the correct reshaping of the data""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() assert dataset.data.shape == (2, 2, 5, 100, 100) # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() assert dataset.data.shape == (2, 2, 5, 100, 100)
[docs]def test_find_shift(in_memory_dataset, on_disk_dataset): """Tests the shift detection with dimensions and indices""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() indices = [1, 2, 3, 4] shift = dataset.find_shift(dimension=[1, 1], indices=indices) assert len(shift) == 0 shift = dataset.find_shift(dimension=[0, 1], indices=indices) assert shift.shape == (2, 1) # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() indices = [1, 2, 3, 4] shift = dataset.find_shift(dimension=[1, 1], indices=indices) assert len(shift) == 0 shift = dataset.find_shift(dimension=[0, 1], indices=indices) assert shift.shape == (2, 1)
[docs]def test_apply_shift(in_memory_dataset, on_disk_dataset): """Tests the shift correction with dimensions and indices""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() new_dataset = dataset.apply_shift( shift=numpy.array([[0, 0.5], [0, 0.5]]), dimension=[0, 1], indices=[1, 2, 3, 4], ) assert new_dataset.data.urls[0, 0, 0] == dataset.data.urls[0, 0, 0] assert new_dataset.data.urls[0, 0, 1] != dataset.data.urls[0, 0, 1] # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() new_dataset = dataset.apply_shift( shift=numpy.array([[0, 0.5], [0, 0.5]]), dimension=[0, 1], indices=[1, 2, 3, 4], ) assert new_dataset.data.urls[0, 0, 0] == dataset.data.urls[0, 0, 0] assert new_dataset.data.urls[0, 0, 1] != dataset.data.urls[0, 0, 1]
[docs]def test_find_shift_along_dimension(in_memory_dataset, on_disk_dataset): """Tests the shift detection along a dimension""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() indices = numpy.arange(10) shift = dataset.find_shift_along_dimension(dimension=[1], indices=indices) assert shift.shape == (2, 2, 5) shift = dataset.find_shift_along_dimension(dimension=[0], indices=indices) assert shift.shape == (5, 2, 2) # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() indices = numpy.arange(10) shift = dataset.find_shift_along_dimension(dimension=[1], indices=indices) assert shift.shape == (2, 2, 5) shift = dataset.find_shift_along_dimension(dimension=[0], indices=indices) assert shift.shape == (5, 2, 2)
[docs]def test_apply_shift_along_dimension(in_memory_dataset, on_disk_dataset): """Tests the shift correction with dimensions and indices""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() shift = numpy.random.random((4, 2, 2)) new_dataset = dataset.apply_shift_along_dimension( shift=shift, dimension=[1], indices=[1, 2, 3, 4] ) assert new_dataset.data.urls[0, 0, 0] == dataset.data.urls[0, 0, 0] assert new_dataset.data.urls[0, 0, 1] != dataset.data.urls[0, 0, 1] # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() shift = numpy.random.random((4, 2, 2)) new_dataset = dataset.apply_shift_along_dimension( shift=shift, dimension=[1], indices=[1, 2, 3, 4] ) assert new_dataset.data.urls[0, 0, 0] == dataset.data.urls[0, 0, 0] assert new_dataset.data.urls[0, 0, 1] != dataset.data.urls[0, 0, 1]
[docs]def test_zsum(in_memory_dataset, on_disk_dataset): """Tests the shift detection with dimensions and indices""" indices = [1, 2, 3, 6] # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() result = numpy.sum(dataset.get_data(dimension=[0, 1], indices=indices), axis=0) zsum = dataset.zsum(dimension=[0, 1], indices=indices) numpy.testing.assert_array_equal(zsum, result) # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() zsum = dataset.zsum(dimension=[0, 1], indices=indices) numpy.testing.assert_array_equal(zsum, result)
[docs]def test_apply_2d_fit(in_memory_dataset, on_disk_dataset): """Tests the fit with dimensions and indices""" # In memory data = Data( urls=in_memory_dataset.get_data().urls[:10], metadata=in_memory_dataset.get_data().metadata[:10], in_memory=True, ) dataset = ImageDataset(_dir=in_memory_dataset.dir, data=data) dataset.find_dimensions(POSITIONER_METADATA) dataset = dataset.reshape_data() new_dataset, maps = dataset.apply_fit(indices=[1, 2, 3, 4]) assert new_dataset.data.urls[0, 0] == dataset.data.urls[0, 0] assert new_dataset.data.urls[0, 1] != dataset.data.urls[0, 1] assert len(maps) == 7 assert maps[0].shape == in_memory_dataset.get_data(0).shape # On disk data = Data( urls=on_disk_dataset.get_data().urls[:10], metadata=on_disk_dataset.get_data().metadata[:10], in_memory=False, ) dataset = ImageDataset(_dir=on_disk_dataset.dir, data=data) dataset.find_dimensions(POSITIONER_METADATA) dataset = dataset.reshape_data() new_dataset, maps = dataset.apply_fit(indices=[1, 2, 3, 4]) assert new_dataset.data.urls[0, 0] == dataset.data.urls[0, 0] assert new_dataset.data.urls[0, 1] != dataset.data.urls[0, 1] assert len(maps) == 7 assert maps[0].shape == in_memory_dataset.get_data(0).shape
[docs]def test_data_reshaped_data(in_memory_dataset, on_disk_dataset): """Tests that data and reshaped data have same values""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() numpy.testing.assert_array_equal(dataset.get_data(0), in_memory_dataset.get_data(0)) # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() numpy.testing.assert_array_equal(dataset.get_data(0), on_disk_dataset.get_data(0))
[docs]def test_clear_dimensions(in_memory_dataset, on_disk_dataset): """Tests the clear dimensions function""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) in_memory_dataset.clear_dims() assert in_memory_dataset.dims.ndim == 0 # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) on_disk_dataset.clear_dims() assert on_disk_dataset.dims.ndim == 0
[docs]def test_apply_moments_in_memory(in_memory_dataset, on_disk_dataset): """Tests finding moments""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() moments = dataset.apply_moments(indices=[1, 2, 3, 4]) assert moments[0][0].shape == dataset.get_data(0).shape assert moments[1][3].shape == dataset.get_data(0).shape # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() moments = dataset.apply_moments(indices=[1, 2, 3, 4]) assert moments[0][0].shape == dataset.get_data(0).shape assert moments[1][3].shape == dataset.get_data(0).shape
[docs]def test_compute_magnification(in_memory_dataset, on_disk_dataset): """Tests fitting data in memory""" # In memory in_memory_dataset.find_dimensions(POSITIONER_METADATA) dataset = in_memory_dataset.reshape_data() dataset.compute_transformation(d=0.1) assert dataset.transformation.shape == dataset.get_data(0).shape # On disk on_disk_dataset.find_dimensions(POSITIONER_METADATA) dataset = on_disk_dataset.reshape_data() dataset.compute_transformation(d=0.1) assert dataset.transformation.shape == dataset.get_data(0).shape
[docs]def test_find_dimension_silicon_111_reflection(tmp_path): """ Test 'find_dimension' with a bunch of motor position over a real use cases that used to bring troubles. """ silicon_111_reflection_file = resource_files(darfix.resources.tests).joinpath( os.path.join("dimensions_definition", "silicon_111_reflection.h5") ) raw_motor_values = {} with h5py.File(silicon_111_reflection_file, mode="r") as h5f: raw_motor_values["chi"] = h5f["positioners/chi"][()] raw_motor_values["mu"] = h5f["positioners/mu"][()] data_folder = tmp_path / "test_fitting" data_folder.mkdir() data_file_url = DataUrl( file_path=os.path.join(str(data_folder), "data.h5"), data_path="data", scheme="silx", ) number_of_points = 1891 with h5py.File(data_file_url.file_path(), mode="w") as h5f: h5f["data"] = numpy.random.random(number_of_points) dataset = Dataset( dataset=ImageDataset( first_filename=data_file_url.path(), metadata_url=DataUrl( file_path=str(silicon_111_reflection_file), data_path="positioners", scheme="silx", ).path(), isH5=True, _dir=None, in_memory=False, ) ) image_dataset = dataset.dataset # with a tolerance of 10e-9 we won't find 1081 steps over 2 dimensions assert len(image_dataset.dims) == 0 image_dataset.find_dimensions(kind=None, tolerance=1e-9) assert len(image_dataset.dims) == 2 assert ( numpy.prod([val.size for val in image_dataset.dims.values()]) > number_of_points ) image_dataset.clear_dims() image_dataset.find_dimensions(kind=None, tolerance=1e-5) assert ( numpy.prod([val.size for val in image_dataset.dims.values()]) == number_of_points ) for dim in image_dataset.dims.values(): numpy.testing.assert_almost_equal( dim.range[0], min(raw_motor_values[dim.name]), decimal=3 ) numpy.testing.assert_almost_equal( dim.range[1], max(raw_motor_values[dim.name]), decimal=3 )
[docs]def test_find_dimension_NiTi_1PD_002_g411_420MPa_mosalayers_2x(tmp_path): """ Test 'find_dimension' with a bunch of motor position over a real use cases that used to bring troubles. """ dataset_file = resource_files(darfix.resources.tests).joinpath( os.path.join( "dimensions_definition", "NiTi_1PD_002_g411_420MPa_mosalayers_2x.h5" ) ) raw_motor_values = {} with h5py.File(dataset_file, mode="r") as h5f: raw_motor_values["chi"] = h5f["positioners/chi"][()] raw_motor_values["diffry"] = h5f["positioners/diffry"][()] raw_motor_values["difftz"] = h5f["positioners/difftz"][()] data_folder = tmp_path / "test_fitting" data_folder.mkdir() data_file_url = DataUrl( file_path=os.path.join(str(data_folder), "data.h5"), data_path="data", scheme="silx", ) number_of_points = 31500 with h5py.File(data_file_url.file_path(), mode="w") as h5f: h5f["data"] = numpy.random.random(number_of_points) dataset = Dataset( dataset=ImageDataset( first_filename=data_file_url.path(), metadata_url=DataUrl( file_path=str(dataset_file), data_path="positioners", scheme="silx", ).path(), isH5=True, _dir=None, in_memory=False, ) ) image_dataset = dataset.dataset def check_dimensions_bounds(dims: dict): """Make sure find_dimension is correctly fitting motor bounds""" for dim in dims.values(): numpy.testing.assert_almost_equal( dim.range[0], min(raw_motor_values[dim.name]), decimal=3 ) numpy.testing.assert_almost_equal( dim.range[1], max(raw_motor_values[dim.name]), decimal=3 ) # with a tolerance of 10e-9 we won't find 1081 steps over 2 dimensions assert len(image_dataset.dims) == 0 image_dataset.find_dimensions(kind=None, tolerance=1e-5) assert len(image_dataset.dims) == 3 check_dimensions_bounds(dims=image_dataset.dims) assert ( numpy.prod([val.size for val in image_dataset.dims.values()]) > number_of_points ) image_dataset.clear_dims() image_dataset.find_dimensions(kind=None, tolerance=1e-4) assert len(image_dataset.dims) == 3 check_dimensions_bounds(dims=image_dataset.dims) assert ( numpy.prod([val.size for val in image_dataset.dims.values()]) == number_of_points )