Basin Delineation#

../_images/basin_batch.png

Fig. 10 Pour points and derived basins for batch 1/12 for Vancouver Island.#

Large sample basin delineation is a computationally intensive operation, but the Whitebox unnest_basins function performs really well. Whitebox is written in Rust. Many functions implicitly take care of paralellization to run efficiently. We’ll carry out the operation by setting up batches to manage disk usage and RAM.

import os
import time
import numpy as np
import pandas as pd
import geopandas as gpd

import multiprocessing as mp

from shapely.geometry import Point
from shapely.validation import make_valid

from utilities import *

import whitebox 

wbt = whitebox.WhiteboxTools()
wbt.verbose = False
# open the stream layer
base_dir = os.getcwd()
def raster_to_vector_basins_batch(input):
    """
    If we send too many pour points per batch to the Whitebox "unnest" function, 
    we generate a huge number of temporary raster files that could easily 
    exceed current SSD disk capacities.
    """

    raster_fname, raster_crs, resolution, min_area, temp_folder = input
    raster_path = os.path.join(temp_folder, raster_fname)
    raster_no = int(raster_fname.split('_')[-1].split('.')[0])
    polygon_path = os.path.join(temp_folder, f'temp_polygons_{raster_no:05}.shp')
    
    # this function creates rasters of ordered 
    # sets of non-overlapping basins
    wbt.raster_to_vector_polygons(
        raster_path,
        polygon_path,
    )

    gdf = gpd.read_file(polygon_path, crs=raster_crs)

    # simplify the polygon geometry to avoid self-intersecting polygons
    simplify_dim = 0.5 * np.sqrt(resolution[0]**2 + resolution[1]**2)
    simplify_dim = abs(resolution[0])
    buffer_dim = 10
    gdf.geometry = gdf.geometry.buffer(buffer_dim)
    gdf.geometry = gdf.geometry.simplify(simplify_dim)
    gdf.geometry = gdf.geometry.buffer(-1.0 * buffer_dim)
    
    gdf = filter_and_explode_geoms(gdf, min_area)

    assert (gdf.geometry.geom_type == 'Polygon').all()
        
    gdf.drop(labels=['area'], inplace=True, axis=1)
    gdf.to_file(polygon_path.replace('.shp', '.geojson'))
    
    return gdf
region = 'Vancouver_Island'

fdir_path = os.path.join(base_dir, f'data/processed_dem/{region}_d8_pointer.tif')

# get the file size in MB
filesize = (os.path.getsize(fdir_path) >> 20 )

temp_dir = os.path.join(base_dir, f'data/temp/')        
ppt_dir = os.path.join(base_dir, f'data/pour_points/')
basin_output_dir = os.path.join(base_dir, f'data/basins/')
# check for an output file to allow for partial updating
# by tracking (unique) ppt indices
output_fpath = os.path.join(base_dir, f'data/basins/{region}_basins.geojson')

for d in [temp_dir, basin_output_dir]:
    if not os.path.exists(d):
        os.mkdir(d)
region_raster, region_raster_crs, _ = retrieve_raster(fdir_path)
raster_resolution = tuple(abs(e) for e in region_raster.rio.resolution())
print(f'    {region} raster crs = {region_raster_crs}.')
# Break the pour points into unique sets
# to avoid duplicate delineations
ppt_file = os.path.join(ppt_dir, f'{region}_pour_points_filtered.geojson')

ppt_gdf = gpd.read_file(ppt_file)
ppt_crs = ppt_gdf.crs
print(f'    ppt crs = {ppt_crs}.')
ppt_gdf
# check if the ppt batches have already been created
batch_dir = os.path.join(temp_dir, 'ppt_batches/')
batches_exist = check_for_ppt_batches(batch_dir)

# break the pour point dataframe into batches to limit the RAM used in 
# processing basins for each pour point.
temp_ppt_filepath = os.path.join(batch_dir, f'pts.shp')

if not batches_exist:
    print(f'    Generating new batch paths.')
    batch_ppt_paths = create_ppt_file_batches(ppt_gdf, filesize, temp_ppt_filepath)
else:
    print(f'    Retrieving existing batch paths.')
    batch_ppt_files = list(sorted([f for f in os.listdir(batch_dir) if f.endswith('.shp')]))
    batch_ppt_paths = [os.path.join(batch_dir, f) for f in batch_ppt_files]
