Pipeline API Docs

Contents

Pipeline API Docs#

pipeline#

class pyearthtools.pipeline.Sampler#

Base level Sampler

All sampler classes must implement this class, and provide generator, which should act as a generator.

See DefaultSampler for an example, and the process to make a sampler.

abstractmethod generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.Pipeline(*steps, iterator=None, sampler=None, exceptions_to_ignore=None, max_exception_count=-1, name=None, **kwargs)#

Core of pyearthtools.pipeline,

Provides a way to set a sequence of operations to be applied to samples / data retrieved from pyearthtools.data.

Examples

>>> python
>>> pipeline = pyearthtools.pipeline.Pipeline(
>>>     pyearthtools.data.download.cds.ERA5('tcwv'),
>>>     pyearthtools.pipeline.operations.xarray.conversion.ToNumpy()
>>>     )
>>> pipeline['2000-01-01T00]

Usage:

A Pipeline can be used in three primary ways.

  1. Direct Indexing with pipeline[idx]

  2. Iteration with for i in pipeline

  3. Applying to individual data objects with pipeline.apply

Create Pipeline of operations to run on samples of data.

The steps will be run in order of inclusion.

Branches

If a tuple within the steps is encountered, it will be interpreted as a BranchingPoint, with each element in the tuple a seperate Pipeline of it’s own right. Therefore to have a BranchingPoint with each branch containing multiple steps, a nested tuple is needed.

E.g. (pseudocode)

>>> Pipeline(
>>>     Index,
>>>     (Operation_1, Operation_2)
>>> )

This will cause samples to be retrieved from Index and each of the operations run on the sample.

The result will follow the form of: (Operation_1 on Index, Operation_2 on Index)

If a branch consists of multiple operations, the nested tuples must be used.

E.g. (pseudocode)

>>> Pipeline(
>>>     Index,
>>>     ((Operation_1, Operation_1pt2), Operation_2)
    )
This will cause samples to be retrieved from `Index` and each of the operations run on the `sample`.
The result will follow the form of:
    `(Operation_1 + Operation_1pt2 on Index, Operation_2 on Index)`

A BranchingPoint by default will cause each branch to be run seperately, and a tuple returned with the results of each branch. However, if ‘map’ is included in the BranchingPoint tuple, it will be mapped across elements in the incoming sample.

Mapping

E.g. (pseudocode)

>>> Pipeline(
>>>     Index,
>>>     ((Operation_1, Operation_1pt2), Operation_2, 'map')
>>> )

This will cause samples to be retrieved from Index and the operations to be mapped to the sample.

The result will follow the form of (Operation_1 + Operation_1pt2 on Index[0], Operation_2 on Index[1])

‘map_copy’ can be used to copy the branch to the number of elements in the sample without having to manually specify each branch.

Indexes in Branches

Indexes can also be included in branches, which behaviour as expected, where the sample is retrieved rather than operations applied.

E.g. (pseudocode)

>>> Pipeline(
>>>     Index,
>>>     (Operation_1, Operation_2, Index)
>>> )

Transforms

Transforms from pyearthtools.data can be added directly inline in a pipeline, and will be applied on the forward pass. If they need to be applied on undo, or on both see, pyearthtools.pipeline.operations.Transforms

Parameters:
  • *steps (Union[VALID_PIPELINE_TYPES, _Pipeline, PipelineIndex, tuple[Union[VALID_PIPELINE_TYPES, Literal['map', 'map_copy']], ...]]) – Steps of the pipeline. Can include tuples to refer to branches.

  • iterator (Optional[Union[iterators.Iterator, tuple[iterators.Iterator, ...]]]) – Iterator to use to retrieve samples when the Pipeline is being iterated over.

  • sampler (Optional[Union[samplers.Sampler, tuple[samplers.Sampler, ...]]]) – Sampler to use to sample the samples when iterating. If not given will yield all samples. Can be used to randomly sample, drop out and more

  • exceptions_to_ignore (Optional[tuple[Union[str, Type[Exception]], ...]]) – Which exceptions to ignore when iterating. Defaults to None.

  • max_exception_count (int) – When “ignoring” exceptions, only ignore the first n such exceptions. Set to -1 for unlimited.

  • name (str | None) – Name of the pipeline, used in nested pipelines

apply(sample)#

Apply pipeline to sample

Pipeline should only consist of PipelineStep’s and Transforms, as Indexes cannot be applied,

as_steps()#

Get an indexable object to recreate pipeline with a subset of steps.

>>> pipeline.as_steps[:5]
property complete_steps: tuple#

Get all steps

property exceptions_to_ignore#

Sampler of Pipeline

property flattened_steps: tuple#

Flat tuple of steps contained within this PipelineIndex

get(idx)#

Get idx from Pipeline.

property get_and_catch#

Get indexable object like pipeline which will ignore any expections known to be ignored.

has_source()#

Determine if this Pipeline contains a source of data, or is just a sequence of operations.

Return type:

bool

index(id)#

Get index of id in Pipeline.

Parameters:

id (str | Type)

Return type:

int

property iteration_order: tuple[Any, ...]#

Get ordering from iterator

property iterator#

Iterator of Pipeline

property named: dict[str, Pipeline]#

Named sub-pipelines

classmethod sample(variables=None, iterator=None, sampler=None)#

Simple sample Pipeline for testing.

Parameters:
property sampler#

Sampler of Pipeline

save(path=None, only_steps=False)#

Save Pipeline

Parameters:
  • path (Optional[Union[str, Path]], optional) – File to save to. If not given return save str. Defaults to None.

  • only_steps (bool, optional) – Save only steps of the pipeline, dropping iterator, sampler, and exceptions_to_ignore.

Returns:

If path is None, pipeline in save form else None.

Return type:

(Union[str, None])

step(id: str | int | Type[Any] | Any, limit: None) Index | Pipeline | Operation#
step(id: str | int | Type[Any] | Any, limit: int) tuple[Index | Pipeline | Operation, ...]

Get step correspondent to id

If str flattens steps and retrieves the first limit found, otherwise if int, gets step at the idx

If limit is None, give back first found not in tuple, or if -1 return all.

Raises:

ValueError – If cannot find id in self.

Parameters:
  • id (str | int | Type[Any] | Any)

  • limit (int | None)

Return type:

Index | Pipeline | Operation | tuple[Index | Pipeline | Operation, …]

property steps: tuple[Index | PipelineStep | Transform | TransformCollection | tuple[Index | PipelineStep | Transform | TransformCollection, ...] | tuple[tuple, ...] | _Pipeline | tuple[Index | PipelineStep | Transform | TransformCollection | tuple[Index | PipelineStep | Transform | TransformCollection, ...] | tuple[tuple, ...], ...], ...]#

Steps of pipeline

undo(sample)#

Undo Pipeline on sample.

Reverses the steps and operations applied to the sample. Ideally this should result in the sample looking identical to the initial data.

## Examples:
>>> python
>>> pipeline = (
>>>     Index,
>>>     Operation1,
>>> )
>>> pipeline[1]
>>> pipeline.undo(pipeline[1])
class pyearthtools.pipeline.Operation(*, split_tuples=False, recursively_split_tuples=False, operation='both', recognised_types=None, response_on_type='exception', **kwargs)#

Pipeline Operation

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

property T#

Transposed Operation.

Swaps apply with undo so that this operation behaves inversely to normal.

apply(sample)#

Run the apply_func on sample, splitting tuples if needed

undo(sample)#

Run the undo_func on sample, splitting tuples if needed

class pyearthtools.pipeline.ReversedPipeline(forward_pipeline)#

Operation reversing the effect of pipeline

Applying this operation will undo the provided pipeline, while undoing this operation will apply the pipeline.

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • forward_pipeline (Pipeline)

class pyearthtools.pipeline.PipelineException#

General Pipeline Exception

class pyearthtools.pipeline.PipelineFilterException(sample, message='', *args)#

Pipeline Filter Exception

Indicates that the filter has detected an invalid sample.

FilterException

Parameters:
  • sample (Any) – Sample found to be invalid

  • message (str, optional) – Msg for the user. Defaults to “”.

class pyearthtools.pipeline.PipelineRuntimeError#

Pipeline Runtime Error

class pyearthtools.pipeline.PipelineTypeError#

Pipeline Type error

pipeline.branching#

class pyearthtools.pipeline.branching.PipelineBranchPoint(*steps)#

Branch Point in a Pipeline.

Can be anywhere in the pipeline, including at the start.

Special keys can be provided as part of the tuple to customise behaviour:
map:

Will not branch, but map elements of the sample which should be a tuple, to the associated ordered branch. Requires samples and branches to be of the same length.

map_copy:

Acts like map but will make copies of the pipelines to match incoming sample. Once one sample has been seen, and all copies made, becomes a map. If multiple branches, will use % to create more, so if it has has two pipelines and a length of 4 needed, would be pipeline index correspondant to >>> [0,1,0,1]

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • steps (Union[tuple[Union[Index, Pipeline, PipelineStep, Transform, TransformCollection, Literal['map', 'map_copy']], ...], Index, PipelineStep, Pipeline, Transform, TransformCollection])

apply(sample)#

Apply each branch on the sample

property complete_steps#

Get steps in pipeline

undo(sample)#

Undo the effects of the branches.

This will still result in a tuple, so a unify may be needed after.

If each branch executed a fully reversable operation, and originated from the same data source, each sample ‘should’ be identical.

class pyearthtools.pipeline.branching.Unifier#

Unify samples after a branching point on the undo operation.

Child class must supply check_validity, to determine if the samples can be unified,

and return an int which is used to select a sub_sample to be returned by undo.

If samples are not be unified, check_validity should return None.

Differs from Spliter as this is built only to eliminate the tuple created on the undo with a BranchingPoint.

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

abstractmethod check_validity(sample)#

Check if samples can be unified.

Raise a PipelineUnificationException if not be unified.

Parameters:

sample (tuple) – Sample’s

Returns:

Which sub_sample to be returned. Return None if invalid.

Return type:

(Union[None, int])

class pyearthtools.pipeline.branching.Joiner(*, split_tuples=True, recursively_split_tuples=False, recognised_types=None, response_on_type='exception')#

Join samples after a branching point.

Child class must implement join, and unjoin.

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

abstractmethod join(sample)#

Join method called on apply.

Parameters:

sample (tuple) – Sample to be joined

Returns:

Joined sample

Return type:

(Any)

abstractmethod unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.branching.Spliter(*, split_tuples=True, recursively_split_tuples=False, recognised_types=None, response_on_type='exception')#

Split samples.

Useful prior to PipelineBranchingPoint set up to map. Can therefore assign subsets of the sample to various branches.

Child class must implement split, and join.

Split samples into tuples

Parameters:
  • split_tuples (bool, optional) – Split tuples on split operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on split, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

abstractmethod join(sample)#

Join method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in split.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (tuple[Any, ...]) – Sample to be joined.

Returns:

Joined sample

Return type:

(Any)

abstractmethod split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

pipeline.filters#

class pyearthtools.pipeline.filters.Filter(split_tuples=True, recursively_split_tuples=True, recognised_types=None)#

Base Filter

Allows for samples to be skipped if found to be invalid.

filter should raise a PipelineFilterException if invalid.

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • recursively_split_tuples (bool)

abstractmethod filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Return type:

None

run(sample)#

Run filtering

class pyearthtools.pipeline.filters.FilterCheck(split_tuples=True, recursively_split_tuples=True, recognised_types=None)#

Subclass of Filter to automate exception raising,

Just needs check to return a bool.

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • recursively_split_tuples (bool)

filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Return type:

None

class pyearthtools.pipeline.filters.FilterWarningContext(max_exceptions=None)#

Filter Warning context

Will count how many PipelineFilterException have been thrown, and warn if over max_exceptions.

Parameters:

max_exceptions (int | None)

class pyearthtools.pipeline.filters.TypeFilter(valid_types, *, split_tuples=False)#

Filter if type is wrong

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • valid_types (tuple[Type] | Type)

filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Return type:

None

pipeline.iterators#

class pyearthtools.pipeline.iterators.Iterator#

Base level Iterator.

Provides the indexes from which to query the pipeline.

All Iterator classes must implement this class, and provide __iter__, which should act as a generator.

randomise(seed=42)#

Randomise this interator

Parameters:

seed (int | None)

property samples: tuple[Any, ...]#

Get tuple of samples returned by this Iterator.

class pyearthtools.pipeline.iterators.Range(min, max, step=1)#

Range based Iterator

Constructs a range object and yields all elements within.

Construct Range Iterator

Parameters:
  • min (int) – Minimum value of range

  • max (int) – Maximum value of range

  • step (int, optional) – Step of range. Defaults to 1.

class pyearthtools.pipeline.iterators.Predefined(indexes)#

Predefined Iterator

Takes any iterable as provided, and yields all elements within.

Construct PreDefined iterator

Parameters:

indexes (Iterable[Any]) – Iterable to get elements from

class pyearthtools.pipeline.iterators.File(file, type_conversion=None)#

Iterate over elements in file

Each line will be treated as a seperate index.

Iterate over file

Parameters:
  • file (Union[str, Path]) – File to load.

  • type_conversion (Optional[Callable], optional) – Function to convert lines in file with. Defaults to None.

class pyearthtools.pipeline.iterators.DateRange(start, end, interval, *, allowlist=None, blocklist=None)#

DateRange Iterator

Uses pyearthtools.data.TimeRange to create a range of times.

Construct DateRange Iterator

Parameters:
  • start (str) – Start time. Must be understandable by pyearthtools.data.Petdt.

  • end (str) – End time. Must be understandable by pyearthtools.data.Petdt.

  • interval (Any) – Interval between times. Must be understandable by pyearthtools.data.TimeDelta.

  • allowlist (Optional[Iterable[str]]) – A list of pyearthtools.data.Petdt which should be filtered to

  • blocklist (Optional[Iterable[str]]) – A list of pyearthtools.data.Petdt which should be skipped

Note

You cannot specify both a blocklist and an allowlist.

The entries in the blocklist and allowlist must be a complete list of exact dates. It is not possible to do fuzzy matching or range-based constraints at this stage. For example, if the underlying data is hourly, and a particular day needs to be skipped entirely, each hour of that day will need to be in the blocklist.

Examples

>>> known_bad = ['2015-01-01T06']
>>> iterator=DateRange(2015, 2016, interval='6 hours', blocklist=known_bad)
randomise(seed=42)#

Randomise this interator

Parameters:

seed (int | None)

class pyearthtools.pipeline.iterators.DateRangeLimit(start, interval, num)#

DataRange configured with the number of samples from start

Uses pyearthtools.data.TimeRange to create a range of times.

Construct DateRange with limit

Parameters:
  • start (str) – Start time

  • interval (Any) – Interval between times. Must be understandable by pyearthtools.data.TimeDelta.

  • num (int) – Number of total samples to iterate over.

class pyearthtools.pipeline.iterators.Randomise(iterator, seed=42)#

Wrap around another Iterator and randomly sample

Randomise iterator

Parameters:
  • iterator (Iterator) – Underlying Iterator to randomise.

  • seed (Union[int, None], optional) – Random selection seed. If None, will be random. Defaults to 42.

class pyearthtools.pipeline.iterators.SuperIterator(*iterators)#

Iterate over a sequence of iterators

Create SuperIterator

Parameters:

*iterators (Iterator) – Iterating is run sequentially, so order may be important.

class pyearthtools.pipeline.iterators.IterateResults(obj)#

A wrapper which informs pipeline, that the object in question should be iterated over, instead of yielded directly.

Allows all attrs and items to be retrieved from underlying object.

Construct a IterateResults

Parameters:

obj (Any) – Object that should be iterated over.

iterate_over_object()#

Iterate over underlying object

Return type:

Generator[Any, None, None]

pipeline.modifications#

class pyearthtools.pipeline.modifications.Cache(cache=None, pattern=None, *, pattern_kwargs={}, cache_validity='warn', save_kwargs=None, **kwargs)#

An pyearthtools.pipeline implementation of the CachingIndex from pyearthtools.data.

Allows for samples to be cached to disk when using the pipeline.

Will save according to the pattern and idx used to retrieve data.

Examples

>>> era_index = pyearthtools.data.archive.ERA5.sample()
>>> pipeline = pyearthtools.pipeline.Pipeline(
        era_index,
        pyearthtools.pipeline.pipelines.Cache('temp')
    )
>>> pipeline['2000-01-01T00'] # Data will be cached

Pipeline step to cache samples

Parameters:
  • cache (str | Path, optional) – Path to cache data to. Defaults to None.

  • pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use pattern_kwargs to initialise. Defaults to None.

  • pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.

  • cache_validity (Literal['trust','delete','warn','keep','override'], optional) – Behaviour of cache validity checking. | Value | Behaviour | | —– | ——— | | ‘trust’ | Trust the cache even if the hash is different | | ‘warn’ | Warn if the hash is different | | ‘keep’ | Keep the cache, and raise an exception if the hash is different | | ‘override’ | Override the cache data when generating data, removes the caching benefit. | | ‘delete’ | Delete the cache if the hash is different. Will ask for input, include ‘F’ to force. | Defaults to ‘warn’.

  • save_kwargs (dict[str, Any], optional) – Keywords arguments to pass to saving function. Defaults to None.

  • kwargs (Any, optional) – All other kwargs passed to pyearthtools.data.indexes.FunctionalCacheIndex.

property cache_hash_file: Path#

Get the hash file name

cache_validity()#

Check the cache validity according to cache_validity passed in __init__.

Return type:

bool

property global_override#

Get a context window in which data will be overwritten in all caches

property hash: str#

Get sha512 hash of underlying index

property override#

Get a context window in which data will be overwritten in the cache

property pipeline_save_file: Path#

Get the pipeline save file name

save_cache_hash()#

Attempt to make cache hash, if fails do nothing and try again later.

Will return bool indicating if saving hash was successfull.

True -> Valid hash False -> Invalid hash, either unable to write or different

Return type:

bool

save_pipeline()#

Attempt to make pipeline file, if fails do nothing and try again later.

class pyearthtools.pipeline.modifications.StaticCache(idx, cache, pattern=None, *, pattern_kwargs={}, cache_validity='warn', load_into_memory=False, **kwargs)#

Static Cache.

Mainly a convenience wrapper instead of using IdxOverride and Cache together.

Will override the index, and cache the result.

Static Cache

Parameters:
  • idx (Any) – Index to override with

  • cache (str | Path, optional) – Path to cache data to. Defaults to None.

  • pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use pattern_kwargs to initialise. Defaults to None.

  • pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.

  • cache_validity (Literal['trust','delete','warn','keep','override'], optional) – Behaviour of cache validity checking. | Value | Behaviour | | —– | ——— | | ‘trust’ | Trust the cache even if the hash is different | | ‘warn’ | Warn if the hash is different | | ‘keep’ | Keep the cache, and raise an exception if the hash is different | | ‘override’ | Override the cache data when generating data, removes the caching benefit. | | ‘delete’ | Delete the cache if the hash is different. Will ask for input, include ‘F’ to force. | Defaults to ‘warn’.

  • load_into_memory (bool) – Load sample into memory if it is a dask or xarray object. Defaults to False.

class pyearthtools.pipeline.modifications.MemCache(pattern=None, max_size=None, *, pattern_kwargs={}, **kwargs)#

An pyearthtools.pipeline implementation of the MemCache from pyearthtools.data.

Allows for samples to be cached to memory when using the pipeline.

Examples

>>> era_index = pyearthtools.data.archive.ERA5.sample()
>>> pipeline = pyearthtools.pipeline.Pipeline(
        era_index,
        pyearthtools.pipeline.pipelines.MemCache()
    )
>>> pipeline['2000-01-01T00'] # Data will be cached to memory

Pipeline step to cache samples

Parameters:
  • pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use pattern_kwargs to initialise. Defaults to None.

  • pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.

  • kwargs (Any, optional) – All other kwargs passed to pyearthtools.data.indexes.FunctionalMemCacheIndex.

  • max_size (Optional[str])

property global_override#

Get a context window in which data will be overwritten in all caches

property override#

Get a context window in which data will be overwritten in the cache

property size#

Size of in memory cache

class pyearthtools.pipeline.modifications.IdxModifier(modification, *extra_mods, merge=False, concat=False, merge_function=None, merge_kwargs=None)#

Modify index used in __getitem__, allows for multiple samples.

Examples

>>> pipeline = Pipeline(IdxModifier((0, 1)))
>>> pipeline[1] # Will get sample with (1, 2)

Index modification

Parameters:
  • modification (Union[Any, tuple[Union[Any, tuple[Any, ...]], ...]]) – Can be Any type, if tuple will map across elements.

  • merge (Union[bool, int]) – Merge retrieved tuple, must all be the same type. If int corresponds to how many layers to merge from the bottom up. If True, merge one layer.

  • concat (bool) – Whether to concat arrays instead of stack.

  • merge_function (Optional[Callable[[Any, ...], Any]]) – Override for function to use when merging.

  • merge_kwargs (Optional[dict[str, Any]]) – Optional extra kwargs for the merge function if merge.

  • extra_mods (Any)

Examples

>>> IdxModifier((0, 1))
... # Will get samples with (idx+0, idx+1)
>>> IdxModifier((0, (1, 2)))
... # Will get samples with (idx+0, (idx+1, idx+2))
>>> IdxModifier((0, (1, 2)), merge = 1)
... # Will get samples with (idx+0, merged(idx+1, idx+2))
>>> IdxModifier((0, (1, 2)), merge = True)
... # Will get samples with (idx+0, merged(idx+1, idx+2))
>>> IdxModifier((0, (1, 2)), merge = 2)
... # Will get samples with merged(idx+0, merged(idx+1, idx+2))
class pyearthtools.pipeline.modifications.IdxOverride(index)#

Override idx on any __getitem__ call

Parameters:

index (Any)

class pyearthtools.pipeline.modifications.TimeIdxModifier(modification, *extra_mods, **kwargs)#

IdxModifier which converts all modification’s to pyearthtools.data.TimeDelta

Modify idx but convert all modification’s to pyearthtools.data.TimeDelta

Parameters:
  • modification (Union[Any, tuple[Union[Any, tuple[Any, ...]], ...]]) – Expected to be TimeDelta compatible, or tuples of TimeDelta’s.

  • merge (Union[bool, int], optional) – Merge retrieved tuple, must all be the same type. If int corresponds to how many layers to merge from the bottom up. If True, merge one layer. Defaults to False.

  • merge_function (Optional[Callable], optional) – Override for function to use when merging. Defaults to None.

  • merge_kwargs (Optional[dict[str, Any]], optional) – Optional extra kwargs for the merge function if merge. Defaults to None.

  • extra_mods (Union[Any, tuple[Any, ...]])

class pyearthtools.pipeline.modifications.SequenceRetrieval(samples, *, merge_function=None, concat=False, merge_kwargs=None)#

Subclassing from IdxModifier, retrieve a sequence of samples based on rules.

Notes

Will attempt to stack samples, and may create a new 0 axis.

If samples is an int, then retrieve the idx originally asked for, and the sample offset by samples.

This will return ‘sorted’.

>>> SequenceRetrieval(1)[0]
... # Will get (0, 1)
>>> SequenceRetrieval(-1)[0]
... # Will get (-1, 0)
>>> SequenceRetrieval(-6)[0]
... # Will get (-6, 0)

If samples is a single-element iterable, it must be of length 2 or 3, with the third being optional. The idx being requested is first offset, then num_of_samples retrieved, merged where applicable. If a single sample is retrieved, it will not be in a tuple if cannot be merged.

>>> SequenceRetrieval((0, 3))[0]
... # Will get (0, 1, 2)
>>> SequenceRetrieval((-1, 2))[0]
... # Will get (-1, 0)
>>> SequenceRetrieval((2, 3))[0]
... # Will get (2,3,4)
>>> SequenceRetrieval((2, 3, 2))[0]
... # Will get (2,4,6)
>>> SequenceRetrieval((2, 1))[0]
... # Will get 2

If samples is of multiples element it can consist of either tuples or ints. A tuple in this sequence corresponds to the same as single element, and an int the next offset to retrieve a sample at.

These index adjustments are accumulated, so if a retrieval moves the marker 2 forwards, the next sampling config will operate from there.

Each config in the samples will be returned within its own tuple, merged where applicable.

>>> SequenceRetrieval((0, 3),(1, 2))[0]
... # Will get ((0, 1, 2), (3, 4))
>>> SequenceRetrieval((0, 3),1)[0]
... # Will get ((0, 1, 2), 3)
>>> SequenceRetrieval((0, 3),2)[0]
... # Will get ((0, 1, 2), 4)
>>> SequenceRetrieval((0, 3),(-1, 2))[0]
... # Will get ((0, 1, 2), (1, 2))
>>> SequenceRetrieval((0, 3),(-1, 1))[0]
... # Will get ((0, 1, 2), 1)

Sequence retrieval

Parameters:
  • samples (Union[int, tuple[Union[tuple[int, int], tuple[int, int, int], int], ...]]) – Configuration of samples to retrieve.

  • merge_function (Optional[Callable], optional) – Override for function to use when merging. Defaults to None.

  • merge_kwargs (Optional[dict[str, Any]], optional) – Optional extra kwargs for the merge function. Defaults to None.

  • concat (bool)

class pyearthtools.pipeline.modifications.TemporalRetrieval(samples, *, merge_function=None, concat=False, merge_kwargs=None, delta_unit=None)#

Retrieve a sequence of samples from SequenceRetrieval, but force all indexes to be an pyearthtools.data.Petdt.

Examples

>>> TemporalRetrieval(-6)['2000-01-01T12']
## Will get samples for ('2000-01-01T06' & '2000-01-01T12')
Parameters:
  • samples (Union[int, tuple[Union[tuple[int, ...], int], ...]]) – number of samples to fetch (negative for n-back, positive for n-forward)

  • delta_unit (Optional[str]) – e.g. “month” or “hour”

  • concat (bool) – whether to contact or merge

  • merge_function (Optional[Callable])

  • merge_kwargs (Optional[dict[str, Any]])

class pyearthtools.pipeline.modifications.TemporalWindow(*, prior_indexes, posterior_indexes, timedelta, merge_method=None)#

The purpose of this class is to provide the ability to perform sequence-to-sequence modelling from a data accessor or pipeline that was designed to produce single time steps (i.e. single samples).

The temporal window allows the specification of the ‘back window’ and the ‘forward window’, and will produce a binary branch.

For example, if the time steps are hourly, and the base pipeline can produce hours 1, 2, 3 … 10; then this Temporal Window can be used to produce sequence pairs like:

[1,2,3], [4],
[2,3,4], [5],
...
[7,8,9], [10]

or like:

[1,2], [3,4,5],
[2,3], [4,5,6],
...
[6,7], [8,9,10]

This provides a simpler interface than the TemporalRetrieval which is a more general alternative.

The window offsets are calculated not using positional indexing, but using calculated date-times based on the reference time and the specified timedelta to calculate each required index exactly. The handling of missing data is left to the underlying pipeline response to the retrieval of the calculated datetime.

The resultant sequences may be left unmerged (i.e. a list of retrieved results for each timetime) or merged (e.g. into an xarray along the time dimension). The default behaviour is to merge along the time dimension.

A custom merge method may be specified.

Parameters:
  • prior_indexes – Multiplied by the timedelta then applied to the reference date

  • posterior_indexes – Multiplied by the timedelta then applied to the reference date

  • timedelta – Typically the time step of the underlying data

  • merge_method – How to merge samples into a combined object

Examples:

>>> TemporalWindow(prior_indexes=[-3,-2,-1], posterior_indexes=[0], timedelta=timedelta, merge_method=merge_method)

(assuming xarray data) will result in a tuple of two datasets, the first with a time coordinate dimension of 3 time steps and the second with a time coordinate dimension of 1 time step.

pyearthtools.pipeline.modifications.idx_modification#

alias of <module ‘pyearthtools.pipeline.modifications.idx_modification’ from ‘/home/docs/checkouts/readthedocs.org/user_builds/pyearthtools/checkouts/latest/packages/pipeline/src/pyearthtools/pipeline/modifications/idx_modification.py’>

pipeline.operations#

class pyearthtools.pipeline.operations.Transforms(transforms=None, apply=None, undo=None)#

Run pyearthtools.data.Transforms within a Pipeline.

Run Transforms

If transforms given will run on both functions first, and then if also given apply and undo respectively.

Parameters:
  • transforms (Optional[TRANSFORM_TYPE], optional) – Transforms to run on both apply and undo. Defaults to None.

  • apply (Optional[TRANSFORM_TYPE], optional) – Transforms to run on apply. Defaults to None.

  • undo (Optional[TRANSFORM_TYPE], optional) – Transforms to run on undo. Defaults to None.

pipeline.operations.xarray#

class pyearthtools.pipeline.operations.xarray.Compute#

Compute xarray object

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

class pyearthtools.pipeline.operations.xarray.Merge(merge_kwargs=None)#

Merge a tuple of xarray object’s.

Currently can only undo this operation with xr.Dataset and xr.DataArray inputs.

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • merge_kwargs (dict[str, Any] | None)

join(sample)#

Join sample

Parameters:

sample (tuple[Dataset | DataArray, ...])

Return type:

Dataset

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.xarray.Concatenate(concat_dim, concat_kwargs=None)#

Concatenate a tuple of xarray object’s

Currently cannot undo this operation. Unjoining a sample returns the same sample.

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • concat_dim (str)

  • concat_kwargs (dict[str, Any] | None)

join(sample)#

Concat sample

Parameters:

sample (tuple[T, ...])

Return type:

T

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.xarray.Sort(order=None, strict=False)#

Sort Variables of an xarray object

Examples >>> Sort(order = [‘a’,’b’])

Sort xarray variables

Parameters:
  • order (list[str] | None) – Order to set vars to, if not given sort alphabetically, or add others to the end. Cannot be None if strict is True. Defaults to None.

  • strict (bool) – Forces all variables to be listed in order, and no extras given. Defaults to False.

apply_func(data)#

Sort an xarray object data variables into the given order

Parameters:

data (T) – xarray object to sort.

Returns:

Sorted dataset

Return type:

class pyearthtools.pipeline.operations.xarray.Chunk(chunk=None, operation='apply', **extra_chunk_kwargs)#

ReChunk xarray object

ReChunk xarray object

Parameters:
  • chunk (Optional[dict[str, int]], optional) – Chunk dictionary. coord: size. Defaults to None.

  • operation (Literal['apply', 'undo', 'both']) – When to apply rechunking. Defaults to ‘apply’.

  • **extra_chunk_kwargs (int) – Kwarg form of chunk.

class pyearthtools.pipeline.operations.xarray.RecodeCalendar#

Climate datasets often use the cftime module to index into data using non-standard calendars. This operation will recode the time coordinate of a dataset or data array to a standard timestamp For now, support only exists for recoding from Noleap to Timestamp

Record initialisation and store flags for processing

apply_func(data)#

Sort an xarray object data variables into the given order

Parameters:

data (T) – xarray object to sort.

Returns:

Sorted dataset

Return type:

class pyearthtools.pipeline.operations.xarray.AlignDates(to='start_of_month')#

Climate datasets often use the cftime module to index into data using non-standard calendars. This operation will recode the time coordinate of a dataset or data array to a standard timestamp For now, support only exists for recoding from Noleap to Timestamp

Record initialisation and store flags for processing

Parameters:

to – either “start_of_month” or a zero-padded string day-of-month

apply_func(data)#

Sort an xarray object data variables into the given order

Parameters:

data (T) – xarray object to sort.

Returns:

Sorted dataset

Return type:

class pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(reference_dataset=None, saved_records=None, run_parallel=False, warn=True)#

Convert xarray objects to np.ndarray’s

DataOperation to convert data to [np.array][numpy.ndarray]

If speed is needed without an undo, set run_parallel to True, and split the data into separate datasets as much as possible.

pyearthtools.pipeline.operations.xarray.split.OnVariables() can be useful here

Parameters:
  • reference_dataset (str | Path | None) – Reference dataset to run through numpy converter to initialise converter. Will be overwritten when this is given a dataset.

  • saved_records (str | Path | None) – Saved records to set numpy converter with. Will be overwritten when this is given a dataset.

  • run_parallel (bool) – Whether to run in parallel, will cause undo to fail without saved_records. If an undo pipeline is needed, set this to False.

  • warn (bool) – Whether to warn on invalid shape.

class pyearthtools.pipeline.operations.xarray.conversion.ToDask(warn=True)#

Convert xarray objects to pure dask arrays

Convert xarray object to dask and back.

Parameters:

warn (bool, optional) – Whether to warn on invalid shape. Defaults to True.

class pyearthtools.pipeline.operations.xarray.filters.XarrayFilter#

Xarray Filters

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

class pyearthtools.pipeline.operations.xarray.filters.DropAnyNan(variables=None)#

Filter to drop any data with nans when iterating.

Used to remove any bad data or data that is masked out.

Drop data with any nans

Parameters:

variables (list, optional) – Subset of variables to check. Defaults to None.

filter(sample)#

Check if any of the sample is nan

Parameters:

sample (xr.Dataset) – Sample to check

Raises:

(PipelineFilterException) – If sample contains one or more nan value

Return type:

None

class pyearthtools.pipeline.operations.xarray.filters.DropAllNan(variables=None)#

Filter to drop any data with all nans when iterating.

Used to remove any bad data or data that is masked out.

Drop data with all nans

Parameters:

variables (list, optional) – Subset of variables to check. Defaults to None.

filter(sample)#

Check if all of the sample is nan

Parameters:

sample (xr.Dataset) – Sample to check

Raises:
  • (PipelineFilterException) – If sample contains only nan values

  • (TypeError) – If sample is not an xr.DataArray or xr.Dataset

Return type:

None

class pyearthtools.pipeline.operations.xarray.filters.DropValue(value, percentage)#

Filter to drop data containing more than a given percentage of a value.

Can be used to trim out invalid data

Drop Data if number of elements equal to value are greater than percentage when iterating.

Parameters:
  • value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.

  • percentage (float) – Percentage of value of which an exceedance drops data

filter(sample)#

Check if all of the sample is nan

Parameters:

sample (np.ndarray) – Sample to check

Raises:

(PipelineFilterException) – If number of elements equal to value are greater than percentage

Return type:

None

class pyearthtools.pipeline.operations.xarray.filters.Shape(shape, split_tuples=False)#

Filter to drop data of incorrect shape

Used to ensure that incoming data is of the correct shape for later steps

Drop Data if shape does not match expected

Parameters:
  • tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape

  • ...] – Shape to match, either tuple of shapes for tupled data or direct shape

  • int]) – Shape to match, either tuple of shapes for tupled data or direct shape

  • split_tuples (bool, optional) – Whether to split tuples, if True, shape should not be a tuple of tuples

  • shape (tuple[tuple[int, ...] | int, ...])

filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Parameters:

sample (tuple[T, ...] | T)

class pyearthtools.pipeline.operations.xarray.reshape.Dimensions(dimensions, append=True, preserve_order=False)#

Reorder dimensions

Operation to reorder Dimensions of an [xarray][xarray] object.

Not all dims have to be supplied, will automatically add remaining dims, or if append == False, prepend extra dims.

Parameters:
  • dimensions (Union[str, list[str]]) – Specified order of dimensions to transpose dataset to

  • append (bool, optional) – Append extra dims, if false, prepend dims. Defaults to True.

  • preserve_order (bool, optional) – Whether to preserve the order of dims or on undo, also set to dimensions order. Defaults to False.

class pyearthtools.pipeline.operations.xarray.reshape.CoordinateFlatten(coordinate, skip_missing=False)#

Flatten a coordinate in a dataset into separate variables.

Flatten a coordinate in an xarray Dataset, putting the data at each value of the coordinate into a separate data variable.

The output data variables will be named “<old variable name><value of coordinate>”. For example, if the input Dataset has a variable “t” and it is flattened along the coordinate “pressure_level” which has values [100, 200, 500], then the output Dataset will have variables called t100, t200 and t500.

Parameters:
  • coordinate (Hashable) – Coordinate to flatten and expand on.

  • skip_missing (bool, optional) – Whether to skip data that does not have any of the listed coordinates. If True, will return such data unchanged. Defaults to False.

