Flood Early Warning — Real Data Integration Guide

Companion to Notebook 13 (13_flood_early_warning.ipynb).
This notebook is a drop-in replacement for Cells 3–5 of NB13.
All downstream model cells (Sections 4–9) run unchanged.

Covered datasets

Dataset

Coverage

Spatial

Temporal

Access

CAMELS-US

671 US basins

Basin-level

Daily 1980–2014

Free

USGS NWIS

~10 000 US gauges

Point gauge

15-min / hourly

Free API

ERA5-Land

Global

0.1° (~9 km)

Hourly 1950–

Free (CDS)

GloFAS

Global

0.1°

Daily reanalysis

Free (CDS)

GRDC

10 000+ global gauges

Point gauge

Daily

Free (registration)

OpenHydro (UK)

~1 500 UK gauges

Point gauge

15-min

Free API

How to use

  1. Run Section 1 to download and cache your chosen dataset

  2. Run Section 2 to build X_static, X_dyn_raw, X_future_raw, Y_raw in the same shape as NB13

  3. Copy those four arrays back into NB13 and re-run from Cell 9 onwards

No other change is needed — the BaseAttentive model architecture, FSI prior, and alarm system are fully dataset-agnostic.

[1]:
import os, warnings, time, glob, zipfile, requests
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
from pathlib import Path

# Optional — install if needed:
#   pip install dataretrieval cdsapi hydrofunctions netCDF4 xarray
NWIS_OK = False
try:
    import dataretrieval.nwis as nwis
    NWIS_OK = True
except ImportError:
    print("dataretrieval not installed: pip install dataretrieval")

CDS_OK = False
try:
    import cdsapi
    CDS_OK = True
except ImportError:
    print("cdsapi not installed: pip install cdsapi")

# NB13 constants — must match
LOOKBACK   = 24
HORIZONS_H = [1, 3, 6, 12, 24]
N_H        = len(HORIZONS_H)
MAX_H      = max(HORIZONS_H)
PRIMARY_H  = 2   # +6h
DATA_CACHE = Path('data/flood_ews')
DATA_CACHE.mkdir(parents=True, exist_ok=True)
print(f'Cache directory: {DATA_CACHE.resolve()}')

dataretrieval not installed: pip install dataretrieval
cdsapi not installed: pip install cdsapi
Cache directory: /home/daniel/projects/base-attentive/examples/data/flood_ews

1 — CAMELS-US: 671 Basins with Static Attributes

CAMELS (Catchment Attributes and MEteorology for Large-sample Studies) provides:

  • Daily streamflow + meteorological forcing for 671 CONUS basins (1980–2014)

  • 33 basin attributes: topology, climate, soil, vegetation, geology, hydrology

Download (one-time, ~4 GB total)

https://gdex.ucar.edu/dataset/camels.html

Files needed:

  • basin_timeseries_v1p2_metForcing_obsFlow.tar.gz (~3.2 GB)

  • camels_attributes_v2.0.xlsx (~1.5 MB)

Or use the Kaggle mirror (no login required for cached versions):

kaggle datasets download -d htagholdings/camels-us-streamflow-catchment-attributes

Column mapping to NB13 features

NB13 feature

CAMELS column

Notes

basin_area

area_gages2 (km²)

Direct

slope

slope_mean (m/km → m/m: ÷1000)

Direct

imperv

frac_urban (fraction)

Urbanisation proxy

soil_perm

soil_conductivity (cm/h → mm/h: ×10)

Direct

ndvi

frac_forest (fraction)

Vegetation proxy

dist_channel

derived from area_gages2, slope_mean

See code below

elevation

elev_mean (m)

Direct

flood_hist

derived from annual max flow exceedances

See code below

rain_upstream

prcp (mm/day → mm/h: ÷24)

Forcing file

water_level

QObs (mm/day) / q_mean (normalise to bankfull)

Scaled discharge

discharge

QObs (mm/day)

Direct (convert units)

soil_moisture

swe / API computed from prcp

Antecedent

temperature

tmean (°C)

Direct

[2]:
# ── CAMELS-US loader ──────────────────────────────────────────────────────────
CAMELS_DIR = DATA_CACHE / 'camels_us'

def load_camels_attributes(attr_file):
    """Load 33 CAMELS basin attributes from the Excel file."""
    df = pd.read_excel(attr_file, sheet_name=None)
    # merge all sheets on gauge_id
    merged = None
    for sheet, dfs in df.items():
        if 'gauge_id' in dfs.columns:
            merged = dfs if merged is None else merged.merge(dfs, on='gauge_id', how='outer')
    return merged