n_batch_files = len(batch_ppt_paths)
batch_output_files = sorted([f for f in os.listdir(basin_output_dir) if f.startswith(output_fpath.split('/')[-1].split('.')[0])])
if len(batch_output_files) > 0:
    batch_matches = [b.replace('.geojson', '.shp').split('_')[-1] for b in batch_output_files]
    batch_ppt_paths = list(sorted([f for f in batch_ppt_paths if f.split('_')[-1] not in batch_matches]))
basin_output_files = []
min_basin_area = 1 # km^2
for ppt_batch_path in batch_ppt_paths:
    t_batch_start = time.time()
    batch_no = int(ppt_batch_path.split('_')[-1].split('.')[0])
    print(f'  Starting processing {region} batch {batch_no}/{n_batch_files}')
    temp_fname = f'temp_raster.tif'
    temp_basin_raster_path = os.path.join(temp_dir, temp_fname)            
    batch_output_fpath = output_fpath.replace('.geojson', f'_{batch_no:04d}.geojson')

    ppt_batch = gpd.read_file(ppt_batch_path)
    
    # creates the minimum set of rasters with non-overlapping polygons
    wbt.unnest_basins(
        fdir_path, 
        ppt_batch_path, 
        temp_basin_raster_path,
        esri_pntr=False, 
    )
    
    batch_rasters = sorted([e for e in sorted(os.listdir(temp_dir)) if e.endswith('.tif')])

    tb1 = time.time()
    print(f'    Basins delineated for {region} ppt batch {batch_no} in {(tb1-t_batch_start)/60:.1f}min. {len(batch_rasters)} raster sub-batches to process ({len(ppt_batch)}).')

    # process the raster batches in parallel
    crs_array = [f'EPSG:3005'] * len(batch_rasters)
    resolution_array = [raster_resolution] * len(batch_rasters)
    temp_folder_array = [temp_dir] * len(batch_rasters)
    min_areas = [min_basin_area] * len(batch_rasters)
    path_inputs = list(zip(batch_rasters, crs_array, resolution_array, min_areas, temp_folder_array))
    batch_size_GB = len(batch_rasters) * filesize / 1E3
    
    # Each processor core will load and process a full size DEM, and
    # total ram usage can be multiples of the raster file size for each process. 
    # You need to balance the number of cores with RAM usage.
    n_procs = 4
    print(f'    Setting n_procs={n_procs:.0f} For a batch size of {batch_size_GB:.1f}GB')

    p = mp.Pool(n_procs)
    trv0 = time.time()
    all_polygons = p.map(raster_to_vector_basins_batch, path_inputs)
    trv1 = time.time()
    print(f'    {trv1-trv0:.2f}s to convert {len(path_inputs)} rasters to vector basins.')        
        
    # concatenate polygons and ppt info
    batch_output = format_batch_geometries(all_polygons, ppt_batch, region_raster_crs)
    batch_output.to_file(batch_output_fpath)
    trc0 = time.time()
    # sort by VALUE to maintain ordering from ppt batching / raster basin unnesting operation

    trc1 = time.time()
    print(f'    {trc1-trc0:.2f}s to concatenate basin vector polygons.')
    basin_output_files.append(batch_output_fpath)    
    
    # remove all temporary files (don't delete the raster batches yet!)
    clean_up_temp_files(temp_dir, batch_rasters)

Concatenate Batch Polygon Files into a single Geopackage (optional)#

You can work with the output geojson files directly (smaller files), or concatenat them and convert to other useful formats like Geopackage or Parquet.

batch_dfs = []
for file in sorted(list(set(basin_output_files))):
    fpath = os.path.join(temp_dir, file)
    layer = gpd.read_file(fpath)
    layer.drop(labels=['FID', 'VALUE'], axis=1, inplace=True)
    batch_dfs.append(layer)
    
all_data = gpd.GeoDataFrame(pd.concat(batch_dfs), crs=layer.crs)
all_data.to_file(output_fpath)

# save to Parquet
all_data.to_parquet(
    output_fpath.replace('.geojson', '.parquet'),
)

# alternatively, you can use the geopackage format
# all_data.to_file(
#     output_fpath.replace('.geojson', '.gpkg'), 
#     driver='GPKG', 
#     layer=f'pour_points')
temp_files = os.listdir(os.path.join(temp_dir, 'ppt_batches'))
for f in temp_files:
    fp = os.path.join(os.path.join(temp_dir, 'ppt_batches'), f)
    os.remove(fp)

if os.path.exists(output_fpath):
    for f in basin_output_files:
        os.remove(f)