Raises:

ValueError – If coordinate not found in the dataset and skip_missing==False.

class pyearthtools.pipeline.operations.xarray.select.SelectDataset(variables, operation='apply')#

Operation to select a given set of variables from a [Dataset][xarray.Dataset]

Select variables from dataset

Parameters:
  • () (variables) – Variables to select

  • operation (Literal['apply', 'undo'], optional) – Operation to run on. Defaults to ‘apply’.

class pyearthtools.pipeline.operations.xarray.select.DropDataset(variables, operation='apply')#

DataOperation to drop a given set of variables from a [Dataset][xarray.Dataset]

Can be used to remove variables when undoing, if one was added as a pipeline step.

Drop variables from dataset

Parameters:
  • () (variables) – Variables to drop

  • operation (Literal['apply', 'undo'], optional) – Operation to run on. Defaults to ‘apply’.

class pyearthtools.pipeline.operations.xarray.select.SliceDataset(slices=None, **kwargs)#

Select a slice of an xarray object

Examples
>>> Slicer(slices = {'time': (0,10,2)}) # == .sel(time = slice(0,10,2))

Setup dataset slicer

Parameters:
  • slices (Optional[dict[str, tuple[Any, ...]]], optional) – Slice dictionary, must be key of dim in ds, and slice notation as value. Defaults to None.

  • kwargs (tuple, optional) – Keyword argument form of slices.

class pyearthtools.pipeline.operations.xarray.split.OnVariables(variables=None, merge_kwargs=None)#

Split xarray object’s on variables

Split on variables

Parameters:
  • variables (Optional[Union[tuple[Union[str, tuple[str, ...], list[str]], ...], list[str]]], optional) – Variable split. If tuple or list, will split into those tuples, with the associated list referencing the variables to split out. If not given, will split all variables into seperate items. Defaults to None.

  • merge_kwargs (Optional[dict[str, Any]], optional) – Kwargs needed for merge on the undo. Defaults to None.

join(sample)#

Join sample

Parameters:

sample (tuple[Dataset | DataArray, ...])

Return type:

Dataset

split(sample)#

Split sample

Parameters:

sample (Dataset)

Return type:

tuple[Dataset, …]

class pyearthtools.pipeline.operations.xarray.split.OnCoordinate(coordinate, merge_kwargs=None)#

Split xarray object on coordinate

Split samples into tuples

Parameters:
  • split_tuples (bool, optional) – Split tuples on split operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on split, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • coordinate (str)

  • merge_kwargs (dict[str, Any] | None)

join(sample)#

Join method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in split.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (tuple[Any, ...]) – Sample to be joined.

Returns:

Joined sample

Return type:

(Any)

split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

class pyearthtools.pipeline.operations.xarray.values.FillNan(nan=0, posinf=None, neginf=None)#

Fill any Nan’s with a value

DataOperation to fill Nan’s

Parameters:
  • nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.

  • posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.

  • neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.

class pyearthtools.pipeline.operations.xarray.values.MaskValue(value, operation='==', replacement_value=nan)#

DataOperation to mask values with a given replacement

Operation to Mask Values

Parameters:
  • value (Union[float, dict[str, float]]) – Value to search for.

  • operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.

  • replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.

Raises:

KeyError – If invalid operation passed.

apply_func(sample)#

Mask Data from initialised configuration

Parameters:

sample (T) – Data to apply mask to

Returns:

Masked Data

Return type:

class pyearthtools.pipeline.operations.xarray.values.Clip(min_value=0, max_value=1)#

Operation to force data to be within a certain range, by default 0 & 1

Force data into a specified range

Parameters:
  • min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.

  • max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.

class pyearthtools.pipeline.operations.xarray.values.Derive(derivation=None, *, drop=True, **derivations)#

Derive variables within the dataset

Uses pyearthtools.data.transforms.derive.

Derivation step

Parameters:
  • derivation (Optional[dict[str, Union[str, tuple[str, dict[str, Any]]]]], optional) – Equation configuration. If str, equation is evaluated. If tuple, first element is assumed to be equation, and the second a dictionary to update the new vars attributes with. Defaults to None.

  • drop (bool, optional) – Drop derived variables on undo. Defaults to True.

  • **derivations (Union[str, tuple[str, dict[str, Any]]]) – Kwarg form of derivation.

