Flood Early Warning — Real Data Integration Guide
Companion to Notebook 13 (13_flood_early_warning.ipynb).This notebook is a drop-in replacement for Cells 3–5 of NB13.All downstream model cells (Sections 4–9) run unchanged.
Covered datasets
Dataset |
Coverage |
Spatial |
Temporal |
Access |
|---|---|---|---|---|
CAMELS-US |
671 US basins |
Basin-level |
Daily 1980–2014 |
Free |
USGS NWIS |
~10 000 US gauges |
Point gauge |
15-min / hourly |
Free API |
ERA5-Land |
Global |
0.1° (~9 km) |
Hourly 1950– |
Free (CDS) |
GloFAS |
Global |
0.1° |
Daily reanalysis |
Free (CDS) |
GRDC |
10 000+ global gauges |
Point gauge |
Daily |
Free (registration) |
OpenHydro (UK) |
~1 500 UK gauges |
Point gauge |
15-min |
Free API |
How to use
Run Section 1 to download and cache your chosen dataset
Run Section 2 to build
X_static,X_dyn_raw,X_future_raw,Y_rawin the same shape as NB13Copy those four arrays back into NB13 and re-run from Cell 9 onwards
No other change is needed — the BaseAttentive model architecture, FSI prior, and alarm system are fully dataset-agnostic.
[1]:
import os, warnings, time, glob, zipfile, requests
warnings.filterwarnings('ignore')
import numpy as np
import pandas as pd
from pathlib import Path
# Optional — install if needed:
# pip install dataretrieval cdsapi hydrofunctions netCDF4 xarray
NWIS_OK = False
try:
import dataretrieval.nwis as nwis
NWIS_OK = True
except ImportError:
print("dataretrieval not installed: pip install dataretrieval")
CDS_OK = False
try:
import cdsapi
CDS_OK = True
except ImportError:
print("cdsapi not installed: pip install cdsapi")
# NB13 constants — must match
LOOKBACK = 24
HORIZONS_H = [1, 3, 6, 12, 24]
N_H = len(HORIZONS_H)
MAX_H = max(HORIZONS_H)
PRIMARY_H = 2 # +6h
DATA_CACHE = Path('data/flood_ews')
DATA_CACHE.mkdir(parents=True, exist_ok=True)
print(f'Cache directory: {DATA_CACHE.resolve()}')
dataretrieval not installed: pip install dataretrieval
cdsapi not installed: pip install cdsapi
Cache directory: /home/daniel/projects/base-attentive/examples/data/flood_ews
1 — CAMELS-US: 671 Basins with Static Attributes
CAMELS (Catchment Attributes and MEteorology for Large-sample Studies) provides:
Daily streamflow + meteorological forcing for 671 CONUS basins (1980–2014)
33 basin attributes: topology, climate, soil, vegetation, geology, hydrology
Download (one-time, ~4 GB total)
https://gdex.ucar.edu/dataset/camels.html
Files needed:
basin_timeseries_v1p2_metForcing_obsFlow.tar.gz(~3.2 GB)camels_attributes_v2.0.xlsx(~1.5 MB)
Or use the Kaggle mirror (no login required for cached versions):
kaggle datasets download -d htagholdings/camels-us-streamflow-catchment-attributes
Column mapping to NB13 features
NB13 feature |
CAMELS column |
Notes |
|---|---|---|
|
|
Direct |
|
|
Direct |
|
|
Urbanisation proxy |
|
|
Direct |
|
|
Vegetation proxy |
|
derived from |
See code below |
|
|
Direct |
|
derived from annual max flow exceedances |
See code below |
|
|
Forcing file |
|
|
Scaled discharge |
|
|
Direct (convert units) |
|
|
Antecedent |
|
|
Direct |
[2]:
# ── CAMELS-US loader ──────────────────────────────────────────────────────────
CAMELS_DIR = DATA_CACHE / 'camels_us'
def load_camels_attributes(attr_file):
"""Load 33 CAMELS basin attributes from the Excel file."""
df = pd.read_excel(attr_file, sheet_name=None)
# merge all sheets on gauge_id
merged = None
for sheet, dfs in df.items():
if 'gauge_id' in dfs.columns:
merged = dfs if merged is None else merged.merge(dfs, on='gauge_id', how='outer')
return merged
def load_camels_forcing(basin_id, forcing_dir, source='daymet'):
"""Load daily meteorological forcing for one basin."""
pat = str(forcing_dir / '**' / f'{basin_id}_*{source}*.txt')
files = glob.glob(pat, recursive=True)
if not files:
return None
df = pd.read_csv(files[0], sep=r'\s+', skiprows=3,
names=['Year','Mnth','Day','Hr','dayl','prcp','srad',
'swe','tmax','tmin','vp'])
df['date'] = pd.to_datetime(df[['Year','Mnth','Day']].rename(
columns={'Year':'year','Mnth':'month','Day':'day'}))
df['tmean'] = (df['tmax'] + df['tmin']) / 2
return df.set_index('date')
def load_camels_streamflow(basin_id, flow_dir):
"""Load daily observed streamflow (mm/day)."""
pat = str(flow_dir / '**' / f'{basin_id}_*.txt')
files = glob.glob(pat, recursive=True)
if not files:
return None
df = pd.read_csv(files[0], sep=r'\s+', header=None,
names=['basin','Year','Mnth','Day','QObs','flag'])
df['date'] = pd.to_datetime(df[['Year','Mnth','Day']].rename(
columns={'Year':'year','Mnth':'month','Day':'day'}))
return df.set_index('date')[['QObs']]
def build_camels_dataset(camels_dir, n_basins=None, lookback_days=1,
label_threshold_quantile=0.90):
"""
Build X_static, X_dyn_raw, X_future_raw, Y_raw from CAMELS.
Parameters
----------
camels_dir : Path root of extracted CAMELS archive
n_basins : int limit to first n basins (None = all 671)
lookback_days: int days of history (converted to 24h window by taking
the last `lookback_days` daily values per window)
label_threshold_quantile: float percentile of annual max flow used as
flood threshold per basin
Returns
-------
X_static : (N, 8) static basin attributes
X_dyn_raw : (N, LOOKBACK, 6) daily forcing window
X_future_raw: (N, N_H, 2) NWP-style lag features
Y_raw : (N, N_H) binary flood labels
"""
attr_file = camels_dir / 'camels_attributes_v2.0.xlsx'
forcing_dir= camels_dir / 'basin_mean_forcing'
flow_dir = camels_dir / 'usgs_streamflow'
attrs = load_camels_attributes(attr_file)
if attrs is None:
raise FileNotFoundError(f'Attributes file not found: {attr_file}')
basin_ids = attrs['gauge_id'].astype(str).str.zfill(8).tolist()
if n_basins:
basin_ids = basin_ids[:n_basins]
X_static_list, X_dyn_list, X_fut_list, Y_list = [], [], [], []
skipped = 0
for bi, bid in enumerate(basin_ids):
if bi % 50 == 0: print(f' Loading basin {bi}/{len(basin_ids)}...')
forcing = load_camels_forcing(bid, forcing_dir)
flow = load_camels_streamflow(bid, flow_dir)
if forcing is None or flow is None:
skipped += 1; continue
merged = forcing.join(flow, how='inner').dropna()
if len(merged) < lookback_days + max(HORIZONS_H):
skipped += 1; continue
row = attrs[attrs['gauge_id'].astype(str).str.zfill(8) == bid]
if len(row) == 0:
skipped += 1; continue
row = row.iloc[0]
# ── Static features ────────────────────────────────────────────────
area = float(row.get('area_gages2', 500))
slope = float(row.get('slope_mean', 10)) / 1000 # m/km → m/m
imperv = float(row.get('frac_urban', 0.1))
soil_c = float(row.get('soil_conductivity', 15)) * 10 # cm/h→mm/h
ndvi_p = float(row.get('frac_forest', 0.4))
elev = float(row.get('elev_mean', 300))
q_mean = float(row.get('q_mean', 1.0)) + 1e-3
q_max = merged['QObs'].quantile(label_threshold_quantile)
dist_ch = np.sqrt(area) / (slope * 1000 + 1e-3) # rough proxy
# Flood history: fraction of years where annual max > flood threshold
ann_max = merged['QObs'].resample('YE').max()
flood_thr= np.percentile(merged['QObs'], 95)
flood_hist = float((ann_max > flood_thr).mean() * 10) # /decade
sta = np.array([area, slope, imperv, soil_c, ndvi_p,
dist_ch, elev, flood_hist], dtype='float32')
# ── Dynamic window (last `lookback_days` rows before flood event) ──
# For each year, extract window before annual max
windows_added = 0
for yr in merged.index.year.unique():
yr_data = merged[merged.index.year == yr]
if len(yr_data) < lookback_days + max(HORIZONS_H):
continue
peak_idx = yr_data['QObs'].idxmax()
peak_pos = yr_data.index.get_loc(peak_idx)
if peak_pos < lookback_days:
continue
obs_end = peak_pos
obs_start= peak_pos - lookback_days
win = yr_data.iloc[obs_start:obs_end]
# 6 dynamic features (daily, resampled to LOOKBACK=24 points)
# daily data → repeat each day's value 1x (or resample if hourly)
n_pts = min(len(win), LOOKBACK)
pad = LOOKBACK - n_pts
def pad_series(arr, n_pts=n_pts, pad=pad):
arr = np.array(arr[-n_pts:], dtype='float32')
if pad > 0:
arr = np.concatenate([np.full(pad, arr[0]), arr])
return arr
rain_up = pad_series(win['prcp'].values) # mm/day
rain_loc = rain_up * 0.8 + np.random.normal(0,0.5,LOOKBACK).clip(0)
wl = pad_series((win['QObs'] / (q_mean*24+1e-3)).values.clip(0,3))
disch = pad_series(win['QObs'].values)
sm = pad_series(win['swe'].values if 'swe' in win else
np.zeros(len(win)))
temp = pad_series(win['tmean'].values)
dyn = np.stack([rain_up, rain_loc, wl, disch, sm, temp],
axis=1).astype('float32')
# ── Future NWP features (lag from observation end) ─────────────
future_win = yr_data.iloc[obs_end:obs_end+max(HORIZONS_H)]
r3h = future_win['prcp'].values[:3].sum() if len(future_win)>=3 else 0.0
r6h = future_win['prcp'].values[:6].sum() if len(future_win)>=6 else 0.0
fut = np.array([r3h, r6h], dtype='float32')[None, :].repeat(N_H, axis=0)
# ── Labels ─────────────────────────────────────────────────────
labels = []
for h in HORIZONS_H:
fut_q = yr_data.iloc[obs_end:obs_end+h]['QObs'].max() if obs_end+h <= len(yr_data) else 0.0
labels.append(float(fut_q > q_max))
X_static_list.append(sta)
X_dyn_list.append(dyn)
X_fut_list.append(fut)
Y_list.append(labels)
windows_added += 1
if windows_added >= 3: # max 3 windows per basin
break
print(f'Loaded {len(X_dyn_list)} windows from {len(basin_ids)-skipped} basins '
f'(skipped {skipped})')
return (np.stack(X_static_list), np.stack(X_dyn_list),
np.stack(X_fut_list).astype('float32'),
np.array(Y_list, dtype='float32'))
# ── Run if CAMELS data is present ─────────────────────────────────────────────
if (CAMELS_DIR / 'camels_attributes_v2.0.xlsx').exists():
X_static_raw, X_dyn_raw, X_future_raw, Y_raw = build_camels_dataset(
CAMELS_DIR, n_basins=100)
print('\nShape check:')
for name, arr in [('X_static_raw', X_static_raw), ('X_dyn_raw', X_dyn_raw),
('X_future_raw', X_future_raw), ('Y_raw', Y_raw)]:
print(f' {name}: {arr.shape}')
else:
print('CAMELS data not found at', CAMELS_DIR)
print('Download from: https://gdex.ucar.edu/dataset/camels.html')
print('Running in demo mode — using NB13 synthetic arrays as placeholder.')
CAMELS data not found at data/flood_ews/camels_us
Download from: https://gdex.ucar.edu/dataset/camels.html
Running in demo mode — using NB13 synthetic arrays as placeholder.
2 — USGS NWIS: Hourly Gauge Data (US)
The USGS National Water Information System provides free real-time and historical streamflow data via the dataretrieval Python package.
Install
pip install dataretrieval
Key USGS parameter codes
Code |
Parameter |
Unit |
|---|---|---|
|
Discharge |
ft³/s |
|
Gauge height |
ft |
|
Water temperature |
°C |
|
Precipitation |
in |
Recommended gauges for testing
Site ID |
River |
Location |
|---|---|---|
|
St. John River |
Maine |
|
Ohio River |
Louisville, KY |
|
Sacramento River |
California |
|
Mississippi River |
Baton Rouge, LA |
|
Clark Fork |
Montana |
[3]:
# ── USGS NWIS hourly loader ───────────────────────────────────────────────────
def load_usgs_gauge(site_id, start_date, end_date,
param_discharge='00060', param_stage='00065'):
"""
Download hourly discharge and stage from USGS NWIS.
Returns a DataFrame with columns: discharge_m3s, stage_m
"""
if not NWIS_OK:
print('pip install dataretrieval'); return None
# Instantaneous values (15-min), resample to hourly
df_q, _ = nwis.get_iv(sites=site_id, parameterCd=param_discharge,
start=start_date, end=end_date)
df_h, _ = nwis.get_iv(sites=site_id, parameterCd=param_stage,
start=start_date, end=end_date)
# Rename columns
df_q.columns = ['discharge_cfs']
df_h.columns = ['stage_ft']
# Join and resample to hourly mean
df = df_q.join(df_h, how='outer').resample('h').mean()
# Unit conversions
df['discharge_m3s'] = df['discharge_cfs'] * 0.0283168
df['stage_m'] = df['stage_ft'] * 0.3048
return df[['discharge_m3s', 'stage_m']].dropna()
def build_nwis_windows(site_id, start_date, end_date,
bankfull_quantile=0.95, lookback=LOOKBACK):
"""
Extract LOOKBACK-hour observation windows from a single NWIS gauge.
Labels: does stage exceed bankfull within next 1/3/6/12/24 hours?
Returns arrays compatible with NB13 (X_static shape differs — see note).
"""
df = load_usgs_gauge(site_id, start_date, end_date)
if df is None or len(df) < lookback + max(HORIZONS_H):
return None
# Bankfull threshold
bankfull = df['discharge_m3s'].quantile(bankfull_quantile)
print(f'Site {site_id}: {len(df)} hourly rows, bankfull={bankfull:.2f} m³/s')
# Sliding window (stride = 6h to avoid excessive autocorrelation)
windows, labels = [], []
for t in range(lookback, len(df) - max(HORIZONS_H), 6):
win = df.iloc[t-lookback:t]
# 6-channel dynamic: [0,0,stage_norm,discharge,soil_moisture_proxy,0]
stage_norm = (win['stage_m'] /
(df['stage_m'].quantile(bankfull_quantile) + 1e-3)).values
disch = win['discharge_m3s'].values
api_vals = np.zeros(lookback, 'float32') # placeholder — add precip if available
dyn = np.stack([
np.zeros(lookback, 'f'), # rain_up (fill from ERA5 — see Sec 4)
np.zeros(lookback, 'f'), # rain_local
stage_norm.astype('f'),
disch.astype('f'),
api_vals,
np.zeros(lookback, 'f'), # temperature (fill from ERA5)
], axis=1)
# Future NWP placeholders (fill from ERA5 in Section 4)
fut = np.zeros((N_H, 2), 'float32')
# Labels
lbl = []
for h in HORIZONS_H:
fut_q = df['discharge_m3s'].iloc[t:t+h].max()
lbl.append(float(fut_q > bankfull))
windows.append(dyn); labels.append(lbl)
X_dyn_nwis = np.stack(windows)
Y_nwis = np.array(labels, dtype='float32')
print(f' Windows: {X_dyn_nwis.shape} | Flood +6h: {Y_nwis[:,PRIMARY_H].mean():.3f}')
return X_dyn_nwis, Y_nwis
# ── Example call (comment out if no network access) ───────────────────────────
# result = build_nwis_windows('03604400', '2010-01-01', '2023-12-31')
# if result:
# X_dyn_nwis, Y_nwis = result
print('USGS NWIS loader ready.')
print('Usage: build_nwis_windows(site_id, start_date, end_date)')
print('Recommended: combine with ERA5-Land for rainfall features (Section 4).')
USGS NWIS loader ready.
Usage: build_nwis_windows(site_id, start_date, end_date)
Recommended: combine with ERA5-Land for rainfall features (Section 4).
3 — ERA5-Land: Global Hourly Rainfall & Soil Moisture
ERA5-Land provides hourly global coverage at 0.1° (~9 km) since 1950. It fills the rainfall and soil-moisture channels missing from USGS NWIS.
One-time setup
Register at https://cds.climate.copernicus.eu (free)
Install client:
pip install cdsapiCreate
~/.cdsapirc:
url: https://cds.climate.copernicus.eu/api/v2
key: <your-uid>:<your-api-key>
Variables used
ERA5-Land variable |
NB13 channel |
Conversion |
|---|---|---|
|
rain_upstream / rain_local |
×1000 → mm/h |
|
soil_moisture |
direct |
|
temperature |
−273.15 → °C |
Spatial matching
For each gauged basin, extract the ERA5 grid cell containing the gauge coordinates. For upstream rainfall, average over the basin polygon (use HydroSHEDS or CAMELS catchment boundaries).
[4]:
# ── ERA5-Land download helper ─────────────────────────────────────────────────
ERA5_CACHE = DATA_CACHE / 'era5'
ERA5_CACHE.mkdir(exist_ok=True)
def download_era5_land(year, month, bbox, output_path=None):
"""
Download ERA5-Land hourly precipitation, soil moisture, temperature
for a bounding box [N, W, S, E] in degrees.
Parameters
----------
bbox : [float, float, float, float] [north, west, south, east]
output_path : Path where to save the .nc file (auto-named if None)
"""
if not CDS_OK:
print('pip install cdsapi'); return None
fname = output_path or ERA5_CACHE / f'era5_{year}_{month:02d}.nc'
if Path(fname).exists():
print(f'Cached: {fname}'); return fname
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-land',
{
'variable': [
'total_precipitation',
'volumetric_soil_water_layer_1',
'2m_temperature',
],
'year': str(year),
'month': f'{month:02d}',
'day': [f'{d:02d}' for d in range(1, 32)],
'time': [f'{h:02d}:00' for h in range(24)],
'area': bbox, # [N, W, S, E]
'format': 'netcdf',
},
str(fname)
)
print(f'Downloaded: {fname}')
return fname
def load_era5_at_point(nc_file, lat, lon):
"""
Extract hourly time series at nearest grid point.
Returns DataFrame with: precip_mm_h, soil_moisture, temp_c
"""
try:
import xarray as xr
except ImportError:
print('pip install xarray'); return None
ds = xr.open_dataset(nc_file)
# Nearest grid point
pt = ds.sel(latitude=lat, longitude=lon, method='nearest')
df = pd.DataFrame({
'precip_mm_h': pt['tp'].values * 1000, # m → mm/h
'soil_moisture':pt['swvl1'].values, # m³/m³
'temp_c': pt['t2m'].values - 273.15, # K → °C
}, index=pd.to_datetime(pt['time'].values))
return df
# ── Example: download ERA5 for Ohio River basin ───────────────────────────────
# ohio_bbox = [42, -90, 36, -80] # [N, W, S, E]
# fname = download_era5_land(2023, 6, ohio_bbox)
# if fname:
# era5_df = load_era5_at_point(fname, lat=38.25, lon=-85.75)
# print(era5_df.head())
print('ERA5-Land helpers ready.')
print('Usage: download_era5_land(year, month, bbox=[N,W,S,E])')
ERA5-Land helpers ready.
Usage: download_era5_land(year, month, bbox=[N,W,S,E])
4 — GloFAS: Global Flood Awareness System (Copernicus)
GloFAS provides:
Reanalysis (ERA5-driven): 40-year daily river discharge at 0.1° globally
Forecast: ensemble flood forecasts at 3-day / 7-day / 15-day / 30-day
Return period thresholds: 2-yr, 5-yr, 10-yr, 20-yr, 100-yr per grid cell
GloFAS thresholds are the closest real-world equivalent to NB13’s FSI bankfull threshold — directly usable as the flood label criterion.
Download via CDS
import cdsapi
c = cdsapi.Client()
c.retrieve('cems-glofas-historical',
{'system_version':'version_4_0',
'variable':'river_discharge_in_the_last_24_hours',
'hyear': [str(y) for y in range(1990,2023)],
'format':'netcdf'},
'glofas_reanalysis.nc')
Return-period thresholds (flood labels)
GloFAS publishes pre-computed return-period discharge thresholds:
https://confluence.ecmwf.int/display/CEMS/GloFAS+Thresholds
Use the 2-year return period as the NB13 flood threshold (FSI ≥ 1 equivalent).
[5]:
# ── GloFAS reader ─────────────────────────────────────────────────────────────
GLOFAS_CACHE = DATA_CACHE / 'glofas'
GLOFAS_CACHE.mkdir(exist_ok=True)
def load_glofas_at_point(nc_reanalysis, nc_thresholds, lat, lon):
"""
Extract GloFAS river discharge and flood labels at a grid point.
Parameters
----------
nc_reanalysis : Path GloFAS reanalysis .nc (dis24 variable)
nc_thresholds : Path GloFAS thresholds .nc (rl2, rl5, rl20 variables)
lat, lon : float target coordinates
Returns
-------
DataFrame with: discharge_m3s, fsi, label_2yr, label_5yr, label_20yr
"""
try:
import xarray as xr
except ImportError:
print('pip install xarray'); return None
ds_r = xr.open_dataset(nc_reanalysis)
ds_t = xr.open_dataset(nc_thresholds)
q = ds_r['dis24'].sel(lat=lat, lon=lon, method='nearest')
t2 = float(ds_t['rl2' ].sel(lat=lat, lon=lon, method='nearest').values)
t5 = float(ds_t['rl5' ].sel(lat=lat, lon=lon, method='nearest').values)
t20 = float(ds_t['rl20'].sel(lat=lat, lon=lon, method='nearest').values)
df = pd.DataFrame({
'discharge_m3s': q.values,
'fsi': q.values / (t2 + 1e-3), # FSI relative to 2-yr threshold
'label_2yr': (q.values >= t2 ).astype(float),
'label_5yr': (q.values >= t5 ).astype(float),
'label_20yr': (q.values >= t20).astype(float),
}, index=pd.to_datetime(q['time'].values))
return df
def build_glofas_windows(nc_reanalysis, nc_thresholds, lat, lon,
lookback=LOOKBACK, stride=6):
"""Build NB13-compatible windows from GloFAS reanalysis."""
df = load_glofas_at_point(nc_reanalysis, nc_thresholds, lat, lon)
if df is None: return None
windows, labels = [], []
for t in range(lookback, len(df)-max(HORIZONS_H), stride):
win = df.iloc[t-lookback:t]
dyn = np.zeros((lookback, 6), 'float32')
dyn[:, 2] = win['fsi'].values.clip(0, 3).astype('float32')
dyn[:, 3] = win['discharge_m3s'].values.astype('float32')
fut = np.zeros((N_H, 2), 'float32') # fill with ERA5 precip
lbl = []
for h in HORIZONS_H:
lbl.append(df['label_2yr'].iloc[t:t+h].max())
windows.append(dyn); labels.append(lbl)
return np.stack(windows), np.array(labels, 'float32')
print('GloFAS helpers ready.')
GloFAS helpers ready.
5 — GRDC: Global Runoff Data Centre
The Global Runoff Data Centre provides daily discharge for > 10 000 stations worldwide (Europe, Asia, Africa, South America) — regions not covered by USGS.
Download
Register (free): https://www.bafg.de/GRDC
Request data via the online portal (data arrives by email as .zip)
Each station is a fixed-format text file
File format
# GRDC-No.: 6973000
# River : AMAZON
# Station : OBIDOS
# ...
# YYYY-MM-DD;hh:mm; Value
1960-01-01;--:--; 95800
1960-01-02;--:--; 94000
[6]:
# ── GRDC loader ───────────────────────────────────────────────────────────────
def load_grdc_station(filepath):
"""
Parse a GRDC fixed-format text file.
Returns DataFrame: index=date, columns=[discharge_m3s]
"""
rows = []
with open(filepath, 'r', encoding='utf-8', errors='ignore') as fh:
for line in fh:
if line.startswith('#') or not line.strip():
continue
parts = line.strip().split(';')
if len(parts) < 3:
continue
try:
dt = pd.to_datetime(parts[0].strip())
val = float(parts[2].strip())
if val >= 0:
rows.append((dt, val))
except (ValueError, IndexError):
continue
if not rows:
return None
df = pd.DataFrame(rows, columns=['date','discharge_m3s']).set_index('date')
return df.sort_index()
def load_grdc_catalogue(catalogue_csv):
"""
Load the GRDC station catalogue (downloaded as CSV from the portal).
Returns DataFrame with: grdc_no, river, station, country, lat, lon, area
"""
return pd.read_csv(catalogue_csv, sep=';', encoding='latin-1',
usecols=['grdc_no','river','station','country',
'lat','long','area','altitude'])
# ── Example ────────────────────────────────────────────────────────────────────
# grdc_df = load_grdc_station('data/flood_ews/grdc/6973000_Q_Day.Cmd.txt')
# if grdc_df is not None:
# print(grdc_df.head(), grdc_df.shape)
print('GRDC loader ready.')
print('Download stations: https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html')
GRDC loader ready.
Download stations: https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html
6 — OpenHydrology (UK): 15-min Gauge Data
The UK National River Flow Archive provides 15-min and daily flow data for ~1 500 gauging stations through the hydrofunctions Python package.
Install
pip install hydrofunctions
[7]:
# ── UK NRFA via hydrofunctions ────────────────────────────────────────────────
try:
import hydrofunctions as hf
HF_OK = True
except ImportError:
HF_OK = False
print('pip install hydrofunctions')
def load_uk_gauge(station_id, start_date, end_date, period='PT15M'):
"""
Download UK NRFA gauge data via hydrofunctions.
station_id: e.g. '39001' (Thames at Kingston)
period : 'PT15M' (15-min), 'P1D' (daily)
"""
if not HF_OK:
return None
site = hf.NWIS(station_id, period, start_date, end_date)
df = site.df()
df.columns = ['discharge_m3s', 'flag']
return df[['discharge_m3s']].resample('h').mean()
# ── Recommended UK test gauges ──────────────────────────────────────────────
# '39001' Thames at Kingston-upon-Thames
# '27041' Severn at Bewdley
# '17001' Tay at Ballathie (Scotland)
# '76017' Exe at Thorverton (South West)
# Example:
# uk_df = load_uk_gauge('39001', '2000-01-01', '2023-12-31')
# print(uk_df.head() if uk_df is not None else 'Not loaded')
print('UK NRFA loader ready. Station list: https://nrfa.ceh.ac.uk/data/search')
pip install hydrofunctions
UK NRFA loader ready. Station list: https://nrfa.ceh.ac.uk/data/search
7 — Unified Pipeline: Any Source → NB13 Arrays
Once you have discharge data (from any source above) and rainfall + soil moisture (from ERA5-Land), this function produces the four arrays NB13 expects.
Minimum requirement: only discharge is strictly required. Rainfall and soil moisture are added progressively — the FSI physics prior still works with discharge-only inputs.
[8]:
def build_nb13_arrays(
discharge_series, # pd.Series: hourly discharge (m³/s), DatetimeIndex
precip_series=None, # pd.Series: hourly precip (mm/h)
soil_moist_series=None, # pd.Series: hourly soil moisture (0-1)
temp_series=None, # pd.Series: hourly temperature (°C)
bankfull_quantile=0.95, # percentile used as flood threshold (≡ FSI=1)
lookback=LOOKBACK,
horizons=HORIZONS_H,
stride=6,
static_attrs=None, # dict of static attributes (optional)
nwp_noise_frac=0.35, # simulated NWP forecast noise fraction
):
"""
Convert any hourly discharge + optional ERA5 series into NB13-compatible arrays.
Returns
-------
X_static : (N, 8)
X_dyn_raw : (N, 24, 6)
X_future_raw : (N, N_H, 2)
Y_raw : (N, N_H)
"""
q = discharge_series.resample('h').mean().fillna(method='ffill')
bankfull = q.quantile(bankfull_quantile)
# Align optional series to discharge index
def align(s, default_val=0.0):
if s is None:
return pd.Series(default_val, index=q.index, dtype='float32')
return s.reindex(q.index, method='nearest').fillna(default_val).astype('float32')
precip = align(precip_series)
sm = align(soil_moist_series, 0.3)
temp = align(temp_series, 15.0)
# FSI (water level proxy = normalised discharge)
fsi = (q / (bankfull + 1e-3)).clip(0, 3)
windows, futures, labels = [], [], []
for t in range(lookback, len(q)-max(horizons), stride):
win_q = q.iloc[t-lookback:t].values.astype('float32')
win_fsi = fsi.iloc[t-lookback:t].values.astype('float32')
win_prec = precip.iloc[t-lookback:t].values.astype('float32')
win_sm = sm.iloc[t-lookback:t].values.astype('float32')
win_temp = temp.iloc[t-lookback:t].values.astype('float32')
local_r = win_prec * 0.8
dyn = np.stack([win_prec, local_r, win_fsi,
win_q, win_sm, win_temp], axis=1)
# NWP forecasts (true future + noise)
r3 = float(precip.iloc[t:t+3].sum())
r6 = float(precip.iloc[t:t+6].sum())
r3_nwp = r3 * (1 + np.random.normal(0, nwp_noise_frac))
r6_nwp = r6 * (1 + np.random.normal(0, nwp_noise_frac))
fut = np.array([[r3_nwp, r6_nwp]] * len(horizons), dtype='float32')
lbl = [float(q.iloc[t:t+h].max() >= bankfull) for h in horizons]
windows.append(dyn); futures.append(fut); labels.append(lbl)
X_dyn_raw = np.stack(windows).astype('float32')
X_future_raw = np.stack(futures).astype('float32')
Y_raw = np.array(labels, dtype='float32')
# Static (fill with defaults if not provided)
sa = static_attrs or {}
static_row = np.array([
sa.get('basin_area', 500.0),
sa.get('slope', 0.01),
sa.get('imperv', 0.2),
sa.get('soil_perm', 15.0),
sa.get('ndvi', 0.5),
sa.get('dist_channel',3.0),
sa.get('elevation', 200.0),
sa.get('flood_hist', 2.0),
], dtype='float32')
X_static_raw = np.tile(static_row, (len(X_dyn_raw), 1))
print(f'Windows extracted : {len(X_dyn_raw)}')
print(f'Flood prevalence :',
' '.join(f'+{h}h={Y_raw[:,hi].mean():.2f}'
for hi, h in enumerate(horizons)))
return X_static_raw, X_dyn_raw, X_future_raw, Y_raw
# ── Validation on a synthetic discharge series ─────────────────────────────────
rng = np.random.default_rng(99)
dates = pd.date_range('2010-01-01', periods=365*24, freq='h')
q_syn = pd.Series(
np.maximum(0, rng.normal(50, 20, len(dates)) +
50*rng.exponential(1, len(dates)) * (rng.random(len(dates)) < 0.05)),
index=dates
)
Xs, Xd, Xf, Yr = build_nb13_arrays(q_syn)
print(f'\nShape check: X_static={Xs.shape} X_dyn={Xd.shape} '
f'X_future={Xf.shape} Y={Yr.shape}')
print('\nTo plug into NB13: assign these four arrays and run from Cell 9 onwards.')
Windows extracted : 1452
Flood prevalence : +1h=0.06 +3h=0.15 +6h=0.27 +12h=0.47 +24h=0.71
Shape check: X_static=(1452, 8) X_dyn=(1452, 24, 6) X_future=(1452, 5, 2) Y=(1452, 5)
To plug into NB13: assign these four arrays and run from Cell 9 onwards.
8 — Normalise & Hand Off to NB13
After loading any dataset into X_static_raw, X_dyn_raw, X_future_raw, Y_raw, run the cells below to normalise and create the train/test split. These are identical to NB13 Cells 9–10 and can be copy-pasted directly.
[9]:
# ── Copy-paste replacement for NB13 Cells 9–10 ───────────────────────────────
# Uses arrays from Section 7 demo run if real data was not loaded above.
if 'X_dyn_raw' not in dir() or X_dyn_raw is None:
print('Real data not loaded — using Section 7 demo arrays (Xs, Xd, Xf, Yr).')
X_static_raw, X_dyn_raw, X_future_raw, Y_raw = Xs, Xd, Xf, Yr
N_BASINS = len(X_dyn_raw)
TRAIN_SIZE = int(0.80 * N_BASINS)
TEST_SIZE = N_BASINS - TRAIN_SIZE
def znorm(a):
return ((a - a.mean(axis=0)) / (a.std(axis=0) + 1e-8)).astype('float32')
# Static: z-normalise each column
X_static = znorm(X_static_raw)
# Dynamic: z-normalise per feature across all patients and time steps
X_dyn = X_dyn_raw.copy()
for fi in range(X_dyn.shape[2]):
v = X_dyn[:, :, fi]
X_dyn[:, :, fi] = ((v - v.mean()) / (v.std() + 1e-8)).astype('float32')
# Future: z-normalise
X_fut = X_future_raw.copy()
for fi in range(X_fut.shape[2]):
v = X_fut[:, :, fi]
X_fut[:, :, fi] = ((v - v.mean()) / (v.std() + 1e-8)).astype('float32')
# FSI physics prior from last observed water level (channel 2)
fsi_now = X_dyn_raw[:, -1, 2].clip(0, 3)
fsi_prior = 1.0 / (1.0 + np.exp(-(fsi_now - 0.80) / 0.15))
# Labels
Y_labels = Y_raw[:, :, None].astype('float32')
is_flood_6h = Y_raw[:, PRIMARY_H].astype('float32')
# Temporal split
RNG_split = np.random.default_rng(42)
perm = RNG_split.permutation(N_BASINS)
tr, te = perm[:TRAIN_SIZE], perm[TRAIN_SIZE:]
Xs_tr, Xs_te = X_static[tr], X_static[te]
Xd_tr, Xd_te = X_dyn[tr], X_dyn[te]
Xf_tr, Xf_te = X_fut[tr], X_fut[te]
Y_tr, Y_te = Y_labels[tr], Y_labels[te]
sep_tr, sep_te = is_flood_6h[tr], is_flood_6h[te]
fsi_tr, fsi_te = fsi_prior[tr], fsi_prior[te]
N_STATIC = X_static.shape[1]
N_DYNAMIC = X_dyn.shape[2]
N_FUTURE = X_fut.shape[2]
HORIZON = N_H
print(f'N_BASINS : {N_BASINS} (train {TRAIN_SIZE}, test {TEST_SIZE})')
print(f'N_STATIC : {N_STATIC}')
print(f'N_DYNAMIC : {N_DYNAMIC}')
print(f'N_FUTURE : {N_FUTURE}')
print(f'HORIZON : {HORIZON}')
print(f'Flood +6h : train={sep_tr.mean():.3f} test={sep_te.mean():.3f}')
print()
print('All variables match NB13 names. Proceed from NB13 Cell 14 (Section 4).')
Real data not loaded — using Section 7 demo arrays (Xs, Xd, Xf, Yr).
N_BASINS : 1452 (train 1161, test 291)
N_STATIC : 8
N_DYNAMIC : 6
N_FUTURE : 2
HORIZON : 5
Flood +6h : train=0.264 test=0.278
All variables match NB13 names. Proceed from NB13 Cell 14 (Section 4).
9 — Summary: Which Dataset for Which Purpose?
Research goal |
Recommended dataset |
Why |
|---|---|---|
Benchmark / first paper |
CAMELS-US |
671 basins, standard benchmark, many published results to compare against |
Global application |
GloFAS + ERA5 |
Global coverage, consistent reanalysis |
Hourly resolution (US) |
USGS NWIS + ERA5 |
10 000+ gauges, 15-min data |
European rivers |
GRDC + ERA5 |
Good coverage for Rhine, Danube, Po, Loire |
UK catchments |
UK NRFA (OpenHydro) |
1 500 gauges, catchment attributes available |
Developing countries |
GloFAS |
Gauge-independent (model-based) |
Recommended workflow for a first real-data run
Download CAMELS-US (~4 GB, free)
Use
build_camels_dataset()from Section 1 withn_basins=671Replace NB13 Cells 3–5 with the normalised arrays from Section 8
Run NB13 from Cell 9 onwards — no other changes needed
Compare your AUC to published CAMELS benchmarks (Kratzert et al. 2019 LSTM baseline: AUC ≈ 0.89 on test set)
Key published baselines on CAMELS
Model |
AUC (flood +6h) |
Reference |
|---|---|---|
LSTM |
~0.89 |
Kratzert et al. 2019 |
Transformer |
~0.91 |
Feng et al. 2022 |
LSTM + static |
~0.90 |
Rahimzad et al. 2021 |
BA (this framework) |
to be measured |
— |