Source code for silx.io.spech5

# coding: utf-8
# /*##########################################################################
# Copyright (C) 2016 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ############################################################################*/
"""This module provides a h5py-like API to access SpecFile data.

API description
===============
Specfile data structure exposed by this API:

::

  /
      1.1/
          title = "…"
          start_time = "…"
          instrument/
              specfile/
                  file_header = "…"
                  scan_header = "…"
              positioners/
                  motor_name = value

              mca_0/
                  data = …
                  calibration = …
                  channels = …
                  preset_time = …
                  elapsed_time = …
                  live_time = …

              mca_1/


          measurement/
              colname0 = …
              colname1 = …

              mca_0/
                   data -> /1.1/instrument/mca_0/data
                   info -> /1.1/instrument/mca_0/

      2.1/


``file_header`` and ``scan_header`` are the raw headers as they
appear in the original file, as a string of lines separated by newline (``\\n``) characters.

The title is the content of the ``#S`` scan header line without the leading
``#S`` (e.g ``"1  ascan  ss1vo -4.55687 -0.556875  40 0.2"``).

The start time is converted to ISO8601 format (``"2016-02-23T22:49:05Z"``),
if the original date format is standard.

Numeric datasets are stored in *float32* format, except for scalar integers
which are stored as *int64*.

Motor positions (e.g. ``/1.1/instrument/positioners/motor_name``) can be
1D numpy arrays if they are measured as scan data, or else scalars as defined
on ``#P`` scan header lines. A simple test is done to check if the motor name
is also a data column header defined in the ``#L`` scan header line.

Scan data  (e.g. ``/1.1/measurement/colname0``) is accessed by column,
the dataset name ``colname0`` being the column label as defined in the ``#L``
scan header line.

MCA data is exposed as a 2D numpy array containing all spectra for a given
analyser. The number of analysers is calculated as the number of MCA spectra
per scan data line. Demultiplexing is then performed to assign the correct
spectra to a given analyser.

MCA calibration is an array of 3 scalars, from the ``#@CALIB`` header line.
It is identical for all MCA analysers, as there can be only one
``#@CALIB`` line per scan.

MCA channels is an array containing all channel numbers. This information is
computed from the ``#@CHANN`` scan header line (if present), or computed from
the shape of the first spectrum in a scan (``[0, … len(first_spectrum] - 1]``).

Accessing data
==============

Data and groups are accessed in :mod:`h5py` fashion::

    from silx.io.spech5 import SpecH5

    # Open a SpecFile
    sfh5 = SpecH5("test.dat")

    # using SpecH5 as a regular group to access scans
    scan1group = sfh5["1.1"]
    instrument_group = scan1group["instrument"]

    # alternative: full path access
    measurement_group = sfh5["/1.1/measurement"]

    # accessing a scan data column by name as a 1D numpy array
    data_array = measurement_group["Pslit HGap"]

    # accessing all mca-spectra for one MCA device
    mca_0_spectra = measurement_group["mca_0/data"]

:class:`SpecH5` and :class:`SpecH5Group` provide a :meth:`SpecH5Group.keys` method::

    >>> sfh5.keys()
    ['96.1', '97.1', '98.1']
    >>> sfh5['96.1'].keys()
    ['title', 'start_time', 'instrument', 'measurement']

They can also be treated as iterators:

.. code-block:: python

    for scan_group in SpecH5("test.dat"):
        dataset_names = [item.name in scan_group["measurement"] if
                         isinstance(item, SpecH5Dataset)]
        print("Found data columns in scan " + scan_group.name)
        print(", ".join(dataset_names))

You can test for existence of data or groups::

    >>> "/1.1/measurement/Pslit HGap" in sfh5
    True
    >>> "positioners" in sfh5["/2.1/instrument"]
    True
    >>> "spam" in sfh5["1.1"]
    False

Strings are stored encoded as ``numpy.string_``, as recommended by
`the h5py documentation <http://docs.h5py.org/en/latest/strings.html>`_.
This ensures maximum compatibility with third party software libraries,
when saving a :class:`SpecH5` to a HDF5 file using :mod:`silx.io.spectoh5`.

The type ``numpy.string_`` is a byte-string format. The consequence of this
is that you should decode strings before using them in **Python 3**::

    >>> from silx.io.spech5 import SpecH5
    >>> sfh5 = SpecH5("31oct98.dat")
    >>> sfh5["/68.1/title"]
    b'68  ascan  tx3 -28.5 -24.5  20 0.5'
    >>> sfh5["/68.1/title"].decode()
    '68  ascan  tx3 -28.5 -24.5  20 0.5'


Classes
=======

- :class:`SpecH5`
- :class:`SpecH5Group`
- :class:`SpecH5Dataset`
- :class:`SpecH5LinkToGroup`
- :class:`SpecH5LinkToDataset`
"""

import logging
import numpy
import posixpath
import re
import sys

from .specfile import SpecFile

__authors__ = ["P. Knobel", "D. Naudet"]
__license__ = "MIT"
__date__ = "03/10/2016"

logging.basicConfig()
logger1 = logging.getLogger(__name__)