class pyearthtools.pipeline.operations.xarray.metadata.Rename(rename, **rename_kwargs)#

Rename variables in an xr.Dataset.

Rename variables in an xr.Dataset

Parameters:

rename (Optional[dict[str, str]]) – Name conversion dictionary

class pyearthtools.pipeline.operations.xarray.metadata.Encoding(encoding, operation='both')#

Set encoding on xarray objects

Set encoding on xarray objects

Parameters:
  • encoding (dict[str, dict[str, Any]]) – Variable value pairs assigning encoding to the given variable. Can set key to ‘all’ to apply to all variables. Defaults to None.

  • operation (Literal['apply', 'undo', 'both'], optional) – When to apply encoding setting. Defaults to “both”.

class pyearthtools.pipeline.operations.xarray.metadata.MaintainEncoding(reference=None, limit=None)#

Maintain encoding of samples from apply to undo.

If apply not called before undo, this will do nothing.

Parameters:
  • Optional[str] (reference) – Reference dataset to get encoding from. If not given will use first sample on apply.

  • optional) – Reference dataset to get encoding from. If not given will use first sample on apply.

  • limit (Optional[list[str]], optional) – When getting encoding from reference object, limit the retrieved encoding. If not given will get ['units', 'dtype', 'calendar', '_FillValue', 'scale_factor', 'add_offset', 'missing_value']. Defaults to None.

  • reference (str | None)

class pyearthtools.pipeline.operations.xarray.metadata.Attributes(attributes, apply_on='dataset', operation='both')#

Set attributes on xarray objects

Set attributes on xarray objects

Parameters:
  • attrs (dict[str, Any] | None) – Attributes to set, key: value pairs. Set apply_on to choose where attributes are applied. | Key | Description | | — | ———– | | dataset | Attributes updated on dataset | | dataarray | If applied on a dataset, update each dataarray inside the dataset | | both | Do both above | | per_variable | Treat attrs as a dictionary of dictionaries, applying on dataarray if in dataset. | Defaults to None.

  • apply_on (Literal['dataset', 'dataarray', 'both'], optional) – On what type to update attributes. Defaults to ‘dataset’.

  • operation (Literal['apply', 'undo', 'both'], optional) – When to apply encoding setting. Defaults to “both”.

  • attributes (dict[str, dict[str, Any]])

class pyearthtools.pipeline.operations.xarray.metadata.MaintainAttributes(reference=None)#

Maintain attributes

Maintain attributes of samples from apply to undo.

If apply not called before undo, this will do nothing.

Parameters:
  • Optional[str] (reference) – Reference dataset to get attributes from. If not given will use first sample on apply.

  • optional) – Reference dataset to get attributes from. If not given will use first sample on apply.

  • reference (str | None)

class pyearthtools.pipeline.operations.xarray.normalisation.xarrayNormalisation#

Parent xarray Normalisation

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

classmethod open_file(file)#

Open xarray file

Parameters:

file (str | Path)

Return type:

Dataset

class pyearthtools.pipeline.operations.xarray.normalisation.Anomaly(mean)#

Anomaly Normalisation

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • mean (str | Path)

class pyearthtools.pipeline.operations.xarray.normalisation.Deviation(mean, deviation, debug=False)#

Deviation Normalisation

Each argument take take a Dataset, DataArray, float or file object.

Parameters:
  • mean (str | Path | Dataset | DataArray | float) – mean values to subtract

  • deviation (str | Path | Dataset | DataArray | float) – deviation value to divide by

class pyearthtools.pipeline.operations.xarray.normalisation.Division(division_factor)#

Division based Normalisation

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • division_factor (str | Path)

class pyearthtools.pipeline.operations.xarray.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#

eval based normalisation

Run a normalisation calculation using eval.

Will get all kwargs passed to this class, and sample as the data to be normalised.

All kwargs will be loaded from file if a str.

Parameters:
  • normalisation_eval (str) – Normalisation eval str

  • unnormalisation_eval (str) – Unnoralisation eval str

class pyearthtools.pipeline.operations.xarray.remapping.HEALPix(spatial_coords, nside, interpolation='bilinear', resolution_factor=1.0, include_coords=False, manual_rechunking=True, template_dataset=None, check_for_nans=False)#

HEALPix Remapper

HEALPix mesh Remapper as pipeline operations

Parameters:
  • spatial_coords (dict[str, int]) – Dictionary of spatial coords to remap over, with the associated size.

  • nside (int) – The number of pixels each HEALPix face sides has. Must be power of 2.

  • interpolation (str) – The interpolation scheme (“nearest-neighbor”, “bilinear”, “biquadratic”, “bicubic”).

  • resolution_factor (float) – In some cases, when choosing nside “too large” for the source data, the projection can contain NaN values. Choosing a resolution_factor > 1.0 can resolve this but requires careful inspection of the projected data.

  • include_coords (bool) – Include spatial_coords as variables for each face.

  • manual_rechunking (bool) – Manually rechunk to one chunk per spatial grid.

  • template_dataset (str | None) – Override for template dataset to get coords from.

  • check_for_nans (bool) – Check for nans after remapping.

Raises:

ValueError – If spatial_coords is wrong

Examples

Remap ERA5 resolution data to faces of size 128.

>>> import pyearthtools.pipeline
>>> import pyearthtools.data
>>>
>>> remapper = pyearthtools.pipeline.operations.xarray.remapping.HEALPix({'latitude':721, 'longitude':1440}, nside = 128)
>>> remapper.remap(pyearthtools.data.archive.ERA5.sample()['2000-01-01T00'])
... # Remapped data
>>>
>>> pyearthtools.pipeline.Pipeline(
>>>     pyearthtools.data.archive.ERA5.sample(),
>>>     remapper
>>> )
fyx2hpxidx(f, x, y)#

Computes the HEALPix index from a given face (f), row (y), and column (x) under consideration of the number of pixels along a HEALPix face (nside).

Parameters:
  • f (int) – The face index

  • y (int) – The local row index within the given face

  • x (int) – The local column index within the given face

Returns:

The HEALPix index

Return type:

int

hpx1d2hpx3d(hpx1d, dtype=<class 'numpy.float32'>)#

Converts a one-dimensional HEALPix array [NPix] into a three-dimensional HEALPix array of shape [F, H, W].

Parameters:
  • hpx1d (ndarray) – The one-dimensional array in HEALPix convention

  • dtype (dtype) – The data type (float precision) of the returned array

Returns:

The three-dimensional array in [F, H, W] convention

Return type:

ndarray

hpx2ll(data, **kwargs)#

Projects a given three dimensional HEALPix array to latitude longitude representation.

Parameters:

data (ndarray) – The data of shape [faces=12, height=nside, width=nside] in HEALPix format

Returns:

An array of shape [height=latitude, width=longitude] containing the latlon data

Return type:

ndarray

hpx3d2hpx1d(hpx3d, dtype=<class 'numpy.float32'>)#

Converts a three-dimensional HEALPix array of shape [F, H, W] into a one-dimensional HEALPix array [NPix].

Parameters:
  • hpx3d (ndarray) – The three dimensional array in HEALPix convention [F, H, W]

  • dtype (dtype) – The data type (float precision) of the returned array

Returns:

The one-dimensional array in [NPix] HEALPix convention

Return type:

ndarray

hpxidx2fyx(hpxidx, dtype=<class 'numpy.float32'>)#

Determines the face (f), column (x), and row (y) indices for a given HEALPix index under consideration of the base face index [0, 1, …, 11] and the number of pixels each HEALPix face side has (nside).

Parameters:
  • hpxidx (int) – The HEALPix index

  • dtype (dtype)

Returns:

A tuple containing the face, y, and x indices of the given HEALPix index

Return type:

(<class ‘int’>, <class ‘int’>, <class ‘int’>)

inverse_remap(sample)#

Remap sample from HEALPix mesh to Lat Lon Grid

Parameters:

sample (XR_TYPE)

Return type:

XR_TYPE

ll2hpx(data)#

Projects a given array from latitude longitude into the HEALPix representation.

Parameters:

data (ndarray) – The data of shape [height, width] in latlon format

Returns:

An array of shape [f=12, h=nside, w=nside] containing the HEALPix data

Return type:

ndarray

remap(sample)#

Remap sample from Lat Lon Grid to HEALPix mesh

Parameters:

sample (XR_TYPE)

Return type:

XR_TYPE

pipeline.operations.dask#

class pyearthtools.pipeline.operations.dask.Stack(axis=None)#

Stack a tuple of da.Array’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • axis (int | None)

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

Array

unjoin(sample)#

Unstacks a stacked sample

Parameters:

sample (Any)

Return type:

tuple

class pyearthtools.pipeline.operations.dask.Concatenate(axis=None)#

Concatenate a tuple of da.Array’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • axis (int | None)

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

Array

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.dask.VStack#

Vertically Stack a tuple of da.Array’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

Array

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.dask.HStack#

Horizontally Stack a tuple of da.Array’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

Array

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.dask.Compute#

Compute dask array or delayed object

If dask array, will convert it to a full numpy array

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

class pyearthtools.pipeline.operations.dask.augment.Rotate(seed=42, axis=(-2, -1))#

Rotation Augmentation by 90 degrees in the plane specified by axes.

Rotation Augmentation by 90 degrees in the plane specified by axes.

Generates a random number between 0 & 3 inclusive, for number of times to rotate.

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int, int], optional) – Rotation plane. Axes must be different. Defaults to (-2, -1).

class pyearthtools.pipeline.operations.dask.augment.Flip(seed=42, axis=-1)#

Flip Augmentation on the specified axes.

Flip Augmentation by 90 degrees in the plane specified by axes.

Generates a random boolean, if True, flip, otherwise not

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int], optional) – Axis to flip data in. Defaults to -1.

class pyearthtools.pipeline.operations.dask.augment.FlipAndRotate(seed=42, axis=(-2, -1))#

Flip & Rotation Augmentation.

Apply both Flip & Rotation Augmentations, will rotate on given axis, and flip on both

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int], optional) – Rotation plane primarily. Axes must be different. Will also flip on each given axis. Defaults to (-2, -1).