def load_camels_forcing(basin_id, forcing_dir, source='daymet'):
    """Load daily meteorological forcing for one basin."""
    pat = str(forcing_dir / '**' / f'{basin_id}_*{source}*.txt')
    files = glob.glob(pat, recursive=True)
    if not files:
        return None
    df = pd.read_csv(files[0], sep=r'\s+', skiprows=3,
                     names=['Year','Mnth','Day','Hr','dayl','prcp','srad',
                            'swe','tmax','tmin','vp'])
    df['date'] = pd.to_datetime(df[['Year','Mnth','Day']].rename(
        columns={'Year':'year','Mnth':'month','Day':'day'}))
    df['tmean'] = (df['tmax'] + df['tmin']) / 2
    return df.set_index('date')

def load_camels_streamflow(basin_id, flow_dir):
    """Load daily observed streamflow (mm/day)."""
    pat = str(flow_dir / '**' / f'{basin_id}_*.txt')
    files = glob.glob(pat, recursive=True)
    if not files:
        return None
    df = pd.read_csv(files[0], sep=r'\s+', header=None,
                     names=['basin','Year','Mnth','Day','QObs','flag'])
    df['date'] = pd.to_datetime(df[['Year','Mnth','Day']].rename(
        columns={'Year':'year','Mnth':'month','Day':'day'}))
    return df.set_index('date')[['QObs']]

