HadISD Tutorial Three - Chunking the Data into Small Groups#
NOTE Before beginning this tutorial, you should first read HadISD Tutorial One - Introduction to Station Data and ensure you complete the tutorials in order
The ideal end goal would be to have one huge netcdf file with efficient random access to all the data within it. Unfortunately, practical limitations with merging large files make this somewhat difficult. In principle all the various merging algorithms should be happy working on disk, but in practise the merging component of the algorithm seems to happen in memory in the libraries we use. It’s more common to use smaller files on disk and then abstract that behind a multi-file lazy-load interface. This is acceptable for small and medium data sets, but eventually the sheer number of files starts to prove a problem, particularly in supercomputing environments, which are typically optimised for large file transfers.
As a result, we must jump through a few hoops to re-structure our data for efficient indexing by time rather than primarily by station number (or location).
Step one is to take our per-station files, and then in small groups re-arrange them and write out files by decade. This doesn’t immediately reduce the number of files much, but then we will proceed to join those decadal files into much larger files, and then delete the various intermediate files. Instead of over a thousand small files, we will instead have around-about 50 larger ones, organised in a way that we can work with more easily, particularly if we only want a decade or two.
This notebook does the small-group rechunking; the next one does the recombining.
[8]:
import tarfile
import gzip
import shutil
from pathlib import Path
import numpy as np
from datetime import datetime
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import os
from dask.distributed import Client
import xarray as xr
[10]:
UNPACKED_DIR = Path('/g/data/kd24/data/') / 'hadisd' / 'unpacked' # We need a place on disk to unpack the archives
PROCESSING_DIR = Path('/g/data/kd24/data/') / 'hadisd' / 'processing' # We need to cache some data on disk during reprocessing
PROCESSING_DIR.mkdir()
[11]:
files = list(UNPACKED_DIR.glob('*.nc'))
len(files)
[11]:
2089
[12]:
def simplify(ds):
'''
Here we move the latitude, longitude and elevation from a coordinate to a data variable
This fits the structure of the data more efficiently, allowing simpler concatenation.
'''
lats = xr.DataArray(data=[ds.latitude.values[0]] * len(ds.time), coords={'time': ds.time})
lons = xr.DataArray(data=[ds.longitude.values[0]] * len(ds.time), coords={'time': ds.time})
elev = xr.DataArray(data=[ds.elevation.values[0]] * len(ds.time), coords={'time': ds.time})
ds = ds.reset_coords(names=('latitude', 'longitude', 'elevation'), drop=True)
ds['lat'] = lats
ds['lon'] = lons
ds['elev'] = elev
ds = ds.drop_attrs()
return ds
[13]:
filegroups = [files[i:i + 10] for i in range(0, len(files), 10)]
print(len(filegroups)) # We come up with 134 such file groupings from the test data or 1040 for the full dataset
209
[14]:
decades = [('1800', '1930'), # Just in case there is undocumented early data
('1930', '1940'), ('1940', '1950'), # Dataset begins in 1930, start by decade here
('1950', '1960'), ('1960', '1970'), ('1970', '1980'),
('1980', '1990'), ('1990', '2000'), ('2000', '2010'), # 1980 is a common time to start from
('2010', '2020'), ('2020', '2030')
]
[ ]:
# This takes around 20-60 seconds per grouping. If you just want to get the hang of it, limit it to three groupings
# Otherwise, the test set have 67 groupings, so will take around half an hour to run
# The full set of stations will take several hours!
# For testing, just try three file groups
for i, fg in enumerate(filegroups): # Use me to process all downloaded data
print(f"Processing group {i} of {len(filegroups)}")
print(datetime.now().time())
loaded = [xr.open_dataset(f, engine='h5netcdf') for f in fg]
simplified = [simplify(_ds) for _ds in loaded]
merged = xr.concat(simplified, dim='report')
for d in decades:
decadal = merged.sel(time=slice(*d))
if len(decadal.time):
filename = PROCESSING_DIR / f'{d[0]}-{d[1]}-sg{i}.nc'
if not os.path.exists(filename):
decadal.to_netcdf(filename)
else:
print(f"{filename} exists, skipping")
Processing group 0 of 209
00:28:28.093766
Processing group 1 of 209
00:31:28.043623
[ ]:
print('done')