class pyearthtools.pipeline.operations.dask.filters.daskFilter#

dask Filters

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

class pyearthtools.pipeline.operations.dask.filters.DropAnyNan#

Filter to drop any data with nans when iterating.

Used to remove any bad data or data that is masked out.

Drop data with any nans

filter(sample)#

Check if any of the sample is nan

Parameters:

sample (da.Array) – Sample to check

Raises:

(PipelineFilterException) – If sample contains one or more nan value

Return type:

None

class pyearthtools.pipeline.operations.dask.filters.DropAllNan#

Filter to drop any data if all nans.

Used to remove any bad data or data that is masked out.

Drop data with any nans

filter(sample)#

Check if all of the sample is nan

Parameters:

sample (da.Array) – Sample to check

Raises:

(PipelineFilterException) – If sample contains only nan values

Return type:

None

class pyearthtools.pipeline.operations.dask.filters.DropValue(value, percentage)#

Filter to drop data containing more than a given percentage of a value.

Can be used to trim out invalid data

Drop Data if number of elements equal to value are greater than percentage when iterating.

Parameters:
  • value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.

  • percentage (float) – Percentage of value of which an exceedance drops data

filter(sample)#

Check if all of the sample is nan

Parameters:

sample (da.Array) – Sample to check

Raises:

(PipelineFilterException) – If number of elements equal to value are greater than percentage

Return type:

None

class pyearthtools.pipeline.operations.dask.filters.Shape(shape, split_tuples=False)#

Filter to drop data of incorrect shape

Used to ensure that incoming data is of the correct shape for later steps

Drop Data if shape does not match expected

Parameters:
  • tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape

  • ...] – Shape to match, either tuple of shapes for tupled data or direct shape

  • int]) – Shape to match, either tuple of shapes for tupled data or direct shape

  • split_tuples (bool, optional) – Whether to split tuples, if True, shape should not be a tuple of tuples

  • shape (tuple[tuple[int, ...] | int, ...])

filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Parameters:

sample (tuple[Array, ...] | Array)

class pyearthtools.pipeline.operations.dask.reshape.Rearrange(rearrange, skip=False, reverse_rearrange=None, rearrange_kwargs=None, **kwargs)#

Operation to rearrange data using einops

Using Einops rearrange, rearrange data.

!!! Warning

This will occur on each iteration, and on __getitem__, so it is best to leave patches code out if using [PatchingDataIndex][pyearthtools.pipeline.operations.PatchingDataIndex].

` 'p t c h w' == 't c h w' `

As this will attempt to add the patch dim if the first attempt fails

Parameters:
  • rearrange (str) – String entry to einops.rearrange

  • skip (bool, optional) – Whether to skip data that cannot be rearranged. Defaults to False.

  • reverse_rearrange (Optional[str], optional) – Override for reverse operation, if not given flip rearrange. Defaults to None.

  • rearrange_kwargs (Optional[dict[str, Any]], optional) – Extra keyword arguments to be passed to the einops.rearrange call. Defaults to {}.

class pyearthtools.pipeline.operations.dask.reshape.Squeeze(axis)#

Operation to Squeeze one Dimensional axis at ‘axis’ location

Squeeze Dimension of Data

Parameters:

axis (Union[tuple[int, ...], int]) – Axis to squish at

class pyearthtools.pipeline.operations.dask.reshape.Flattener(flatten_dims=None, shape_attempt=None)#
Parameters:
  • flatten_dims (int | None)

  • shape_attempt (tuple[str | int, ...] | None)

class pyearthtools.pipeline.operations.dask.reshape.Flatten(flatten_dims=None, *, shape_attempt=None)#

Operation to Flatten parts of data samples into a one dimensional array

Operation to flatten incoming data

Parameters:
  • flatten_dims (Optional[int], optional) – Number of dimensions to flatten, counting from the end. If None, flatten all, with size being stored from first use. Is used for negative indexing, so for last three dims flatten_dims == 3, Defaults to None.

  • shape_attempt (Optional[tuple[int, ...]], optional) – Reshape value to try if discovered shape fails. Used if data coming to be undone is different. Can have '...' as wildcards to get from discovered, Defaults to None.

Examples

>>> incoming_data = da.zeros((5,4,3,2))
>>> flattener = Flatten(flatten_dims = 2)
>>> flattener.apply_func(incoming_data).shape
... (5, 4, 6)
>>> flattener = Flatten(flatten_dims = 3)
>>> flattener.apply_func(incoming_data).shape
... (5, 24)
>>> flattener = Flatten(flatten_dims = None)
>>> flattener.apply_func(incoming_data).shape
... (120)

Tip

`shape_attempt` Advanced Use

If using a model which does not return a full sample, say an XGBoost model only returning the centre value, set shape_attempt.

If incoming data is of shape (1, 1, 3, 3), and data for undoing is (1, 1, 1, 1) aka (1), set shape_attempt to ('...','...', 1, 1)

>>> title="Spatial Size Change"
>>> incoming_data = da.zeros((1,1,3,3))
>>> flattener = Flatten(shape_attempt = (1,1,1,1))
>>> flattener.apply_func(incoming_data).shape   #(9,)
>>>
>>> undo_data = da.zeros((1))
>>> flattener.undo_func(undo_data).shape        #(1,1,1,1)

If incoming data is of shape (8, 1, 3, 3), and data for undoing is (2, 1, 1, 1) aka (2), set shape_attempt to (2,'...',1,1)

>>> title=" Channel or Time Size Change also"
>>> incoming_data = da.zeros((8,1,3,3))
>>> flattener = Flatten(shape_attempt = (2,1,1,1))
>>> flattener.apply_func(incoming_data).shape   #(72,)
>>>
>>> undo_data = da.zeros((2))
>>> flattener.undo_func(undo_data).shape        #(2,1,1,1)
class pyearthtools.pipeline.operations.dask.reshape.SwapAxis(axis_1, axis_2)#

Move axis

Parameters:
  • axis_1 (int) – Source axis

  • axis_2 (int) – Target axis

class pyearthtools.pipeline.operations.dask.select.Select(array_index, tuple_index=None)#

Operation to select an element from a given array

Select data from a given index

Parameters:
  • array_index (tuple[Optional[int],...]) – Tuple of indexes from which to select data. Can use None to specify not to select

  • tuple_index (Optional[int], optional) – Choice of which tuple element to apply selection to, if tuples passed. Defaults to None.

Examples
>>> incoming_data = da.zeros((10,5,2))
>>> select = Select([0])
>>> select.apply_func(incoming_data).shape
(5,2)
>>> select = Select([0, None, 0])
>>> select.apply_func(incoming_data).shape
(5)
class pyearthtools.pipeline.operations.dask.select.Slice(*slices, reverse_slice=False)#

Slice a chunk of a dask array

Examples

>>> Slicer((0,10,2)) # == slice(0,10,2)
>>> incoming_data = da.zeros((10,5,4))
>>> Slicer((0,10,2), (1, 3)).apply_func(incoming_data).shape
(5,2,4)
>>> Slicer((1, 3)).apply_func(incoming_data).shape
(2,5,4)
>>> Slicer((1, 3), reverse_slice = True).apply_func(incoming_data).shape
(10,5,2)

Setup slicing operation

Parameters:
  • slices (tuple[Optional[int], ...]) – Each tuple is converted into a slice. So must follow slice notation

  • reverse_slice (bool, optional) – Whether to slice offset towards last axis. Defaults to False.

class pyearthtools.pipeline.operations.dask.split.OnAxis(axis, axis_size=None)#

Split across an axis in a dask array

Split over a dask array axis

Parameters:
  • axis (int) – Axis number to iterate over

  • axis_size (int | None, optional) – Expected size of the axis, can be found automatically. Defaults to None.

join(sample)#

Join sample together, recovering initial shape

Parameters:

sample (tuple[Array])

Return type:

Array

split(sample)#

Combine all elements of axis on batch dimension

Parameters:

sample (Array)

Return type:

tuple[Array]

class pyearthtools.pipeline.operations.dask.split.OnSlice(*slices, axis)#

Split across slices on axis

Setup slicing operation

Parameters:
  • slices (tuple[int, ...]) – Each tuple is converted into a slice. So must follow slice notation

  • axis (int) – Axis number to slice over

join(sample)#

Join sample together

Parameters:

sample (tuple[Array])

Return type:

Array

split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

class pyearthtools.pipeline.operations.dask.values.FillNan(nan=0, posinf=None, neginf=None)#

Fill any Nan’s with a value

DataOperation to fill Nan’s

Parameters:
  • nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.

  • posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.

  • neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.

class pyearthtools.pipeline.operations.dask.values.MaskValue(value, operation='==', replacement_value=nan)#

DataOperation to mask values with a given replacement

Operation to Mask Values

Parameters:
  • value (Union[float, dict[str, float]]) – Value to search for.

  • operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.

  • replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.

Raises:

KeyError – If invalid operation passed.

apply_func(sample)#

Mask Data from initialised configuration

Parameters:

sample (da.Array) – Data to apply mask to

Returns:

Masked Data

Return type:

(da.Array)

class pyearthtools.pipeline.operations.dask.values.Clip(min_value=0, max_value=1)#

Operation to force data to be within a certain range, by default 0 & 1

Force data into a specified range

Parameters:
  • min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.

  • max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.

class pyearthtools.pipeline.operations.dask.normalisation.daskNormalisation(expand=True)#

Parent dask normalisation class

Base Dask Normalisation

Parameters:

expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

classmethod open_file(file)#

Open dask file

Parameters:

file (str | Path)

Return type:

Array

class pyearthtools.pipeline.operations.dask.normalisation.Anomaly(mean, expand=True)#

Anomaly Normalisation

Base Dask Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • mean (str | Path)

class pyearthtools.pipeline.operations.dask.normalisation.Deviation(mean, deviation, expand=True)#

Deviation Normalisation

Base Dask Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • mean (str | Path)

  • deviation (str | Path)

class pyearthtools.pipeline.operations.dask.normalisation.Division(division_factor, expand=True)#

Division based Normalisation

Base Dask Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • division_factor (str | Path)

