Skip to content

solshade.terrain

compute_hillshade

compute_hillshade(slope, aspect, azimuth_deg=315.0, altitude_deg=45.0)

Compute a hillshade array using slope and aspect with Lambertian illumination.

Parameters:

Name Type Description Default
slope DataArray

Slope in degrees.

required
aspect DataArray

Aspect in degrees clockwise from north.

required
azimuth_deg float

Azimuth angle of the sun (0° = north, 90° = east). Default is 315°.

315.0
altitude_deg float

Altitude angle of the sun above the horizon. Default is 45°.

45.0

Returns:

Name Type Description
hillshade DataArray

Normalized hillshade values from 0 (dark) to 1 (bright), preserving coordinates and CRS metadata.

Notes
  • Uses a Lambertian reflection model.
  • All input and output arrays are 2D.
Logging
  • DEBUG: sun azimuth/altitude inputs.
  • INFO: completion of hillshade.
Source code in src/solshade/terrain.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def compute_hillshade(
    slope: xr.DataArray,
    aspect: xr.DataArray,
    azimuth_deg: float = 315.0,
    altitude_deg: float = 45.0,
) -> xr.DataArray:
    """
    Compute a hillshade array using slope and aspect with Lambertian illumination.

    Parameters
    ----------
    slope : xarray.DataArray
        Slope in degrees.
    aspect : xarray.DataArray
        Aspect in degrees clockwise from north.
    azimuth_deg : float, optional
        Azimuth angle of the sun (0° = north, 90° = east). Default is 315°.
    altitude_deg : float, optional
        Altitude angle of the sun above the horizon. Default is 45°.

    Returns
    -------
    hillshade : xarray.DataArray
        Normalized hillshade values from 0 (dark) to 1 (bright),
        preserving coordinates and CRS metadata.

    Notes
    -----
    - Uses a Lambertian reflection model.
    - All input and output arrays are 2D.

    Logging
    -------
    - DEBUG: sun azimuth/altitude inputs.
    - INFO: completion of hillshade.
    """
    log.debug(f"Computing hillshade: azimuth={azimuth_deg:.2f}°, altitude={altitude_deg:.2f}°")
    slope_rad = np.radians(slope)
    aspect_rad = np.radians(aspect)
    az_rad = np.radians(azimuth_deg)
    alt_rad = np.radians(altitude_deg)

    shaded = np.sin(alt_rad) * np.cos(slope_rad) + np.cos(alt_rad) * np.sin(slope_rad) * np.cos(az_rad - aspect_rad)
    hillshade = np.clip(shaded, 0, 1)

    log.info("Computed hillshade")
    return xr.DataArray(
        hillshade,
        coords=slope.coords,
        dims=slope.dims,
        attrs={"units": "unitless", "long_name": "hillshade"},
    )

compute_horizon_map

compute_horizon_map(dem, n_directions=360, max_distance=5000, step=20, chunk_size=32, n_jobs=-1, progress=True)

Compute a per-pixel horizon angle map from a digital elevation model (DEM) using chunked ray tracing.

For each pixel, rays are cast in n_directions azimuthal directions and sampled up to max_distance away. The maximum elevation angle encountered along each ray is recorded as the local horizon.

Parallel processing is done with Joblib, and a rich progress bar can optionally be shown.

Parameters:

Name Type Description Default
dem DataArray

Input digital elevation model with a defined CRS and affine transform. Must be a 2D array with shape (y, x) and spatial coordinates.

required
n_directions int

Number of azimuthal directions to trace (default is 64).

360
max_distance float

Maximum distance (in meters) to trace each ray from each pixel (default is 5000 m).

5000
step float

Distance step (in meters) between ray samples (default is 20 m).

20
chunk_size int

Size (in pixels) of square chunks to process independently (default is 32).

32
n_jobs int

Number of parallel jobs to run. Use -1 to use all available cores (default is -1).

-1
progress bool

If True, display a rich progress bar (default is True).

True

Returns:

Type Description
DataArray

A 3D xarray DataArray with dimensions (azimuth, y, x), representing the local horizon angle (in degrees) in each direction for each pixel. The azimuthal directions are stored in the azimuth coordinate.

Logging
  • INFO: parameters used and final shape.
  • DEBUG: chunking details, dispatching jobs, pixel sizes.
  • WARN: defensive warnings if a None chunk were returned.
Source code in src/solshade/terrain.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def compute_horizon_map(
    dem: xr.DataArray,
    n_directions: int = 360,
    max_distance: float = 5000,
    step: float = 20,
    chunk_size: int = 32,
    n_jobs: int = -1,
    progress: bool = True,
) -> xr.DataArray:
    """
    Compute a per-pixel horizon angle map from a digital elevation model (DEM)
    using chunked ray tracing.

    For each pixel, rays are cast in `n_directions` azimuthal directions
    and sampled up to `max_distance` away. The maximum elevation angle
    encountered along each ray is recorded as the local horizon.

    Parallel processing is done with Joblib, and a rich progress bar can
    optionally be shown.

    Parameters
    ----------
    dem : xr.DataArray
        Input digital elevation model with a defined CRS and affine transform.
        Must be a 2D array with shape (y, x) and spatial coordinates.
    n_directions : int, optional
        Number of azimuthal directions to trace (default is 64).
    max_distance : float, optional
        Maximum distance (in meters) to trace each ray from each pixel
        (default is 5000 m).
    step : float, optional
        Distance step (in meters) between ray samples (default is 20 m).
    chunk_size : int, optional
        Size (in pixels) of square chunks to process independently
        (default is 32).
    n_jobs : int, optional
        Number of parallel jobs to run. Use -1 to use all available cores
        (default is -1).
    progress : bool, optional
        If True, display a rich progress bar (default is True).

    Returns
    -------
    xr.DataArray
        A 3D xarray DataArray with dimensions (azimuth, y, x), representing
        the local horizon angle (in degrees) in each direction for each pixel.
        The azimuthal directions are stored in the `azimuth` coordinate.

    Logging
    -------
    - INFO: parameters used and final shape.
    - DEBUG: chunking details, dispatching jobs, pixel sizes.
    - WARN: defensive warnings if a None chunk were returned.
    """
    if dem.rio.crs is None or dem.rio.transform() is None:
        log.error("DEM missing CRS or affine transform")
        raise ValueError("DEM must have CRS and affine transform defined.")

    log.info(
        "Computing horizon map (n_directions=%d, max_distance=%.1f, step=%.1f, chunk_size=%d, n_jobs=%d, progress=%s)",
        n_directions,
        max_distance,
        step,
        chunk_size,
        n_jobs,
        progress,
    )

    transform = dem.rio.transform()
    res_x, res_y = transform.a, -transform.e
    ny, nx = dem.shape
    log.debug(f"DEM size: nx={nx}, ny={ny}; pixel size: ({res_x:.6f}, {res_y:.6f})")

    azimuths = np.linspace(0, 360, n_directions, endpoint=False)
    distances = np.arange(0, max_distance + step, step)
    ns = len(distances)

    dx_pix = np.cos(np.deg2rad(azimuths))[:, np.newaxis] * distances / res_x
    dy_pix = -np.sin(np.deg2rad(azimuths))[:, np.newaxis] * distances / res_y
    dx_pix = dx_pix[np.newaxis, np.newaxis, :, :]
    dy_pix = dy_pix[np.newaxis, np.newaxis, :, :]

    @njit(parallel=True)  # pragma: no cover
    def _compute_horizon(elev_prof, distances, out_arr):
        nyc, nxc, nd, ns = elev_prof.shape
        for iy in prange(nyc):
            for ix in prange(nxc):
                for idir in range(nd):
                    elev0 = elev_prof[iy, ix, idir, 0]
                    if np.isnan(elev0):
                        out_arr[iy, ix, idir] = np.nan
                        continue
                    max_ang = -np.inf
                    for idist in range(1, ns):
                        elev_sample = elev_prof[iy, ix, idir, idist]
                        if np.isnan(elev_sample):
                            continue
                        dz = elev_sample - elev0
                        angle = np.arctan2(dz, distances[idist])
                        max_ang = max(max_ang, angle)
                    out_arr[iy, ix, idir] = np.nan if max_ang == -np.inf else np.rad2deg(max_ang)

    def process_chunk(x0, y0):
        y1 = min(y0 + chunk_size, ny)
        x1 = min(x0 + chunk_size, nx)

        iy, ix = np.mgrid[y0:y1, x0:x1]
        iy = iy[..., np.newaxis, np.newaxis]
        ix = ix[..., np.newaxis, np.newaxis]

        sample_y = iy + dy_pix
        sample_x = ix + dx_pix
        sample_coords = np.stack([sample_y, sample_x], axis=0)

        elev_profiles = map_coordinates(
            dem.values,
            sample_coords.reshape(2, -1),
            order=1,
            mode="constant",
            cval=np.nan,
        ).reshape(y1 - y0, x1 - x0, n_directions, ns)

        chunk_out = np.full((y1 - y0, x1 - x0, n_directions), np.nan, dtype=np.float32)
        _compute_horizon(elev_profiles, distances, chunk_out)

        return y0, y1, x0, x1, chunk_out

    tasks = [(x0, y0) for y0 in range(0, ny, chunk_size) for x0 in range(0, nx, chunk_size)]
    log.debug(f"Prepared {len(tasks)} chunks")
    horizon_data = np.full((n_directions, ny, nx), np.nan, dtype=np.float32)

    warnings.filterwarnings(
        "ignore", category=UserWarning, message=".*worker stopped while some jobs were given to the executor.*"
    )

    def run_all():
        return Parallel(n_jobs=n_jobs, return_as="generator")(delayed(process_chunk)(x0, y0) for x0, y0 in tasks)

    log.debug(f"Dispatching {len(tasks)} jobs with n_jobs={n_jobs}")
    if progress:
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Computing horizon map"),
            BarColumn(),
            "[progress.percentage]{task.percentage:>3.0f}%",
            TimeElapsedColumn(),
            TimeRemainingColumn(),
        ) as bar:
            task_id = bar.add_task("computing", total=len(tasks))
            results = run_all()
            for result in results:
                if result is not None:
                    y0, y1, x0, x1, chunk = result
                    horizon_data[:, y0:y1, x0:x1] = np.moveaxis(chunk, -1, 0)
                else:  # pragma: no cover (defensive)
                    log.warning("Received None result for a chunk")
                bar.advance(task_id)
    else:
        for result in run_all():
            if result is not None:
                y0, y1, x0, x1, chunk = result
                horizon_data[:, y0:y1, x0:x1] = np.moveaxis(chunk, -1, 0)
            else:  # pragma: no cover (defensive)
                log.warning("Received None result for a chunk (no-progress mode)")

    horizon_da = xr.DataArray(
        horizon_data,
        dims=("azimuth", "y", "x"),
        coords={"azimuth": azimuths, "y": dem.y, "x": dem.x},
        name="horizon_angle",
        attrs={"units": "degrees"},
    )
    horizon_da.rio.write_crs(dem.rio.crs, inplace=True)
    horizon_da.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    horizon_da.rio.write_transform(dem.rio.transform(), inplace=True)
    horizon_da.attrs.update(
        {
            "max_distance_m": max_distance,
            "step_m": step,
            "n_directions": n_directions,
            "azimuth_meaning": "Azimuthal directions clockwise from North",
            "azimuths_deg": json.dumps(azimuths.tolist()),
        }
    )

    log.info(f"Horizon map computed: shape={horizon_da.shape} (azimuth,y,x) with {n_directions} directions")
    return horizon_da

compute_slope_aspect_normals

compute_slope_aspect_normals(dem)

Compute slope, aspect, and ENU unit normal vectors from a DEM.

Slope and aspect follow GIS conventions: - Slope: angle from horizontal (0° flat, 90° vertical). - Aspect: compass direction of steepest descent, clockwise from North (0°=N, 90°=E).

Parameters:

Name Type Description Default
dem DataArray

2D DEM with projected CRS and linear units (e.g., meters).

required

Returns:

Name Type Description
slope DataArray(y, x)

Slope in degrees.

aspect DataArray(y, x)

Aspect in degrees clockwise from North.

normal_enu DataArray(3, y, x)

ENU unit normal vectors. Bands: [east, north, up].

Notes

Normal vector components: E = sin(slope) * sin(aspect) N = sin(slope) * cos(aspect) U = cos(slope)

Logging
  • DEBUG: DEM shape and pixel resolution.
  • INFO: completion of slope/aspect/normal computation.
Source code in src/solshade/terrain.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def compute_slope_aspect_normals(
    dem: xr.DataArray,
) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
    """
    Compute slope, aspect, and ENU unit normal vectors from a DEM.

    Slope and aspect follow GIS conventions:
    - Slope: angle from horizontal (0° flat, 90° vertical).
    - Aspect: compass direction of steepest descent, clockwise from North (0°=N, 90°=E).

    Parameters
    ----------
    dem : xarray.DataArray
        2D DEM with projected CRS and linear units (e.g., meters).

    Returns
    -------
    slope : xarray.DataArray (y, x)
        Slope in degrees.
    aspect : xarray.DataArray (y, x)
        Aspect in degrees clockwise from North.
    normal_enu : xarray.DataArray (3, y, x)
        ENU unit normal vectors. Bands: [east, north, up].

    Notes
    -----
    Normal vector components:
        E = sin(slope) * sin(aspect)
        N = sin(slope) * cos(aspect)
        U = cos(slope)

    Logging
    -------
    - DEBUG: DEM shape and pixel resolution.
    - INFO: completion of slope/aspect/normal computation.
    """
    log.debug(f"Computing slope/aspect/normals for DEM with shape={dem.shape}")
    z = dem.values
    dy, dx = dem.rio.resolution()
    dx = abs(dx)
    dy = abs(dy)
    log.debug(f"DEM resolution: dx={dx:.6f}, dy={dy:.6f}")

    dzdy, dzdx = np.gradient(z, dy, dx)

    slope_rad = np.arctan(np.hypot(dzdx, dzdy))
    slope_deg = np.degrees(slope_rad)

    aspect_rad = np.arctan2(-dzdx, dzdy)
    aspect_deg = (np.degrees(aspect_rad) + 360) % 360

    slope = xr.DataArray(
        slope_deg,
        coords=dem.coords,
        dims=dem.dims,
        attrs={"units": "degrees", "long_name": "slope"},
    )

    aspect = xr.DataArray(
        aspect_deg,
        coords=dem.coords,
        dims=dem.dims,
        attrs={"units": "degrees", "long_name": "aspect"},
    )

    # Convert to radians
    slope_r = np.deg2rad(slope_deg)
    aspect_r = np.deg2rad(aspect_deg)

    # ENU normal components
    east = np.sin(slope_r) * np.sin(aspect_r)
    north = np.sin(slope_r) * np.cos(aspect_r)
    up = np.cos(slope_r)

    normal_vec = np.stack([east, north, up], axis=0)  # (3, y, x)

    # Normalize
    norms = np.linalg.norm(normal_vec, axis=0, keepdims=True)
    normal_vec = np.divide(normal_vec, norms, out=np.zeros_like(normal_vec), where=norms > 0)

    normal_enu = xr.DataArray(
        normal_vec,
        coords={"band": ["east", "north", "up"], dem.dims[0]: dem[dem.dims[0]], dem.dims[1]: dem[dem.dims[1]]},
        dims=("band", *dem.dims),
        attrs={"description": "Terrain normal unit vector in ENU coordinates"},
    )

    log.info("Computed slope/aspect/normals")
    return slope, aspect, normal_enu

horizon_interp

horizon_interp(azimuths, horizon_vals, fallback_frac=0.5)

Interpolate horizon angles as a function of azimuth using cubic interpolation, ensuring periodic continuity. Falls back to linear if data is sparse or if cubic interpolation yields unphysical results.

Parameters:

Name Type Description Default
azimuths ndarray

1D array of azimuth angles in degrees [0, 360).

required
horizon_vals ndarray

1D array of horizon angles (in degrees), same shape as azimuths. May contain NaNs.

required
fallback_frac float

Threshold fraction of NaNs above which to fall back to linear interpolation.

0.5

Returns:

Name Type Description
interp_func callable or None

A function that takes azimuth(s) in degrees and returns interpolated horizon angle(s). Returns None if no valid data.

Logging
  • DEBUG: data completeness, ranges, and interpolation path taken.
  • INFO: chosen interpolation (cubic vs linear) and output range when applicable.
  • WARN: cubic spline unphysical output fallback.
Source code in src/solshade/terrain.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def horizon_interp(
    azimuths: np.ndarray,
    horizon_vals: np.ndarray,
    fallback_frac: float = 0.5,
) -> Optional[Callable]:
    """
    Interpolate horizon angles as a function of azimuth using cubic interpolation,
    ensuring periodic continuity. Falls back to linear if data is sparse or if
    cubic interpolation yields unphysical results.

    Parameters
    ----------
    azimuths : np.ndarray
        1D array of azimuth angles in degrees [0, 360).
    horizon_vals : np.ndarray
        1D array of horizon angles (in degrees), same shape as azimuths.
        May contain NaNs.
    fallback_frac : float, optional
        Threshold fraction of NaNs above which to fall back to linear interpolation.

    Returns
    -------
    interp_func : callable or None
        A function that takes azimuth(s) in degrees and returns interpolated
        horizon angle(s). Returns None if no valid data.

    Logging
    -------
    - DEBUG: data completeness, ranges, and interpolation path taken.
    - INFO: chosen interpolation (cubic vs linear) and output range when applicable.
    - WARN: cubic spline unphysical output fallback.
    """
    log.debug(f"Building horizon interpolation (fallback_frac={fallback_frac:.2f})")
    az_valid = azimuths[~np.isnan(horizon_vals)]
    vals_valid = horizon_vals[~np.isnan(horizon_vals)]

    if len(vals_valid) == 0:
        log.info("No valid horizon data — returning None.")
        return None

    nan_frac = np.isnan(horizon_vals).sum() / len(horizon_vals)
    log.debug(
        "Valid points: %d / %d (%.1f%% valid); azimuth range=[%.2f, %.2f]; value range=[%.2f, %.2f]",
        len(vals_valid),
        len(horizon_vals),
        (1 - nan_frac) * 100,
        float(az_valid.min()),
        float(az_valid.max()),
        float(vals_valid.min()),
        float(vals_valid.max()),
    )

    # Enforce periodicity only if we already have a valid value at 0.0 deg
    if az_valid[0] == 0.0:
        log.debug("Enforcing periodicity by copying value at 0.0° to 360.0°")
        az_valid = np.append(az_valid, 360.0)
        vals_valid = np.append(vals_valid, vals_valid[0])

    # Attempt cubic interpolation if data is sufficiently complete
    if nan_frac <= fallback_frac:
        try:
            interp_func = CubicSpline(az_valid, vals_valid, bc_type="periodic")
            test_vals = interp_func(np.arange(361))
            min_val, max_val = np.nanmin(test_vals), np.nanmax(test_vals)

            if min_val < -90 or max_val > 90:
                log.warning(
                    f"CubicSpline produced unphysical output: min={min_val:.2f}, max={max_val:.2f} — falling back to linear"
                )
                raise ValueError("Unphysical cubic spline result")

            log.info(f"Using CubicSpline interpolation; output range=[{min_val:.2f}, {max_val:.2f}]")
        except Exception as e:
            log.debug(f"CubicSpline interpolation failed: {e} — using linear interp1d")
            interp_func = interp1d(
                az_valid,
                vals_valid,
                kind="linear",
                bounds_error=False,
                fill_value="extrapolate",  # type: ignore[arg-type]
            )
    else:
        log.info(f"Too many NaNs ({nan_frac * 100:.1f}%) — using linear interp1d")
        interp_func = interp1d(
            az_valid,
            vals_valid,
            kind="linear",
            bounds_error=False,
            fill_value="extrapolate",  # type: ignore[arg-type]
        )

    def wrapped_interp(query_az):
        return interp_func(np.mod(query_az, 360))

    return wrapped_interp

load_dem

load_dem(path)

Load a single-band Digital Elevation Model (DEM) from a GeoTIFF file.

Uses rioxarray to preserve CRS and coordinate metadata.

Parameters:

Name Type Description Default
path str or Path

Path to a single-band GeoTIFF file containing elevation data. The file must contain exactly one raster band.

required

Returns:

Name Type Description
dem DataArray

A 2D array of elevation values with dimensions (y, x), including CRS, transform, and coordinate metadata.

Raises:

Type Description
TypeError

If the input file contains more than one band or does not reduce to a DataArray.

Notes
  • Elevation units are inherited from the input file (typically meters).
  • The spatial reference (CRS) must be projected (not geographic/latlon).
  • Squeezes band dimension if present.
  • Logging: this function logs basic file/metadata information at INFO level.
Source code in src/solshade/terrain.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def load_dem(path: str | Path) -> xr.DataArray:
    """
    Load a single-band Digital Elevation Model (DEM) from a GeoTIFF file.

    Uses rioxarray to preserve CRS and coordinate metadata.

    Parameters
    ----------
    path : str or Path
        Path to a single-band GeoTIFF file containing elevation data.
        The file must contain exactly one raster band.

    Returns
    -------
    dem : xarray.DataArray
        A 2D array of elevation values with dimensions (y, x),
        including CRS, transform, and coordinate metadata.

    Raises
    ------
    TypeError
        If the input file contains more than one band or does not reduce to a DataArray.

    Notes
    -----
    - Elevation units are inherited from the input file (typically meters).
    - The spatial reference (CRS) must be projected (not geographic/latlon).
    - Squeezes band dimension if present.
    - Logging: this function logs basic file/metadata information at INFO level.
    """
    log.debug(f"Loading DEM from {path}")
    raw = cast(xr.Dataset, rxr.open_rasterio(path, masked=True))
    squeezed = raw.squeeze()

    if not isinstance(squeezed, xr.DataArray):
        log.error(f"DEM at {path} has multiple bands or is not a DataArray")
        raise TypeError("DEM is not a single-band raster.")

    try:
        crs_str = squeezed.rio.crs.to_string() if squeezed.rio.crs else "None"
        res = squeezed.rio.resolution()
    except Exception:  # pragma: no cover (very defensive)
        crs_str, res = "Unknown", ("?", "?")

    log.info(f"Loaded DEM: shape={squeezed.shape}, CRS={crs_str}, resolution={res}")
    return squeezed