Pipeline API Docs#
pipeline#
- class pyearthtools.pipeline.Sampler#
Base level Sampler
All sampler classes must implement this class, and provide
generator, which should act as a generator.See
DefaultSamplerfor an example, and the process to make a sampler.- abstractmethod generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.Pipeline(*steps, iterator=None, sampler=None, exceptions_to_ignore=None, max_exception_count=-1, name=None, **kwargs)#
Core of
pyearthtools.pipeline,Provides a way to set a sequence of operations to be applied to samples / data retrieved from
pyearthtools.data.Examples
>>> python >>> pipeline = pyearthtools.pipeline.Pipeline( >>> pyearthtools.data.download.cds.ERA5('tcwv'), >>> pyearthtools.pipeline.operations.xarray.conversion.ToNumpy() >>> ) >>> pipeline['2000-01-01T00]
Usage:
A
Pipelinecan be used in three primary ways.Direct Indexing with
pipeline[idx]Iteration with
for i in pipelineApplying to individual data objects with
pipeline.apply
Create
Pipelineof operations to run on samples of data.The
stepswill be run in order of inclusion.Branches
If a tuple within the
stepsis encountered, it will be interpreted as aBranchingPoint, with each element in thetuplea seperatePipelineof it’s own right. Therefore to have aBranchingPointwith each branch containing multiple steps, a nestedtupleis needed.E.g. (pseudocode)
>>> Pipeline( >>> Index, >>> (Operation_1, Operation_2) >>> )
This will cause samples to be retrieved from
Indexand each of the operations run on thesample.The result will follow the form of:
(Operation_1 on Index, Operation_2 on Index)If a branch consists of multiple operations, the nested tuples must be used.
E.g. (pseudocode)
>>> Pipeline( >>> Index, >>> ((Operation_1, Operation_1pt2), Operation_2) ) This will cause samples to be retrieved from `Index` and each of the operations run on the `sample`. The result will follow the form of: `(Operation_1 + Operation_1pt2 on Index, Operation_2 on Index)`
A
BranchingPointby default will cause each branch to be run seperately, and a tuple returned with the results of each branch. However, if ‘map’ is included in theBranchingPointtuple, it will be mapped across elements in the incoming sample.Mapping
E.g. (pseudocode)
>>> Pipeline( >>> Index, >>> ((Operation_1, Operation_1pt2), Operation_2, 'map') >>> )
This will cause samples to be retrieved from
Indexand the operations to be mapped to thesample.The result will follow the form of
(Operation_1 + Operation_1pt2 on Index[0], Operation_2 on Index[1])‘map_copy’ can be used to copy the branch to the number of elements in the sample without having to manually specify each branch.
Indexes in Branches
Indexes can also be included in branches, which behaviour as expected, where the sample is retrieved rather than operations applied.
E.g. (pseudocode)
>>> Pipeline( >>> Index, >>> (Operation_1, Operation_2, Index) >>> )
Transforms
Transforms from
pyearthtools.datacan be added directly inline in a pipeline, and will be applied on the forward pass. If they need to be applied onundo, or on both see,pyearthtools.pipeline.operations.Transforms- Parameters:
*steps (Union[VALID_PIPELINE_TYPES, _Pipeline, PipelineIndex, tuple[Union[VALID_PIPELINE_TYPES, Literal['map', 'map_copy']], ...]]) – Steps of the pipeline. Can include tuples to refer to branches.
iterator (Optional[Union[iterators.Iterator, tuple[iterators.Iterator, ...]]]) –
Iteratorto use to retrieve samples when thePipelineis being iterated over.sampler (Optional[Union[samplers.Sampler, tuple[samplers.Sampler, ...]]]) –
Samplerto use to sample the samples when iterating. If not given will yield all samples. Can be used to randomly sample, drop out and moreexceptions_to_ignore (Optional[tuple[Union[str, Type[Exception]], ...]]) – Which exceptions to ignore when iterating. Defaults to None.
max_exception_count (int) – When “ignoring” exceptions, only ignore the first n such exceptions. Set to -1 for unlimited.
name (str | None) – Name of the pipeline, used in nested pipelines
- apply(sample)#
Apply pipeline to
samplePipelineshould only consist ofPipelineStep’s andTransforms, asIndexescannot be applied,
- as_steps()#
Get an indexable object to recreate pipeline with a subset of steps.
>>> pipeline.as_steps[:5]
- property complete_steps: tuple#
Get all steps
- property exceptions_to_ignore#
Sampler of
Pipeline
- property flattened_steps: tuple#
Flat tuple of steps contained within this
PipelineIndex
- get(idx)#
Get
idxfromPipeline.
- property get_and_catch#
Get indexable object like pipeline which will ignore any expections known to be ignored.
- has_source()#
Determine if this
Pipelinecontains a source of data, or is just a sequence of operations.- Return type:
bool
- index(id)#
Get index of
idin Pipeline.- Parameters:
id (str | Type)
- Return type:
int
- property iteration_order: tuple[Any, ...]#
Get ordering from
iterator
- property iterator#
Iterator of
Pipeline
- classmethod sample(variables=None, iterator=None, sampler=None)#
Simple sample Pipeline for testing.
- property sampler#
Sampler of
Pipeline
- save(path=None, only_steps=False)#
Save
Pipeline- Parameters:
path (Optional[Union[str, Path]], optional) – File to save to. If not given return save str. Defaults to None.
only_steps (bool, optional) – Save only steps of the pipeline, dropping iterator, sampler, and exceptions_to_ignore.
- Returns:
If
pathis None,pipelinein save form else None.- Return type:
(Union[str, None])
- step(id: str | int | Type[Any] | Any, limit: None) Index | Pipeline | Operation#
- step(id: str | int | Type[Any] | Any, limit: int) tuple[Index | Pipeline | Operation, ...]
Get step correspondent to
idIf
strflattens steps and retrieves the firstlimitfound, otherwise ifint, gets step at theidxIf
limitis None, give back first found not in tuple, or if -1 return all.
- property steps: tuple[Index | PipelineStep | Transform | TransformCollection | tuple[Index | PipelineStep | Transform | TransformCollection, ...] | tuple[tuple, ...] | _Pipeline | tuple[Index | PipelineStep | Transform | TransformCollection | tuple[Index | PipelineStep | Transform | TransformCollection, ...] | tuple[tuple, ...], ...], ...]#
Steps of pipeline
- undo(sample)#
Undo
Pipelineonsample.Reverses the steps and operations applied to the
sample. Ideally this should result in thesamplelooking identical to the initial data.- ## Examples:
>>> python >>> pipeline = ( >>> Index, >>> Operation1, >>> ) >>> pipeline[1] >>> pipeline.undo(pipeline[1])
- class pyearthtools.pipeline.Operation(*, split_tuples=False, recursively_split_tuples=False, operation='both', recognised_types=None, response_on_type='exception', **kwargs)#
Pipeline Operation
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- property T#
Transposed Operation.
Swaps
applywithundoso that this operation behaves inversely to normal.
- apply(sample)#
Run the
apply_funcon sample, splitting tuples if needed
- undo(sample)#
Run the
undo_funcon sample, splitting tuples if needed
- class pyearthtools.pipeline.ReversedPipeline(forward_pipeline)#
Operation reversing the effect of pipeline
Applying this operation will undo the provided pipeline, while undoing this operation will apply the pipeline.
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
forward_pipeline (Pipeline)
- class pyearthtools.pipeline.PipelineException#
General Pipeline Exception
- class pyearthtools.pipeline.PipelineFilterException(sample, message='', *args)#
Pipeline Filter Exception
Indicates that the filter has detected an invalid sample.
FilterException
- Parameters:
sample (Any) – Sample found to be invalid
message (str, optional) – Msg for the user. Defaults to “”.
- class pyearthtools.pipeline.PipelineRuntimeError#
Pipeline Runtime Error
- class pyearthtools.pipeline.PipelineTypeError#
Pipeline Type error
pipeline.branching#
- class pyearthtools.pipeline.branching.PipelineBranchPoint(*steps)#
Branch Point in a
Pipeline.Can be anywhere in the pipeline, including at the start.
- Special keys can be provided as part of the tuple to customise behaviour:
map:Will not branch, but map elements of the
samplewhich should be a tuple, to the associated ordered branch. Requires samples and branches to be of the same length.map_copy:Acts like
mapbut will make copies of the pipelines to match incoming sample. Once one sample has been seen, and all copies made, becomes amap. If multiple branches, will use % to create more, so if it has has two pipelines and a length of 4 needed, would be pipeline index correspondant to >>> [0,1,0,1]
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
steps (Union[tuple[Union[Index, Pipeline, PipelineStep, Transform, TransformCollection, Literal['map', 'map_copy']], ...], Index, PipelineStep, Pipeline, Transform, TransformCollection])
- apply(sample)#
Apply each branch on the sample
- property complete_steps#
Get steps in pipeline
- undo(sample)#
Undo the effects of the branches.
This will still result in a tuple, so a unify may be needed after.
If each branch executed a fully reversable operation, and originated from the same data source, each sample ‘should’ be identical.
- class pyearthtools.pipeline.branching.Unifier#
Unify samples after a branching point on the undo operation.
- Child class must supply
check_validity, to determine if the samples can be unified, and return an
intwhich is used to select a sub_sample to be returned byundo.
If samples are not be unified,
check_validityshould return None.Differs from
Spliteras this is built only to eliminate the tuple created on theundowith aBranchingPoint.Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- abstractmethod check_validity(sample)#
Check if samples can be unified.
Raise a
PipelineUnificationExceptionif not be unified.- Parameters:
sample (tuple) – Sample’s
- Returns:
Which sub_sample to be returned. Return
Noneif invalid.- Return type:
(Union[None, int])
- Child class must supply
- class pyearthtools.pipeline.branching.Joiner(*, split_tuples=True, recursively_split_tuples=False, recognised_types=None, response_on_type='exception')#
Join samples after a branching point.
Child class must implement
join, andunjoin.Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
- abstractmethod join(sample)#
Join method called on
apply.- Parameters:
sample (tuple) – Sample to be joined
- Returns:
Joined
sample- Return type:
(Any)
- abstractmethod unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.branching.Spliter(*, split_tuples=True, recursively_split_tuples=False, recognised_types=None, response_on_type='exception')#
Split samples.
Useful prior to
PipelineBranchingPointset up tomap. Can therefore assign subsets of the sample to various branches.Child class must implement
split, andjoin.Split samples into tuples
- Parameters:
split_tuples (bool, optional) – Split tuples on
splitoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
split,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
- abstractmethod join(sample)#
Join method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
split.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (tuple[Any, ...]) – Sample to be joined.
- Returns:
Joined
sample- Return type:
(Any)
- abstractmethod split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
pipeline.filters#
- class pyearthtools.pipeline.filters.Filter(split_tuples=True, recursively_split_tuples=True, recognised_types=None)#
Base Filter
Allows for samples to be skipped if found to be invalid.
filtershould raise aPipelineFilterExceptionif invalid.Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
recursively_split_tuples (bool)
- abstractmethod filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Return type:
None
- run(sample)#
Run filtering
- class pyearthtools.pipeline.filters.FilterCheck(split_tuples=True, recursively_split_tuples=True, recognised_types=None)#
Subclass of
Filterto automate exception raising,Just needs
checkto return abool.Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
recursively_split_tuples (bool)
- filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Return type:
None
- class pyearthtools.pipeline.filters.FilterWarningContext(max_exceptions=None)#
Filter Warning context
Will count how many
PipelineFilterExceptionhave been thrown, and warn if overmax_exceptions.- Parameters:
max_exceptions (int | None)
- class pyearthtools.pipeline.filters.TypeFilter(valid_types, *, split_tuples=False)#
Filter if type is wrong
Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
valid_types (tuple[Type] | Type)
- filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Return type:
None
pipeline.iterators#
- class pyearthtools.pipeline.iterators.Iterator#
Base level Iterator.
Provides the indexes from which to query the pipeline.
All Iterator classes must implement this class, and provide
__iter__, which should act as a generator.- randomise(seed=42)#
Randomise this interator
- Parameters:
seed (int | None)
- property samples: tuple[Any, ...]#
Get tuple of samples returned by this
Iterator.
- class pyearthtools.pipeline.iterators.Range(min, max, step=1)#
Range based Iterator
Constructs a
rangeobject and yields all elements within.Construct Range Iterator
- Parameters:
min (int) – Minimum value of range
max (int) – Maximum value of range
step (int, optional) – Step of range. Defaults to 1.
- class pyearthtools.pipeline.iterators.Predefined(indexes)#
Predefined Iterator
Takes any iterable as provided, and yields all elements within.
Construct PreDefined iterator
- Parameters:
indexes (Iterable[Any]) – Iterable to get elements from
- class pyearthtools.pipeline.iterators.File(file, type_conversion=None)#
Iterate over elements in file
Each line will be treated as a seperate index.
Iterate over file
- Parameters:
file (Union[str, Path]) – File to load.
type_conversion (Optional[Callable], optional) – Function to convert lines in file with. Defaults to None.
- class pyearthtools.pipeline.iterators.DateRange(start, end, interval, *, allowlist=None, blocklist=None)#
DateRange Iterator
Uses
pyearthtools.data.TimeRangeto create a range of times.Construct DateRange Iterator
- Parameters:
start (str) – Start time. Must be understandable by
pyearthtools.data.Petdt.end (str) – End time. Must be understandable by
pyearthtools.data.Petdt.interval (Any) – Interval between times. Must be understandable by
pyearthtools.data.TimeDelta.allowlist (Optional[Iterable[str]]) – A list of pyearthtools.data.Petdt which should be filtered to
blocklist (Optional[Iterable[str]]) – A list of pyearthtools.data.Petdt which should be skipped
Note
You cannot specify both a blocklist and an allowlist.
The entries in the blocklist and allowlist must be a complete list of exact dates. It is not possible to do fuzzy matching or range-based constraints at this stage. For example, if the underlying data is hourly, and a particular day needs to be skipped entirely, each hour of that day will need to be in the blocklist.
Examples
>>> known_bad = ['2015-01-01T06'] >>> iterator=DateRange(2015, 2016, interval='6 hours', blocklist=known_bad)
- randomise(seed=42)#
Randomise this interator
- Parameters:
seed (int | None)
- class pyearthtools.pipeline.iterators.DateRangeLimit(start, interval, num)#
DataRange configured with the number of samples from start
Uses
pyearthtools.data.TimeRangeto create a range of times.Construct DateRange with limit
- Parameters:
start (str) – Start time
interval (Any) – Interval between times. Must be understandable by
pyearthtools.data.TimeDelta.num (int) – Number of total samples to iterate over.
- class pyearthtools.pipeline.iterators.Randomise(iterator, seed=42)#
Wrap around another
Iteratorand randomly sampleRandomise
iterator- Parameters:
iterator (Iterator) – Underlying
Iteratorto randomise.seed (Union[int, None], optional) – Random selection seed. If None, will be
random. Defaults to 42.
- class pyearthtools.pipeline.iterators.SuperIterator(*iterators)#
Iterate over a sequence of iterators
Create SuperIterator
- Parameters:
*iterators (Iterator) – Iterating is run sequentially, so order may be important.
- class pyearthtools.pipeline.iterators.IterateResults(obj)#
A wrapper which informs pipeline, that the object in question should be iterated over, instead of yielded directly.
Allows all attrs and items to be retrieved from underlying object.
Construct a IterateResults
- Parameters:
obj (Any) – Object that should be iterated over.
- iterate_over_object()#
Iterate over underlying object
- Return type:
Generator[Any, None, None]
pipeline.modifications#
- class pyearthtools.pipeline.modifications.Cache(cache=None, pattern=None, *, pattern_kwargs={}, cache_validity='warn', save_kwargs=None, **kwargs)#
An
pyearthtools.pipelineimplementation of theCachingIndexfrompyearthtools.data.Allows for samples to be cached to disk when using the pipeline.
Will save according to the
patternandidxused to retrieve data.Examples
>>> era_index = pyearthtools.data.archive.ERA5.sample() >>> pipeline = pyearthtools.pipeline.Pipeline( era_index, pyearthtools.pipeline.pipelines.Cache('temp') ) >>> pipeline['2000-01-01T00'] # Data will be cached
Pipeline step to cache samples
- Parameters:
cache (str | Path, optional) – Path to cache data to. Defaults to None.
pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use
pattern_kwargsto initialise. Defaults to None.pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.
cache_validity (Literal['trust','delete','warn','keep','override'], optional) – Behaviour of cache validity checking. | Value | Behaviour | | —– | ——— | | ‘trust’ | Trust the cache even if the hash is different | | ‘warn’ | Warn if the hash is different | | ‘keep’ | Keep the cache, and raise an exception if the hash is different | | ‘override’ | Override the cache data when generating data, removes the caching benefit. | | ‘delete’ | Delete the cache if the hash is different. Will ask for input, include ‘F’ to force. | Defaults to ‘warn’.
save_kwargs (dict[str, Any], optional) – Keywords arguments to pass to saving function. Defaults to None.
kwargs (Any, optional) – All other kwargs passed to
pyearthtools.data.indexes.FunctionalCacheIndex.
- property cache_hash_file: Path#
Get the hash file name
- cache_validity()#
Check the cache validity according to
cache_validitypassed in__init__.- Return type:
bool
- property global_override#
Get a context window in which data will be overwritten in all caches
- property hash: str#
Get sha512 hash of underlying index
- property override#
Get a context window in which data will be overwritten in the cache
- property pipeline_save_file: Path#
Get the pipeline save file name
- save_cache_hash()#
Attempt to make cache hash, if fails do nothing and try again later.
- Will return
boolindicating if saving hash was successfull. True -> Valid hash False -> Invalid hash, either unable to write or different
- Return type:
bool
- Will return
- save_pipeline()#
Attempt to make pipeline file, if fails do nothing and try again later.
- class pyearthtools.pipeline.modifications.StaticCache(idx, cache, pattern=None, *, pattern_kwargs={}, cache_validity='warn', load_into_memory=False, **kwargs)#
Static Cache.
Mainly a convenience wrapper instead of using
IdxOverrideandCachetogether.Will override the index, and cache the result.
Static Cache
- Parameters:
idx (Any) – Index to override with
cache (str | Path, optional) – Path to cache data to. Defaults to None.
pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use
pattern_kwargsto initialise. Defaults to None.pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.
cache_validity (Literal['trust','delete','warn','keep','override'], optional) – Behaviour of cache validity checking. | Value | Behaviour | | —– | ——— | | ‘trust’ | Trust the cache even if the hash is different | | ‘warn’ | Warn if the hash is different | | ‘keep’ | Keep the cache, and raise an exception if the hash is different | | ‘override’ | Override the cache data when generating data, removes the caching benefit. | | ‘delete’ | Delete the cache if the hash is different. Will ask for input, include ‘F’ to force. | Defaults to ‘warn’.
load_into_memory (bool) – Load sample into memory if it is a dask or xarray object. Defaults to False.
- class pyearthtools.pipeline.modifications.MemCache(pattern=None, max_size=None, *, pattern_kwargs={}, **kwargs)#
An
pyearthtools.pipelineimplementation of theMemCachefrompyearthtools.data.Allows for samples to be cached to memory when using the pipeline.
Examples
>>> era_index = pyearthtools.data.archive.ERA5.sample() >>> pipeline = pyearthtools.pipeline.Pipeline( era_index, pyearthtools.pipeline.pipelines.MemCache() ) >>> pipeline['2000-01-01T00'] # Data will be cached to memory
Pipeline step to cache samples
- Parameters:
pattern (str | PatternIndex, optional) – Pattern to use to cache data, if str use
pattern_kwargsto initialise. Defaults to None.pattern_kwargs (dict[str, Any], optional) – Kwargs to initalise the pattern with. Defaults to {}.
kwargs (Any, optional) – All other kwargs passed to
pyearthtools.data.indexes.FunctionalMemCacheIndex.max_size (Optional[str])
- property global_override#
Get a context window in which data will be overwritten in all caches
- property override#
Get a context window in which data will be overwritten in the cache
- property size#
Size of in memory cache
- class pyearthtools.pipeline.modifications.IdxModifier(modification, *extra_mods, merge=False, concat=False, merge_function=None, merge_kwargs=None)#
Modify index used in
__getitem__, allows for multiple samples.Examples
>>> pipeline = Pipeline(IdxModifier((0, 1))) >>> pipeline[1] # Will get sample with (1, 2)
Index modification
- Parameters:
modification (Union[Any, tuple[Union[Any, tuple[Any, ...]], ...]]) – Can be Any type, if tuple will map across elements.
merge (Union[bool, int]) – Merge retrieved tuple, must all be the same type. If
intcorresponds to how many layers to merge from the bottom up. IfTrue, merge one layer.concat (bool) – Whether to concat arrays instead of stack.
merge_function (Optional[Callable[[Any, ...], Any]]) – Override for function to use when merging.
merge_kwargs (Optional[dict[str, Any]]) – Optional extra kwargs for the merge function if
merge.extra_mods (Any)
Examples
>>> IdxModifier((0, 1)) ... # Will get samples with (idx+0, idx+1) >>> IdxModifier((0, (1, 2))) ... # Will get samples with (idx+0, (idx+1, idx+2)) >>> IdxModifier((0, (1, 2)), merge = 1) ... # Will get samples with (idx+0, merged(idx+1, idx+2)) >>> IdxModifier((0, (1, 2)), merge = True) ... # Will get samples with (idx+0, merged(idx+1, idx+2)) >>> IdxModifier((0, (1, 2)), merge = 2) ... # Will get samples with merged(idx+0, merged(idx+1, idx+2))
- class pyearthtools.pipeline.modifications.IdxOverride(index)#
Override
idxon any__getitem__call- Parameters:
index (Any)
- class pyearthtools.pipeline.modifications.TimeIdxModifier(modification, *extra_mods, **kwargs)#
IdxModifierwhich converts allmodification’s topyearthtools.data.TimeDeltaModify
idxbut convert allmodification’s topyearthtools.data.TimeDelta- Parameters:
modification (Union[Any, tuple[Union[Any, tuple[Any, ...]], ...]]) – Expected to be
TimeDeltacompatible, or tuples ofTimeDelta’s.merge (Union[bool, int], optional) – Merge retrieved tuple, must all be the same type. If
intcorresponds to how many layers to merge from the bottom up. IfTrue, merge one layer. Defaults to False.merge_function (Optional[Callable], optional) – Override for function to use when merging. Defaults to None.
merge_kwargs (Optional[dict[str, Any]], optional) – Optional extra kwargs for the merge function if
merge. Defaults to None.extra_mods (Union[Any, tuple[Any, ...]])
- class pyearthtools.pipeline.modifications.SequenceRetrieval(samples, *, merge_function=None, concat=False, merge_kwargs=None)#
Subclassing from
IdxModifier, retrieve a sequence of samples based on rules.Notes
Will attempt to stack samples, and may create a new 0 axis.
If
samplesis anint, then retrieve the idx originally asked for, and the sample offset bysamples.This will return ‘sorted’.
>>> SequenceRetrieval(1)[0] ... # Will get (0, 1) >>> SequenceRetrieval(-1)[0] ... # Will get (-1, 0) >>> SequenceRetrieval(-6)[0] ... # Will get (-6, 0)
If
samplesis a single-element iterable, it must be of length 2 or 3, with the third being optional. Theidxbeing requested is first offset, then num_of_samples retrieved, merged where applicable. If a single sample is retrieved, it will not be in a tuple if cannot be merged.>>> SequenceRetrieval((0, 3))[0] ... # Will get (0, 1, 2) >>> SequenceRetrieval((-1, 2))[0] ... # Will get (-1, 0) >>> SequenceRetrieval((2, 3))[0] ... # Will get (2,3,4) >>> SequenceRetrieval((2, 3, 2))[0] ... # Will get (2,4,6) >>> SequenceRetrieval((2, 1))[0] ... # Will get 2
If
samplesis of multiples element it can consist of either tuples or ints. A tuple in this sequence corresponds to the same assingle element, and an int the next offset to retrieve a sample at.These index adjustments are accumulated, so if a retrieval moves the marker 2 forwards, the next sampling config will operate from there.
Each config in the
sampleswill be returned within its own tuple, merged where applicable.>>> SequenceRetrieval((0, 3),(1, 2))[0] ... # Will get ((0, 1, 2), (3, 4)) >>> SequenceRetrieval((0, 3),1)[0] ... # Will get ((0, 1, 2), 3) >>> SequenceRetrieval((0, 3),2)[0] ... # Will get ((0, 1, 2), 4) >>> SequenceRetrieval((0, 3),(-1, 2))[0] ... # Will get ((0, 1, 2), (1, 2)) >>> SequenceRetrieval((0, 3),(-1, 1))[0] ... # Will get ((0, 1, 2), 1)
Sequence retrieval
- Parameters:
samples (Union[int, tuple[Union[tuple[int, int], tuple[int, int, int], int], ...]]) – Configuration of samples to retrieve.
merge_function (Optional[Callable], optional) – Override for function to use when merging. Defaults to None.
merge_kwargs (Optional[dict[str, Any]], optional) – Optional extra kwargs for the merge function. Defaults to None.
concat (bool)
- class pyearthtools.pipeline.modifications.TemporalRetrieval(samples, *, merge_function=None, concat=False, merge_kwargs=None, delta_unit=None)#
Retrieve a sequence of samples from
SequenceRetrieval, but force all indexes to be anpyearthtools.data.Petdt.Examples
>>> TemporalRetrieval(-6)['2000-01-01T12'] ## Will get samples for ('2000-01-01T06' & '2000-01-01T12')
- Parameters:
samples (Union[int, tuple[Union[tuple[int, ...], int], ...]]) – number of samples to fetch (negative for n-back, positive for n-forward)
delta_unit (Optional[str]) – e.g. “month” or “hour”
concat (bool) – whether to contact or merge
merge_function (Optional[Callable])
merge_kwargs (Optional[dict[str, Any]])
- class pyearthtools.pipeline.modifications.TemporalWindow(*, prior_indexes, posterior_indexes, timedelta, merge_method=None)#
The purpose of this class is to provide the ability to perform sequence-to-sequence modelling from a data accessor or pipeline that was designed to produce single time steps (i.e. single samples).
The temporal window allows the specification of the ‘back window’ and the ‘forward window’, and will produce a binary branch.
For example, if the time steps are hourly, and the base pipeline can produce hours 1, 2, 3 … 10; then this Temporal Window can be used to produce sequence pairs like:
[1,2,3], [4], [2,3,4], [5], ... [7,8,9], [10]
or like:
[1,2], [3,4,5], [2,3], [4,5,6], ... [6,7], [8,9,10]
This provides a simpler interface than the TemporalRetrieval which is a more general alternative.
The window offsets are calculated not using positional indexing, but using calculated date-times based on the reference time and the specified timedelta to calculate each required index exactly. The handling of missing data is left to the underlying pipeline response to the retrieval of the calculated datetime.
The resultant sequences may be left unmerged (i.e. a list of retrieved results for each timetime) or merged (e.g. into an xarray along the time dimension). The default behaviour is to merge along the time dimension.
A custom merge method may be specified.
- Parameters:
prior_indexes – Multiplied by the timedelta then applied to the reference date
posterior_indexes – Multiplied by the timedelta then applied to the reference date
timedelta – Typically the time step of the underlying data
merge_method – How to merge samples into a combined object
Examples:
>>> TemporalWindow(prior_indexes=[-3,-2,-1], posterior_indexes=[0], timedelta=timedelta, merge_method=merge_method)
(assuming xarray data) will result in a tuple of two datasets, the first with a time coordinate dimension of 3 time steps and the second with a time coordinate dimension of 1 time step.
- pyearthtools.pipeline.modifications.idx_modification#
alias of <module ‘pyearthtools.pipeline.modifications.idx_modification’ from ‘/home/docs/checkouts/readthedocs.org/user_builds/pyearthtools/checkouts/latest/packages/pipeline/src/pyearthtools/pipeline/modifications/idx_modification.py’>
pipeline.operations#
- class pyearthtools.pipeline.operations.Transforms(transforms=None, apply=None, undo=None)#
Run
pyearthtools.data.Transformswithin aPipeline.Run
TransformsIf
transformsgiven will run on both functions first, and then if also givenapplyandundorespectively.- Parameters:
transforms (Optional[TRANSFORM_TYPE], optional) – Transforms to run on both
applyandundo. Defaults to None.apply (Optional[TRANSFORM_TYPE], optional) – Transforms to run on
apply. Defaults to None.undo (Optional[TRANSFORM_TYPE], optional) – Transforms to run on
undo. Defaults to None.
pipeline.operations.xarray#
- class pyearthtools.pipeline.operations.xarray.Compute#
Compute xarray object
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- class pyearthtools.pipeline.operations.xarray.Merge(merge_kwargs=None)#
Merge a tuple of xarray object’s.
Currently can only undo this operation with xr.Dataset and xr.DataArray inputs.
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
merge_kwargs (dict[str, Any] | None)
- join(sample)#
Join sample
- Parameters:
sample (tuple[Dataset | DataArray, ...])
- Return type:
Dataset
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.xarray.Concatenate(concat_dim, concat_kwargs=None)#
Concatenate a tuple of xarray object’s
Currently cannot undo this operation. Unjoining a sample returns the same sample.
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
concat_dim (str)
concat_kwargs (dict[str, Any] | None)
- join(sample)#
Concat sample
- Parameters:
sample (tuple[T, ...])
- Return type:
T
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.xarray.Sort(order=None, strict=False)#
Sort Variables of an
xarrayobjectExamples >>> Sort(order = [‘a’,’b’])
Sort
xarrayvariables- Parameters:
order (list[str] | None) – Order to set vars to, if not given sort alphabetically, or add others to the end. Cannot be None if
strictisTrue. Defaults to None.strict (bool) – Forces all variables to be listed in
order, and no extras given. Defaults to False.
- class pyearthtools.pipeline.operations.xarray.Chunk(chunk=None, operation='apply', **extra_chunk_kwargs)#
ReChunk xarray object
ReChunk xarray object
- Parameters:
chunk (Optional[dict[str, int]], optional) – Chunk dictionary.
coord: size. Defaults to None.operation (Literal['apply', 'undo', 'both']) – When to apply rechunking. Defaults to ‘apply’.
**extra_chunk_kwargs (int) – Kwarg form of
chunk.
- class pyearthtools.pipeline.operations.xarray.RecodeCalendar#
Climate datasets often use the cftime module to index into data using non-standard calendars. This operation will recode the time coordinate of a dataset or data array to a standard timestamp For now, support only exists for recoding from Noleap to Timestamp
Record initialisation and store flags for processing
- class pyearthtools.pipeline.operations.xarray.AlignDates(to='start_of_month')#
Climate datasets often use the cftime module to index into data using non-standard calendars. This operation will recode the time coordinate of a dataset or data array to a standard timestamp For now, support only exists for recoding from Noleap to Timestamp
Record initialisation and store flags for processing
- Parameters:
to – either “start_of_month” or a zero-padded string day-of-month
- class pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(reference_dataset=None, saved_records=None, run_parallel=False, warn=True)#
Convert xarray objects to np.ndarray’s
DataOperation to convert data to [np.array][numpy.ndarray]
If speed is needed without an
undo, setrun_parallelto True, and split the data into separate datasets as much as possible.pyearthtools.pipeline.operations.xarray.split.OnVariables()can be useful here- Parameters:
reference_dataset (str | Path | None) – Reference dataset to run through numpy converter to initialise converter. Will be overwritten when this is given a dataset.
saved_records (str | Path | None) – Saved records to set numpy converter with. Will be overwritten when this is given a dataset.
run_parallel (bool) – Whether to run in parallel, will cause
undoto fail withoutsaved_records. If an undo pipeline is needed, set this to False.warn (bool) – Whether to warn on invalid shape.
- class pyearthtools.pipeline.operations.xarray.conversion.ToDask(warn=True)#
Convert xarray objects to pure dask arrays
Convert xarray object to dask and back.
- Parameters:
warn (bool, optional) – Whether to warn on invalid shape. Defaults to True.
- class pyearthtools.pipeline.operations.xarray.filters.XarrayFilter#
Xarray Filters
Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- class pyearthtools.pipeline.operations.xarray.filters.DropAnyNan(variables=None)#
Filter to drop any data with nans when iterating.
Used to remove any bad data or data that is masked out.
Drop data with any nans
- Parameters:
variables (list, optional) – Subset of variables to check. Defaults to None.
- filter(sample)#
Check if any of the sample is nan
- Parameters:
sample (xr.Dataset) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains one or more nan value
- Return type:
None
- class pyearthtools.pipeline.operations.xarray.filters.DropAllNan(variables=None)#
Filter to drop any data with all nans when iterating.
Used to remove any bad data or data that is masked out.
Drop data with all nans
- Parameters:
variables (list, optional) – Subset of variables to check. Defaults to None.
- filter(sample)#
Check if all of the sample is nan
- Parameters:
sample (xr.Dataset) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains only nan values
(TypeError) – If sample is not an xr.DataArray or xr.Dataset
- Return type:
None
- class pyearthtools.pipeline.operations.xarray.filters.DropValue(value, percentage)#
Filter to drop data containing more than a given percentage of a value.
Can be used to trim out invalid data
Drop Data if number of elements equal to value are greater than percentage when iterating.
- Parameters:
value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.
percentage (float) – Percentage of
valueof which an exceedance drops data
- filter(sample)#
Check if all of the sample is nan
- Parameters:
sample (np.ndarray) – Sample to check
- Raises:
(PipelineFilterException) – If number of elements equal to value are greater than percentage
- Return type:
None
- class pyearthtools.pipeline.operations.xarray.filters.Shape(shape, split_tuples=False)#
Filter to drop data of incorrect shape
Used to ensure that incoming data is of the correct shape for later steps
Drop Data if shape does not match expected
- Parameters:
tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape
...] – Shape to match, either tuple of shapes for tupled data or direct shape
int]) – Shape to match, either tuple of shapes for tupled data or direct shape
split_tuples (bool, optional) – Whether to split tuples, if
True,shapeshould not be a tuple of tuplesshape (tuple[tuple[int, ...] | int, ...])
- filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Parameters:
sample (tuple[T, ...] | T)
- class pyearthtools.pipeline.operations.xarray.reshape.Dimensions(dimensions, append=True, preserve_order=False)#
Reorder dimensions
Operation to reorder Dimensions of an [xarray][xarray] object.
Not all dims have to be supplied, will automatically add remaining dims, or if append == False, prepend extra dims.
- Parameters:
dimensions (Union[str, list[str]]) – Specified order of dimensions to transpose dataset to
append (bool, optional) – Append extra dims, if false, prepend dims. Defaults to True.
preserve_order (bool, optional) – Whether to preserve the order of dims or on
undo, also set to dimensions order. Defaults to False.
- class pyearthtools.pipeline.operations.xarray.reshape.CoordinateFlatten(coordinate, skip_missing=False)#
Flatten a coordinate in a dataset into separate variables.
Flatten a coordinate in an xarray Dataset, putting the data at each value of the coordinate into a separate data variable.
The output data variables will be named “<old variable name><value of coordinate>”. For example, if the input Dataset has a variable “t” and it is flattened along the coordinate “pressure_level” which has values [100, 200, 500], then the output Dataset will have variables called t100, t200 and t500.
- Parameters:
coordinate (Hashable) – Coordinate to flatten and expand on.
skip_missing (bool, optional) – Whether to skip data that does not have any of the listed coordinates. If True, will return such data unchanged. Defaults to False.
- Raises:
ValueError – If coordinate not found in the dataset and skip_missing==False.
- class pyearthtools.pipeline.operations.xarray.select.SelectDataset(variables, operation='apply')#
Operation to select a given set of variables from a [Dataset][xarray.Dataset]
Select variables from dataset
- Parameters:
() (variables) – Variables to select
operation (Literal['apply', 'undo'], optional) – Operation to run on. Defaults to ‘apply’.
- class pyearthtools.pipeline.operations.xarray.select.DropDataset(variables, operation='apply')#
DataOperation to drop a given set of variables from a [Dataset][xarray.Dataset]
Can be used to remove variables when undoing, if one was added as a pipeline step.
Drop variables from dataset
- Parameters:
() (variables) – Variables to drop
operation (Literal['apply', 'undo'], optional) – Operation to run on. Defaults to ‘apply’.
- class pyearthtools.pipeline.operations.xarray.select.SliceDataset(slices=None, **kwargs)#
Select a slice of an xarray object
- Examples
>>> Slicer(slices = {'time': (0,10,2)}) # == .sel(time = slice(0,10,2))
Setup dataset slicer
- Parameters:
slices (Optional[dict[str, tuple[Any, ...]]], optional) – Slice dictionary, must be key of dim in ds, and slice notation as value. Defaults to None.
kwargs (tuple, optional) – Keyword argument form of
slices.
- class pyearthtools.pipeline.operations.xarray.split.OnVariables(variables=None, merge_kwargs=None)#
Split xarray object’s on variables
Split on variables
- Parameters:
variables (Optional[Union[tuple[Union[str, tuple[str, ...], list[str]], ...], list[str]]], optional) – Variable split. If tuple or list, will split into those tuples, with the associated list referencing the variables to split out. If not given, will split all variables into seperate items. Defaults to None.
merge_kwargs (Optional[dict[str, Any]], optional) – Kwargs needed for merge on the
undo. Defaults to None.
- join(sample)#
Join sample
- Parameters:
sample (tuple[Dataset | DataArray, ...])
- Return type:
Dataset
- split(sample)#
Split sample
- Parameters:
sample (Dataset)
- Return type:
tuple[Dataset, …]
- class pyearthtools.pipeline.operations.xarray.split.OnCoordinate(coordinate, merge_kwargs=None)#
Split xarray object on coordinate
Split samples into tuples
- Parameters:
split_tuples (bool, optional) – Split tuples on
splitoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
split,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
coordinate (str)
merge_kwargs (dict[str, Any] | None)
- join(sample)#
Join method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
split.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (tuple[Any, ...]) – Sample to be joined.
- Returns:
Joined
sample- Return type:
(Any)
- split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
- class pyearthtools.pipeline.operations.xarray.values.FillNan(nan=0, posinf=None, neginf=None)#
Fill any Nan’s with a value
DataOperation to fill Nan’s
- Parameters:
nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.
posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.
neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.
- class pyearthtools.pipeline.operations.xarray.values.MaskValue(value, operation='==', replacement_value=nan)#
DataOperation to mask values with a given replacement
Operation to Mask Values
- Parameters:
value (Union[float, dict[str, float]]) – Value to search for.
operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.
replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.
- Raises:
KeyError – If invalid
operationpassed.
- class pyearthtools.pipeline.operations.xarray.values.Clip(min_value=0, max_value=1)#
Operation to force data to be within a certain range, by default 0 & 1
Force data into a specified range
- Parameters:
min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.
max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.
- class pyearthtools.pipeline.operations.xarray.values.Derive(derivation=None, *, drop=True, **derivations)#
Derive variables within the dataset
Uses
pyearthtools.data.transforms.derive.Derivation step
- Parameters:
derivation (Optional[dict[str, Union[str, tuple[str, dict[str, Any]]]]], optional) – Equation configuration. If str, equation is evaluated. If tuple, first element is assumed to be equation, and the second a dictionary to update the new vars attributes with. Defaults to None.
drop (bool, optional) – Drop derived variables on
undo. Defaults to True.**derivations (Union[str, tuple[str, dict[str, Any]]]) – Kwarg form of
derivation.
- class pyearthtools.pipeline.operations.xarray.metadata.Rename(rename, **rename_kwargs)#
Rename
variablesin anxr.Dataset.Rename variables in an
xr.Dataset- Parameters:
rename (Optional[dict[str, str]]) – Name conversion dictionary
- class pyearthtools.pipeline.operations.xarray.metadata.Encoding(encoding, operation='both')#
Set encoding on
xarrayobjectsSet encoding on
xarrayobjects- Parameters:
encoding (dict[str, dict[str, Any]]) – Variable value pairs assigning encoding to the given variable. Can set key to ‘all’ to apply to all variables. Defaults to None.
operation (Literal['apply', 'undo', 'both'], optional) – When to apply encoding setting. Defaults to “both”.
- class pyearthtools.pipeline.operations.xarray.metadata.MaintainEncoding(reference=None, limit=None)#
Maintain encoding of samples from
applytoundo.If
applynot called beforeundo, this will do nothing.- Parameters:
Optional[str] (reference) – Reference dataset to get encoding from. If not given will use first
sampleonapply.optional) – Reference dataset to get encoding from. If not given will use first
sampleonapply.limit (Optional[list[str]], optional) – When getting encoding from
referenceobject, limit the retrieved encoding. If not given will get['units', 'dtype', 'calendar', '_FillValue', 'scale_factor', 'add_offset', 'missing_value']. Defaults to None.reference (str | None)
- class pyearthtools.pipeline.operations.xarray.metadata.Attributes(attributes, apply_on='dataset', operation='both')#
Set attributes on
xarrayobjectsSet attributes on
xarrayobjects- Parameters:
attrs (dict[str, Any] | None) – Attributes to set, key: value pairs. Set
apply_onto choose where attributes are applied. | Key | Description | | — | ———– | | dataset | Attributes updated on dataset | | dataarray | If applied on a dataset, update each dataarray inside the dataset | | both | Do both above | | per_variable | Treatattrsas a dictionary of dictionaries, applying on dataarray if in dataset. | Defaults to None.apply_on (Literal['dataset', 'dataarray', 'both'], optional) – On what type to update attributes. Defaults to ‘dataset’.
operation (Literal['apply', 'undo', 'both'], optional) – When to apply encoding setting. Defaults to “both”.
attributes (dict[str, dict[str, Any]])
- class pyearthtools.pipeline.operations.xarray.metadata.MaintainAttributes(reference=None)#
Maintain attributes
Maintain attributes of samples from
applytoundo.If
applynot called beforeundo, this will do nothing.- Parameters:
Optional[str] (reference) – Reference dataset to get attributes from. If not given will use first
sampleonapply.optional) – Reference dataset to get attributes from. If not given will use first
sampleonapply.reference (str | None)
- class pyearthtools.pipeline.operations.xarray.normalisation.xarrayNormalisation#
Parent xarray Normalisation
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- classmethod open_file(file)#
Open xarray file
- Parameters:
file (str | Path)
- Return type:
Dataset
- class pyearthtools.pipeline.operations.xarray.normalisation.Anomaly(mean)#
Anomaly Normalisation
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
mean (str | Path)
- class pyearthtools.pipeline.operations.xarray.normalisation.Deviation(mean, deviation, debug=False)#
Deviation Normalisation
Each argument take take a Dataset, DataArray, float or file object.
- Parameters:
mean (str | Path | Dataset | DataArray | float) – mean values to subtract
deviation (str | Path | Dataset | DataArray | float) – deviation value to divide by
- class pyearthtools.pipeline.operations.xarray.normalisation.Division(division_factor)#
Division based Normalisation
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
division_factor (str | Path)
- class pyearthtools.pipeline.operations.xarray.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#
evalbased normalisationRun a normalisation calculation using
eval.Will get all
kwargspassed to this class, andsampleas the data to be normalised.All kwargs will be loaded from file if a str.
- Parameters:
normalisation_eval (str) – Normalisation eval str
unnormalisation_eval (str) – Unnoralisation eval str
- class pyearthtools.pipeline.operations.xarray.remapping.HEALPix(spatial_coords, nside, interpolation='bilinear', resolution_factor=1.0, include_coords=False, manual_rechunking=True, template_dataset=None, check_for_nans=False)#
HEALPix Remapper
HEALPix mesh Remapper as
pipelineoperations- Parameters:
spatial_coords (dict[str, int]) – Dictionary of spatial coords to remap over, with the associated size.
nside (int) – The number of pixels each HEALPix face sides has. Must be power of 2.
interpolation (str) – The interpolation scheme (“nearest-neighbor”, “bilinear”, “biquadratic”, “bicubic”).
resolution_factor (float) – In some cases, when choosing nside “too large” for the source data, the projection can contain NaN values. Choosing a resolution_factor > 1.0 can resolve this but requires careful inspection of the projected data.
include_coords (bool) – Include spatial_coords as variables for each face.
manual_rechunking (bool) – Manually rechunk to one chunk per spatial grid.
template_dataset (str | None) – Override for template dataset to get coords from.
check_for_nans (bool) – Check for nans after remapping.
- Raises:
ValueError – If
spatial_coordsis wrong
Examples
Remap ERA5 resolution data to faces of size 128.
>>> import pyearthtools.pipeline >>> import pyearthtools.data >>> >>> remapper = pyearthtools.pipeline.operations.xarray.remapping.HEALPix({'latitude':721, 'longitude':1440}, nside = 128) >>> remapper.remap(pyearthtools.data.archive.ERA5.sample()['2000-01-01T00']) ... # Remapped data >>> >>> pyearthtools.pipeline.Pipeline( >>> pyearthtools.data.archive.ERA5.sample(), >>> remapper >>> )
- fyx2hpxidx(f, x, y)#
Computes the HEALPix index from a given face (f), row (y), and column (x) under consideration of the number of pixels along a HEALPix face (nside).
- Parameters:
f (int) – The face index
y (int) – The local row index within the given face
x (int) – The local column index within the given face
- Returns:
The HEALPix index
- Return type:
int
- hpx1d2hpx3d(hpx1d, dtype=<class 'numpy.float32'>)#
Converts a one-dimensional HEALPix array [NPix] into a three-dimensional HEALPix array of shape [F, H, W].
- Parameters:
hpx1d (ndarray) – The one-dimensional array in HEALPix convention
dtype (dtype) – The data type (float precision) of the returned array
- Returns:
The three-dimensional array in [F, H, W] convention
- Return type:
ndarray
- hpx2ll(data, **kwargs)#
Projects a given three dimensional HEALPix array to latitude longitude representation.
- Parameters:
data (ndarray) – The data of shape [faces=12, height=nside, width=nside] in HEALPix format
- Returns:
An array of shape [height=latitude, width=longitude] containing the latlon data
- Return type:
ndarray
- hpx3d2hpx1d(hpx3d, dtype=<class 'numpy.float32'>)#
Converts a three-dimensional HEALPix array of shape [F, H, W] into a one-dimensional HEALPix array [NPix].
- Parameters:
hpx3d (ndarray) – The three dimensional array in HEALPix convention [F, H, W]
dtype (dtype) – The data type (float precision) of the returned array
- Returns:
The one-dimensional array in [NPix] HEALPix convention
- Return type:
ndarray
- hpxidx2fyx(hpxidx, dtype=<class 'numpy.float32'>)#
Determines the face (f), column (x), and row (y) indices for a given HEALPix index under consideration of the base face index [0, 1, …, 11] and the number of pixels each HEALPix face side has (nside).
- Parameters:
hpxidx (int) – The HEALPix index
dtype (dtype)
- Returns:
A tuple containing the face, y, and x indices of the given HEALPix index
- Return type:
(<class ‘int’>, <class ‘int’>, <class ‘int’>)
- inverse_remap(sample)#
Remap
samplefrom HEALPix mesh to Lat Lon Grid- Parameters:
sample (XR_TYPE)
- Return type:
XR_TYPE
- ll2hpx(data)#
Projects a given array from latitude longitude into the HEALPix representation.
- Parameters:
data (ndarray) – The data of shape [height, width] in latlon format
- Returns:
An array of shape [f=12, h=nside, w=nside] containing the HEALPix data
- Return type:
ndarray
- remap(sample)#
Remap
samplefrom Lat Lon Grid to HEALPix mesh- Parameters:
sample (XR_TYPE)
- Return type:
XR_TYPE
pipeline.operations.dask#
- class pyearthtools.pipeline.operations.dask.Stack(axis=None)#
Stack a tuple of da.Array’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
axis (int | None)
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
Array
- unjoin(sample)#
Unstacks a stacked sample
- Parameters:
sample (Any)
- Return type:
tuple
- class pyearthtools.pipeline.operations.dask.Concatenate(axis=None)#
Concatenate a tuple of da.Array’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
axis (int | None)
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
Array
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.dask.VStack#
Vertically Stack a tuple of da.Array’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
Array
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.dask.HStack#
Horizontally Stack a tuple of da.Array’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
Array
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.dask.Compute#
Compute dask array or delayed object
If dask array, will convert it to a full numpy array
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- class pyearthtools.pipeline.operations.dask.augment.Rotate(seed=42, axis=(-2, -1))#
Rotation Augmentation by 90 degrees in the plane specified by axes.
Rotation Augmentation by 90 degrees in the plane specified by axes.
Generates a random number between 0 & 3 inclusive, for number of times to rotate.
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int, int], optional) – Rotation plane. Axes must be different. Defaults to (-2, -1).
- class pyearthtools.pipeline.operations.dask.augment.Flip(seed=42, axis=-1)#
Flip Augmentation on the specified axes.
Flip Augmentation by 90 degrees in the plane specified by axes.
Generates a random boolean, if True, flip, otherwise not
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int], optional) – Axis to flip data in. Defaults to -1.
- class pyearthtools.pipeline.operations.dask.augment.FlipAndRotate(seed=42, axis=(-2, -1))#
Flip & Rotation Augmentation.
Apply both Flip & Rotation Augmentations, will rotate on given axis, and flip on both
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int], optional) – Rotation plane primarily. Axes must be different. Will also flip on each given axis. Defaults to (-2, -1).
- class pyearthtools.pipeline.operations.dask.filters.daskFilter#
dask Filters
Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- class pyearthtools.pipeline.operations.dask.filters.DropAnyNan#
Filter to drop any data with nans when iterating.
Used to remove any bad data or data that is masked out.
Drop data with any nans
- filter(sample)#
Check if any of the sample is nan
- Parameters:
sample (da.Array) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains one or more nan value
- Return type:
None
- class pyearthtools.pipeline.operations.dask.filters.DropAllNan#
Filter to drop any data if all nans.
Used to remove any bad data or data that is masked out.
Drop data with any nans
- filter(sample)#
Check if all of the sample is nan
- Parameters:
sample (da.Array) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains only nan values
- Return type:
None
- class pyearthtools.pipeline.operations.dask.filters.DropValue(value, percentage)#
Filter to drop data containing more than a given percentage of a value.
Can be used to trim out invalid data
Drop Data if number of elements equal to value are greater than percentage when iterating.
- Parameters:
value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.
percentage (float) – Percentage of
valueof which an exceedance drops data
- filter(sample)#
Check if all of the sample is nan
- Parameters:
sample (da.Array) – Sample to check
- Raises:
(PipelineFilterException) – If number of elements equal to value are greater than percentage
- Return type:
None
- class pyearthtools.pipeline.operations.dask.filters.Shape(shape, split_tuples=False)#
Filter to drop data of incorrect shape
Used to ensure that incoming data is of the correct shape for later steps
Drop Data if shape does not match expected
- Parameters:
tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape
...] – Shape to match, either tuple of shapes for tupled data or direct shape
int]) – Shape to match, either tuple of shapes for tupled data or direct shape
split_tuples (bool, optional) – Whether to split tuples, if
True,shapeshould not be a tuple of tuplesshape (tuple[tuple[int, ...] | int, ...])
- filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Parameters:
sample (tuple[Array, ...] | Array)
- class pyearthtools.pipeline.operations.dask.reshape.Rearrange(rearrange, skip=False, reverse_rearrange=None, rearrange_kwargs=None, **kwargs)#
Operation to rearrange data using einops
Using Einops rearrange, rearrange data.
- !!! Warning
This will occur on each iteration, and on
__getitem__, so it is best to leave patches code out if using [PatchingDataIndex][pyearthtools.pipeline.operations.PatchingDataIndex].` 'p t c h w' == 't c h w' `As this will attempt to add the patch dim if the first attempt fails
- Parameters:
rearrange (str) – String entry to einops.rearrange
skip (bool, optional) – Whether to skip data that cannot be rearranged. Defaults to False.
reverse_rearrange (Optional[str], optional) – Override for reverse operation, if not given flip rearrange. Defaults to None.
rearrange_kwargs (Optional[dict[str, Any]], optional) – Extra keyword arguments to be passed to the einops.rearrange call. Defaults to {}.
- class pyearthtools.pipeline.operations.dask.reshape.Squeeze(axis)#
Operation to Squeeze one Dimensional axis at ‘axis’ location
Squeeze Dimension of Data
- Parameters:
axis (Union[tuple[int, ...], int]) – Axis to squish at
- class pyearthtools.pipeline.operations.dask.reshape.Flattener(flatten_dims=None, shape_attempt=None)#
- Parameters:
flatten_dims (int | None)
shape_attempt (tuple[str | int, ...] | None)
- class pyearthtools.pipeline.operations.dask.reshape.Flatten(flatten_dims=None, *, shape_attempt=None)#
Operation to Flatten parts of data samples into a one dimensional array
Operation to flatten incoming data
- Parameters:
flatten_dims (Optional[int], optional) – Number of dimensions to flatten, counting from the end. If None, flatten all, with size being stored from first use. Is used for negative indexing, so for last three dims
flatten_dims== 3, Defaults to None.shape_attempt (Optional[tuple[int, ...]], optional) – Reshape value to try if discovered shape fails. Used if data coming to be undone is different. Can have
'...'as wildcards to get from discovered, Defaults to None.
Examples
>>> incoming_data = da.zeros((5,4,3,2)) >>> flattener = Flatten(flatten_dims = 2) >>> flattener.apply_func(incoming_data).shape ... (5, 4, 6) >>> flattener = Flatten(flatten_dims = 3) >>> flattener.apply_func(incoming_data).shape ... (5, 24) >>> flattener = Flatten(flatten_dims = None) >>> flattener.apply_func(incoming_data).shape ... (120)
Tip
`shape_attempt` Advanced Use
If using a model which does not return a full sample, say an XGBoost model only returning the centre value, set
shape_attempt.If incoming data is of shape
(1, 1, 3, 3), and data for undoing is(1, 1, 1, 1)aka(1), setshape_attemptto('...','...', 1, 1)>>> title="Spatial Size Change" >>> incoming_data = da.zeros((1,1,3,3)) >>> flattener = Flatten(shape_attempt = (1,1,1,1)) >>> flattener.apply_func(incoming_data).shape #(9,) >>> >>> undo_data = da.zeros((1)) >>> flattener.undo_func(undo_data).shape #(1,1,1,1)
If incoming data is of shape
(8, 1, 3, 3), and data for undoing is(2, 1, 1, 1)aka(2), setshape_attemptto(2,'...',1,1)>>> title=" Channel or Time Size Change also" >>> incoming_data = da.zeros((8,1,3,3)) >>> flattener = Flatten(shape_attempt = (2,1,1,1)) >>> flattener.apply_func(incoming_data).shape #(72,) >>> >>> undo_data = da.zeros((2)) >>> flattener.undo_func(undo_data).shape #(2,1,1,1)
- class pyearthtools.pipeline.operations.dask.reshape.SwapAxis(axis_1, axis_2)#
Move axis
- Parameters:
axis_1 (int) – Source axis
axis_2 (int) – Target axis
- class pyearthtools.pipeline.operations.dask.select.Select(array_index, tuple_index=None)#
Operation to select an element from a given array
Select data from a given index
- Parameters:
array_index (tuple[Optional[int],...]) – Tuple of indexes from which to select data. Can use None to specify not to select
tuple_index (Optional[int], optional) – Choice of which tuple element to apply selection to, if tuples passed. Defaults to None.
- Examples
>>> incoming_data = da.zeros((10,5,2)) >>> select = Select([0]) >>> select.apply_func(incoming_data).shape (5,2) >>> select = Select([0, None, 0]) >>> select.apply_func(incoming_data).shape (5)
- class pyearthtools.pipeline.operations.dask.select.Slice(*slices, reverse_slice=False)#
Slice a chunk of a dask array
Examples
>>> Slicer((0,10,2)) # == slice(0,10,2) >>> incoming_data = da.zeros((10,5,4)) >>> Slicer((0,10,2), (1, 3)).apply_func(incoming_data).shape (5,2,4) >>> Slicer((1, 3)).apply_func(incoming_data).shape (2,5,4) >>> Slicer((1, 3), reverse_slice = True).apply_func(incoming_data).shape (10,5,2)
Setup slicing operation
- Parameters:
slices (tuple[Optional[int], ...]) – Each tuple is converted into a slice. So must follow slice notation
reverse_slice (bool, optional) – Whether to slice offset towards last axis. Defaults to False.
- class pyearthtools.pipeline.operations.dask.split.OnAxis(axis, axis_size=None)#
Split across an axis in a dask array
Split over a dask array axis
- Parameters:
axis (int) – Axis number to iterate over
axis_size (int | None, optional) – Expected size of the axis, can be found automatically. Defaults to None.
- join(sample)#
Join
sampletogether, recovering initial shape- Parameters:
sample (tuple[Array])
- Return type:
Array
- split(sample)#
Combine all elements of axis on batch dimension
- Parameters:
sample (Array)
- Return type:
tuple[Array]
- class pyearthtools.pipeline.operations.dask.split.OnSlice(*slices, axis)#
Split across slices on axis
Setup slicing operation
- Parameters:
slices (tuple[int, ...]) – Each tuple is converted into a slice. So must follow slice notation
axis (int) – Axis number to slice over
- join(sample)#
Join
sampletogether- Parameters:
sample (tuple[Array])
- Return type:
Array
- split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
- class pyearthtools.pipeline.operations.dask.values.FillNan(nan=0, posinf=None, neginf=None)#
Fill any Nan’s with a value
DataOperation to fill Nan’s
- Parameters:
nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.
posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.
neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.
- class pyearthtools.pipeline.operations.dask.values.MaskValue(value, operation='==', replacement_value=nan)#
DataOperation to mask values with a given replacement
Operation to Mask Values
- Parameters:
value (Union[float, dict[str, float]]) – Value to search for.
operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.
replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.
- Raises:
KeyError – If invalid
operationpassed.
- apply_func(sample)#
Mask Data from initialised configuration
- Parameters:
sample (da.Array) – Data to apply mask to
- Returns:
Masked Data
- Return type:
(da.Array)
- class pyearthtools.pipeline.operations.dask.values.Clip(min_value=0, max_value=1)#
Operation to force data to be within a certain range, by default 0 & 1
Force data into a specified range
- Parameters:
min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.
max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.
- class pyearthtools.pipeline.operations.dask.normalisation.daskNormalisation(expand=True)#
Parent dask normalisation class
Base Dask Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.
- classmethod open_file(file)#
Open dask file
- Parameters:
file (str | Path)
- Return type:
Array
- class pyearthtools.pipeline.operations.dask.normalisation.Anomaly(mean, expand=True)#
Anomaly Normalisation
Base Dask Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.mean (str | Path)
- class pyearthtools.pipeline.operations.dask.normalisation.Deviation(mean, deviation, expand=True)#
Deviation Normalisation
Base Dask Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.mean (str | Path)
deviation (str | Path)
- class pyearthtools.pipeline.operations.dask.normalisation.Division(division_factor, expand=True)#
Division based Normalisation
Base Dask Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.division_factor (str | Path)
- class pyearthtools.pipeline.operations.dask.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#
evalbased normalisationRun a normalisation calculation using
eval.Will get all
kwargspassed to this class, andsampleas the data to be normalised.All kwargs will be loaded from file if a str.
- Parameters:
normalisation_eval (str) – Normalisation eval str
unnormalisation_eval (str) – Unnoralisation eval str
- class pyearthtools.pipeline.operations.dask.conversion.ToXarray(array_shape, coords=None, encoding=None, attributes=None, **kwargs)#
Dask -> Xarray Converter
Convert array into xarray object.
Can use
.liketo record from a reference data object.- Parameters:
array_shape (tuple[str, ...] | str) – Order / naming of dimensions of incoming array. Can be str split by ‘ ‘. Special name is ‘variable’ corresponding to variables in a dataset. That dim will be split into variables
coords (dict[Hashable, Any] | None) – Coordinates to set xarray object with, not all have to be given. Cannot be a tuple. As ‘variable’ is special in
array_shape, ‘variable’ in coords names the variables.encoding (dict[str, Any] | None) – Encoding to set, can be variable, or dimension.
attributes (dict[str, Any] | None) – Attributes to set. Can use
__datasetif dataset to update dataset attrs Defaults to None.
Examples
Using
likeconvenience method>>> import pyearthtools.data >>> import pyearthtools.pipeline >>> import dask.array as da >>> >>> sample = pyearthtools.data.archive.ERA5.sample()('2000-01-01T00') >>> converter = pyearthtools.pipeline.operations.numpy.conversion.ToXarray.like(sample) >>> converter.apply(da.ones((1, 1, 721, 1440)))
Manual specification example
>>> import pyearthtools.pipeline >>> import numpy as np >>> >>> converter = pyearthtools.pipeline.operations.numpy.conversion.ToXarray( >>> 'time latitude longitude', >>> coords = {'latitude': np.arange(-90, 90, 0.25), 'longitude': np.arange(-180, 180, 0.25)} >>> ) >>> converter.apply(np.ones((1, 721, 1440)))
- apply_func(sample)#
Convert dask array to xarray
- Parameters:
sample (Array)
- classmethod like(reference_dataset, drop_coords=None)#
Get
ToXarrayOperation setup from a reference dataset- Parameters:
reference_dataset (Union[xr.DataArray, xr.Dataset]) – Reference dataset to use to setup converter.
drop_coords (list[str] | None)
- Returns:
Converter setup to convert like
reference_dataset.- Return type:
(ToXarray)
- undo_func(sample)#
Convert xarray to dask array
- Parameters:
sample (DataArray | Dataset)
- class pyearthtools.pipeline.operations.dask.conversion.ToNumpy(chunks='auto')#
Dask -> Numpy Converter
Base
PipelineOperation,Allows for tuple spliting, and type checking
- Parameters:
split_tuples (Literal['apply', 'undo', True, False], optional) – Split tuples on associated actions, if bool, apply to all functions. Defaults to False.
recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
operation (Literal['apply', 'undo', 'both'], optional) – Which functions to apply operation to. If not ‘apply’ apply does nothing, same for
undo. Defaults to “both”.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
chunks (int | str | tuple[int, str])
pipeline.operations.numpy#
- class pyearthtools.pipeline.operations.numpy.Stack(axis=None)#
Stack a tuple of np.ndarray’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
axis (int | None)
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
ndarray
- unjoin(sample)#
Unstacks a stacked sample
- Parameters:
sample (Any)
- Return type:
tuple
- class pyearthtools.pipeline.operations.numpy.Concatenate(axis=None)#
Concatenate a tuple of np.ndarray’s
Join samples from tuple
- Parameters:
split_tuples (bool, optional) – Split tuples on
unjoinoperation. Defaults to True.recursively_split_tuples (bool, optional) – Recursively split tuples. Defaults to False.
recognised_types (Optional[Union[tuple[Type, ...],Type]], optional) – Types recognised on
unjoin,joinautomatically has tuples. Defaults to None.response_on_type (Literal["warn", "exception", "ignore"], optional) – Response when invalid type found. Defaults to “exception”.
axis (int | None)
- join(sample)#
Join sample
- Parameters:
sample (tuple[Any, ...])
- Return type:
ndarray
- unjoin(sample)#
Unjoin method called on
undo.- If the pipeline is to be fully reversable,
this should return exactly what was received in
join.
If it does not, the pipeline will not be fully reversable.
- Parameters:
sample (Any) – Sample to be split / unjoined.
- Returns:
Split / unjoined sample
- Return type:
(tuple)
- class pyearthtools.pipeline.operations.numpy.augment.Rotate(seed=42, axis=(-2, -1))#
Rotation Augmentation by 90 degrees in the plane specified by axes.
Rotation Augmentation by 90 degrees in the plane specified by axes.
Generates a random number between 0 & 3 inclusive, for number of times to rotate.
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int, int], optional) – Rotation plane. Axes must be different. Defaults to (-2, -1).
- class pyearthtools.pipeline.operations.numpy.augment.Flip(seed=42, axis=-1)#
Flip Augmentation on the specified axes.
Flip Augmentation by 90 degrees in the plane specified by axes.
Generates a random boolean, if True, flip, otherwise not
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int], optional) – Axis to flip data in. Defaults to -1.
- class pyearthtools.pipeline.operations.numpy.augment.FlipAndRotate(seed=42, axis=(-2, -1))#
Flip & Rotation Augmentation.
Apply both Flip & Rotation Augmentations, will rotate on given axis, and flip on both
- Parameters:
seed (int, optional) – Random Number seed. Defaults to 42.
axis (tuple[int], optional) – Rotation plane primarily. Axes must be different. Will also flip on each given axis. Defaults to (-2, -1).
- class pyearthtools.pipeline.operations.numpy.filters.NumpyFilter#
Numpy Filters
Base
PipelineStep- all steps should subclass from this- Parameters:
split_tuples (Union[dict[str, bool], bool], optional) – Split tuples. If dict, allows to distinguish which functions should split tuples. Defaults to False.
_split_tuples_call. (recursively_split_tuples when using) – Recursively split tuples when using
_split_tuples_call. Defaults to False.recognised_types (Optional[Union[tuple[Type, ...], Type, dict[str, Union[tuple[Type, ...], Type]]] ], optional) – Types recognised, can be dictionary to reference different types per function Defaults to None.
response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional) – Response when invalid type found. Defaults to “exception”.
- class pyearthtools.pipeline.operations.numpy.filters.DropAnyNan#
Filter to drop any data with nans when iterating.
Used to remove any bad data or data that is masked out.
Drop data with any nans
- filter(sample)#
Reject the sample if any value is nan
- Parameters:
sample (np.ndarray) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains one or more nan value
- Return type:
None
- class pyearthtools.pipeline.operations.numpy.filters.DropAllNan#
Filter to drop any data if all nans.
Used to remove any bad data or data that is masked out.
Drop data with any nans
- filter(sample)#
Reject the sample if all of its values are nan
- Parameters:
sample (np.ndarray) – Sample to check
- Raises:
(PipelineFilterException) – If sample contains only nan values
- Return type:
None
- class pyearthtools.pipeline.operations.numpy.filters.DropValue(value, percentage)#
Filter to drop data containing more than a given percentage of a value.
Can be used to trim out invalid data
Drop Data if number of elements equal to value are greater than percentage when iterating.
- Parameters:
value (Union[float, Literal["nan"]]) – Value to search for. Can be nan or ‘nan’.
percentage (float) – Percentage of
valueof which an exceedance drops data
- filter(sample)#
Check if all of the sample is nan
- Parameters:
sample (np.ndarray) – Sample to check
- Raises:
(PipelineFilterException) – If number of elements equal to value are greater than percentage
- Return type:
None
- class pyearthtools.pipeline.operations.numpy.filters.Shape(shape, split_tuples=False)#
Filter to drop data of incorrect shape
Used to ensure that incoming data is of the correct shape for later steps
Drop Data if shape does not match expected
- Parameters:
tuple[Union[tuple[int (shape) – Shape to match, either tuple of shapes for tupled data or direct shape
...] – Shape to match, either tuple of shapes for tupled data or direct shape
int]) – Shape to match, either tuple of shapes for tupled data or direct shape
split_tuples (bool, optional) – Whether to split tuples, if
True,shapeshould not be a tuple of tuplesshape (tuple[tuple[int, ...] | int, ...])
- filter(sample)#
To be implemented by child class, should raise a
PipelineFilterExceptionifsampleis invalid.- Parameters:
sample (tuple[ndarray, ...] | ndarray)
- class pyearthtools.pipeline.operations.numpy.reshape.Rearrange(rearrange, skip=False, reverse_rearrange=None, rearrange_kwargs=None, **kwargs)#
Operation to rearrange data using einops
Using Einops rearrange, rearrange data.
- Parameters:
rearrange (str) – String entry to einops.rearrange
skip (bool, optional) – Whether to skip data that cannot be rearranged. Defaults to False.
reverse_rearrange (Optional[str], optional) – Override for reverse operation, if not given flip rearrange. Defaults to None.
rearrange_kwargs (Optional[dict[str, Any]], optional) – Extra keyword arguments to be passed to the einops.rearrange call. Defaults to {}.
- class pyearthtools.pipeline.operations.numpy.reshape.Squeeze(axis)#
Operation to Squeeze one-Dimensional axes at ‘axis’ location
Squeeze Dimension of Data, removing dimensions of length 1.
- Parameters:
axis (Union[tuple[int, ...], int]) – Axis to squeeze at
- class pyearthtools.pipeline.operations.numpy.reshape.Expand(axis)#
Operation to Expand One Dimensional axis at ‘axis’ location
Expand Dimension of Data
- Parameters:
axis (Union[tuple[int, ...], int]) – Axis to expand at
- class pyearthtools.pipeline.operations.numpy.reshape.Flattener(flatten_dims=None, shape_attempt=None)#
- Parameters:
flatten_dims (int | None)
shape_attempt (tuple[str | int, ...] | None)
- class pyearthtools.pipeline.operations.numpy.reshape.Flatten(flatten_dims=None, *, shape_attempt=None)#
Operation to Flatten parts of data samples into a one dimensional array
Operation to flatten incoming data
- Parameters:
flatten_dims (int | None) – Number of dimensions to flatten, counting from the end. If None, flatten all, with size being stored from first use. Is used for negative indexing, so for last three dims
flatten_dims== 3, Defaults to None.shape_attempt (tuple[int, ...] | None) – Reshape value to try if discovered shape fails. Used if data coming to be undone is different. Can have
'...'as wildcards to get from discovered, Defaults to None.
Examples
>>> incoming_data = np.zeros((5,4,3,2)) >>> flattener = Flatten(flatten_dims = 2) >>> flattener.apply_func(incoming_data).shape ... (5, 4, 6) >>> flattener = Flatten(flatten_dims = 3) >>> flattener.apply_func(incoming_data).shape ... (5, 24) >>> flattener = Flatten(flatten_dims = None) >>> flattener.apply_func(incoming_data).shape ... (120)
Tip
`shape_attempt` Advanced Use
If using a model which does not return a full sample, say an XGBoost model only returning the centre value, set
shape_attempt.If incoming data is of shape
(1, 1, 3, 3), and data for undoing is(1, 1, 1, 1)aka(1), setshape_attemptto('...','...', 1, 1)>>> title="Spatial Size Change" >>> incoming_data = np.zeros((1,1,3,3)) >>> flattener = Flatten(shape_attempt = (1,1,1,1)) >>> flattener.apply_func(incoming_data).shape #(9,) >>> >>> undo_data = np.zeros((1)) >>> flattener.undo_func(undo_data).shape #(1,1,1,1)
If incoming data is of shape
(8, 1, 3, 3), and data for undoing is(2, 1, 1, 1)aka(2), setshape_attemptto(2,'...',1,1)>>> title=" Channel or Time Size Change also" >>> incoming_data = np.zeros((8,1,3,3)) >>> flattener = Flatten(shape_attempt = (2,1,1,1)) >>> flattener.apply_func(incoming_data).shape #(72,) >>> >>> undo_data = np.zeros((2)) >>> flattener.undo_func(undo_data).shape #(2,1,1,1)
- class pyearthtools.pipeline.operations.numpy.reshape.SwapAxis(axis_1, axis_2)#
Move axis
- Parameters:
axis_1 (int) – Source axis
axis_2 (int) – Target axis
- class pyearthtools.pipeline.operations.numpy.select.Select(array_index, tuple_index=None)#
Operation to select an element from a given array
Select data from a given index
- Parameters:
array_index (tuple[Optional[int],...]) – Tuple of indexes from which to select data. Can use None to specify not to select
tuple_index (Optional[int], optional) – Choice of which tuple element to apply selection to, if tuples passed. Defaults to None.
- Examples
>>> incoming_data = np.zeros((10,5,2)) >>> select = Select([0]) >>> select.apply_func(incoming_data).shape (5,2) >>> select = Select([0, None, 0]) >>> select.apply_func(incoming_data).shape (5)
- class pyearthtools.pipeline.operations.numpy.select.Slice(*slices, reverse_slice=False)#
Slice a chunk of a numpy array
Examples
>>> Slice((0,10,2)) # == slice(0,10,2) >>> incoming_data = np.zeros((10,5,4)) >>> Slice((0,10,2), (1, 3)).apply_func(incoming_data).shape (5,2,4) >>> Slice((1, 3)).apply_func(incoming_data).shape (2,5,4) >>> Slice((1, 3), reverse_slice = True).apply_func(incoming_data).shape (10,5,2)
Setup slicing operation
- Parameters:
slices (tuple[Optional[int], ...]) – Each tuple is converted into a slice. So must follow slice notation
reverse_slice (bool, optional) – Whether to slice offset towards last axis. Defaults to False.
- class pyearthtools.pipeline.operations.numpy.split.OnAxis(axis, axis_size=None)#
Split across an axis in a numpy array
Split over a numpy array axis
- Parameters:
axis (int) – Axis number to iterate over
axis_size (int | None, optional) – Expected size of the axis, can be found automatically. Defaults to None.
- join(sample)#
Join
sampletogether, recovering initial shape- Parameters:
sample (tuple[ndarray])
- Return type:
ndarray
- split(sample)#
Combine all elements of axis on batch dimension
- Parameters:
sample (ndarray)
- Return type:
tuple[ndarray]
- class pyearthtools.pipeline.operations.numpy.split.OnSlice(*slices, axis)#
Split across slices on axis
Examples:
Setup slicing operation
- Parameters:
slices (tuple[int, ...]) – Each tuple is converted into a slice. So must follow slice notation
axis (int) – Axis number to slice over
- join(sample)#
Join
sampletogether- Parameters:
sample (tuple[ndarray])
- Return type:
ndarray
- split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
- class pyearthtools.pipeline.operations.numpy.split.VSplit#
vsplit on numpy arrays
Setup slicing operation
- join(sample)#
Join
sampletogether- Parameters:
sample (tuple[ndarray])
- Return type:
ndarray
- split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
- class pyearthtools.pipeline.operations.numpy.split.HSplit#
hsplit on numpy arrays
Setup slicing operation
- join(sample)#
Join
sampletogether- Parameters:
sample (tuple[ndarray])
- Return type:
ndarray
- split(sample)#
Split method called on
apply.- Parameters:
sample (Any) – Sample to be split into tuple
- Returns:
Split
sample- Return type:
(tuple[Any, …])
- class pyearthtools.pipeline.operations.numpy.values.FillNan(nan=0, posinf=None, neginf=None)#
Fill any Nan’s with a value
DataOperation to fill Nan’s
- Parameters:
nan (float, optional) – Value to fill nan’s with. If None is passed then NaN values will be replaced with 0. Defaults to 0.
posinf (float, optional) – Value to be used to fill positive infinity values, If None is passed then positive infinity values will be replaced with a very large number. Defaults to None.
neginf (float, optional) – Value to be used to fill negative infinity values, If None is passed then negative infinity values will be replaced with a very small (or negative) number. Defaults to None.
- class pyearthtools.pipeline.operations.numpy.values.MaskValue(value, operation='==', replacement_value=nan)#
DataOperation to mask values with a given replacement
Operation to Mask Values
- Parameters:
value (Union[float, dict[str, float]]) – Value to search for.
operation (Literal['==', '>', '<', '>=','<='], optional) – Operation to search with. Defaults to ‘==’.
replacement_value (Union[float, dict[str, float]], optional) – Replacement value. Defaults to np.nan.
- Raises:
KeyError – If invalid
operationpassed.
- apply_func(sample)#
Mask Data from initialised configuration
- Parameters:
sample (np.ndarray) – Data to apply mask to
- Returns:
Masked Data
- Return type:
(np.ndarray)
- class pyearthtools.pipeline.operations.numpy.values.Clip(min_value=0, max_value=1)#
Operation to force data to be within a certain range, by default 0 & 1
Force data into a specified range
- Parameters:
min_value (Optional[Union[float, dict[str, float]]], optional) – Minimum Value. If using a dict, acts per variable given. If None, this won’t apply a min masking Defaults to 0.
max_value (Optional[Union[float, dict[str, float]]], optional) – Maximum Value. If using a dict, acts per variable given. If None, this won’t apply a max masking Defaults to 1.
- class pyearthtools.pipeline.operations.numpy.normalisation.numpyNormalisation(expand=True)#
Parent numpy normalisation class
Base Numpy Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.
- classmethod open_file(file)#
Open numpy file
- Parameters:
file (str | Path)
- Return type:
ndarray
- class pyearthtools.pipeline.operations.numpy.normalisation.Anomaly(mean, expand=True)#
Anomaly Normalisation
Base Numpy Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.mean (str | Path)
- class pyearthtools.pipeline.operations.numpy.normalisation.Deviation(mean, deviation, expand=True)#
Deviation Normalisation
Base Numpy Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.mean (str | Path)
deviation (str | Path)
- class pyearthtools.pipeline.operations.numpy.normalisation.Division(division_factor, expand=True)#
Division based Normalisation
Base Numpy Normalisation
- Parameters:
expand (bool, optional) – Expand normalisation arrays to shape of
sampleby appending axis of size 1. Defaults to True.division_factor (str | Path)
- class pyearthtools.pipeline.operations.numpy.normalisation.Evaluated(normalisation_eval, unnormalisation_eval, **kwargs)#
evalbased normalisationRun a normalisation calculation using
eval.Will get all
kwargspassed to this class, andsampleas the data to be normalised.All kwargs will be loaded from file if a str.
- Parameters:
normalisation_eval (str) – Normalisation eval str
unnormalisation_eval (str) – Unnoralisation eval str
pipeline.operations.transform#
- class pyearthtools.pipeline.operations.transform.TimeOfYear(method)#
Add time of year to dataset
Add time of year as variable to a dataset
Use [DropDataset][pyearthtools.pipeline.operations.select.DropDataset] to remove it if an earlier step in the pipeline is sensitive to variable names.
- Parameters:
method (str) – Method to use, either “dayofyear” or “monthofyear” Both modelled as a sinusodal function
- Returns:
Transform to add time of year variable
- Return type:
- apply(ds)#
Apply transformation to Dataset
- Parameters:
dataset (XR_TYPES) – Dataset to apply transform to
ds (Dataset)
- Raises:
NotImplementedError – Base Transform does not implement this function
- Returns:
Transformed Dataset
- Return type:
XR_TYPES
- class pyearthtools.pipeline.operations.transform.AddCoordinates(coordinates, *extra_coords)#
Add coordinates as variable to a dataset
Use [DropDataset][pyearthtools.pipeline.operations.select.DropDataset] to remove it if an earlier step in the pipeline is sensitive to variable names.
Add coordinates to dataset.
- Parameters:
coordinates (str | list[str]) – Coordinate/s to add
*extra_coords (str) – Args form of coordinates
- apply(data)#
Apply transformation to Dataset
- Parameters:
dataset (XR_TYPES) – Dataset to apply transform to
data (Dataset)
- Raises:
NotImplementedError – Base Transform does not implement this function
- Returns:
Transformed Dataset
- Return type:
XR_TYPES
pipeline.samplers#
- class pyearthtools.pipeline.samplers.EmptyObject#
Empty object to be skipped by Iteration
Used to mark where the sampler cannot return data.
- class pyearthtools.pipeline.samplers.Sampler#
Base level Sampler
All sampler classes must implement this class, and provide
generator, which should act as a generator.See
DefaultSamplerfor an example, and the process to make a sampler.- abstractmethod generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.samplers.Default#
Default Sampler
Simply passes back any object given to it.
- generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.samplers.SuperSampler(*samplers)#
A collection of
Sampler’s run one after another.Construct a new
SuperSamplerfrom multipleSampler’s.- Parameters:
*samplers (Sampler) – Sampling is run sequentially, so order may be important.
- generator()#
SuperSampler version of
generator,Runs each
Samplerone after another.- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.samplers.Random(buffer_len, seed=42)#
Randomly sample objects from stream.
Builds a buffer of controllable length from which to sample from, once size is reached.
Construct a Random Sampler
- Parameters:
buffer_len (int) – Length of buffer to build. No objects will be yielded until this length is reached or exhausted.
seed (Union[int, None], optional) – Seed to initialise rng module with. Defaults to 42.
- generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.samplers.DropOut(step, yield_on_step=False)#
DropOut samples from the stream at a given interval.
Construct a DropOut Sampler
- Parameters:
step (int) – Step value in which to drop out objects. or if
yield_on_stepwhen to yield datayield_on_step (bool, optional) – Reverse behaviour of this Sampler, such that on
stepyield objects. Defaults to False.
- generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]
- class pyearthtools.pipeline.samplers.RandomDropOut(chance, seed=42)#
Randomly DropOut samples from the stream.
Construct a RandomDropOut Sampler
- Parameters:
chance (float) – Chance for samples to dropped, between 0 & 100.
seed (int, optional) – Seed to initialise rng module with. Defaults to 42.
- generator()#
Generator to control the sampling of data.
When passed None, and no samples remain within, should exit.
- How to:
Yield an
EmptyObjectto begin with, and capture what is sent. Run sampling routine, yieldEmptyObjectif sampler cannot return obj, else return obj. Exit when None is encountered. If any stored within the sampler, yield them all afterwards.
- Yields:
Generator[Any, Any, Any] – Sampling of data
- Return type:
Generator[Any, Any, Any]