def build_camels_dataset(camels_dir, n_basins=None, lookback_days=1,
                         label_threshold_quantile=0.90):
    """
    Build X_static, X_dyn_raw, X_future_raw, Y_raw from CAMELS.

    Parameters
    ----------
    camels_dir : Path   root of extracted CAMELS archive
    n_basins   : int    limit to first n basins (None = all 671)
    lookback_days: int  days of history (converted to 24h window by taking
                        the last `lookback_days` daily values per window)
    label_threshold_quantile: float  percentile of annual max flow used as
                                     flood threshold per basin
    Returns
    -------
    X_static  : (N, 8)   static basin attributes
    X_dyn_raw : (N, LOOKBACK, 6)  daily forcing window
    X_future_raw: (N, N_H, 2)     NWP-style lag features
    Y_raw     : (N, N_H)  binary flood labels
    """
    attr_file  = camels_dir / 'camels_attributes_v2.0.xlsx'
    forcing_dir= camels_dir / 'basin_mean_forcing'
    flow_dir   = camels_dir / 'usgs_streamflow'

    attrs = load_camels_attributes(attr_file)
    if attrs is None:
        raise FileNotFoundError(f'Attributes file not found: {attr_file}')

    basin_ids = attrs['gauge_id'].astype(str).str.zfill(8).tolist()
    if n_basins:
        basin_ids = basin_ids[:n_basins]

    X_static_list, X_dyn_list, X_fut_list, Y_list = [], [], [], []
    skipped = 0

    for bi, bid in enumerate(basin_ids):
        if bi % 50 == 0: print(f'  Loading basin {bi}/{len(basin_ids)}...')
        forcing = load_camels_forcing(bid, forcing_dir)
        flow    = load_camels_streamflow(bid, flow_dir)
        if forcing is None or flow is None:
            skipped += 1; continue

        merged = forcing.join(flow, how='inner').dropna()
        if len(merged) < lookback_days + max(HORIZONS_H):
            skipped += 1; continue

        row = attrs[attrs['gauge_id'].astype(str).str.zfill(8) == bid]
        if len(row) == 0:
            skipped += 1; continue
        row = row.iloc[0]

        # ── Static features ────────────────────────────────────────────────
        area     = float(row.get('area_gages2', 500))
        slope    = float(row.get('slope_mean', 10)) / 1000      # m/km → m/m
        imperv   = float(row.get('frac_urban', 0.1))
        soil_c   = float(row.get('soil_conductivity', 15)) * 10  # cm/h→mm/h
        ndvi_p   = float(row.get('frac_forest', 0.4))
        elev     = float(row.get('elev_mean', 300))
        q_mean   = float(row.get('q_mean', 1.0)) + 1e-3
        q_max    = merged['QObs'].quantile(label_threshold_quantile)
        dist_ch  = np.sqrt(area) / (slope * 1000 + 1e-3)         # rough proxy

        # Flood history: fraction of years where annual max > flood threshold
        ann_max  = merged['QObs'].resample('YE').max()
        flood_thr= np.percentile(merged['QObs'], 95)
        flood_hist = float((ann_max > flood_thr).mean() * 10)     # /decade

        sta = np.array([area, slope, imperv, soil_c, ndvi_p,
                        dist_ch, elev, flood_hist], dtype='float32')

        # ── Dynamic window (last `lookback_days` rows before flood event) ──
        # For each year, extract window before annual max
        windows_added = 0
        for yr in merged.index.year.unique():
            yr_data = merged[merged.index.year == yr]
            if len(yr_data) < lookback_days + max(HORIZONS_H):
                continue
            peak_idx = yr_data['QObs'].idxmax()
            peak_pos = yr_data.index.get_loc(peak_idx)
            if peak_pos < lookback_days:
                continue

            obs_end  = peak_pos
            obs_start= peak_pos - lookback_days
            win      = yr_data.iloc[obs_start:obs_end]

            # 6 dynamic features (daily, resampled to LOOKBACK=24 points)
            # daily data → repeat each day's value 1x (or resample if hourly)
            n_pts = min(len(win), LOOKBACK)
            pad   = LOOKBACK - n_pts

            def pad_series(arr, n_pts=n_pts, pad=pad):
                arr = np.array(arr[-n_pts:], dtype='float32')
                if pad > 0:
                    arr = np.concatenate([np.full(pad, arr[0]), arr])
                return arr

            rain_up   = pad_series(win['prcp'].values)          # mm/day
            rain_loc  = rain_up * 0.8 + np.random.normal(0,0.5,LOOKBACK).clip(0)
            wl        = pad_series((win['QObs'] / (q_mean*24+1e-3)).values.clip(0,3))
            disch     = pad_series(win['QObs'].values)
            sm        = pad_series(win['swe'].values if 'swe' in win else
                                   np.zeros(len(win)))
            temp      = pad_series(win['tmean'].values)

            dyn = np.stack([rain_up, rain_loc, wl, disch, sm, temp],
                           axis=1).astype('float32')

            # ── Future NWP features (lag from observation end) ─────────────
            future_win = yr_data.iloc[obs_end:obs_end+max(HORIZONS_H)]
            r3h = future_win['prcp'].values[:3].sum()   if len(future_win)>=3 else 0.0
            r6h = future_win['prcp'].values[:6].sum()   if len(future_win)>=6 else 0.0
            fut = np.array([r3h, r6h], dtype='float32')[None, :].repeat(N_H, axis=0)

            # ── Labels ─────────────────────────────────────────────────────
            labels = []
            for h in HORIZONS_H:
                fut_q = yr_data.iloc[obs_end:obs_end+h]['QObs'].max()                         if obs_end+h <= len(yr_data) else 0.0
                labels.append(float(fut_q > q_max))

            X_static_list.append(sta)
            X_dyn_list.append(dyn)
            X_fut_list.append(fut)
            Y_list.append(labels)
            windows_added += 1
            if windows_added >= 3:   # max 3 windows per basin
                break

    print(f'Loaded {len(X_dyn_list)} windows from {len(basin_ids)-skipped} basins '
          f'(skipped {skipped})')
    return (np.stack(X_static_list), np.stack(X_dyn_list),
            np.stack(X_fut_list).astype('float32'),
            np.array(Y_list, dtype='float32'))

# ── Run if CAMELS data is present ─────────────────────────────────────────────
if (CAMELS_DIR / 'camels_attributes_v2.0.xlsx').exists():
    X_static_raw, X_dyn_raw, X_future_raw, Y_raw = build_camels_dataset(
        CAMELS_DIR, n_basins=100)
    print('\nShape check:')
    for name, arr in [('X_static_raw', X_static_raw), ('X_dyn_raw', X_dyn_raw),
                      ('X_future_raw', X_future_raw), ('Y_raw', Y_raw)]:
        print(f'  {name}: {arr.shape}')
else:
    print('CAMELS data not found at', CAMELS_DIR)
    print('Download from: https://gdex.ucar.edu/dataset/camels.html')
    print('Running in demo mode — using NB13 synthetic arrays as placeholder.')

CAMELS data not found at data/flood_ews/camels_us
Download from: https://gdex.ucar.edu/dataset/camels.html
Running in demo mode — using NB13 synthetic arrays as placeholder.

2 — USGS NWIS: Hourly Gauge Data (US)

The USGS National Water Information System provides free real-time and historical streamflow data via the dataretrieval Python package.

Install

pip install dataretrieval

Key USGS parameter codes

Code

Parameter

Unit

00060

Discharge

ft³/s

00065

Gauge height

ft

00010

Water temperature

°C

00045

Precipitation

in


3 — ERA5-Land: Global Hourly Rainfall & Soil Moisture

ERA5-Land provides hourly global coverage at 0.1° (~9 km) since 1950. It fills the rainfall and soil-moisture channels missing from USGS NWIS.

One-time setup

  1. Register at https://cds.climate.copernicus.eu (free)

  2. Install client: pip install cdsapi

  3. Create ~/.cdsapirc:

url: https://cds.climate.copernicus.eu/api/v2
key: <your-uid>:<your-api-key>

Variables used

ERA5-Land variable

NB13 channel

Conversion

total_precipitation (m)

rain_upstream / rain_local

×1000 → mm/h

volumetric_soil_water_layer_1 (m³/m³)

soil_moisture

direct

2m_temperature (K)

temperature

−273.15 → °C

Spatial matching

For each gauged basin, extract the ERA5 grid cell containing the gauge coordinates. For upstream rainfall, average over the basin polygon (use HydroSHEDS or CAMELS catchment boundaries).

[4]:
# ── ERA5-Land download helper ─────────────────────────────────────────────────
ERA5_CACHE = DATA_CACHE / 'era5'
ERA5_CACHE.mkdir(exist_ok=True)

def download_era5_land(year, month, bbox, output_path=None):
    """
    Download ERA5-Land hourly precipitation, soil moisture, temperature
    for a bounding box [N, W, S, E] in degrees.

    Parameters
    ----------
    bbox : [float, float, float, float]   [north, west, south, east]
    output_path : Path   where to save the .nc file (auto-named if None)
    """
    if not CDS_OK:
        print('pip install cdsapi'); return None

    fname = output_path or ERA5_CACHE / f'era5_{year}_{month:02d}.nc'
    if Path(fname).exists():
        print(f'Cached: {fname}'); return fname

    c = cdsapi.Client()
    c.retrieve(
        'reanalysis-era5-land',
        {
            'variable': [
                'total_precipitation',
                'volumetric_soil_water_layer_1',
                '2m_temperature',
            ],
            'year':  str(year),
            'month': f'{month:02d}',
            'day':   [f'{d:02d}' for d in range(1, 32)],
            'time':  [f'{h:02d}:00' for h in range(24)],
            'area':  bbox,           # [N, W, S, E]
            'format': 'netcdf',
        },
        str(fname)
    )
    print(f'Downloaded: {fname}')
    return fname

def load_era5_at_point(nc_file, lat, lon):
    """
    Extract hourly time series at nearest grid point.
    Returns DataFrame with: precip_mm_h, soil_moisture, temp_c
    """
    try:
        import xarray as xr
    except ImportError:
        print('pip install xarray'); return None

    ds = xr.open_dataset(nc_file)
    # Nearest grid point
    pt = ds.sel(latitude=lat, longitude=lon, method='nearest')
    df = pd.DataFrame({
        'precip_mm_h':  pt['tp'].values  * 1000,        # m → mm/h
        'soil_moisture':pt['swvl1'].values,              # m³/m³
        'temp_c':       pt['t2m'].values - 273.15,       # K → °C
    }, index=pd.to_datetime(pt['time'].values))
    return df

# ── Example: download ERA5 for Ohio River basin ───────────────────────────────
# ohio_bbox = [42, -90, 36, -80]   # [N, W, S, E]
# fname = download_era5_land(2023, 6, ohio_bbox)
# if fname:
#     era5_df = load_era5_at_point(fname, lat=38.25, lon=-85.75)
#     print(era5_df.head())

print('ERA5-Land helpers ready.')
print('Usage: download_era5_land(year, month, bbox=[N,W,S,E])')

ERA5-Land helpers ready.
Usage: download_era5_land(year, month, bbox=[N,W,S,E])

4 — GloFAS: Global Flood Awareness System (Copernicus)

GloFAS provides:

  • Reanalysis (ERA5-driven): 40-year daily river discharge at 0.1° globally

  • Forecast: ensemble flood forecasts at 3-day / 7-day / 15-day / 30-day

  • Return period thresholds: 2-yr, 5-yr, 10-yr, 20-yr, 100-yr per grid cell

GloFAS thresholds are the closest real-world equivalent to NB13’s FSI bankfull threshold — directly usable as the flood label criterion.

Download via CDS

import cdsapi
c = cdsapi.Client()
c.retrieve('cems-glofas-historical',
           {'system_version':'version_4_0',
            'variable':'river_discharge_in_the_last_24_hours',
            'hyear': [str(y) for y in range(1990,2023)],
            'format':'netcdf'},
           'glofas_reanalysis.nc')

Return-period thresholds (flood labels)

GloFAS publishes pre-computed return-period discharge thresholds:

https://confluence.ecmwf.int/display/CEMS/GloFAS+Thresholds

Use the 2-year return period as the NB13 flood threshold (FSI ≥ 1 equivalent).

[5]:
# ── GloFAS reader ─────────────────────────────────────────────────────────────
GLOFAS_CACHE = DATA_CACHE / 'glofas'
GLOFAS_CACHE.mkdir(exist_ok=True)

def load_glofas_at_point(nc_reanalysis, nc_thresholds, lat, lon):
    """
    Extract GloFAS river discharge and flood labels at a grid point.

    Parameters
    ----------
    nc_reanalysis   : Path  GloFAS reanalysis .nc (dis24 variable)
    nc_thresholds   : Path  GloFAS thresholds .nc (rl2, rl5, rl20 variables)
    lat, lon        : float  target coordinates

    Returns
    -------
    DataFrame with: discharge_m3s, fsi, label_2yr, label_5yr, label_20yr
    """
    try:
        import xarray as xr
    except ImportError:
        print('pip install xarray'); return None

    ds_r  = xr.open_dataset(nc_reanalysis)
    ds_t  = xr.open_dataset(nc_thresholds)

    q   = ds_r['dis24'].sel(lat=lat, lon=lon, method='nearest')
    t2  = float(ds_t['rl2' ].sel(lat=lat, lon=lon, method='nearest').values)
    t5  = float(ds_t['rl5' ].sel(lat=lat, lon=lon, method='nearest').values)
    t20 = float(ds_t['rl20'].sel(lat=lat, lon=lon, method='nearest').values)

    df = pd.DataFrame({
        'discharge_m3s': q.values,
        'fsi':           q.values / (t2 + 1e-3),    # FSI relative to 2-yr threshold
        'label_2yr':     (q.values >= t2 ).astype(float),
        'label_5yr':     (q.values >= t5 ).astype(float),
        'label_20yr':    (q.values >= t20).astype(float),
    }, index=pd.to_datetime(q['time'].values))
    return df

def build_glofas_windows(nc_reanalysis, nc_thresholds, lat, lon,
                         lookback=LOOKBACK, stride=6):
    """Build NB13-compatible windows from GloFAS reanalysis."""
    df = load_glofas_at_point(nc_reanalysis, nc_thresholds, lat, lon)
    if df is None: return None

    windows, labels = [], []
    for t in range(lookback, len(df)-max(HORIZONS_H), stride):
        win = df.iloc[t-lookback:t]
        dyn = np.zeros((lookback, 6), 'float32')
        dyn[:, 2] = win['fsi'].values.clip(0, 3).astype('float32')
        dyn[:, 3] = win['discharge_m3s'].values.astype('float32')

        fut = np.zeros((N_H, 2), 'float32')  # fill with ERA5 precip

        lbl = []
        for h in HORIZONS_H:
            lbl.append(df['label_2yr'].iloc[t:t+h].max())
        windows.append(dyn); labels.append(lbl)

    return np.stack(windows), np.array(labels, 'float32')

print('GloFAS helpers ready.')

GloFAS helpers ready.

5 — GRDC: Global Runoff Data Centre

The Global Runoff Data Centre provides daily discharge for > 10 000 stations worldwide (Europe, Asia, Africa, South America) — regions not covered by USGS.

Download

  1. Register (free): https://www.bafg.de/GRDC

  2. Request data via the online portal (data arrives by email as .zip)

  3. Each station is a fixed-format text file

File format

# GRDC-No.: 6973000
# River   : AMAZON
# Station : OBIDOS
# ...
# YYYY-MM-DD;hh:mm; Value
1960-01-01;--:--;  95800
1960-01-02;--:--;  94000
[6]:
# ── GRDC loader ───────────────────────────────────────────────────────────────

