# -*- coding: utf-8 -*- """ Classes for the handling the fields. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import warnings import logging import copy from collections import OrderedDict from typing import (Any, Dict, Hashable, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Tuple, Union, cast) import numpy import xarray as xr from cerbere.datamodel.variable import Variable import cerbere.cfconvention as cf __all__ = ['Field'] FIELD_ATTRS = [ 'standard_name', 'authority', 'units', 'valid_min', 'valid_max', '_FillValue' ] FIELD_EXCL_ATTRS = [ 'long_name' ] class Field(object): """A Field describes a scientific data array. It contains data and metadata attributes. This is an extension of xarray's DataArray with stricter requirements on attributes. A :class:`Field` object can be constructed with: * a xarray :class:`~xarray.DataArray` object, provided in ``array`` A :class:`Field` object can be attached to a :class:`~cerbere.dataset.dataset.Dataset` object (of any class inherited from a :class:`~cerbere.dataset.dataset.Dataset`, provided with the ``dataset`` argument. Args: data: the scientific data. Can be of the following types: * :class:`numpy.ma.MaskedArray` * :class:`xarray.DataArray` Optional (the field can be created with default values) name: the label of the field (don't use any white space). This corresponds to the variable name in a netcdf file dims (list): the scientific array dimension names datatype: the type of the data. Infer the type from the provided data by default. precision (int, optional): number of significant digits (when writing the data on file) fields : the subfields composing the main field. This is intended to group for instance components of the same variable (such as a vector's northward and eastward components for wind, currents,...). This allows to relate these components to the same physical variable (e.g wind) and to a single qc_levels and qc_details information. fillvalue : the default value to associate with missing data in the field's values. The fillvalue must be of the same type as `datatype` and `values` attrs (dict) : a dictionary of the metadata associated with the field's values. units : the units in which the data are given (if applicable) description (str) : full name of the phenomenon. This corresponds to a long_name in attribute in a netCDF file. authority (str): naming authority referencing the provided standard name standard_name (optional, str): standard label for a phenomenon, with respect to the convention stated in `authority` argument. This corresponds to a standard_name attribute in a CF compliant NetCDF file. quality_vars: list of related quality fields """ def __init__(self, data, name: Optional[str] = None, dims: Optional[Tuple] = None, datatype: Optional[numpy.dtype] = None, fields: Optional[Tuple['Field']] = None, dataset: Optional['Dataset'] = None, fillvalue: Optional[Any] = None, precision: Optional[int] = None, description: Optional[str] = None, standard_name: Optional[Union[str, Tuple[str, str]]] = None, units: Optional[str] = None, quality_vars: Optional[List[str]] = None, attrs: Optional[Mapping[str, Any]] = None, **kwargs) -> None: """ """ if name is not None and not isinstance(name, str): raise TypeError('name must be a string') if isinstance(data, xr.DataArray): self.array = data else: # create the DataArray from the provided information # dimensions if isinstance(dims, (list, tuple)): dims = tuple(dims) elif dims is None: dims = () else: raise TypeError("Wrong type for dimensions") if data is None: # create default array if datatype is None: raise ValueError( "If you don't provide any data, you must at least " "provide a datatype" ) if not isinstance(dims, OrderedDict): raise TypeError( "dimensions should be provided with their size in a " "OrderedDict" ) data = numpy.ma.masked_all( tuple(dims.values()), datatype) else: data = data # instantiate the xarray representation kwargs['dims'] = list(dims) kwargs['attrs'] = attrs self.array = xr.DataArray( data, # dims=list(dims), # attrs=attrs, name=name, **kwargs ) # Overrides DataArray object when conflicts with the superceding # arguments self.name = name self.standard_name = standard_name self.description = description self.fill_value = fillvalue self.units = units self.array.attrs['quality_vars'] = quality_vars # components for complex fields if fields is not None: # Add components in case of a composite field : # each of these components must be itself a field for fld in fields: if not isinstance(fld, Field): raise TypeError("Components must by Field class object") self.components = fields # attachment to a parent Dataset object self.dataset = dataset if (self.dataset is not None and self.array.name not in dataset._varnames): raise ValueError( "Field {} not found in this mapper".format(self.array.name) ) # @TODO self.handler ??? self.array.encoding['cerbere_status'] = "changed" @classmethod def to_field(cls, data: xr.DataArray) -> 'Field': """Cast a xarray DataArray to a :class:`cerbere.datamodel.field.Field` object """ return Field(data=data) @property def to_dataarray(self): """Return the field values a xarray DataArray""" if self.dataset is None: return self.array else: return self.dataset.get_values( self.array.name, as_masked_array=False ) def __str__(self): result = "Field : '%s'\n" % self.array.name if ('long_name' in self.array.attrs and self.array.attrs['long_name'] is not None): result += ' {}\n'.format(self.array.attrs['long_name']) result += '\n' # dimensions result = result + ' dimensions :\n' if self.dataset is None: dims = OrderedDict(self.array.sizes.items()) else: dims = self.dataset.get_field_dims(self.name) for dim, size in dims.items(): result += ' # {} : {}\n'.format(dim, size) attrs = self.array.attrs.items() # standard attributes result = result + ' standard CF attributes :\n' for att, val in attrs: if att in FIELD_ATTRS: result += ' # {} : {}\n'.format(att, val) # free form attributes result = result + ' other attributes :\n' for att, val in attrs: if att not in FIELD_ATTRS and att not in FIELD_EXCL_ATTRS: result = result + ' # {} : {}\n'.format(att, val) return result @property def name(self): return self.array.name @name.setter def name(self, value): self.array.name = value @property def attrs(self): return self.array.attrs @attrs.setter def attrs(self, attrs): self.array.attrs = attrs @property def dims(self): if self.dataset is None: return tuple(self.array.dims) else: return self.dataset.get_field_dims(self.name) @dims.setter def dims(self, dims): if self.dataset is None: self.array.dims = dims else: self.dataset.set_dimensions(dims) @property def dimnames(self): return tuple(self.dims.keys()) def get_dimsize(self, dimname) -> int: """Return the size of a field dimension""" if self.dataset is None: return self.array.sizes[dimname] else: return self.dataset.get_dimsize(dimname) @property def fill_value(self): """return the value for missing data""" try: return self.array.encoding['_FillValue'] except KeyError: return None @fill_value.setter def fill_value(self, fill_value): """set the value for missing data""" self.array.encoding['_FillValue'] = fill_value @property def valid_min(self): """return the minimum valid value""" try: return self.array.attrs['valid_min'] except KeyError: return None @valid_min.setter def valid_min(self, value): """set the minimum valid value""" self.array.attrs['valid_min'] = value @property def valid_max(self): """return the maximum valid value""" try: return self.array.attrs['valid_max'] except KeyError: return None @valid_max.setter def valid_max(self, value): """set the maximum valid value""" self.array.attrs['valid_max'] = value @property def units(self) -> str: """return the field units (``units`` CF attribute)""" try: return self.array.attrs['units'] except KeyError: return @units.setter def units(self, units: str): """set the variable units (``units`` CF attribute)""" self.array.attrs['units'] = units @property def description(self): """return the field description (``long_name`` CF attribute)""" try: return self.array.attrs['long_name'] except KeyError: return None @units.setter def description(self, description: str) -> None: """set the field description (``long_name`` CF attribute)""" self.array.attrs['long_name'] = description @property def standard_name(self) -> str: """return the field standard name (``standard_name`` CF attribute)""" try: return ( self.array.attrs['standard_name'], self.array.attrs['cf_authority'] ) except KeyError: return None @units.setter def standard_name(self, standard_name: str) -> None: """set the standard_name (``standard_name`` CF attribute)""" if isinstance(standard_name, tuple): self.array.attrs['standard_name'] = standard_name[0] self.array.attrs['authority'] = standard_name[1] elif standard_name is not None: self.array.attrs['standard_name'] = standard_name[0] self.array.attrs['authority'] = cf.CF_AUTHORITY else: self.array.attrs['standard_name'] = None self.array.attrs['authority'] = None @property def datatype(self): if self.dataset is None: return self.array.dtype else: return self.dataset.dataset[self.name].dtype @property def variable(self): """return the field variable definition""" var = Variable(self.array.name) if 'long_name' in self.array.attrs: var.description = self.array.attrs['long_name'] if 'standard_name' in self.array.attrs: var.standardname = self.array.attrs['standard_name'] try: var.authority = self.array.attrs['authority'] except KeyError: logging.error( "No authority attribute defined for standard name: {}" .format(var.standardname) ) return var @variable.setter def variable(self, variable): """set the field variable definition""" self.array.name = variable.shortname self.array.attrs['long_name'] = variable.description self.array.attrs['authority'] = variable.authority self.array.attrs['standard_name'] = variable.standardname def is_composite(self) -> bool: """ True if the field is a composite field. A composite field is a composition of sub fields (vector components, real and imaginary part of a complex, ...) """ return self.components is not None @property def components(self): """Return the list of components of the field. Components (or sub fields) are intended for non scalar fields (ex: vector like current or wind, real and imaginary part of a complex,...) """ res = [] if self.is_composite(): for rec in self.components: res.extend(rec.get_components()) return res else: return [self] def _attach_dataset( self, dataset: 'Dataset' ) -> None: """Attach a parent dataset to the field. cerbere uses lazy loading. Data are not actually loaded until they are explicitly requested (calling :func:`get_values`). This pointer to the parent dataset, set by this function, is used to locate where the data have to be read from (or saved later for a new or updated file). Args: handler(:class:`Dataset`): the parent dataset attached to the field. """ self.dataset = dataset def get_values( self, index: Optional[Mapping[str, slice]] = None, padding: Optional[bool] = False, **kwargs) -> 'np.ma.MaskedArray': """ Return the field values as a :class:`numpy.ma.MaskedArray` object. Args: index: any kind of xarray indexing compatible with xarray :func:`~xarray.DataArray.isel` selection method. padding: pad the result with fill values where slices are out of the field dimension limits. Default is False. Returns: The field values as a numpy MaskedArray """ return self._get_values( index=index, padding=padding, as_masked_array=True, **kwargs ) def get_xvalues( self, index: Optional[Mapping[str, slice]] = None, padding: Optional[bool] = False, **kwargs) -> 'xr.DataArray': """ Return the field values as a :class:`xarray.DataArray` object. Args: index: any kind of xarray indexing compatible with :func:`xarray.DataArray.isel` selection method. padding: pad the result with fill values where slices are out of the field dimension limits. Default is False. Returns: The field values as a xarray DataArray """ return self._get_values( index=index, padding=padding, as_masked_array=False, **kwargs ) def _get_values( self, index=None, padding=False, as_masked_array=True, **kwargs): allkwargs = { 'index': index, 'padding': padding, 'as_masked_array': as_masked_array, **kwargs } if self.dataset is None: return numpy.ma.array(self._read_dataarray(self.array, **allkwargs)) else: return self.dataset.get_values(self.name, **allkwargs) @classmethod def _read_dataarray( cls, xrdata, index: Mapping[Hashable, Any]=None, padding: bool=False, as_masked_array: bool=True, **kwargs ): """ Extract values as from a xarray DataArray object. Values are returned as a numpy ``MaskedArray`` or xarray ``DataArray`` object. Args: xrdata (:class:`xarray.DataArray`): the xarray ``DataArray`` object from which to extract the values. index (dict, optional): any kind of xarray indexing compatible with ``isel`` selection method. as_masked_array (bool): return the result as a numpy masked array (by default), or as a xarray if set to False. padding (bool): if True, pad the result with fill values where slices are out of the field size. Returns: :class:`numpy.ma.MaskedArray` or :class:`xarray.DataArray`: the data as a numpy ``MaskedArray``, if ``as_masked_array`` is set, or as a xarray ``DataArray`` object """ if as_masked_array: data = xrdata.isel(index).to_masked_array(copy=False) else: data = xrdata.isel(index).values() if padding: data = cls._pad_data(xrdata, data, index) return data @classmethod def _pad_data( cls, array: 'to_dataarray.core.dataset.Dataset', subset: 'to_dataarray.core.dataset.Dataset', index: Optional[Mapping[str, slice]] ) -> 'numpy.ndarray': """ pad with fill values the ``subset`` array extracted from ``array`` where ``index`` is beyond the limits of ``array``. """ pad_edges = [] for dim in list(array.dims): if dim in index: dslice = index[dim] pad_edges.append([ abs(min(0, dslice.start)), abs(min(0, array.sizes[dim] - dslice.stop)) ]) else: pad_edges.append([0, 0]) res = numpy.pad( subset, pad_edges, 'constant', constant_values=numpy.nan ) if isinstance(subset, numpy.ma.MaskedArray): res = numpy.ma.fix_invalid(res, copy=False) return res def set_values( self, values: numpy.ndarray, index: Optional[Mapping[str, slice]] = None, **kwargs) -> None: """set the values of a field. It is possible to set only a subset of the field data array, using ``index``: >>> import numpy as np >>> data = np.ma.zeros((100, 200)) >>> field = Field(data, name='test', dims=('x', 'y')) >>> field.set_values( >>> np.full((10, 5,), 2), >>> {'x': slice(10, 20), 'y': slice(0, 5)} >>> ) Args: values: the values to replace the ones in the field index: a dict of slices or indices of the subset to replace in the current field data array """ if self.dataset is None: self._set_xrvalues(self.array, values, index=index) else: self.dataset.set_values(self.name, values, index=index) @classmethod def _set_xrvalues( cls, xrdata, values, index=None ): if index is None: xrdata.values[:] = values else: tmp = [] for dim in xrdata.dims: if dim not in index: tmp.append(slice(None)) else: tmp.append(index[dim]) subset = tmp xrdata[tuple(subset)] = values def is_saved(self): """ Return True is the content of the field is saved on file and was not updated since """ if self.handler is None: return False return self.handler.is_saved() def bitmask_or( self, meanings, index: Mapping[Hashable, Any]=None, **kwargs): """helper function to get a boolean mask from a bit field. Bit (or flag) fields are arrays of integers where each bit has a specific meaning, described in a ``flag_meaning`` field attribute. Providing a list of the meanings to be tested in ``meaning``, a boolean mask if built, using the `or` logical operator, with a True value everywhere at least one of the provided meanings is set. The field must defined as in CF convention, with ``flags_masks`` and ``flag_meanings`` field attributes. Args: meanings(list or str): a list of the meanings that have to be set or a str if only one bit is tested. Available meanings are listed in the ``flag_meanings`` attribute of the field. Returns: A boolean array. """ if ('flag_meanings' not in self.array.attrs or 'flag_masks' not in self.array.attrs): raise ValueError( "This is not mask field. Either flag_meanings or flag_masks " "is missing." ) # transform mask attributes to list if it is not the case allmeanings = self.array.attrs['flag_meanings'] if isinstance(allmeanings, str): allmeanings = [_ for _ in allmeanings.split(' ') if _ != ''] allmasks = self.array.attrs['flag_masks'] if isinstance(allmasks, str): allmasks = [_ for _ in allmasks.split(' ') if _ != ''] criteria = meanings if isinstance(meanings, str): criteria = [meanings] # calculate sum (and) of all mask bits requested to be set masksum = 0 for criterium in criteria: try: bit = allmeanings.index(criterium) except: raise ValueError("Unknown flag meaning {}".format(criterium)) masksum += allmasks[bit] # calculate mask return self.get_values(slices=index) & int(masksum) != 0 @classmethod def compute( cls, func, field1: 'Field', field2: 'Field'=None, **kwargs) -> 'Field': """Apply a function to a field (possibly combining with a second one) and returns the result as a new field. The function may be for instance a numpy MaskedArray operator such as numpy.ma.anom, numpy.ma.corr,... To be used with caution. Args: func (function) : the function to be called (ex: numpy.ma.anom) field1 (Field) : the field argument to the function field2 (Field, optional) : an optional 2nd field argument to the function kwargs : any argument to Field creation further describing the returned Field (units, name, ...). Returns: Field: the result field """ if 'name' not in kwargs: varname = 'result' if field2 is None: values = func(field1.get_values()) else: values = func(field1.get_values(), field2.get_values()) field = Field(data=values, name=varname, dims=copy.copy(field1.dims), datatype=field1.datatype, fillvalue=field1.fill_value, **kwargs) return field def clone( self, index: Mapping[Hashable, Any] = None, padding: bool = False, prefix: str = None, **kwargs) -> 'Field': """Create a copy of a field, or a subset defined by index, and padding out as required. The returned field does not contain any attachment to the source file attached to the original field, if any. Args: index (dict, optional):any kind of xarray indexing compatible with xarray :func:`~xarray.DataArray.isel` selection method. padding (bool, optional): True to pad out feature with fill values to the extent of the dimensions. prefix (str, optional): add a prefix string to the field names of the extracted subset. """ if index is None: new_field = Field(data=self.array.copy(deep=True)) else: new_index = { dim: val for dim, val in index.items() if dim in self.array.dims } subarray = self.array[new_index] if padding: data = self.__pad_data(subarray.values, new_index) subarray.set_values(data) new_field = Field(data=subarray.copy(deep=True)) # detach from any dataset new_field.dataset = None if prefix is not None: new_field.set_name(prefix + new_field.name) return new_field def rename(self, newname: str) -> None: """Rename the field inplace. Args: newname (str): new name of the field """ if self._mapper is not None: if self.name not in self._mapper.get_fieldnames(): raise ValueError("Field {} not existing".format(self.name)) self.dataset = self.dataset.rename({self.name: newname}) self.name = newname def __add__(self, other: 'Field') -> 'Field': """Return a new field with the sum of current and an other field.""" res = Field.convert_from_xarray(self.xrdata + other.xrdata) res.xrdata.name = "{}_{}_sum".format(self.name, other.name) return res def __sub__(self, other: 'Field') -> 'Field': """Return a new field with the difference of current and an other field. """ res = Field.convert_from_xarray(self.xrdata - other.xrdata) res.xrdata.name = "{}_{}_difference".format(self.name, other.name) return res def module(u, v, variable=None): """Return the module field from its two components The module is sqrt(u² + v²) Args: u (Field) : the eastward component v (Field) : the northward component variable (Variable) : variable of the returned module field. If not provided, the returned field is created with a basic variable definition. Returns: Field: the module field """ values = numpy.ma.sqrt(numpy.ma.power(u.get_values(), 2) + numpy.ma.power(v.get_values(), 2) ) if variable is None: if 'eastward_' in u.variable.shortname: varname = u.variable.shortname.replace('eastward_', '') else: varname = 'module' variable = Variable(varname) field = Field(variable, dims=copy.copy(u.dimensions), datatype=u.datatype, fillvalue=u.fill_value, values=values, units=u.units) return field