class pyearthtools.pipeline.operations.dask.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#

eval based normalisation

Run a normalisation calculation using eval.

Will get all kwargs passed to this class, and sample as the data to be normalised.

All kwargs will be loaded from file if a str.

Parameters:
  • normalisation_eval (str) – Normalisation eval str

  • unnormalisation_eval (str) – Unnoralisation eval str

class pyearthtools.pipeline.operations.dask.conversion.ToXarray(array_shape, coords=None, encoding=None, attributes=None, **kwargs)#

Dask -> Xarray Converter

Convert array into xarray object.

Can use .like to record from a reference data object.

Parameters:
  • array_shape (tuple[str, ...] | str) – Order / naming of dimensions of incoming array. Can be str split by ‘ ‘. Special name is ‘variable’ corresponding to variables in a dataset. That dim will be split into variables

  • coords (dict[Hashable, Any] | None) – Coordinates to set xarray object with, not all have to be given. Cannot be a tuple. As ‘variable’ is special in array_shape, ‘variable’ in coords names the variables.

  • encoding (dict[str, Any] | None) – Encoding to set, can be variable, or dimension.

  • attributes (dict[str, Any] | None) – Attributes to set. Can use __dataset if dataset to update dataset attrs Defaults to None.

Examples

Using like convenience method

>>> import pyearthtools.data
>>> import pyearthtools.pipeline
>>> import dask.array as da
>>>
>>> sample = pyearthtools.data.archive.ERA5.sample()('2000-01-01T00')
>>> converter = pyearthtools.pipeline.operations.numpy.conversion.ToXarray.like(sample)
>>> converter.apply(da.ones((1, 1, 721, 1440)))

Manual specification example

>>> import pyearthtools.pipeline
>>> import numpy as np
>>>
>>> converter = pyearthtools.pipeline.operations.numpy.conversion.ToXarray(
>>>     'time latitude longitude',
>>>     coords = {'latitude': np.arange(-90, 90, 0.25), 'longitude': np.arange(-180, 180, 0.25)}
>>> )
>>> converter.apply(np.ones((1, 721, 1440)))
apply_func(sample)#

Convert dask array to xarray

Parameters:

sample (Array)

classmethod like(reference_dataset, drop_coords=None)#

Get ToXarray Operation setup from a reference dataset

Parameters:
  • reference_dataset (Union[xr.DataArray, xr.Dataset]) – Reference dataset to use to setup converter.

  • drop_coords (list[str] | None)

Returns:

Converter setup to convert like reference_dataset.

Return type:

(ToXarray)

undo_func(sample)#

Convert xarray to dask array

Parameters:

sample (DataArray | Dataset)

class pyearthtools.pipeline.operations.dask.conversion.ToNumpy(chunks='auto')#

Dask -> Numpy Converter

Base Pipeline Operation,

Allows for tuple spliting, and type checking

Parameters:
  • split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for undo. Defaults to “both”.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

  • chunks (int | str | tuple[int, str])

pipeline.operations.numpy#

class pyearthtools.pipeline.operations.numpy.Stack(axis=None)#

Stack a tuple of np.ndarray’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • axis (int | None)

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

ndarray

unjoin(sample)#

Unstacks a stacked sample

Parameters:

sample (Any)

Return type:

tuple

class pyearthtools.pipeline.operations.numpy.Concatenate(axis=None)#

Concatenate a tuple of np.ndarray’s

Join samples from tuple

Parameters:
  • split_tuples (bool, optional) – Split tuples on unjoin operation. Defaults to True.

  • recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on unjoin, join automatically has tuples. Defaults to None.

  • response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.

  • axis (int | None)

join(sample)#

Join sample

Parameters:

sample (tuple[Any, ...])

Return type:

ndarray

unjoin(sample)#

Unjoin method called on undo.

If the pipeline is to be fully reversable,

this should return exactly what was received in join.

If it does not, the pipeline will not be fully reversable.

Parameters:

sample (Any) – Sample to be split / unjoined.

Returns:

Split / unjoined sample

Return type:

(tuple)

class pyearthtools.pipeline.operations.numpy.augment.Rotate(seed=42, axis=(-2, -1))#

Rotation Augmentation by 90 degrees in the plane specified by axes.

Rotation Augmentation by 90 degrees in the plane specified by axes.

Generates a random number between 0 & 3 inclusive, for number of times to rotate.

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int, int], optional) – Rotation plane. Axes must be different. Defaults to (-2, -1).

class pyearthtools.pipeline.operations.numpy.augment.Flip(seed=42, axis=-1)#

Flip Augmentation on the specified axes.

Flip Augmentation by 90 degrees in the plane specified by axes.

Generates a random boolean, if True, flip, otherwise not

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int], optional) – Axis to flip data in. Defaults to -1.

class pyearthtools.pipeline.operations.numpy.augment.FlipAndRotate(seed=42, axis=(-2, -1))#

Flip & Rotation Augmentation.

Apply both Flip & Rotation Augmentations, will rotate on given axis, and flip on both

Parameters:
  • seed (int, optional) – Random Number seed. Defaults to 42.

  • axis (tuple[int], optional) – Rotation plane primarily. Axes must be different. Will also flip on each given axis. Defaults to (-2, -1).

class pyearthtools.pipeline.operations.numpy.filters.NumpyFilter#

Numpy Filters

Base PipelineStep - all steps should subclass from this

Parameters:
  • split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.

  • _split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using _split_tuples_call. Defaults to False.

  • recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.

  • response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.

class pyearthtools.pipeline.operations.numpy.filters.DropAnyNan#

Filter to drop any data with nans when iterating.

Used to remove any bad data or data that is masked out.

Drop data with any nans

filter(sample)#

Reject the sample if any value is nan

Parameters:

sample (np.ndarray) – Sample to check

Raises:

(PipelineFilterException) – If sample contains one or more nan value

Return type:

None

class pyearthtools.pipeline.operations.numpy.filters.DropAllNan#

Filter to drop any data if all nans.

Used to remove any bad data or data that is masked out.

Drop data with any nans

filter(sample)#

Reject the sample if all of its values are nan

Parameters:

sample (np.ndarray) – Sample to check

Raises:

(PipelineFilterException) – If sample contains only nan values

Return type:

None

class pyearthtools.pipeline.operations.numpy.filters.DropValue(value, percentage)#

Filter to drop data containing more than a given percentage of a value.

Can be used to trim out invalid data

Drop Data if number of elements equal to value are greater than percentage when iterating.

Parameters:
  • value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.

  • percentage (float) – Percentage of value of which an exceedance drops data

filter(sample)#

Check if all of the sample is nan

Parameters:

sample (np.ndarray) – Sample to check

Raises:

(PipelineFilterException) – If number of elements equal to value are greater than percentage

Return type:

None

class pyearthtools.pipeline.operations.numpy.filters.Shape(shape, split_tuples=False)#

Filter to drop data of incorrect shape

Used to ensure that incoming data is of the correct shape for later steps

Drop Data if shape does not match expected

Parameters:
  • tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape

  • ...] – Shape to match, either tuple of shapes for tupled data or direct shape

  • int]) – Shape to match, either tuple of shapes for tupled data or direct shape

  • split_tuples (bool, optional) – Whether to split tuples, if True, shape should not be a tuple of tuples

  • shape (tuple[tuple[int, ...] | int, ...])

filter(sample)#

To be implemented by child class, should raise a PipelineFilterException if sample is invalid.

Parameters:

sample (tuple[ndarray, ...] | ndarray)

class pyearthtools.pipeline.operations.numpy.reshape.Rearrange(rearrange, skip=False, reverse_rearrange=None, rearrange_kwargs=None, **kwargs)#

Operation to rearrange data using einops

Using Einops rearrange, rearrange data.

Parameters:
  • rearrange (str) – String entry to einops.rearrange

  • skip (bool, optional) – Whether to skip data that cannot be rearranged. Defaults to False.

  • reverse_rearrange (Optional[str], optional) – Override for reverse operation, if not given flip rearrange. Defaults to None.

  • rearrange_kwargs (Optional[dict[str, Any]], optional) – Extra keyword arguments to be passed to the einops.rearrange call. Defaults to {}.

class pyearthtools.pipeline.operations.numpy.reshape.Squeeze(axis)#

Operation to Squeeze one-Dimensional axes at ‘axis’ location

Squeeze Dimension of Data, removing dimensions of length 1.

Parameters:

axis (Union[tuple[int, ...], int]) – Axis to squeeze at

class pyearthtools.pipeline.operations.numpy.reshape.Expand(axis)#

Operation to Expand One Dimensional axis at ‘axis’ location

Expand Dimension of Data

Parameters:

axis (Union[tuple[int, ...], int]) – Axis to expand at

class pyearthtools.pipeline.operations.numpy.reshape.Flattener(flatten_dims=None, shape_attempt=None)#
Parameters:
  • flatten_dims (int | None)

  • shape_attempt (tuple[str | int, ...] | None)

class pyearthtools.pipeline.operations.numpy.reshape.Flatten(flatten_dims=None, *, shape_attempt=None)#

Operation to Flatten parts of data samples into a one dimensional array

Operation to flatten incoming data

Parameters:
  • flatten_dims (int | None) – Number of dimensions to flatten, counting from the end. If None, flatten all, with size being stored from first use. Is used for negative indexing, so for last three dims flatten_dims == 3, Defaults to None.

  • shape_attempt (tuple[int, ...] | None) – Reshape value to try if discovered shape fails. Used if data coming to be undone is different. Can have '...' as wildcards to get from discovered, Defaults to None.

Examples

>>> incoming_data = np.zeros((5,4,3,2))
>>> flattener = Flatten(flatten_dims = 2)
>>> flattener.apply_func(incoming_data).shape
... (5, 4, 6)
>>> flattener = Flatten(flatten_dims = 3)
>>> flattener.apply_func(incoming_data).shape
... (5, 24)
>>> flattener = Flatten(flatten_dims = None)
>>> flattener.apply_func(incoming_data).shape
... (120)

Tip

`shape_attempt` Advanced Use

If using a model which does not return a full sample, say an XGBoost model only returning the centre value, set shape_attempt.

If incoming data is of shape (1, 1, 3, 3), and data for undoing is (1, 1, 1, 1) aka (1), set shape_attempt to ('...','...', 1, 1)