def load_grdc_station(filepath):
    """
    Parse a GRDC fixed-format text file.
    Returns DataFrame: index=date, columns=[discharge_m3s]
    """
    rows = []
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as fh:
        for line in fh:
            if line.startswith('#') or not line.strip():
                continue
            parts = line.strip().split(';')
            if len(parts) < 3:
                continue
            try:
                dt  = pd.to_datetime(parts[0].strip())
                val = float(parts[2].strip())
                if val >= 0:
                    rows.append((dt, val))
            except (ValueError, IndexError):
                continue
    if not rows:
        return None
    df = pd.DataFrame(rows, columns=['date','discharge_m3s']).set_index('date')
    return df.sort_index()

def load_grdc_catalogue(catalogue_csv):
    """
    Load the GRDC station catalogue (downloaded as CSV from the portal).
    Returns DataFrame with: grdc_no, river, station, country, lat, lon, area
    """
    return pd.read_csv(catalogue_csv, sep=';', encoding='latin-1',
                       usecols=['grdc_no','river','station','country',
                                'lat','long','area','altitude'])

# ── Example ────────────────────────────────────────────────────────────────────
# grdc_df = load_grdc_station('data/flood_ews/grdc/6973000_Q_Day.Cmd.txt')
# if grdc_df is not None:
#     print(grdc_df.head(), grdc_df.shape)
print('GRDC loader ready.')
print('Download stations: https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html')

GRDC loader ready.
Download stations: https://www.bafg.de/GRDC/EN/02_srvcs/21_tmsrs/riverdischarge_node.html

6 — OpenHydrology (UK): 15-min Gauge Data

The UK National River Flow Archive provides 15-min and daily flow data for ~1 500 gauging stations through the hydrofunctions Python package.

Install

pip install hydrofunctions
[7]:
# ── UK NRFA via hydrofunctions ────────────────────────────────────────────────
try:
    import hydrofunctions as hf
    HF_OK = True
except ImportError:
    HF_OK = False
    print('pip install hydrofunctions')

def load_uk_gauge(station_id, start_date, end_date, period='PT15M'):
    """
    Download UK NRFA gauge data via hydrofunctions.
    station_id: e.g. '39001' (Thames at Kingston)
    period    : 'PT15M' (15-min), 'P1D' (daily)
    """
    if not HF_OK:
        return None
    site = hf.NWIS(station_id, period, start_date, end_date)
    df   = site.df()
    df.columns = ['discharge_m3s', 'flag']
    return df[['discharge_m3s']].resample('h').mean()

# ── Recommended UK test gauges ──────────────────────────────────────────────
# '39001'  Thames at Kingston-upon-Thames
# '27041'  Severn at Bewdley
# '17001'  Tay at Ballathie (Scotland)
# '76017'  Exe at Thorverton (South West)

# Example:
# uk_df = load_uk_gauge('39001', '2000-01-01', '2023-12-31')
# print(uk_df.head() if uk_df is not None else 'Not loaded')
print('UK NRFA loader ready. Station list: https://nrfa.ceh.ac.uk/data/search')

pip install hydrofunctions
UK NRFA loader ready. Station list: https://nrfa.ceh.ac.uk/data/search

7 — Unified Pipeline: Any Source → NB13 Arrays

Once you have discharge data (from any source above) and rainfall + soil moisture (from ERA5-Land), this function produces the four arrays NB13 expects.

Minimum requirement: only discharge is strictly required. Rainfall and soil moisture are added progressively — the FSI physics prior still works with discharge-only inputs.