try:
    import h5py
except ImportError:
    h5py = None
    logger1.debug("Module h5py optional.", exc_info=True)


string_types = (basestring,) if sys.version_info[0] == 2 else (str,)  # noqa

# Static subitems: all groups and datasets that are present in any
# scan (excludes list of scans, data columns, list of mca devices,
# optional mca headers)
static_items = {
    "scan": [u"title", u"start_time", u"instrument",
             u"measurement"],
    "scan/instrument": [u"specfile", u"positioners"],
    "scan/instrument/specfile": [u"file_header", u"scan_header"],
    "scan/measurement/mca": [u"data", u"info"],
    "scan/instrument/mca": [u"data", u"calibration", u"channels"],
}

# Patterns for group keys
root_pattern = re.compile(r"/$")
scan_pattern = re.compile(r"/[0-9]+\.[0-9]+/?$")
instrument_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/?$")
specfile_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/?$")
positioners_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/positioners/?$")
measurement_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/?$")
measurement_mca_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_[0-9]+/?$")
instrument_mca_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/?$")

# Link to group
measurement_mca_info_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/?$")

# Patterns for dataset keys
header_pattern = re.compile(r"/[0-9]+\.[0-9]+/header$")
title_pattern = re.compile(r"/[0-9]+\.[0-9]+/title$")
start_time_pattern = re.compile(r"/[0-9]+\.[0-9]+/start_time$")
file_header_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/file_header$")
scan_header_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/scan_header$")
positioners_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/positioners/([^/]+)$")
measurement_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/([^/]+)$")
instrument_mca_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9]+)/data$")
instrument_mca_calib_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9]+)/calibration$")
instrument_mca_chann_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9])+/channels$")
instrument_mca_preset_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/preset_time$")
instrument_mca_elapsed_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/elapsed_time$")
instrument_mca_live_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/live_time$")

# Links to dataset
measurement_mca_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/data$")
# info/ + calibration, channel, preset_time, live_time, elapsed_time (not data)
measurement_mca_info_dataset_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/([^d/][^/]+)$")
# info/data
measurement_mca_info_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/data$")


def _bulk_match(string_, list_of_patterns):
    """Check whether a string matches any regular expression pattern in a list
    """
    for pattern in list_of_patterns:
        if pattern.match(string_):
            return True
    return False