>>> title="Spatial Size Change"
>>> incoming_data = np.zeros((1,1,3,3))
>>> flattener = Flatten(shape_attempt = (1,1,1,1))
>>> flattener.apply_func(incoming_data).shape   #(9,)
>>>
>>> undo_data = np.zeros((1))
>>> flattener.undo_func(undo_data).shape        #(1,1,1,1)

If incoming data is of shape (8, 1, 3, 3), and data for undoing is (2, 1, 1, 1) aka (2), set shape_attempt to (2,'...',1,1)

>>> title=" Channel or Time Size Change also"
>>> incoming_data = np.zeros((8,1,3,3))
>>> flattener = Flatten(shape_attempt = (2,1,1,1))
>>> flattener.apply_func(incoming_data).shape   #(72,)
>>>
>>> undo_data = np.zeros((2))
>>> flattener.undo_func(undo_data).shape        #(2,1,1,1)
class pyearthtools.pipeline.operations.numpy.reshape.SwapAxis(axis_1, axis_2)#

Move axis

Parameters:
  • axis_1 (int) – Source axis

  • axis_2 (int) – Target axis

class pyearthtools.pipeline.operations.numpy.select.Select(array_index, tuple_index=None)#

Operation to select an element from a given array

Select data from a given index

Parameters:
  • array_index (tuple[Optional[int],...]) – Tuple of indexes from which to select data. Can use None to specify not to select

  • tuple_index (Optional[int], optional) – Choice of which tuple element to apply selection to, if tuples passed. Defaults to None.

Examples
>>> incoming_data = np.zeros((10,5,2))
>>> select = Select([0])
>>> select.apply_func(incoming_data).shape
(5,2)
>>> select = Select([0, None, 0])
>>> select.apply_func(incoming_data).shape
(5)
class pyearthtools.pipeline.operations.numpy.select.Slice(*slices, reverse_slice=False)#

Slice a chunk of a numpy array

Examples

>>> Slice((0,10,2)) # == slice(0,10,2)
>>> incoming_data = np.zeros((10,5,4))
>>> Slice((0,10,2), (1, 3)).apply_func(incoming_data).shape
(5,2,4)
>>> Slice((1, 3)).apply_func(incoming_data).shape
(2,5,4)
>>> Slice((1, 3), reverse_slice = True).apply_func(incoming_data).shape
(10,5,2)

Setup slicing operation

Parameters:
  • slices (tuple[Optional[int], ...]) – Each tuple is converted into a slice. So must follow slice notation

  • reverse_slice (bool, optional) – Whether to slice offset towards last axis. Defaults to False.

class pyearthtools.pipeline.operations.numpy.split.OnAxis(axis, axis_size=None)#

Split across an axis in a numpy array

Split over a numpy array axis

Parameters:
  • axis (int) – Axis number to iterate over

  • axis_size (int | None, optional) – Expected size of the axis, can be found automatically. Defaults to None.

join(sample)#

Join sample together, recovering initial shape

Parameters:

sample (tuple[ndarray])

Return type:

ndarray

split(sample)#

Combine all elements of axis on batch dimension

Parameters:

sample (ndarray)

Return type:

tuple[ndarray]

class pyearthtools.pipeline.operations.numpy.split.OnSlice(*slices, axis)#

Split across slices on axis

Examples:

Setup slicing operation

Parameters:
  • slices (tuple[int, ...]) – Each tuple is converted into a slice. So must follow slice notation

  • axis (int) – Axis number to slice over

join(sample)#

Join sample together

Parameters:

sample (tuple[ndarray])

Return type:

ndarray

split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

class pyearthtools.pipeline.operations.numpy.split.VSplit#

vsplit on numpy arrays

Setup slicing operation

join(sample)#

Join sample together

Parameters:

sample (tuple[ndarray])

Return type:

ndarray

split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

class pyearthtools.pipeline.operations.numpy.split.HSplit#

hsplit on numpy arrays

Setup slicing operation

join(sample)#

Join sample together

Parameters:

sample (tuple[ndarray])

Return type:

ndarray

split(sample)#

Split method called on apply.

Parameters:

sample (Any) – Sample to be split into tuple

Returns:

Split sample

Return type:

(tuple[Any, …])

class pyearthtools.pipeline.operations.numpy.values.FillNan(nan=0, posinf=None, neginf=None)#

Fill any Nan’s with a value

DataOperation to fill Nan’s

Parameters:
  • nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.

  • posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.

  • neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.

class pyearthtools.pipeline.operations.numpy.values.MaskValue(value, operation='==', replacement_value=nan)#

DataOperation to mask values with a given replacement

Operation to Mask Values

Parameters:
  • value (Union[float, dict[str, float]]) – Value to search for.

  • operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.

  • replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.

Raises:

KeyError – If invalid operation passed.

apply_func(sample)#

Mask Data from initialised configuration

Parameters:

sample (np.ndarray) – Data to apply mask to

Returns:

Masked Data

Return type:

(np.ndarray)

class pyearthtools.pipeline.operations.numpy.values.Clip(min_value=0, max_value=1)#

Operation to force data to be within a certain range, by default 0 & 1

Force data into a specified range

Parameters:
  • min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.

  • max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.

class pyearthtools.pipeline.operations.numpy.normalisation.numpyNormalisation(expand=True)#

Parent numpy normalisation class

Base Numpy Normalisation

Parameters:

expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

classmethod open_file(file)#

Open numpy file

Parameters:

file (str | Path)

Return type:

ndarray

class pyearthtools.pipeline.operations.numpy.normalisation.Anomaly(mean, expand=True)#

Anomaly Normalisation

Base Numpy Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • mean (str | Path)

class pyearthtools.pipeline.operations.numpy.normalisation.Deviation(mean, deviation, expand=True)#

Deviation Normalisation

Base Numpy Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • mean (str | Path)

  • deviation (str | Path)

class pyearthtools.pipeline.operations.numpy.normalisation.Division(division_factor, expand=True)#

Division based Normalisation

Base Numpy Normalisation

Parameters:
  • expand (bool, optional) – Expand normalisation arrays to shape of sample by appending axis of size 1. Defaults to True.

  • division_factor (str | Path)

class pyearthtools.pipeline.operations.numpy.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#

eval based normalisation

Run a normalisation calculation using eval.

Will get all kwargs passed to this class, and sample as the data to be normalised.

All kwargs will be loaded from file if a str.

Parameters:
  • normalisation_eval (str) – Normalisation eval str

  • unnormalisation_eval (str) – Unnoralisation eval str

pipeline.operations.transform#

class pyearthtools.pipeline.operations.transform.TimeOfYear(method)#

Add time of year to dataset

Add time of year as variable to a dataset

Use [DropDataset][pyearthtools.pipeline.operations.select.DropDataset] to remove it if an earlier step in the pipeline is sensitive to variable names.

Parameters:

method (str) – Method to use, either “dayofyear” or “monthofyear” Both modelled as a sinusodal function

Returns:

Transform to add time of year variable

Return type:

(Transform)

apply(ds)#

Apply transformation to Dataset

Parameters:
  • dataset (XR_TYPES) – Dataset to apply transform to

  • ds (Dataset)

Raises:

NotImplementedError – Base Transform does not implement this function

Returns:

Transformed Dataset

Return type:

XR_TYPES

class pyearthtools.pipeline.operations.transform.AddCoordinates(coordinates, *extra_coords)#

Add coordinates as variable to a dataset

Use [DropDataset][pyearthtools.pipeline.operations.select.DropDataset] to remove it if an earlier step in the pipeline is sensitive to variable names.

Add coordinates to dataset.

Parameters:
  • coordinates (str | list[str]) – Coordinate/s to add

  • *extra_coords (str) – Args form of coordinates

apply(data)#

Apply transformation to Dataset

Parameters:
  • dataset (XR_TYPES) – Dataset to apply transform to

  • data (Dataset)

Raises:

NotImplementedError – Base Transform does not implement this function

Returns:

Transformed Dataset

Return type:

XR_TYPES

pipeline.samplers#

class pyearthtools.pipeline.samplers.EmptyObject#

Empty object to be skipped by Iteration

Used to mark where the sampler cannot return data.

class pyearthtools.pipeline.samplers.Sampler#

Base level Sampler

All sampler classes must implement this class, and provide generator, which should act as a generator.

See DefaultSampler for an example, and the process to make a sampler.

abstractmethod generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.samplers.Default#

Default Sampler

Simply passes back any object given to it.

generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.samplers.SuperSampler(*samplers)#

A collection of Sampler’s run one after another.

Construct a new SuperSampler from multiple Sampler’s.

Parameters:

*samplers (Sampler) – Sampling is run sequentially, so order may be important.

generator()#

SuperSampler version of generator,

Runs each Sampler one after another.

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.samplers.Random(buffer_len, seed=42)#

Randomly sample objects from stream.

Builds a buffer of controllable length from which to sample from, once size is reached.

Construct a Random Sampler

Parameters:
  • buffer_len (int) – Length of buffer to build. No objects will be yielded until this length is reached or exhausted.

  • seed (Union[int, None], optional) – Seed to initialise rng module with. Defaults to 42.

generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.samplers.DropOut(step, yield_on_step=False)#

DropOut samples from the stream at a given interval.

Construct a DropOut Sampler

Parameters:
  • step (int) – Step value in which to drop out objects. or if yield_on_step when to yield data

  • yield_on_step (bool, optional) – Reverse behaviour of this Sampler, such that on step yield objects. Defaults to False.

generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]

class pyearthtools.pipeline.samplers.RandomDropOut(chance, seed=42)#

Randomly DropOut samples from the stream.

Construct a RandomDropOut Sampler

Parameters:
  • chance (float) – Chance for samples to dropped, between 0 & 100.

  • seed (int, optional) – Seed to initialise rng module with. Defaults to 42.

generator()#

Generator to control the sampling of data.

When passed None, and no samples remain within, should exit.

How to:

Yield an EmptyObject to begin with, and capture what is sent. Run sampling routine, yield EmptyObject if sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.

Yields:

Generator[Any, Any, Any] – Sampling of data

Return type:

Generator[Any, Any, Any]