[8]:
def build_nb13_arrays(
    discharge_series,          # pd.Series: hourly discharge (m³/s), DatetimeIndex
    precip_series=None,        # pd.Series: hourly precip (mm/h)
    soil_moist_series=None,    # pd.Series: hourly soil moisture (0-1)
    temp_series=None,          # pd.Series: hourly temperature (°C)
    bankfull_quantile=0.95,    # percentile used as flood threshold (≡ FSI=1)
    lookback=LOOKBACK,
    horizons=HORIZONS_H,
    stride=6,
    static_attrs=None,         # dict of static attributes (optional)
    nwp_noise_frac=0.35,       # simulated NWP forecast noise fraction
):
    """
    Convert any hourly discharge + optional ERA5 series into NB13-compatible arrays.

    Returns
    -------
    X_static  : (N, 8)
    X_dyn_raw : (N, 24, 6)
    X_future_raw : (N, N_H, 2)
    Y_raw     : (N, N_H)
    """
    q      = discharge_series.resample('h').mean().fillna(method='ffill')
    bankfull = q.quantile(bankfull_quantile)

    # Align optional series to discharge index
    def align(s, default_val=0.0):
        if s is None:
            return pd.Series(default_val, index=q.index, dtype='float32')
        return s.reindex(q.index, method='nearest').fillna(default_val).astype('float32')

    precip = align(precip_series)
    sm     = align(soil_moist_series, 0.3)
    temp   = align(temp_series, 15.0)

    # FSI (water level proxy = normalised discharge)
    fsi = (q / (bankfull + 1e-3)).clip(0, 3)

    windows, futures, labels = [], [], []
    for t in range(lookback, len(q)-max(horizons), stride):
        win_q     = q.iloc[t-lookback:t].values.astype('float32')
        win_fsi   = fsi.iloc[t-lookback:t].values.astype('float32')
        win_prec  = precip.iloc[t-lookback:t].values.astype('float32')
        win_sm    = sm.iloc[t-lookback:t].values.astype('float32')
        win_temp  = temp.iloc[t-lookback:t].values.astype('float32')
        local_r   = win_prec * 0.8

        dyn = np.stack([win_prec, local_r, win_fsi,
                        win_q, win_sm, win_temp], axis=1)

        # NWP forecasts (true future + noise)
        r3 = float(precip.iloc[t:t+3].sum())
        r6 = float(precip.iloc[t:t+6].sum())
        r3_nwp = r3 * (1 + np.random.normal(0, nwp_noise_frac))
        r6_nwp = r6 * (1 + np.random.normal(0, nwp_noise_frac))
        fut = np.array([[r3_nwp, r6_nwp]] * len(horizons), dtype='float32')

        lbl = [float(q.iloc[t:t+h].max() >= bankfull) for h in horizons]

        windows.append(dyn); futures.append(fut); labels.append(lbl)

    X_dyn_raw    = np.stack(windows).astype('float32')
    X_future_raw = np.stack(futures).astype('float32')
    Y_raw        = np.array(labels,  dtype='float32')

    # Static (fill with defaults if not provided)
    sa = static_attrs or {}
    static_row = np.array([
        sa.get('basin_area',  500.0),
        sa.get('slope',       0.01),
        sa.get('imperv',      0.2),
        sa.get('soil_perm',   15.0),
        sa.get('ndvi',        0.5),
        sa.get('dist_channel',3.0),
        sa.get('elevation',   200.0),
        sa.get('flood_hist',  2.0),
    ], dtype='float32')
    X_static_raw = np.tile(static_row, (len(X_dyn_raw), 1))

    print(f'Windows extracted : {len(X_dyn_raw)}')
    print(f'Flood prevalence  :',
          '  '.join(f'+{h}h={Y_raw[:,hi].mean():.2f}'
                    for hi, h in enumerate(horizons)))
    return X_static_raw, X_dyn_raw, X_future_raw, Y_raw


# ── Validation on a synthetic discharge series ─────────────────────────────────
rng   = np.random.default_rng(99)
dates = pd.date_range('2010-01-01', periods=365*24, freq='h')
q_syn = pd.Series(
    np.maximum(0, rng.normal(50, 20, len(dates)) +
               50*rng.exponential(1, len(dates)) * (rng.random(len(dates)) < 0.05)),
    index=dates
)
Xs, Xd, Xf, Yr = build_nb13_arrays(q_syn)
print(f'\nShape check: X_static={Xs.shape}  X_dyn={Xd.shape}  '
      f'X_future={Xf.shape}  Y={Yr.shape}')
print('\nTo plug into NB13: assign these four arrays and run from Cell 9 onwards.')

Windows extracted : 1452
Flood prevalence  : +1h=0.06  +3h=0.15  +6h=0.27  +12h=0.47  +24h=0.71

Shape check: X_static=(1452, 8)  X_dyn=(1452, 24, 6)  X_future=(1452, 5, 2)  Y=(1452, 5)

To plug into NB13: assign these four arrays and run from Cell 9 onwards.

8 — Normalise & Hand Off to NB13

After loading any dataset into X_static_raw, X_dyn_raw, X_future_raw, Y_raw, run the cells below to normalise and create the train/test split. These are identical to NB13 Cells 9–10 and can be copy-pasted directly.

