Source code for darfix.io.utils

__authors__ = ["J. Garriga"]
__license__ = "MIT"
__date__ = "03/07/2021"

import logging
import h5py
from datetime import datetime
import numpy
import sys

from silx.io.dictdump import dicttoh5, h5todict

_logger = logging.getLogger(__file__)


[docs] def assert_string(s, enums): s += " has to be " for app in enums: if app == enums[0]: s += "`" + app + "`" elif app == enums[-1]: s += "or `" + app + "`" else: s += ", `" + app + "`" return s
# Print iterations progress
[docs] def advancement_display( ncurrent, ntotal, prefix="", suffix="", decimals=1, length=100, fill="\u2588", left="-", ): """ Call in a loop to create terminal progress bar @params: ncurrent - Required : current N (Int) ntotal - Required : total expected N (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) left - Optional : bar fill character (Str) """ if ntotal: ncurrent = min(ncurrent, ntotal) progress = ncurrent / float(ntotal) else: progress = ncurrent + 1 percent_fmt = "{0:." + str(decimals) + "f}" percent = percent_fmt.format(100 * progress) filledLength = int(length * min(progress, 1)) leftLength = length - filledLength bar = fill * filledLength + left * leftLength sys.stdout.write("\r%s |%s| %s%% %s\r" % (prefix, bar, percent, suffix)) # Print New Line on Complete if ncurrent >= ntotal: sys.stdout.write("\n")
[docs] def write_maps( h5_file, list_of_maps, default_map, entry, processing_order, data_path="/", overwrite=True, ): """ Write a stack of components and its parameters into .h5 :param str h5_file: path to the hdf5 file :param str entry: entry name :param dict dimensions: Dictionary with the dimensions names and values :param numpy.ndarray W: Matrix with the rocking curves values :param numpy.ndarray data: Stack with the components :param int processing_order: processing order of treatment :param str data_path: path to store the data """ process_name = "process_" + str(processing_order) def get_interpretation(my_data): """Return hdf5 attribute for this type of data""" if isinstance(my_data, numpy.ndarray): if my_data.ndim == 1: return "spectrum" elif my_data.ndim in (2, 3): return "image" return None def save_key(path_name, key_path, value, overwrite=True): """Save the given value to the associated path. Manage numpy arrays and dictionaries""" key_path = key_path.replace(".", "/") # save if is dict if isinstance(value, dict): h5_path = "/".join((path_name, key_path)) dicttoh5( value, h5file=h5_file, h5path=h5_path, overwrite_data=True, mode="a" ) else: with h5py.File(h5_file, "a") as h5f: nx = h5f.require_group(path_name) if overwrite and key_path in nx: del nx[key_path] try: nx[key_path] = value except TypeError as e: _logger.error("Unable to write", str(key_path), "reason is", str(e)) else: interpretation = get_interpretation(value) if interpretation: nx[key_path].attrs["interpretation"] = interpretation with h5py.File(h5_file, "a") as h5f: h5f.attrs["default"] = entry nx_entry = h5f.require_group("/".join((data_path, entry))) nx_entry.attrs["NX_class"] = "NXentry" nx_entry.attrs["default"] = "data" nx_process = nx_entry.require_group(process_name) nx_process.attrs["NX_class"] = "NXprocess" if overwrite: for key in ("program", "version", "date", "processing_order"): if key in nx_process: del nx_process[key] nx_process["program"] = "darfix" nx_process["version"] = "0.2" nx_process["date"] = datetime.now().replace(microsecond=0).isoformat() nx_process["processing_order"] = numpy.int32(processing_order) results = nx_process.require_group("results") results.attrs["NX_class"] = "NXcollection" nx_data = nx_entry.require_group("data") nx_data.attrs["NX_class"] = "NXdata" default = list_of_maps[default_map] source_addr = entry + "/" + process_name + "/results/" + default_map results.attrs["target"] = default_map save_key(results.name, default_map, default) save_key(nx_data.name, default_map, h5f[source_addr]) for _map in list_of_maps: if _map == default_map: continue if isinstance(list_of_maps[_map], dict): maps = results.require_group(_map) maps.attrs["NX_class"] = "NXcollection" for method in list_of_maps[_map]: save_key(maps.name, method, list_of_maps[_map][method]) else: save_key(results.name, _map, list_of_maps[_map])
[docs] def read_components(h5_file): """ Read a stack of components and its parameters from a Nexus file. :param str h5_file: path to the hdf5 file """ with h5py.File(h5_file, "r") as nx: # find the default NXentry group nx_entry = nx[nx.attrs["default"]] # find the default NXdata group nx_process = nx_entry["process_1"] input_data = nx_process["dimensions"] dimensions = h5todict( h5_file, nx.attrs["default"] + "/process_1/dimensions", asarray=False ) input_data = nx_process["values"] values = {} for key in input_data.keys(): values[key] = numpy.array(list(input_data[key])) results = nx_process["results"] components = numpy.array(list(results["components"])) W = numpy.array(list(results["W"])) return dimensions, components, W, values
[docs] def write_components( h5_file, entry, dimensions, W, data, values, processing_order, data_path="/", overwrite=True, ): """ Write a stack of components and its parameters into .h5 :param str h5_file: path to the hdf5 file :param str entry: entry name :param dict dimensions: Dictionary with the dimensions names and values :param numpy.ndarray W: Matrix with the rocking curves values :param numpy.ndarray data: Stack with the components :param int processing_order: processing order of treatment :param str data_path: path to store the data """ process_name = "process_" + str(processing_order) def get_interpretation(my_data): """Return hdf5 attribute for this type of data""" if isinstance(my_data, numpy.ndarray): if my_data.ndim == 1: return "spectrum" elif my_data.ndim in (2, 3): return "image" return None def save_key(path_name, key_path, value, overwrite=True): """Save the given value to the associated path. Manage numpy arrays and dictionaries""" key_path = key_path.replace(".", "/") # save if is dict if isinstance(value, dict): h5_path = "/".join((path_name, key_path)) dicttoh5( value, h5file=h5_file, h5path=h5_path, overwrite_data=True, mode="a" ) else: with h5py.File(h5_file, "a") as h5f: nx = h5f.require_group(path_name) if overwrite and key_path in nx: del nx[key_path] try: nx[key_path] = value except TypeError as e: _logger.error("Unable to write", str(key_path), "reason is", str(e)) else: interpretation = get_interpretation(value) if interpretation: nx[key_path].attrs["interpretation"] = interpretation with h5py.File(h5_file, "a") as h5f: h5f.attrs["default"] = entry nx_entry = h5f.require_group("/".join((data_path, entry))) nx_entry.attrs["NX_class"] = "NXentry" nx_entry.attrs["default"] = "data" nx_process = nx_entry.require_group(process_name) nx_process.attrs["NX_class"] = "NXprocess" if overwrite: for key in ("program", "version", "date", "processing_order"): if key in nx_process: del nx_process[key] nx_process["program"] = "darfix" nx_process["version"] = "0.2" nx_process["date"] = datetime.now().replace(microsecond=0).isoformat() nx_process["processing_order"] = numpy.int32(processing_order) nx_parameters = nx_process.require_group("dimensions") nx_parameters.attrs["NX_class"] = "NXparameters" dicttoh5(dimensions, h5f, entry + "/" + process_name + "/dimensions") nx_values = nx_process.require_group("values") nx_values.attrs["NX_class"] = "NXparameters" for key, value in values.items(): save_key(nx_values.name, key_path=key, value=value) results = nx_process.require_group("results") results.attrs["NX_class"] = "NXcollection" nx_data = nx_entry.require_group("data") nx_data.attrs["NX_class"] = "NXdata" nx_data.attrs["signal"] = "components" source_addr = entry + "/" + process_name + "/results/components" results.attrs["target"] = "components" save_key(results.name, "W", W) save_key(results.name, "components", data) save_key(nx_data.name, "components", h5f[source_addr])
[docs] def create_nxdata_dict( signal, signal_name, axes=None, axes_names=None, axes_long_names=None, rgba=False ): nxdata = {signal_name: signal, "@signal": signal_name, "@NX_class": "NXdata"} if axes is not None: nxdata.update( {"@axes": axes_names, axes_names[0]: axes[0], axes_names[1]: axes[1]} ) if axes_long_names is not None: nxdata.update( { axes_names[0] + "@long_name": axes_long_names[0], axes_names[1] + "@long_name": axes_long_names[1], } ) if rgba: nxdata[signal_name + "@interpretation"] = "rgba-image" return nxdata