Source code for rmsh.meshblocks

"""Mesh block implementation and the function that pre-processes inputs for C code."""

#   Internal imports
from collections.abc import Mapping, Sequence
from dataclasses import dataclass

#   External imports
import numpy as np

from rmsh.geometry import (
    BoundaryBlock,
    BoundaryCurve,
    BoundaryId,
    _BlockInfoTuple,
    _BoundaryInfoTuple,
)
from rmsh.mesh2d import Mesh2D


[docs] @dataclass(frozen=True) class MeshBlock: """Basic building block of the mesh, which describes a structured mesh block. Boundaries of the block can be either prescribed curves, or instead a "soft" boundary, where the only information specified is that it should connect to another block. Such "soft" boundaries allow for a potentially smoother transition between different blocks. When blocks share a boundary, the only requirement is that the boundary that is shared between them has such a number of nodes that the opposite boundary matches it. Parameters ---------- label : str Label by which this block will be referred to in the mesh, as well as by other blocks which have a block boundary to this block. boundaries : Mapping of BoundaryId to BoundaryCurve or BoundaryBlock Dictionary containing the boundaries with their respective IDs. All four might be specified, or only a few. Attributes ---------- label : str Label by which this block will be referred to in the mesh, as well as by other blocks which have a block boundary to this block. boundaries : dict[BoundaryId, BoundaryCurve|BoundaryBlock|None] Dictionary of boundaries, which has entries for the boundaries of the block. For a block to be complete, all four boundaries must be specified. """ label: str boundaries: dict[BoundaryId, BoundaryCurve | BoundaryBlock | None] def __init__( self, label: str, boundaries: Mapping[BoundaryId, BoundaryCurve | BoundaryBlock] ): object.__setattr__(self, "label", label) object.__setattr__(self, "boundaries", dict()) for k in boundaries: match k: case ( BoundaryId.BoundaryNorth | BoundaryId.BoundarySouth | BoundaryId.BoundaryEast | BoundaryId.BoundaryWest ): self.boundaries[k] = boundaries[k] case _: raise RuntimeError( f"Boundary has a key of an invalid type ({type(k)}, should be" f" {BoundaryId})" ) def set_boundary( self, bid: BoundaryId, b: BoundaryCurve | BoundaryBlock | None = None ) -> None: """Set a boundary of a block to a specified curve, block, or None. If a boundary that is being set is already set to a curve or a block and the new value is not None, a warning will be reported by the function. Parameters ---------- bid : BoundaryId ID of the boundary to be set b : BoundaryCurve | BoundaryBlock | None = None New value of the boundary specified by `bid`. """ prev = None match bid: case ( BoundaryId.BoundaryNorth | BoundaryId.BoundarySouth | BoundaryId.BoundaryEast | BoundaryId.BoundaryWest ): prev = self.boundaries[bid] self.boundaries[bid] = b case _: raise RuntimeError("Invalid value of boundary id was specified") if prev is not None and b is not None: raise RuntimeWarning( f"Boundary with id {bid.name} for block {self.label} was set, but was " f"not None previously (was {type(prev)} instead)" ) def has_all_boundaries(self) -> bool: """Check if a mesh block has all four boundaries specified as non-None values. Returns ------- bool True if the mesh block has all needed boundaries and False if that is not the case. """ return ( ( BoundaryId.BoundaryNorth in self.boundaries and self.boundaries[BoundaryId.BoundaryNorth] is not None ) and ( BoundaryId.BoundarySouth in self.boundaries and self.boundaries[BoundaryId.BoundarySouth] is not None ) and ( BoundaryId.BoundaryEast in self.boundaries and self.boundaries[BoundaryId.BoundaryEast] is not None ) and ( BoundaryId.BoundaryWest in self.boundaries and self.boundaries[BoundaryId.BoundaryWest] is not None ) )
def _find_boundary_size(bnd: BoundaryBlock, blcks: dict[str, tuple[int, MeshBlock]]): # if type(bnd) is BoundaryCurve: # bnd: BoundaryCurve # return len(bnd.x) checked: list[BoundaryBlock | BoundaryCurve] = [bnd] i = 0 boundary: BoundaryBlock | BoundaryCurve = bnd while True: if type(boundary) is BoundaryCurve: return len(boundary.x) assert type(boundary) is BoundaryBlock if boundary.n != 0: return boundary.n _, target = blcks[boundary.target] other_bnd = target.boundaries[boundary.target_id] assert other_bnd is not None if type(other_bnd) is BoundaryCurve: return len(other_bnd.x) # if type(other_bnd) is BoundaryBlock: match boundary.target_id: case BoundaryId.BoundaryNorth: new_bnd = target.boundaries[BoundaryId.BoundarySouth] case BoundaryId.BoundarySouth: new_bnd = target.boundaries[BoundaryId.BoundaryNorth] case BoundaryId.BoundaryWest: new_bnd = target.boundaries[BoundaryId.BoundaryEast] case BoundaryId.BoundaryEast: new_bnd = target.boundaries[BoundaryId.BoundaryWest] case _: raise RuntimeError( "Invalid boundary type for the block boundary encountered" ) assert new_bnd is not None boundary = new_bnd if new_bnd in checked: break else: checked.append(new_bnd) i += 1 raise RuntimeError( "Circular reference for block boundaries without specifying their size" ) def _curves_have_common_point(c1: BoundaryCurve, c2: BoundaryCurve) -> bool: p11 = (c1.x[0], c1.y[0]) p12 = (c1.x[-1], c1.y[-1]) p21 = (c2.x[0], c2.y[0]) p22 = (c2.x[-1], c2.y[-1]) return ( np.allclose(p11, p21) or np.allclose(p11, p22) or np.allclose(p12, p21) or np.allclose(p12, p22) ) @dataclass class SolverConfig: """Used to configure the solver used to solve the PDE for the positions of mesh nodes. Direct solver uses banded matrix representation to compute an LU decomposition for a matrix with a fixed bandwidth, which works well for small systems, where bandwidth is still small. After a specific mesh size, the solver will be changed to an iterative solver (Stabilized Bi-Conjugate Gradient Preconditioned with Incomplete Lower-Upper Decomposition - PILUBICG-STAB), which offers fast performance at a cost of loss of optimality guarantee. As such, it is necessary to use restarts after the Krylov subspace begins to degenerate. Parameters ---------- force_direct : bool, default: False Force the direct solver to be used regardless of the problem size. This may allow for solution of some cases where the mesh is ill-defined or very close to ill-defined. Not recommended for other cases, since it is very slow. tolerance : float, default: 1e-6 Tolerance used by the iterative solver to check for convergence. The solver will consider the iterative solution to be converged when ``tolerance * norm(y) >= norm(r)``, where ``norm(y)`` is the L2 norm of the RHS of the equation being solved and ``norm(r)`` is the L2 norm of the residual. A lower value means a solution is more converged, but a value which is too low might be unable to give a solution. smoother_rounds : int, default: 0 After each round of using an iterative solver, a Jacobi smoother can be used in order to "smooth" the intermediate iterative solution for up to ``smoother_rounds``. Since the system matrix being solved is not strictly diagonally dominant, there is no guarantee on error reduction, but there is at least a guarantee on the error not being increased by this. max_iterations : int, default: 128 How many iterations the iterative solver will perform each round at most, before restarting. Smaller values might cause the convergence to be very slow due discarding the entire Krylov subspace for that round. Very large values for ``max_iterations`` might also delay convergence, since Krylov subspace might degenerate and residual could start to grow again. If a solver detects that the Krylov subspace has degenerated, a round of iterative solver might end sooner. max_rounds : int, default: 8 How many round of iterative solver to run at most. This means that the total number of rounds that PILUBICG-STAB could run is ``max_rounds * max_iterations``, while the maximum number of smoother rounds is ``max_rounds * smoother_rounds``. """ force_direct: bool = False tolerance: float = 1e-6 smoother_rounds: int = 0 max_iterations: int = 128 max_rounds: int = 8 def create_elliptical_mesh( blocks: Sequence[MeshBlock], verbose: bool = False, allow_insane: bool = False, solver_cfg: SolverConfig | None = None, ) -> tuple[Mesh2D, float, float]: """Create a mesh from mesh blocks by solving the Laplace equation for mesh nodes. Parameters ---------- blocks : Sequence of MeshBlock A sequence of blocks which constitute the mesh. verbose : bool, default: False When set to True the solver and function will print extra information to stdout related to solver progress. When False, only warnings and exceptions will be produced. allow_insane : bool, default: False Disable boundary connectivity checks. By default, these are enabled, causing an error to be raised if neighboring curve boundaries of the same block don't share a single point. solver_cfg : SolverConfig, optional Contains additional options to configure the solver used to compute the mesh coordinates. Returns ------- Mesh2D Mesh generated from the provided blocks. float The L2 norm of the residual of the system solved for the X coordinate. float The L2 norm of the residual of the system solved for the Y coordinate. """ if solver_cfg is None: solver_cfg = SolverConfig( force_direct=False, tolerance=1e-6, smoother_rounds=0, max_iterations=128, max_rounds=8, ) bdict: dict[str, tuple[int, MeshBlock]] = ( dict() ) # Holds indices to which the blocks map to if verbose: print("Checking all blocks") for i, b in enumerate(blocks): if b.label in bdict: raise RuntimeError(f'Multiple blocks with the same label "{b.label}"') # Check if boundaries are correctly set up if (not b.has_all_boundaries()) or (len(b.boundaries) != 4): raise RuntimeError( f"Block {b.label} does not have all boundaries defined" f" (current: {b.boundaries})" ) # Finally set the label bdict[b.label] = (i, b) if verbose: print(f'Block "{b.label}" was good') if verbose: print("Finished checking all blocks") if verbose: print("Checking all boundaries") # Make sure that the boundaries are correctly set up n_bnds: list[dict[BoundaryId, int]] = [] for i, b in enumerate(blocks): bnd_lens: dict[BoundaryId, int] = dict() for bid in b.boundaries: bnd = b.boundaries[bid] # If boundary is BoundaryBlock, it should be sorted nbnd = 0 if type(bnd) is BoundaryBlock: iother, other = bdict[bnd.target] bother = other.boundaries[bnd.target_id] if iother > i and type(bother) is BoundaryCurve: # Boundaries must be swapped # Flip the x and y arrays, since they should be in reverse order flipped = BoundaryCurve(np.flip(bother.x), np.flip(bother.y)) b.boundaries[bid] = flipped other.boundaries[bnd.target_id] = BoundaryBlock(b.label, bid) nbnd = len(flipped.x) elif bnd.n != 0: nbnd = bnd.n else: nbnd = _find_boundary_size(bnd, bdict) # Check that the corners of curve match up correctly if check is enabled elif not allow_insane: assert type(bnd) is BoundaryCurve nbnd = len(bnd.x) bleft = None bright = None match bid: case BoundaryId.BoundaryNorth: bleft = BoundaryId.BoundaryEast bright = BoundaryId.BoundaryWest case BoundaryId.BoundaryWest: bleft = BoundaryId.BoundaryNorth bright = BoundaryId.BoundarySouth case BoundaryId.BoundarySouth: bleft = BoundaryId.BoundaryWest bright = BoundaryId.BoundaryEast case BoundaryId.BoundaryEast: bleft = BoundaryId.BoundarySouth bright = BoundaryId.BoundaryNorth bndleft = b.boundaries[bleft] bndright = b.boundaries[bright] if type(bndleft) is BoundaryCurve: if not _curves_have_common_point(bnd, bndleft): raise RuntimeWarning( f"Block {b.label} has curves as boundaries {bid.name} and" f" {bleft.name}, but they have no common points. To allow" " such meshes to be counted as valid, " 'call this function with "allow_insane=True"' ) if type(bndright) is BoundaryCurve: if not _curves_have_common_point(bnd, bndright): raise RuntimeWarning( f"Block {b.label} has curves as boundaries {bid.name} and" f" {bright.name}, but they have no common points. To allow" " such meshes to be counted as valid, " 'call this function with "allow_insane=True"' ) bnd_lens[bid] = nbnd n_bnds.append(bnd_lens) for i, (b, nb) in enumerate(zip(blocks, n_bnds)): n_north = nb[BoundaryId.BoundaryNorth] n_south = nb[BoundaryId.BoundarySouth] n_east = nb[BoundaryId.BoundaryEast] n_west = nb[BoundaryId.BoundaryWest] if n_north != n_south: raise RuntimeError( f"Block {b.label} has {n_north} points on the north boundary, but" f" {n_south} points on the south boundary" ) if n_east != n_west: raise RuntimeError( f"Block {b.label} has {n_east} points on the east boundary, but {n_west}" " points on the west boundary" ) # Convert the input blocks into the form which is demanded by the C part of the code if verbose: print("Converting inputs to for usable by the C code") inputs: list[_BlockInfoTuple] = [] for i, (b, nb) in enumerate(zip(blocks, n_bnds)): boundaries: dict[BoundaryId, _BoundaryInfoTuple] = dict() for bid in b.boundaries: bnd = b.boundaries[bid] if type(bnd) is BoundaryCurve: boundaries[bid] = ( 0, bid.value, nb[bid], np.array(bnd.x, dtype=np.float64), np.array(bnd.y, dtype=np.float64), ) elif type(bnd) is BoundaryBlock: iother, other = bdict[bnd.target] boundaries[bid] = (1, bid.value, nb[bid], iother, bnd.target_id.value) else: raise RuntimeError( f'Boundary {bid.name} of block "{b.label}" was of invalid type' f" {type(bnd)}" ) bv = ( b.label, boundaries[BoundaryId.BoundaryNorth], boundaries[BoundaryId.BoundarySouth], boundaries[BoundaryId.BoundaryEast], boundaries[BoundaryId.BoundaryWest], ) inputs.append(bv) extra = ( solver_cfg.force_direct, solver_cfg.tolerance, solver_cfg.smoother_rounds, solver_cfg.max_iterations, solver_cfg.max_rounds, ) name_map: dict[str, int] = dict() for k in bdict: name_map[k] = bdict[k][0] mesh, rx, ry = Mesh2D._create_elliptical_mesh_labeled( inputs, verbose, extra, name_map ) return mesh, rx, ry