[docs]def is_group(name): """Check if ``name`` matches a valid group name pattern in a :class:`SpecH5`. :param name: Full name of member :type name: str For example: - ``is_group("/123.456/instrument/")`` returns ``True``. - ``is_group("spam")`` returns ``False`` because :literal:`\"spam\"` is not at all a valid group name. - ``is_group("/1.2/instrument/positioners/xyz")`` returns ``False`` because this key would point to a motor position, which is a dataset and not a group. """ group_patterns = ( root_pattern, scan_pattern, instrument_pattern, specfile_group_pattern, positioners_group_pattern, measurement_group_pattern, measurement_mca_group_pattern, instrument_mca_group_pattern ) return _bulk_match(name, group_patterns)
[docs]def is_dataset(name): """Check if ``name`` matches a valid dataset name pattern in a :class:`SpecH5`. :param name: Full name of member :type name: str For example: - ``is_dataset("/1.2/instrument/positioners/xyz")`` returns ``True`` because this name could be the key to the dataset recording motor positions for motor ``xyz`` in scan ``1.2``. - ``is_dataset("/123.456/instrument/")`` returns ``False`` because this name points to a group. - ``is_dataset("spam")`` returns ``False`` because :literal:`\"spam\"` is not at all a valid dataset name. """ # /1.1/measurement/mca_0 could be interpreted as a data column # with label "mca_0" if measurement_mca_group_pattern.match(name): return False data_patterns = ( header_pattern, title_pattern, start_time_pattern, file_header_data_pattern, scan_header_data_pattern, positioners_data_pattern, measurement_data_pattern, instrument_mca_data_pattern, instrument_mca_calib_pattern, instrument_mca_chann_pattern, instrument_mca_preset_t_pattern, instrument_mca_elapsed_t_pattern, instrument_mca_live_t_pattern ) return _bulk_match(name, data_patterns)
def _get_attrs_dict(name): """Return attributes dictionary corresponding to the group or dataset pointed to by name. :param name: Full name/path to data or group :return: attributes dictionary """ # Associate group and dataset patterns to their attributes pattern_attrs = { root_pattern: {"NX_class": "NXroot", }, scan_pattern: {"NX_class": "NXentry", }, title_pattern: {}, start_time_pattern: {}, instrument_pattern: {"NX_class": "NXinstrument", }, specfile_group_pattern: {"NX_class": "NXcollection", }, file_header_data_pattern: {}, scan_header_data_pattern: {}, positioners_group_pattern: {"NX_class": "NXcollection", }, positioners_data_pattern: {}, instrument_mca_group_pattern: {"NX_class": "NXdetector", }, instrument_mca_data_pattern: {"interpretation": "spectrum", }, instrument_mca_calib_pattern: {}, instrument_mca_chann_pattern: {}, instrument_mca_preset_t_pattern: {}, instrument_mca_elapsed_t_pattern: {}, instrument_mca_live_t_pattern: {}, measurement_group_pattern: {"NX_class": "NXcollection", }, measurement_data_pattern: {}, measurement_mca_group_pattern: {}, measurement_mca_data_pattern: {"interpretation": "spectrum", }, measurement_mca_info_pattern: {"NX_class": "NXdetector", }, measurement_mca_info_dataset_pattern: {}, measurement_mca_info_data_pattern: {"interpretation": "spectrum"}, } for pattern in pattern_attrs: if pattern.match(name): return pattern_attrs[pattern] logger1.warning("%s not a known pattern, assigning empty dict to attrs", name) return {} def _get_scan_key_in_name(item_name): """ :param item_name: Name of a group or dataset :return: Scan identification key (e.g. ``"1.1"``) :rtype: str on None """ scan_match = re.match(r"/([0-9]+\.[0-9]+)", item_name) if not scan_match: return None return scan_match.group(1) def _get_mca_index_in_name(item_name): """ :param item_name: Name of a group or dataset :return: MCA analyser index, ``None`` if item name does not reference a mca dataset :rtype: int or None """ mca_match = re.match(r"/.*/mca_([0-9]+)[^0-9]*", item_name) if not mca_match: return None return int(mca_match.group(1)) def _get_motor_in_name(item_name): """ :param item_name: Name of a group or dataset :return: Motor name or ``None`` :rtype: str on None """ motor_match = positioners_data_pattern.match(item_name) if not motor_match: return None return motor_match.group(1) def _get_data_column_label_in_name(item_name): """ :param item_name: Name of a group or dataset :return: Data column label or ``None`` :rtype: str on None """ # /1.1/measurement/mca_0 should not be interpreted as the label of a # data column (let's hope no-one ever uses mca_0 as a label) if measurement_mca_group_pattern.match(item_name): return None data_column_match = measurement_data_pattern.match(item_name) if not data_column_match: return None return data_column_match.group(1) def _mca_analyser_in_scan(sf, scan_key, mca_analyser_index): """ :param sf: :class:`SpecFile` instance :param scan_key: Scan identification key (e.g. ``1.1``) :param mca_analyser_index: 0-based index of MCA analyser :return: ``True`` if MCA analyser exists in Scan, else ``False`` :raise: ``KeyError`` if scan_key not found in SpecFile :raise: ``AssertionError`` if number of MCA spectra is not a multiple of the number of data lines """ if scan_key not in sf: raise KeyError("Scan key %s " % scan_key + "does not exist in SpecFile %s" % sf.filename) number_of_MCA_spectra = len(sf[scan_key].mca) # Scan.data is transposed number_of_data_lines = sf[scan_key].data.shape[1] # Number of MCA spectra must be a multiple of number of data lines assert number_of_MCA_spectra % number_of_data_lines == 0 number_of_MCA_analysers = number_of_MCA_spectra // number_of_data_lines return 0 <= mca_analyser_index < number_of_MCA_analysers def _motor_in_scan(sf, scan_key, motor_name): """ :param sf: :class:`SpecFile` instance :param scan_key: Scan identification key (e.g. ``1.1``) :param motor_name: Name of motor as defined in file header lines :return: ``True`` if motor exists in scan, else ``False`` :raise: ``KeyError`` if scan_key not found in SpecFile """ if scan_key not in sf: raise KeyError("Scan key %s " % scan_key + "does not exist in SpecFile %s" % sf.filename) return motor_name in sf[scan_key].motor_names def _column_label_in_scan(sf, scan_key, column_label): """ :param sf: :class:`SpecFile` instance :param scan_key: Scan identification key (e.g. ``1.1``) :param column_label: Column label as defined in scan header :return: ``True`` if data column label exists in scan, else ``False`` :raise: ``KeyError`` if scan_key not found in SpecFile """ if scan_key not in sf: raise KeyError("Scan key %s " % scan_key + "does not exist in SpecFile %s" % sf.filename) return column_label in sf[scan_key].labels def _parse_ctime(ctime_lines, analyser_index=0): """ :param ctime_lines: e.g ``@CTIME %f %f %f``, first word ``@CTIME`` optional When multiple CTIME lines are present in a scan header, this argument is a concatenation of them separated by a ``\n`` character. :param analyser_index: MCA device/analyser index, when multiple devices are in a scan. :return: (preset_time, live_time, elapsed_time) """ ctime_lines = ctime_lines.lstrip("@CTIME ") ctimes_lines_list = ctime_lines.split("\n") if len(ctimes_lines_list) == 1: # single @CTIME line for all devices ctime_line = ctimes_lines_list[0] else: ctime_line = ctimes_lines_list[analyser_index] if not len(ctime_line.split()) == 3: raise ValueError("Incorrect format for @CTIME header line " + '(expected "@CTIME %f %f %f").') return list(map(float, ctime_line.split()))
[docs]def spec_date_to_iso8601(date, zone=None): """Convert SpecFile date to Iso8601. :param date: Date (see supported formats below) :type date: str :param zone: Time zone as it appears in a ISO8601 date Supported formats: * ``DDD MMM dd hh:mm:ss YYYY`` * ``DDD YYYY/MM/dd hh:mm:ss YYYY`` where `DDD` is the abbreviated weekday, `MMM` is the month abbreviated name, `MM` is the month number (zero padded), `dd` is the weekday number (zero padded) `YYYY` is the year, `hh` the hour (zero padded), `mm` the minute (zero padded) and `ss` the second (zero padded). All names are expected to be in english. Examples:: >>> spec_date_to_iso8601("Thu Feb 11 09:54:35 2016") '2016-02-11T09:54:35' >>> spec_date_to_iso8601("Sat 2015/03/14 03:53:50") '2015-03-14T03:53:50' """ months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] days_rx = '(?P<day>' + '|'.join(days) + ')' months_rx = '(?P<month>' + '|'.join(months) + ')' year_rx = '(?P<year>\d{4})' day_nb_rx = '(?P<day_nb>[0-3]\d)' month_nb_rx = '(?P<month_nb>[0-1]\d)' hh_rx = '(?P<hh>[0-2]\d)' mm_rx = '(?P<mm>[0-5]\d)' ss_rx = '(?P<ss>[0-5]\d)' tz_rx = '(?P<tz>[+-]\d\d:\d\d){0,1}' # date formats must have either month_nb (1..12) or month (Jan, Feb, ...) re_tpls = ['{days} {months} {day_nb} {hh}:{mm}:{ss}{tz} {year}', '{days} {year}/{month_nb}/{day_nb} {hh}:{mm}:{ss}{tz}'] grp_d = None for rx in re_tpls: full_rx = rx.format(days=days_rx, months=months_rx, year=year_rx, day_nb=day_nb_rx, month_nb=month_nb_rx, hh=hh_rx, mm=mm_rx, ss=ss_rx, tz=tz_rx) m = re.match(full_rx, date) if m: grp_d = m.groupdict() break if not grp_d: raise ValueError('Date format not recognized : {0}'.format(date)) year = grp_d['year'] month = grp_d.get('month_nb') if not month: month = '{0:02d}'.format(months.index(grp_d.get('month')) + 1) day = grp_d['day_nb'] tz = grp_d['tz'] if not tz: tz = zone time = '{0}:{1}:{2}'.format(grp_d['hh'], grp_d['mm'], grp_d['ss']) full_date = '{0}-{1}-{2}T{3}{4}'.format(year, month, day, time, tz if tz else '') return full_date
def _fixed_length_strings(strings, length=0): """Return list of fixed length strings, left-justified and right-padded with spaces. :param strings: List of variable length strings :param length: Length of strings in returned list, defaults to the maximum length in the original list if set to 0. :type length: int or None """ if length == 0 and strings: length = max(len(s) for s in strings) return [s.ljust(length) for s in strings]
[docs]class SpecH5Dataset(object): """Emulate :class:`h5py.Dataset` for a SpecFile object. A :class:`SpecH5Dataset` instance is basically a proxy for the :attr:`value` attribute, with additional attributes for compatibility with *h5py* datasets. :param value: Actual dataset value :param name: Dataset full name (posix path format, starting with ``/``) :type name: str :param file_: Parent :class:`SpecH5` :param parent: Parent :class:`SpecH5Group` which contains this dataset """ def __init__(self, value, name, file_, parent): object.__init__(self) self.value = None """Actual dataset, can be a *numpy array*, a *numpy.string_*, a *numpy.int_* or a *numpy.float32* All operations applied to an instance of the class use this.""" # get proper value types, to inherit from numpy # attributes (dtype, shape, size) if isinstance(value, string_types): # use bytes for maximum compatibility # (see http://docs.h5py.org/en/latest/strings.html) self.value = numpy.string_(value) elif isinstance(value, float): # use 32 bits for float scalars self.value = numpy.float32(value) elif isinstance(value, int): self.value = numpy.int_(value) else: # Enforce numpy array array = numpy.array(value) data_kind = array.dtype.kind if data_kind in ["S", "U"]: self.value = numpy.asarray(array, dtype=numpy.string_) elif data_kind in ["f"]: self.value = numpy.asarray(array, dtype=numpy.float32) else: self.value = array self.name = name """"Dataset name (posix path format, starting with ``/``)""" self.parent = parent """Parent :class:`SpecH5Group` object which contains this dataset""" self.file = file_ """Parent :class:`SpecH5` object""" self.attrs = _get_attrs_dict(name) """Attributes dictionary""" self.shape = self.value.shape """Dataset shape, as a tuple with the length of each dimension of the dataset.""" self.dtype = self.value.dtype """Dataset dtype""" self.size = self.value.size """Dataset size (number of elements)""" @property
[docs] def h5py_class(self): """Return h5py class which is mimicked by this class: :class:`h5py.dataset`. Accessing this attribute if :mod:`h5py` is not installed causes an ``ImportError`` to be raised """ if h5py is None: raise ImportError("Cannot return h5py.Dataset class, " + "unable to import h5py module") return h5py.Dataset
def __getattribute__(self, item): if item in ["value", "name", "parent", "file", "attrs", "shape", "dtype", "size", "h5py_class"]: return object.__getattribute__(self, item) return getattr(self.value, item)
[docs] def __len__(self): return len(self.value)
[docs] def __getitem__(self, item): return self.value.__getitem__(item)
def __getslice__(self, i, j): # deprecated but still in use for python 2.7 return self.__getitem__(slice(i, j, None)) def __iter__(self): return self.value.__iter__() def __dir__(self): attrs = set(dir(self.value) + ["value", "name", "parent", "file", "attrs", "shape", "dtype", "size", "h5py_class"]) return sorted(attrs) # casting def __repr__(self): return self.value.__repr__() def __float__(self): return float(self.value) def __int__(self): return int(self.value) def __str__(self): return str(self.value) def __bool__(self): if self.value: return True return False def __nonzero__(self): # python 2 return self.__bool__() def __array__(self, dtype=None): if dtype is None: return numpy.array(self.value) else: return numpy.array(self.value, dtype=dtype) # comparisons def __eq__(self, other): if hasattr(other, "value"): return self.value == other.value else: return self.value == other def __ne__(self, other): if hasattr(other, "value"): return self.value != other.value else: return self.value != other def __lt__(self, other): if hasattr(other, "value"): return self.value < other.value else: return self.value < other def __le__(self, other): if hasattr(other, "value"): return self.value <= other.value else: return self.value <= other def __gt__(self, other): if hasattr(other, "value"): return self.value > other.value else: return self.value > other def __ge__(self, other): if hasattr(other, "value"): return self.value >= other.value else: return self.value >= other # operations def __add__(self, other): return self.value + other def __radd__(self, other): return other + self.value def __sub__(self, other): return self.value - other def __rsub__(self, other): return other - self.value def __mul__(self, other): return self.value * other def __rmul__(self, other): return other * self.value def __truediv__(self, other): return self.value / other def __rtruediv__(self, other): return other / self.value def __floordiv__(self, other): return self.value // other def __rfloordiv__(self, other): return other // self.value # unary operations def __neg__(self): return -self.value def __abs__(self): return abs(self.value)
[docs]class SpecH5LinkToDataset(SpecH5Dataset): """Special :class:`SpecH5Dataset` representing a link to a dataset. It works exactly like a regular dataset, but :meth:`SpecH5Group.visit` and :meth:`SpecH5Group.visititems` methods will recognize that it is a link and will ignore it. """ pass
def _dataset_builder(name, specfileh5, parent_group): """Retrieve dataset from :class:`SpecFile`, based on dataset name, as a subclass of :class:`numpy.ndarray`. :param name: Datatset full name (posix path format, starting with ``/``) :type name: str :param specfileh5: parent :class:`SpecH5` object :type specfileh5: :class:`SpecH5` :param parent_group: Parent :class:`SpecH5Group` :return: Array with the requested data :rtype: :class:`SpecH5Dataset`. """ scan_key = _get_scan_key_in_name(name) scan = specfileh5._sf[scan_key] # get dataset in an array-like format (ndarray, str, list…) array_like = None if title_pattern.match(name): array_like = scan.scan_header_dict["S"] elif start_time_pattern.match(name): if "D" in scan.scan_header_dict: try: array_like = spec_date_to_iso8601(scan.scan_header_dict["D"]) except (IndexError, ValueError): logger1.warn("Could not parse date format in scan header. " + "Using original date not converted to ISO-8601") array_like = scan.scan_header_dict["D"] elif "D" in scan.file_header_dict: logger1.warn("No #D line in scan header. " + "Using file header for start_time.") try: array_like = spec_date_to_iso8601(scan.file_header_dict["D"]) except (IndexError, ValueError): logger1.warn("Could not parse date format in scan header. " + "Using original date not converted to ISO-8601") array_like = scan.file_header_dict["D"] else: logger1.warn("No #D line in header. Setting date to empty string.") array_like = "" elif file_header_data_pattern.match(name): # array_like = _fixed_length_strings(scan.file_header) array_like = "\n".join(scan.file_header) elif scan_header_data_pattern.match(name): #array_like = _fixed_length_strings(scan.scan_header) array_like = "\n".join(scan.scan_header) elif positioners_data_pattern.match(name): m = positioners_data_pattern.match(name) motor_name = m.group(1) # if a motor is recorded as a data column, ignore its position in # header and return the data column instead if motor_name in scan.labels and scan.data.shape[0] > 0: array_like = scan.data_column_by_name(motor_name) else: # may return float("inf") if #P line is missing from scan hdr array_like = scan.motor_position_by_name(motor_name) elif measurement_data_pattern.match(name): m = measurement_data_pattern.match(name) column_name = m.group(1) array_like = scan.data_column_by_name(column_name) elif instrument_mca_data_pattern.match(name): m = instrument_mca_data_pattern.match(name) analyser_index = int(m.group(1)) # retrieve 2D array of all MCA spectra from one analyser array_like = _demultiplex_mca(scan, analyser_index) elif instrument_mca_calib_pattern.match(name): m = instrument_mca_calib_pattern.match(name) analyser_index = int(m.group(1)) if len(scan.mca.channels) == 1: # single @CALIB line applying to multiple devices analyser_index = 0 array_like = scan.mca.calibration[analyser_index] elif instrument_mca_chann_pattern.match(name): m = instrument_mca_chann_pattern.match(name) analyser_index = int(m.group(1)) if len(scan.mca.channels) == 1: # single @CHANN line applying to multiple devices analyser_index = 0 array_like = scan.mca.channels[analyser_index] elif "CTIME" in scan.mca_header_dict and "mca_" in name: m = re.compile(r"/.*/mca_([0-9]+)/.*").match(name) analyser_index = int(m.group(1)) ctime_line = scan.mca_header_dict['CTIME'] (preset_time, live_time, elapsed_time) = _parse_ctime(ctime_line, analyser_index) if instrument_mca_preset_t_pattern.match(name): array_like = preset_time elif instrument_mca_live_t_pattern.match(name): array_like = live_time elif instrument_mca_elapsed_t_pattern.match(name): array_like = elapsed_time if array_like is None: raise KeyError("Name " + name + " does not match any known dataset.") return SpecH5Dataset(array_like, name, file_=specfileh5, parent=parent_group) def _link_to_dataset_builder(name, specfileh5, parent_group): """Same as :func:`_dataset_builder`, but returns a :class:`SpecH5LinkToDataset` :param name: Datatset full name (posix path format, starting with ``/``) :type name: str :param specfileh5: parent :class:`SpecH5` object :type specfileh5: :class:`SpecH5` :param parent_group: Parent :class:`SpecH5Group` :return: Array with the requested data :rtype: :class:`SpecH5LinkToDataset`. """ scan_key = _get_scan_key_in_name(name) scan = specfileh5._sf[scan_key] # get dataset in an array-like format (ndarray, str, list…) array_like = None # /1.1/measurement/mca_0/data -> /1.1/instrument/mca_0/data if measurement_mca_data_pattern.match(name): m = measurement_mca_data_pattern.match(name) analyser_index = int(m.group(1)) array_like = _demultiplex_mca(scan, analyser_index) # /1.1/measurement/mca_0/info/X -> /1.1/instrument/mca_0/X # X: calibration, channels, preset_time, live_time, elapsed_time elif measurement_mca_info_dataset_pattern.match(name): m = measurement_mca_info_dataset_pattern.match(name) analyser_index = int(m.group(1)) mca_hdr_type = m.group(2) if mca_hdr_type == "calibration": if len(scan.mca.calibration) == 1: # single @CALIB line for multiple devices analyser_index = 0 array_like = scan.mca.calibration[analyser_index] elif mca_hdr_type == "channels": if len(scan.mca.channels) == 1: # single @CHANN line for multiple devices analyser_index = 0 array_like = scan.mca.channels[analyser_index] elif "CTIME" in scan.mca_header_dict: ctime_line = scan.mca_header_dict['CTIME'] (preset_time, live_time, elapsed_time) = _parse_ctime(ctime_line, analyser_index) if mca_hdr_type == "preset_time": array_like = preset_time elif mca_hdr_type == "live_time": array_like = live_time elif mca_hdr_type == "elapsed_time": array_like = elapsed_time # /1.1/measurement/mca_0/info/data -> /1.1/instrument/mca_0/data elif measurement_mca_info_data_pattern.match(name): m = measurement_mca_info_data_pattern.match(name) analyser_index = int(m.group(1)) array_like = _demultiplex_mca(scan, analyser_index) if array_like is None: raise KeyError("Name " + name + " does not match any known dataset.") return SpecH5LinkToDataset(array_like, name, file_=specfileh5, parent=parent_group) def _demultiplex_mca(scan, analyser_index): """Return MCA data for a single analyser. Each MCA spectrum is a 1D array. For each analyser, there is one spectrum recorded per scan data line. When there are more than a single MCA analyser in a scan, the data will be multiplexed. For instance if there are 3 analysers, the consecutive spectra for the first analyser must be accessed as ``mca[0], mca[3], mca[6]…``. :param scan: :class:`Scan` instance containing the MCA data :param analyser_index: 0-based index referencing the analyser :type analyser_index: int :return: 2D numpy array containing all spectra for one analyser """ mca_data = scan.mca number_of_MCA_spectra = len(mca_data) number_of_scan_data_lines = scan.data.shape[1] # Number of MCA spectra must be a multiple of number of scan data lines assert number_of_MCA_spectra % number_of_scan_data_lines == 0 number_of_analysers = number_of_MCA_spectra // number_of_scan_data_lines list_of_1D_arrays = [] for i in range(analyser_index, number_of_MCA_spectra, number_of_analysers): list_of_1D_arrays.append(mca_data[i]) # convert list to 2D array return numpy.array(list_of_1D_arrays)
[docs]class SpecH5Group(object): """Emulate :class:`h5py.Group` for a SpecFile object :param name: Group full name (posix path format, starting with ``/``) :type name: str :param specfileh5: parent :class:`SpecH5` instance """ def __init__(self, name, specfileh5): self.name = name """Full name/path of group""" self.file = specfileh5 """Parent SpecH5 object""" self.attrs = _get_attrs_dict(name) """Attributes dictionary""" if name != "/": if name not in specfileh5: raise KeyError("File %s does not contain group %s" % (specfileh5, name)) scan_key = _get_scan_key_in_name(name) self._scan = self.file._sf[scan_key] @property
[docs] def h5py_class(self): """Return h5py class which is mimicked by this class: :class:`h5py.Group`. Accessing this attribute if :mod:`h5py` is not installed causes an ``ImportError`` to be raised """ if h5py is None: raise ImportError("Cannot return h5py.Group class, " + "unable to import h5py module") return h5py.Group
@property
[docs] def parent(self): """Parent group (group that contains this group)""" if not self.name.strip("/"): return None parent_name = posixpath.dirname(self.name.rstrip("/")) return SpecH5Group(parent_name, self.file)
[docs] def __contains__(self, key): """ :param key: Path to child element (e.g. ``"mca_0/info"``) or full name of group or dataset (e.g. ``"/2.1/instrument/positioners"``) :return: True if key refers to a valid member of this group, else False """ # Absolute path to an item outside this group if key.startswith("/"): if not key.startswith(self.name): return False # Make sure key is an absolute path by prepending this group's name else: key = self.name.rstrip("/") + "/" + key # key not matching any known pattern if not is_group(key) and not is_dataset(key) and\ not is_link_to_group(key) and not is_link_to_dataset(key): return False # nonexistent scan in specfile scan_key = _get_scan_key_in_name(key) if scan_key not in self.file._sf: return False # nonexistent MCA analyser in scan mca_analyser_index = _get_mca_index_in_name(key) if mca_analyser_index is not None: if not _mca_analyser_in_scan(self.file._sf, scan_key, mca_analyser_index): return False # nonexistent motor name motor_name = _get_motor_in_name(key) if motor_name is not None: if not _motor_in_scan(self.file._sf, scan_key, motor_name): return False # nonexistent data column column_label = _get_data_column_label_in_name(key) if column_label is not None: if not _column_label_in_scan(self.file._sf, scan_key, column_label): return False if key.endswith("preset_time") or\ key.endswith("elapsed_time") or\ key.endswith("live_time"): return "CTIME" in self.file._sf[scan_key].mca_header_dict # header, title, start_time, existing scan/mca/motor/measurement return True
def __eq__(self, other): return (isinstance(other, SpecH5Group) and self.name == other.name and self.file.filename == other.file.filename and self.keys() == other.keys())
[docs] def get(self, name, default=None, getclass=False, getlink=False): """Retrieve an item by name, or a default value if name does not point to an existing item. :param name str: name of the item :param default: Default value returned if the name is not found :param bool getclass: if *True*, the returned object is the class of the item, instead of the item instance. :param bool getlink: Not implemented. This method always returns an instance of the original class of the requested item (or just the class, if *getclass* is *True*) :return: The requested item, or its class if *getclass* is *True*, or the specified *default* value if the group does not contain an item with the requested name. """ if name not in self: return default if getlink and getclass: pass if getclass: return self[name].h5py_class return self[name]
[docs] def __getitem__(self, key): """Return a :class:`SpecH5Group` or a :class:`SpecH5Dataset` if ``key`` is a valid name of a group or dataset. ``key`` can be a member of ``self.keys()``, i.e. an immediate child of the group, or a path reaching into subgroups (e.g. ``"instrument/positioners"``) In the special case were this group is the root group, ``key`` can start with a ``/`` character. :param key: Name of member :type key: str :raise: KeyError if ``key`` is not a known member of this group. """ # accept numbers for scan indices if isinstance(key, int): number = self.file._sf.number(key) order = self.file._sf.order(key) full_key = "/%d.%d" % (number, order) # Relative path starting from this group (e.g "mca_0/info") elif not key.startswith("/"): full_key = self.name.rstrip("/") + "/" + key # Absolute path called from the root group or from a parent group elif key.startswith(self.name): full_key = key # Absolute path to an element called from a non-parent group else: raise KeyError(key + " is not a child of " + self.__repr__()) if is_group(full_key): return SpecH5Group(full_key, self.file) elif is_dataset(full_key): return _dataset_builder(full_key, self.file, self) elif is_link_to_group(full_key): return SpecH5LinkToGroup(full_key, self.file) elif is_link_to_dataset(full_key): return _link_to_dataset_builder(full_key, self.file, self) else: raise KeyError("unrecognized group or dataset: " + full_key)
def __iter__(self): for key in self.keys(): yield key
[docs] def items(self): for key in self.keys(): yield key, self[key]
[docs] def __len__(self): """Return number of members,subgroups and datasets, attached to this group. """ return len(self.keys())
def __repr__(self): return '<SpecH5Group "%s" (%d members)>' % (self.name, len(self))
[docs] def keys(self): """:return: List of all names of members attached to this group """ # keys in hdf5 are unicode if self.name == "/": return self.file.keys() if scan_pattern.match(self.name): return static_items["scan"] if positioners_group_pattern.match(self.name): return self._scan.motor_names if specfile_group_pattern.match(self.name): return static_items["scan/instrument/specfile"] if measurement_mca_group_pattern.match(self.name): return static_items["scan/measurement/mca"] if instrument_mca_group_pattern.match(self.name): ret = static_items["scan/instrument/mca"] if "CTIME" in self._scan.mca_header_dict: return ret + [u"preset_time", u"elapsed_time", u"live_time"] return ret number_of_MCA_spectra = len(self._scan.mca) number_of_data_lines = self._scan.data.shape[1] if not number_of_data_lines == 0: # Number of MCA spectra must be a multiple of number of data lines assert number_of_MCA_spectra % number_of_data_lines == 0 number_of_MCA_analysers = number_of_MCA_spectra // number_of_data_lines mca_list = ["mca_%d" % i for i in range(number_of_MCA_analysers)] if measurement_group_pattern.match(self.name): return self._scan.labels + mca_list else: mca_list = [] # no data: no measurement if measurement_group_pattern.match(self.name): return [] if instrument_pattern.match(self.name): return static_items["scan/instrument"] + mca_list
[docs] def visit(self, func): """Recursively visit all names in this group and subgroups. :param func: Callable (function, method or callable object) :type func: function You supply a callable (function, method or callable object); it will be called exactly once for each link in this group and every group below it. Your callable must conform to the signature: ``func(<member name>) => <None or return value>`` Returning ``None`` continues iteration, returning anything else stops and immediately returns that value from the visit method. No particular order of iteration within groups is guaranteed. Example: .. code-block:: python # Get a list of all contents (groups and datasets) in a SpecFile mylist = [] f = File('foo.dat') f.visit(mylist.append) """ for member_name in self.keys(): member = self[member_name] ret = None if not is_link_to_dataset(member.name) and\ not is_link_to_group(member.name): ret = func(member.name) if ret is not None: return ret # recurse into subgroups if isinstance(self[member_name], SpecH5Group) and\ not isinstance(self[member_name], SpecH5LinkToGroup): self[member_name].visit(func)
[docs] def visititems(self, func): """Recursively visit names and objects in this group. :param func: Callable (function, method or callable object) :type func: function You supply a callable (function, method or callable object); it will be called exactly once for each link in this group and every group below it. Your callable must conform to the signature: ``func(<member name>, <object>) => <None or return value>`` Returning ``None`` continues iteration, returning anything else stops and immediately returns that value from the visit method. No particular order of iteration within groups is guaranteed. Example: .. code-block:: python # Get a list of all datasets in a specific scan mylist = [] def func(name, obj): if isinstance(obj, SpecH5Dataset): mylist.append(name) f = File('foo.dat') f["1.1"].visititems(func) """ for member_name in self.keys(): member = self[member_name] ret = None if not is_link_to_dataset(member.name): ret = func(member.name, member) if ret is not None: return ret # recurse into subgroups if isinstance(self[member_name], SpecH5Group) and\ not isinstance(self[member_name], SpecH5LinkToGroup): self[member_name].visititems(func)
[docs]class SpecH5LinkToGroup(SpecH5Group): """Special :class:`SpecH5Group` representing a link to a group. It works exactly like a regular group but :meth:`SpecH5Group.visit` and :meth:`SpecH5Group.visititems` methods will recognize it as a link and will ignore it. """
[docs] def keys(self): """:return: List of all names of members attached to the target group """ # we only have a single type of link to a group: # /1.1/measurement/mca_0/info/ -> /1.1/instrument/mca_0/ if measurement_mca_info_pattern.match(self.name): link_target = self.name.replace("measurement", "instrument").rstrip("/")[:-4] return SpecH5Group(link_target, self.file).keys()
[docs]class SpecH5(SpecH5Group): """Special :class:`SpecH5Group` representing the root of a SpecFile. :param filename: Path to SpecFile in filesystem :type filename: str In addition to all generic :class:`SpecH5Group` attributes, this class also keeps a reference to the original :class:`SpecFile` object and has a :attr:`filename` attribute. Its immediate children are scans, but it also gives access to any group or dataset in the entire SpecFile tree by specifying the full path. """ def __init__(self, filename): self.filename = filename self.attrs = _get_attrs_dict("/") self._sf = SpecFile(filename) SpecH5Group.__init__(self, name="/", specfileh5=self) if len(self) == 0: # SpecFile library do not raise exception for non specfiles raise IOError("Empty specfile. Not a valid spec format.")
[docs] def keys(self): """ :return: List of all scan keys in this SpecFile (e.g. ``["1.1", "2.1"…]``) """ return self._sf.keys()
[docs] def close(self): """Close the object, and free up associated resources. After calling this method, attempts to use the object may fail. """ self._sf = None
def __repr__(self): return '<SpecH5 "%s" (%d members)>' % (self.filename, len(self)) def __eq__(self, other): return (isinstance(other, SpecH5) and self.filename == other.filename and self.keys() == other.keys()) @property
[docs] def h5py_class(self): """h5py class which is mimicked by this class""" if h5py is None: raise ImportError("Cannot return h5py.File class, " + "unable to import h5py module") return h5py.File