Introduction to Pipelines

Introduction to Pipelines#

[1]:
# Here we'll use WeatherBench 2 ERA5 dataset as data source
from pyearthtools.data.download.weatherbench import WB2ERA5
[2]:
# Now we import the pipeline model which is new in this example
import pyearthtools.pipeline

# and the operations module used as steps of the pipeline
import pyearthtools.pipeline.operations as ops

import pyearthtools.data.transforms.coordinates as coords

Data source#

For this example, we are using the ERA5 datasets from WeatherBench 2, at a low resolution. Data is small and direclty fetched from public Google Cloud bucket.

We can first have a look at the full dataset to see which variables and levels we want to assemble in the pipeline.

[3]:
WB2ERA5(resolution="64x32").dataset
[3]:
<xarray.Dataset> Size: 175GB
Dimensions:                                           (time: 93544,
                                                       longitude: 64,
                                                       latitude: 32, level: 13)
Coordinates:
  * latitude                                          (latitude) float64 256B ...
  * level                                             (level) int64 104B 50 ....
  * longitude                                         (longitude) float64 512B ...
  * time                                              (time) datetime64[ns] 748kB ...
Data variables: (12/62)
    10m_u_component_of_wind                           (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    10m_v_component_of_wind                           (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    10m_wind_speed                                    (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    2m_dewpoint_temperature                           (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    2m_temperature                                    (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    above_ground                                      (time, level, longitude, latitude) float32 10GB dask.array<chunksize=(100, 13, 64, 32), meta=np.ndarray>
    ...                                                ...
    volumetric_soil_water_layer_1                     (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    volumetric_soil_water_layer_2                     (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    volumetric_soil_water_layer_3                     (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    volumetric_soil_water_layer_4                     (time, longitude, latitude) float32 766MB dask.array<chunksize=(100, 64, 32), meta=np.ndarray>
    vorticity                                         (time, level, longitude, latitude) float32 10GB dask.array<chunksize=(100, 13, 64, 32), meta=np.ndarray>
    wind_speed                                        (time, level, longitude, latitude) float32 10GB dask.array<chunksize=(100, 13, 64, 32), meta=np.ndarray>

Create a data preparation pipeline.#

We can use pipelines to create a reproducable and explainable process to prepare data for specific tasks. Pipelines could also be shared as templates to complete similar work.

The pipeline below:

  1. Selects the ‘u’ and ‘v’ wind variables at 500 and 850 hPa, as well as 2 meter temperature.

  2. Selects the ‘geopotential’ and ‘vorticity’ atmospheric variables at 850 hPa.

  3. Merges these into a single dataset.

  4. Sorts the variables into a specified order.

  5. Applies a coordinate transformation to ensure latitude is formated as 0-360 degrees (not -180-180 degrees) using the StandardLongitude class.

  6. Reverses the data by ‘level’ coordinate using the ReIndex class.

[4]:
data_preparation = pyearthtools.pipeline.Pipeline(
    (
        WB2ERA5(resolution="64x32", variables=["u", "v", "t2m"], level=[500, 850]),
        WB2ERA5(resolution="64x32", variables=["geopotential", "vorticity"], level=850),
    ),
    ops.xarray.Merge(),
    ops.xarray.Sort(
        [
            "u_component_of_wind",
            "v_component_of_wind",
            "2m_temperature",
            "geopotential",
            "vorticity",
        ]
    ),
    ops.Transforms(
        apply=coords.StandardLongitude(type="0-360") + coords.ReIndex(level="reversed")
    ),
    # These methods will be explained when we create a pipeline for machine learning.
    # ops.xarray.reshape.CoordinateFlatten('level'),
    # ops.xarray.conversion.ToNumpy(),
    # ops.numpy.reshape.Squeeze(1),
)
[5]:
# Inspect the data_preparation pipeline which also visualises the pipeline as a graph.
data_preparation
Pipeline
    Description                    `pyearthtools.pipeline` Data Pipeline


    Initialisation
             exceptions_to_ignore           None
             iterator                       None
             sampler                        None
    Steps
             weatherbench.WB2ERA5           {'WB2ERA5': {'chunks': "'auto'", 'download_dir': 'None', 'level': '[500, 850]', 'url': "'gs://weatherbench2/datasets/era5/1959-2023_01_10-6h-64x32_equiangular_conservative.zarr'", 'variables': "['u', 'v', 't2m']"}}
             weatherbench.WB2ERA5[1]        {'WB2ERA5': {'chunks': "'auto'", 'download_dir': 'None', 'level': '850', 'url': "'gs://weatherbench2/datasets/era5/1959-2023_01_10-6h-64x32_equiangular_conservative.zarr'", 'variables': "['geopotential', 'vorticity']"}}
             join.Merge                     {'Merge': {'merge_kwargs': 'None'}}
             sort.Sort                      {'Sort': {'order': "['u_component_of_wind', 'v_component_of_wind', '2m_temperature', 'geopotential', 'vorticity']", 'strict': 'False'}}
             transforms.Transforms          {'Transforms': {'apply': {'TransformCollection': {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}, 'ReIndex': {'coordinates': 'None', 'level': "'reversed'"}}}, 'transforms': 'None', 'undo': 'None'}}

Graph

../../_images/notebooks_tutorial_Data_Pipelines_7_2.svg
[6]:
# Use the pipeline to create a sample for a specific date.
sample = data_preparation["20120102T00"]
sample
[6]:
<xarray.Dataset> Size: 58kB
Dimensions:              (latitude: 32, level: 2, longitude: 64, time: 1)
Coordinates:
  * latitude             (latitude) float64 256B -87.19 -81.56 ... 81.56 87.19
  * level                (level) int64 16B 850 500
  * longitude            (longitude) float64 512B 0.0 5.625 ... 348.8 354.4
  * time                 (time) datetime64[ns] 8B 2012-01-02
Data variables:
    u_component_of_wind  (time, level, longitude, latitude) float32 16kB dask.array<chunksize=(1, 2, 64, 32), meta=np.ndarray>
    v_component_of_wind  (time, level, longitude, latitude) float32 16kB dask.array<chunksize=(1, 2, 64, 32), meta=np.ndarray>
    2m_temperature       (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>
    geopotential         (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>
    vorticity            (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>
[7]:
# Inspect vorticity DataArray variable.
sample.vorticity
[7]:
<xarray.DataArray 'vorticity' (time: 1, longitude: 64, latitude: 32)> Size: 8kB
dask.array<getitem, shape=(1, 64, 32), dtype=float32, chunksize=(1, 64, 32), chunktype=numpy.ndarray>
Coordinates:
  * latitude   (latitude) float64 256B -87.19 -81.56 -75.94 ... 81.56 87.19
  * longitude  (longitude) float64 512B 0.0 5.625 11.25 ... 343.1 348.8 354.4
  * time       (time) datetime64[ns] 8B 2012-01-02
[ ]: