Source code for ncollpyde.main

import logging
import random
import warnings
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Optional, Tuple, Union

import numpy as np
from numpy.typing import ArrayLike, NDArray

    import trimesh
except ImportError:
    trimesh = None

from .ncollpyde import (
    configure_threadpool as _configure_threadpool,

    import meshio

logger = logging.getLogger(__name__)

N_CPUS = cpu_count()

PRECISION = np.dtype(_precision())
INDEX = np.dtype(_index())

def configure_threadpool(n_threads: Optional[int], name_prefix: Optional[str]):
    """Configure the thread pool used for parallelisation.

    Must be called a maximum of once,
    and only before the first parallelised ncollpyde query.
    This will be used for all parallelised ncollpyde queries.

    n_threads : Optional[int]
        Number of threads to use.
        If None or 0, will use the default
    name_prefix : Optional[str]
        How to name threads created by this library.
        Will be suffixed with the thread index.
        If not given, will use the rayon default.

        If the pool could not be built for any reason.
    _configure_threadpool(n_threads, name_prefix)

def interpret_threads(threads: Optional[Union[int, bool]], default=DEFAULT_THREADS):
    if isinstance(threads, bool):
        return threads

    if threads is None:
        return interpret_threads(default)

    threads = int(threads)

        "ncollpyde's API has changed; `threads` should now be a boolean. "
        "See for more details",
    t = bool(threads)
    logger.warning("Interpreting deprecated `threads=%s` as %s", threads, t)
    return t

[docs]class Volume: dtype: np.dtype = PRECISION """Float data type used internally""" threads: bool = DEFAULT_THREADS """Whether to use threading""" def __init__( self, vertices: ArrayLike, triangles: ArrayLike, validate=False, threads: Optional[bool] = None, n_rays=DEFAULT_RAYS, ray_seed=DEFAULT_SEED, ): f""" Create a volume described by a triangular mesh with N vertices and M triangles. :param vertices: Nx3 array-like of floats, coordinates of triangle corners :param triangles: Mx3 array-like of ints, indices of ``vertices`` which describe each triangle :param validate: bool, whether to validate mesh. If trimesh is installed, the mesh is checked for watertightness and correct winding, and repairs made if possible. Otherwise, only very basic checks are made. :param threads: optional bool, whether to parallelise queries. :param n_rays: int (default {DEFAULT_RAYS}), rays used to check containment. The underlying library sometimes reports false positives: casting multiple rays drastically reduces the chances of this. As the bug only affects ray casts and only produces false positives, unnecessary ray casts are short-circuited if: - the point is not in the bounding box - the point is on the hull - one ray reports that the point is external. :param ray_seed: int >=0 (default {DEFAULT_SEED}), used for generating rays. If None, use a random seed. """ vert = np.asarray(vertices, self.dtype) if len(vert) > np.iinfo(INDEX).max: raise ValueError(f"Cannot represent {len(vert)} vertices with {INDEX}") tri = np.asarray(triangles, INDEX) if validate: vert, tri = self._validate(vert, tri) self.threads = self._interpret_threads(threads) if ray_seed is None: logger.warning( "Using unseeded random number generator for containment-checking rays; " "results may be inconsistent across repeats." ) ray_seed = random.randrange(0, 2**64) self._impl = TriMeshWrapper(vert, tri, int(n_rays), ray_seed) def _validate( self, vertices: np.ndarray, triangles: np.ndarray ) -> Tuple[NDArray[np.float64], NDArray[np.uint32]]: if trimesh: tm = trimesh.Trimesh(vertices, triangles, validate=True) if not tm.is_volume:"Mesh not valid, attempting to fix") tm.fill_holes() tm.fix_normals() if not tm.is_volume: raise ValueError( "Mesh is not a volume " "(e.g. not watertight, incorrect winding) " "and could not be fixed" ) return tm.vertices.astype(self.dtype), tm.faces.astype(np.uint32) else: warnings.warn("trimesh not installed; full validation not possible") if vertices.shape[1:] != (3,): raise ValueError("Vertices are not in 3D") if triangles.shape[1:] != (3,): raise ValueError("Triangles do not have 3 points") if triangles.max() >= len(vertices): raise ValueError("Some triangle vertices do not exist in points") return vertices, triangles def __contains__(self, item: ArrayLike) -> bool: """Check whether a single point is in the volume.""" return self.contains(np.asarray([item]), False)[0] def _interpret_threads(self, threads: Optional[Union[int, bool]]) -> bool: return interpret_threads(threads, self.threads)
[docs] def distance( self, coords: ArrayLike, signed: bool = True, threads: Optional[bool] = None, ) -> np.ndarray: """Check the distance from the volume to multiple points (as a Px3 array-like). Distances are reported to the boundary of the volume. By default, if the point is inside the volume, the distance will be reported as negative. ``signed=False`` is faster but cannot distinguish between internal and external points. :param coords: :param signed: bool, default True. Whether distances to points inside the volume should be reported as negative. :param threads: None, Whether to parallelise the queries. If ``None`` (default), refer to the instance's ``threads`` attribute. :return: np.ndarray of float, the distance from the volume to each given point """ coords = np.asarray(coords, self.dtype) if coords.shape[1:] != (3,): raise ValueError("Coords is not a Nx3 array-like") return self._impl.distance(coords, signed, self._interpret_threads(threads))
[docs] def contains( self, coords: ArrayLike, threads: Optional[bool] = None ) -> NDArray[np.bool_]: """Check whether multiple points (as a Px3 array-like) are in the volume. :param coords: :param threads: None, Whether to parallelise the queries. If ``None`` (default), refer to the instance's ``threads`` attribute. :return: np.ndarray of bools, whether each point is inside the volume """ coords = np.asarray(coords, self.dtype) if coords.shape[1:] != (3,): raise ValueError("Coords is not a Nx3 array-like") return self._impl.contains(coords, self._interpret_threads(threads))
[docs] def intersections( self, src_points: ArrayLike, tgt_points: ArrayLike, threads: Optional[bool] = None, ) -> Tuple[NDArray[np.uint64], NDArray[np.float64], NDArray[np.bool_]]: """Get intersections between line segments and volume. Line segments are defined by their start (source) and end (target) points. Only the first intersection for a given line segment is reported. Even if there is only one line segment to check, the argument arrays must have 2 dimensions, e.g. ``[[0, 0, 0]], [[1, 1, 1]]``. The output arrays are: * Which line segment the intersection refers to, as an index into the argument arrays * The point of intersection * Whether the intersection is with the inside face of the mesh N.B. the inside face check will report True for cases where a line touches ("skims") an external edge; see . N.B. threads=True here uses a slightly different implementation, so you may not see the performance increase as with other methods. :param src_points: Nx3 array-like :param tgt_points: Nx3 array-like :param threads: None, Whether to parallelise the queries. If ``None`` (default), refer to the instance's ``threads`` attribute. :raises ValueError: Inputs have different shapes or are not Nx3 :return: tuple of uint array of indices (N), float array of locations (Nx3), bool array of is_backface (N) """ src = np.asarray(src_points, self.dtype) tgt = np.asarray(tgt_points, self.dtype) if src.shape != tgt.shape: raise ValueError("Source and target points arrays must be the same shape") if src.shape[1:] != (3,): raise ValueError("Points must be Nx3 array-like") if self._interpret_threads(threads): idxs, points, bfs = self._impl.intersections_many_threaded( src.tolist(), tgt.tolist() ) return ( np.array(idxs, INDEX), np.array(points, self.dtype), np.array(bfs, bool), ) else: return self._impl.intersections_many(src, tgt)
[docs] @classmethod def from_meshio( cls, mesh: "meshio.Mesh", validate=False, threads=None, n_rays=DEFAULT_RAYS, ray_seed=DEFAULT_SEED, ) -> "Volume": """ Convenience function for instantiating a Volume from a meshio Mesh. :param mesh: meshio Mesh whose only cells are triangles. :param validate: as passed to ``__init__``, defaults to False :param threads: as passed to ``__init__``, defaults to None :param n_rays: as passed to ``__init__``, defaults to 3 :param ray_seed: as passed to ``__init__``, defaults to None (random) :raises ValueError: if Mesh does not have triangle cells :return: Volume instance """ try: triangles = mesh.cells_dict["triangle"] except KeyError: raise ValueError("Must have triangle cells") return cls( mesh.points, triangles, validate, threads, n_rays, ray_seed, )
@property def points(self) -> NDArray[np.float64]: """ Nx3 array of float describing vertices """ return self._impl.points() @property def faces(self) -> NDArray[np.uint32]: """ Mx3 array of uint32 describing indexes into points array making up triangles """ return self._impl.faces() @property def extents(self) -> NDArray[np.float64]: """Axis-aligned bounding box of the volume. :return: 2x3 numpy array of floats, where the first row is mins and the second row is maxes. """ return self._impl.aabb() @property def rays(self) -> NDArray[np.float64]: """ Rays used to detect containment as an ndarray of shape (n_rays, 3). These rays are in random directions (set with the ray seed on construction), and are the length of the diameter of the volume's bounding sphere. """ return self._impl.rays()