[9]:
# ── Copy-paste replacement for NB13 Cells 9–10 ───────────────────────────────
# Uses arrays from Section 7 demo run if real data was not loaded above.
if 'X_dyn_raw' not in dir() or X_dyn_raw is None:
    print('Real data not loaded — using Section 7 demo arrays (Xs, Xd, Xf, Yr).')
    X_static_raw, X_dyn_raw, X_future_raw, Y_raw = Xs, Xd, Xf, Yr

N_BASINS   = len(X_dyn_raw)
TRAIN_SIZE = int(0.80 * N_BASINS)
TEST_SIZE  = N_BASINS - TRAIN_SIZE

def znorm(a):
    return ((a - a.mean(axis=0)) / (a.std(axis=0) + 1e-8)).astype('float32')

# Static: z-normalise each column
X_static   = znorm(X_static_raw)

# Dynamic: z-normalise per feature across all patients and time steps
X_dyn = X_dyn_raw.copy()
for fi in range(X_dyn.shape[2]):
    v = X_dyn[:, :, fi]
    X_dyn[:, :, fi] = ((v - v.mean()) / (v.std() + 1e-8)).astype('float32')

# Future: z-normalise
X_fut = X_future_raw.copy()
for fi in range(X_fut.shape[2]):
    v = X_fut[:, :, fi]
    X_fut[:, :, fi] = ((v - v.mean()) / (v.std() + 1e-8)).astype('float32')

# FSI physics prior from last observed water level (channel 2)
fsi_now   = X_dyn_raw[:, -1, 2].clip(0, 3)
fsi_prior = 1.0 / (1.0 + np.exp(-(fsi_now - 0.80) / 0.15))

# Labels
Y_labels     = Y_raw[:, :, None].astype('float32')
is_flood_6h  = Y_raw[:, PRIMARY_H].astype('float32')

# Temporal split
RNG_split = np.random.default_rng(42)
perm = RNG_split.permutation(N_BASINS)
tr, te = perm[:TRAIN_SIZE], perm[TRAIN_SIZE:]

Xs_tr, Xs_te   = X_static[tr], X_static[te]
Xd_tr, Xd_te   = X_dyn[tr],    X_dyn[te]
Xf_tr, Xf_te   = X_fut[tr],    X_fut[te]
Y_tr,  Y_te    = Y_labels[tr], Y_labels[te]
sep_tr, sep_te = is_flood_6h[tr], is_flood_6h[te]
fsi_tr, fsi_te = fsi_prior[tr],   fsi_prior[te]

N_STATIC  = X_static.shape[1]
N_DYNAMIC = X_dyn.shape[2]
N_FUTURE  = X_fut.shape[2]
HORIZON   = N_H

print(f'N_BASINS   : {N_BASINS}  (train {TRAIN_SIZE}, test {TEST_SIZE})')
print(f'N_STATIC   : {N_STATIC}')
print(f'N_DYNAMIC  : {N_DYNAMIC}')
print(f'N_FUTURE   : {N_FUTURE}')
print(f'HORIZON    : {HORIZON}')
print(f'Flood +6h  : train={sep_tr.mean():.3f}  test={sep_te.mean():.3f}')
print()
print('All variables match NB13 names. Proceed from NB13 Cell 14 (Section 4).')

Real data not loaded — using Section 7 demo arrays (Xs, Xd, Xf, Yr).
N_BASINS   : 1452  (train 1161, test 291)
N_STATIC   : 8
N_DYNAMIC  : 6
N_FUTURE   : 2
HORIZON    : 5
Flood +6h  : train=0.264  test=0.278

All variables match NB13 names. Proceed from NB13 Cell 14 (Section 4).

9 — Summary: Which Dataset for Which Purpose?

Research goal

Recommended dataset

Why

Benchmark / first paper

CAMELS-US

671 basins, standard benchmark, many published results to compare against

Global application

GloFAS + ERA5

Global coverage, consistent reanalysis

Hourly resolution (US)

USGS NWIS + ERA5

10 000+ gauges, 15-min data

European rivers

GRDC + ERA5

Good coverage for Rhine, Danube, Po, Loire

UK catchments

UK NRFA (OpenHydro)

1 500 gauges, catchment attributes available

Developing countries

GloFAS

Gauge-independent (model-based)

Key published baselines on CAMELS

Model

AUC (flood +6h)

Reference

LSTM

~0.89

Kratzert et al. 2019

Transformer

~0.91

Feng et al. 2022

LSTM + static

~0.90

Rahimzad et al. 2021

BA (this framework)

to be measured