import logging
import random
import warnings
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Optional, Tuple, Union
import numpy as np
from numpy.typing import ArrayLike, NDArray
try:
import trimesh
except ImportError:
trimesh = None
from .ncollpyde import (
TriMeshWrapper,
_index,
_precision,
configure_threadpool as _configure_threadpool,
)
if TYPE_CHECKING:
import meshio
logger = logging.getLogger(__name__)
N_CPUS = cpu_count()
DEFAULT_THREADS = True
DEFAULT_RAYS = 3
DEFAULT_SEED = 1991
PRECISION = np.dtype(_precision())
INDEX = np.dtype(_index())
def configure_threadpool(n_threads: Optional[int], name_prefix: Optional[str]):
"""Configure the thread pool used for parallelisation.
Must be called a maximum of once,
and only before the first parallelised ncollpyde query.
This will be used for all parallelised ncollpyde queries.
Parameters
----------
n_threads : Optional[int]
Number of threads to use.
If None or 0, will use the default
(see https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
name_prefix : Optional[str]
How to name threads created by this library.
Will be suffixed with the thread index.
If not given, will use the rayon default.
Raises
------
RuntimeError
If the pool could not be built for any reason.
"""
_configure_threadpool(n_threads, name_prefix)
def interpret_threads(threads: Optional[Union[int, bool]], default=DEFAULT_THREADS):
if isinstance(threads, bool):
return threads
if threads is None:
return interpret_threads(default)
threads = int(threads)
warnings.warn(
"ncollpyde's API has changed; `threads` should now be a boolean. "
"See https://github.com/clbarnes/ncollpyde/issues/27 for more details",
DeprecationWarning,
)
t = bool(threads)
logger.warning("Interpreting deprecated `threads=%s` as %s", threads, t)
return t
[docs]class Volume:
dtype: np.dtype = PRECISION
"""Float data type used internally"""
threads: bool = DEFAULT_THREADS
"""Whether to use threading"""
def __init__(
self,
vertices: ArrayLike,
triangles: ArrayLike,
validate=False,
threads: Optional[bool] = None,
n_rays=DEFAULT_RAYS,
ray_seed=DEFAULT_SEED,
):
f"""
Create a volume described by a triangular mesh with N vertices and M triangles.
:param vertices: Nx3 array-like of floats, coordinates of triangle corners
:param triangles: Mx3 array-like of ints,
indices of ``vertices`` which describe each triangle
:param validate: bool, whether to validate mesh.
If trimesh is installed, the mesh is checked for watertightness and correct
winding, and repairs made if possible.
Otherwise, only very basic checks are made.
:param threads: optional bool, whether to parallelise queries.
:param n_rays: int (default {DEFAULT_RAYS}), rays used to check containment.
The underlying library sometimes reports false positives:
casting multiple rays drastically reduces the chances of this.
As the bug only affects ray casts and only produces false positives,
unnecessary ray casts are short-circuited if:
- the point is not in the bounding box
- the point is on the hull
- one ray reports that the point is external.
:param ray_seed: int >=0 (default {DEFAULT_SEED}), used for generating rays.
If None, use a random seed.
"""
vert = np.asarray(vertices, self.dtype)
if len(vert) > np.iinfo(INDEX).max:
raise ValueError(f"Cannot represent {len(vert)} vertices with {INDEX}")
tri = np.asarray(triangles, INDEX)
if validate:
vert, tri = self._validate(vert, tri)
self.threads = self._interpret_threads(threads)
if ray_seed is None:
logger.warning(
"Using unseeded random number generator for containment-checking rays; "
"results may be inconsistent across repeats."
)
ray_seed = random.randrange(0, 2**64)
self._impl = TriMeshWrapper(vert, tri, int(n_rays), ray_seed)
def _validate(
self, vertices: np.ndarray, triangles: np.ndarray
) -> Tuple[NDArray[np.float64], NDArray[np.uint32]]:
if trimesh:
tm = trimesh.Trimesh(vertices, triangles, validate=True)
if not tm.is_volume:
logger.info("Mesh not valid, attempting to fix")
tm.fill_holes()
tm.fix_normals()
if not tm.is_volume:
raise ValueError(
"Mesh is not a volume "
"(e.g. not watertight, incorrect winding) "
"and could not be fixed"
)
return tm.vertices.astype(self.dtype), tm.faces.astype(np.uint32)
else:
warnings.warn("trimesh not installed; full validation not possible")
if vertices.shape[1:] != (3,):
raise ValueError("Vertices are not in 3D")
if triangles.shape[1:] != (3,):
raise ValueError("Triangles do not have 3 points")
if triangles.max() >= len(vertices):
raise ValueError("Some triangle vertices do not exist in points")
return vertices, triangles
def __contains__(self, item: ArrayLike) -> bool:
"""Check whether a single point is in the volume."""
return self.contains(np.asarray([item]), False)[0]
def _interpret_threads(self, threads: Optional[Union[int, bool]]) -> bool:
return interpret_threads(threads, self.threads)
[docs] def distance(
self,
coords: ArrayLike,
signed: bool = True,
threads: Optional[bool] = None,
) -> np.ndarray:
"""Check the distance from the volume to multiple points (as a Px3 array-like).
Distances are reported to the boundary of the volume.
By default, if the point is inside the volume,
the distance will be reported as negative.
``signed=False`` is faster but cannot distinguish
between internal and external points.
:param coords:
:param signed: bool, default True.
Whether distances to points inside the volume
should be reported as negative.
:param threads: None,
Whether to parallelise the queries. If ``None`` (default),
refer to the instance's ``threads`` attribute.
:return: np.ndarray of float, the distance from the volume to each given point
"""
coords = np.asarray(coords, self.dtype)
if coords.shape[1:] != (3,):
raise ValueError("Coords is not a Nx3 array-like")
return self._impl.distance(coords, signed, self._interpret_threads(threads))
[docs] def contains(
self, coords: ArrayLike, threads: Optional[bool] = None
) -> NDArray[np.bool_]:
"""Check whether multiple points (as a Px3 array-like) are in the volume.
:param coords:
:param threads: None,
Whether to parallelise the queries. If ``None`` (default),
refer to the instance's ``threads`` attribute.
:return: np.ndarray of bools, whether each point is inside the volume
"""
coords = np.asarray(coords, self.dtype)
if coords.shape[1:] != (3,):
raise ValueError("Coords is not a Nx3 array-like")
return self._impl.contains(coords, self._interpret_threads(threads))
[docs] def intersections(
self,
src_points: ArrayLike,
tgt_points: ArrayLike,
threads: Optional[bool] = None,
) -> Tuple[NDArray[np.uint64], NDArray[np.float64], NDArray[np.bool_]]:
"""Get intersections between line segments and volume.
Line segments are defined by their start (source) and end (target) points.
Only the first intersection for a given line segment is reported.
Even if there is only one line segment to check,
the argument arrays must have 2 dimensions, e.g. ``[[0, 0, 0]], [[1, 1, 1]]``.
The output arrays are:
* Which line segment the intersection refers to,
as an index into the argument arrays
* The point of intersection
* Whether the intersection is with the inside face of the mesh
N.B. the inside face check will report True
for cases where a line touches ("skims") an external edge; see
https://github.com/dimforge/ncollide/issues/335 .
N.B. threads=True here uses a slightly different implementation,
so you may not see the performance increase as with other methods.
:param src_points: Nx3 array-like
:param tgt_points: Nx3 array-like
:param threads: None,
Whether to parallelise the queries. If ``None`` (default),
refer to the instance's ``threads`` attribute.
:raises ValueError: Inputs have different shapes or are not Nx3
:return: tuple of
uint array of indices (N),
float array of locations (Nx3),
bool array of is_backface (N)
"""
src = np.asarray(src_points, self.dtype)
tgt = np.asarray(tgt_points, self.dtype)
if src.shape != tgt.shape:
raise ValueError("Source and target points arrays must be the same shape")
if src.shape[1:] != (3,):
raise ValueError("Points must be Nx3 array-like")
if self._interpret_threads(threads):
idxs, points, bfs = self._impl.intersections_many_threaded(
src.tolist(), tgt.tolist()
)
return (
np.array(idxs, INDEX),
np.array(points, self.dtype),
np.array(bfs, bool),
)
else:
return self._impl.intersections_many(src, tgt)
[docs] @classmethod
def from_meshio(
cls,
mesh: "meshio.Mesh",
validate=False,
threads=None,
n_rays=DEFAULT_RAYS,
ray_seed=DEFAULT_SEED,
) -> "Volume":
"""
Convenience function for instantiating a Volume from a meshio Mesh.
:param mesh: meshio Mesh whose only cells are triangles.
:param validate: as passed to ``__init__``, defaults to False
:param threads: as passed to ``__init__``, defaults to None
:param n_rays: as passed to ``__init__``, defaults to 3
:param ray_seed: as passed to ``__init__``, defaults to None (random)
:raises ValueError: if Mesh does not have triangle cells
:return: Volume instance
"""
try:
triangles = mesh.cells_dict["triangle"]
except KeyError:
raise ValueError("Must have triangle cells")
return cls(
mesh.points,
triangles,
validate,
threads,
n_rays,
ray_seed,
)
@property
def points(self) -> NDArray[np.float64]:
"""
Nx3 array of float describing vertices
"""
return self._impl.points()
@property
def faces(self) -> NDArray[np.uint32]:
"""
Mx3 array of uint32 describing indexes into points array making up triangles
"""
return self._impl.faces()
@property
def extents(self) -> NDArray[np.float64]:
"""Axis-aligned bounding box of the volume.
:return: 2x3 numpy array of floats,
where the first row is mins and the second row is maxes.
"""
return self._impl.aabb()
@property
def rays(self) -> NDArray[np.float64]:
"""
Rays used to detect containment as an ndarray of shape (n_rays, 3).
These rays are in random directions (set with the ray seed on construction),
and are the length of the diameter of the volume's bounding sphere.
"""
return self._impl.rays()