Source code for mfv2d.solving

"""Code related to solving the system of equations."""

from __future__ import annotations

from collections.abc import Callable, Sequence
from dataclasses import dataclass
from typing import Self, TypeVar, cast

import numpy as np
import numpy.typing as npt
from scipy import linalg as la
from scipy import sparse as sp

from mfv2d._mfv2d import DenseVector, MatrixCRS, SparseVector, TraceVector
from mfv2d._mfv2d import LinearSystem as CLinearSystem
from mfv2d.mimetic2d import Constraint
from mfv2d.system import ElementFormSpecification


[docs] class LinearSystem(CLinearSystem): """Class used to represent a linear system with element equations and constraints. Parameters ---------- n_elem : int Number of elements in the system. form_spec : ElementFormSpecification Specification of forms in the element system. orders : array Array of element orders. element_matrices : Sequence of arrays Sequence of element matrices. constraints : Sequence of Constraint Sequences of constraints the element systems are subject to. """
[docs] def __new__( cls, n_elem: int, form_spec: ElementFormSpecification, orders: npt.NDArray[np.uint32], element_matrices: Sequence[npt.NDArray[np.float64]], constraints: Sequence[Constraint], ) -> Self: """Create a new LinearSystem.""" assert n_elem == len(element_matrices) assert orders.shape == (n_elem, 2) for ie in range(n_elem): size = form_spec.total_size(*orders[ie]) assert element_matrices[ie].shape == (size, size) element_constraint_counts = np.zeros(n_elem, np.uintc) for con in constraints: for ec in con.element_constraints: element_constraint_counts[ec.i_e] += 1 system_blocks = tuple( ( element_matrices[ie], MatrixCRS(len(constraints), element_matrices[ie].shape[1]), ) for ie in range(n_elem) ) c_vals: list[float] = list() c_elems: list[npt.NDArray[np.uint32]] = list() c_lens: list[int] = [0] for ic, con in enumerate(constraints): present_elements: list[int] = list() for ec in con.element_constraints: present_elements.append(ec.i_e) system = system_blocks[ec.i_e] system[1].build_row( ic, SparseVector.from_entries(system[1].shape[1], ec.dofs, ec.coeffs), ) c_elems.append( np.array([ec.i_e for ec in con.element_constraints], np.uint32) ) c_vals.append(con.rhs) c_lens.append(c_elems[-1].size) for ie in (i for i in range(n_elem) if i not in present_elements): system_blocks[ie][1].build_row(ic) return super().__new__( cls, system_blocks, np.cumsum(c_lens, dtype=np.uint32), np.concatenate(c_elems, dtype=np.uint32), )
[docs] def apply_full_trace_system( self, x: TraceVector, /, out: TraceVector, tmp1: DenseVector, tmp2: DenseVector ) -> None: """Apply the Schur's trace system. Parameters ---------- TraceVector Trace vector to which this is applied to. out : TraceVector Trace vector, which receives output. Can be the same as input. tmp1 : DenseVector Dense vector used to store intermediate result. Must be unique. tmp2 : DenseVector Dense vector used to store intermediate result. Must be unique. """ if tmp1 is tmp2: raise ValueError("Temporary dense vectors must not be the same.") self.apply_trace_transpose(x, tmp1) self.apply_diagonal_inverse(tmp1, tmp2) self.apply_trace(tmp2, out)
[docs] def combined_system_matrix(self) -> sp.csr_array: """Combine the system matrix into a scipy CSR array.""" diagonal_part = sp.block_diag(self.get_dense_blocks()) lagrange_block = sp.block_array( [ [ sp.csc_array( ( constraint_matrix.values, ( constraint_matrix.row_indices, constraint_matrix.column_indices, ), ), shape=constraint_matrix.shape, ) for constraint_matrix in self.get_constraint_blocks() ] ] ) return cast( sp.csr_array, sp.block_array( [ [diagonal_part, lagrange_block.T], [lagrange_block, None], ], format="csr", ), )
@dataclass(frozen=True) class ConvergenceSettings: """Settings used to specify convergence of an iterative solver.""" maximum_iterations: int = 100 """Maximum number of iterations to improve the solution.""" absolute_tolerance: float = 1e-6 """When the largest update in the solution drops bellow this value, consider it converged.""" relative_tolerance: float = 1e-5 """When the largest update in the solution drops bellow the largest value of solution degrees of freedom scaled by this value, consider it converged.""" _Mat = TypeVar("_Mat") _Vec = TypeVar("_Vec")
[docs] def gmres_general( mat: _Mat, rhs: _Vec, initial_guess: _Vec, convergence: ConvergenceSettings, system_application_function: Callable[[_Mat, _Vec, _Vec], None], vec_dot_function: Callable[[_Vec, _Vec], float], vec_add_to_function: Callable[[_Vec, _Vec, _Vec, float], None], vec_sub_from_scaled_function: Callable[[_Vec, _Vec, _Vec, float], None], vec_scale_by_function: Callable[[_Vec, float, _Vec], None], vec_copy_function: Callable[[_Vec], _Vec], ) -> tuple[_Vec, float, int]: """General implementation of GMRES to use for any data type with operators. Returns ------- _Vec Computed solution. float Estimated residual. int Iterations done. """ m = convergence.maximum_iterations g = np.zeros(m, np.float64) h = np.zeros(m, np.float64) sk = np.zeros(m, np.float64) ck = np.zeros(m, np.float64) r = np.zeros((m, m), np.float64) k = 0 p_vecs: list[_Vec] = list() # Find stopping criterion rhs_mag = np.sqrt(vec_dot_function(rhs, rhs)) if rhs_mag * convergence.relative_tolerance > convergence.absolute_tolerance: tol = convergence.absolute_tolerance else: tol = rhs_mag * convergence.relative_tolerance res = vec_copy_function(rhs) system_application_function(mat, initial_guess, res) vec_sub_from_scaled_function(rhs, res, res, 1.0) # First residual p = res # Get magnitude of the residual r_mag = np.sqrt(vec_dot_function(p, p)) # Normalize the vector vec_scale_by_function(p, 1 / r_mag, p) # Add it to the current collection of basis and LSQR state p_vecs.append(p) g[0] = r_mag for k in range(1, m): # Make a new basis vector p = vec_copy_function(p) system_application_function(mat, p, p) # Make it orthogonal to other basis for li in range(k): p_old = p_vecs[li] pp_dp = vec_dot_function(p, p_old) h[li] = pp_dp vec_sub_from_scaled_function(p, p_old, p, pp_dp) # Get the magnitude and normalize it p_mag2 = vec_dot_function(p, p) # Surprise tool for later p_mag = np.sqrt(p_mag2) vec_scale_by_function(p, 1 / p_mag, p) p_vecs.append(p) # Apply previous Givens rotations to the new column for i in range(k - 1): tmp = ck[i] * h[i] + sk[i] * h[i + 1] h[i + 1] = -sk[i] * h[i] + ck[i] * h[i + 1] h[i] = tmp # Find new Givens rotation rho = np.sqrt(p_mag2 + h[k - 1] * h[k - 1]) c_new = h[k - 1] / rho s_new = p_mag / rho ck[k - 1] = c_new sk[k - 1] = s_new h[k - 1] = c_new * h[k - 1] + s_new * p_mag r[:k, k - 1] = h[:k] g[k] = -s_new * g[k - 1] g[k - 1] = c_new * g[k - 1] r_mag = np.abs(g[k]) if r_mag < tol: # k += 1 break # Iterations are done, time to solve the LSQR problem alpha = la.solve_triangular(r[:k, :k], g[:k]) sol = vec_copy_function(initial_guess) for i in range(k): vec_add_to_function(sol, p_vecs[i], sol, alpha[i]) return sol, r_mag, k
[docs] def pcg_general( mat: _Mat, rhs: _Vec, initial_guess: _Vec, convergence: ConvergenceSettings, system_application_function: Callable[[_Mat, _Vec, _Vec], None], precondition_function: Callable[[_Mat, _Vec, _Vec], None], vec_dot_function: Callable[[_Vec, _Vec], float], vec_add_to_scaled_function: Callable[[_Vec, _Vec, float, _Vec], None], vec_sub_from_scaled_function: Callable[[_Vec, _Vec, float, _Vec], None], vec_copy_function: Callable[[_Vec], _Vec], degen_limit: float = 1e-12, ) -> tuple[_Vec, float, int]: """General implementation of GMRES to use for any data type with operators. Returns ------- _Vec Computed solution. float Estimated residual. int Iterations done. """ # Find stopping criterion x = vec_copy_function(initial_guess) res = vec_copy_function(initial_guess) system_application_function(mat, x, res) vec_sub_from_scaled_function(rhs, res, 1.0, res) p = vec_copy_function(res) precondition_function(mat, res, p) z = vec_copy_function(p) ap = vec_copy_function(rhs) res_mag2 = vec_dot_function(rhs, rhs) if ( np.sqrt(res_mag2) * convergence.relative_tolerance > convergence.absolute_tolerance ): tol = convergence.absolute_tolerance else: tol = np.sqrt(res_mag2) * convergence.relative_tolerance rz_dp = vec_dot_function(res, z) iter_cnt = 0 for iter_cnt in range(convergence.maximum_iterations): system_application_function(mat, p, ap) apa = vec_dot_function(ap, p) if (np.log(abs(apa)) - np.log(res_mag2)) < np.log(degen_limit): raise RuntimeError("System degenerated (matrix was probably not SPD).") alpha = rz_dp / apa vec_add_to_scaled_function(x, p, alpha, x) vec_sub_from_scaled_function(res, ap, alpha, res) res_mag2 = vec_dot_function(res, res) if res_mag2 < tol**2: break precondition_function(mat, res, z) new_rz_dp = vec_dot_function(res, z) beta = new_rz_dp / rz_dp rz_dp = new_rz_dp vec_add_to_scaled_function(z, p, beta, p) return x, np.sqrt(res_mag2), iter_cnt
[docs] def cg_general( mat: _Mat, rhs: _Vec, initial_guess: _Vec, convergence: ConvergenceSettings, system_application_function: Callable[[_Mat, _Vec], None], vec_dot_function: Callable[[_Vec, _Vec], float], vec_add_to_scaled_function: Callable[[_Vec, _Vec, float], None], vec_sub_from_scaled_function: Callable[[_Vec, _Vec, float], None], vec_copy_function: Callable[[_Vec], _Vec], vec_set_function: Callable[[_Vec, _Vec], None], ) -> tuple[_Vec, float, int]: """General implementation of GMRES to use for any data type with operators. Parameters ---------- m : int Number of basis to use for the solution. mat : _Mat System matrix state. rhs : _Vec Right side of the system. convergence : ConvergenceSettings Settings to use for convergence. system_application_function : (_Mat, _Vec) -> _Vec Function that applies the system to the input vector. vec_dot_function : (_Vec, _Vec) -> float Function that computes the dot product of system vectors. vec_add_function : (_Vec, _Vec) -> _Vec Function that adds two vectors together. vec_sub_function : (_Vec, _Vec) -> _Vec Function which subtracts two vectors. vec_scale_function : (float, _Vec) -> _Vec Function which scales a vector by a constant Returns ------- _Vec Computed solution. float Estimated residual. int Iterations done. """ # Find stopping criterion res_mag2 = vec_dot_function(rhs, rhs) if ( np.sqrt(res_mag2) * convergence.relative_tolerance > convergence.absolute_tolerance ): tol = convergence.absolute_tolerance else: tol = np.sqrt(res_mag2) * convergence.relative_tolerance ap = vec_copy_function(rhs) p = vec_copy_function(rhs) res = vec_copy_function(rhs) x = vec_copy_function(initial_guess) iter_cnt = 0 for iter_cnt in range(convergence.maximum_iterations): system_application_function(mat, ap) apa = vec_dot_function(ap, p) alpha = res_mag2 / apa vec_add_to_scaled_function(x, p, alpha) vec_sub_from_scaled_function(res, ap, alpha) new_res_mag2 = vec_dot_function(res, res) if new_res_mag2 < tol**2: res_mag2 = new_res_mag2 break beta = new_res_mag2 / res_mag2 res_mag2 = new_res_mag2 vec_set_function(ap, res) vec_add_to_scaled_function(ap, p, beta) vec_set_function(p, ap) return x, np.sqrt(res_mag2), iter_cnt
def solve_schur_iterative( system: LinearSystem, rhs: DenseVector, constraints: TraceVector, convergence: ConvergenceSettings, ) -> tuple[DenseVector, TraceVector, float, int]: """Solve the system using Schur's compliment but iteratively.""" # Compute rhs forcing for the Lagrange multipliers ### A^{-1} y inv_a_y = DenseVector(system) system.apply_diagonal_inverse(rhs, inv_a_y) ### N A^{-1} y - phi trace_rhs = TraceVector(system) system.apply_trace(inv_a_y, trace_rhs) TraceVector.subtract(trace_rhs, constraints, trace_rhs, 1.0) tmp_d1 = DenseVector(system) tmp_d2 = DenseVector(system) def wrapped_apply_system(system: LinearSystem, v: TraceVector, /) -> None: """Apply the trace system in a wrapped way.""" system.apply_full_trace_system(v, v, tmp_d1, tmp_d2) # Iteratively solve the system for trace lambda # trace_lhs, tr, itr_cnt = gmres_general( # system, # trace_rhs, # convergence, # wrapped_apply_system, # TraceVector.dot, # TraceVector.add_to, # TraceVector.subtract_from_scaled, # TraceVector.scale_by, # TraceVector.copy, # ) def _wrap_add(v1: TraceVector, v2: TraceVector, k: float) -> None: TraceVector.add(v1, v2, v1, k) def _wrap_sub(v1: TraceVector, v2: TraceVector, k: float) -> None: TraceVector.subtract(v1, v2, v1, k) trace_lhs, tr, itr_cnt = cg_general( system, trace_rhs, TraceVector(system), convergence, wrapped_apply_system, TraceVector.dot, _wrap_add, _wrap_sub, TraceVector.copy, TraceVector.set_from, ) # Apply contribution of trace to the system x = A^{-1} y - A^{-1} N^T lambda system.apply_trace_transpose(trace_lhs, tmp_d1) system.apply_diagonal_inverse(tmp_d1, tmp_d2) DenseVector.subtract(inv_a_y, tmp_d2, inv_a_y, 1.0) return inv_a_y, trace_lhs, tr, itr_cnt
[docs] @dataclass class FullVector: """Class contaning a dense and trace vector.""" dense: DenseVector trace: TraceVector
[docs] def __post_init__(self) -> None: """Check both are from the same parent.""" if self.dense.parent is not self.trace.parent: raise ValueError("Both parts must have the same parent.")
[docs] @classmethod def make_empty(cls, system: LinearSystem) -> Self: """Create an empty vector.""" return cls(DenseVector(system), TraceVector(system))
[docs] @staticmethod def dot(v1: FullVector, v2: FullVector) -> float: """Compute dot product of two vectors.""" return DenseVector.dot(v1.dense, v2.dense) + TraceVector.dot(v1.trace, v2.trace)
[docs] def copy(self) -> FullVector: """Create a copy of itself.""" return FullVector(self.dense.copy(), self.trace.copy())
[docs] def set_from(self, other: FullVector) -> None: """Set the vector from another.""" self.dense.set_from(other.dense) self.trace.set_from(other.trace)
[docs] @staticmethod def add(v1: FullVector, v2: FullVector, v_out: FullVector, k: float, /) -> None: """Add two vectors.""" DenseVector.add(v1.dense, v2.dense, v_out.dense, k) TraceVector.add(v1.trace, v2.trace, v_out.trace, k)
[docs] @staticmethod def subtract(v1: FullVector, v2: FullVector, v_out: FullVector, k: float, /) -> None: """Subtracts two vectors.""" DenseVector.subtract(v1.dense, v2.dense, v_out.dense, k) TraceVector.subtract(v1.trace, v2.trace, v_out.trace, k)
[docs] @staticmethod def scale(v: FullVector, k: float, v_out: FullVector, /) -> None: """Scale the vector by a constant.""" DenseVector.scale(v.dense, k, v_out.dense) TraceVector.scale(v.trace, k, v_out.trace)
[docs] def solve_gmres_iterative( system: LinearSystem, rhs: DenseVector, constraints: TraceVector, convergence: ConvergenceSettings, ) -> tuple[DenseVector, TraceVector, float, int]: """Solve the system using GMRES.""" # Set up the right side rhs_full = FullVector(rhs, constraints) dense_buffer1 = DenseVector(system) def wrapped_apply_system( sys: LinearSystem, v_in: FullVector, v_out: FullVector ) -> None: sys.apply_diagonal(v_in.dense, dense_buffer1) sys.apply_trace_transpose(v_in.trace, v_out.dense) sys.apply_trace(v_in.dense, v_out.trace) DenseVector.add(v_out.dense, dense_buffer1, v_out.dense, 1.0) solution, tr, itr_cnt = gmres_general( system, rhs_full, FullVector.make_empty(system), convergence, wrapped_apply_system, FullVector.dot, FullVector.add, FullVector.subtract, FullVector.scale, FullVector.copy, ) return solution.dense, solution.trace, tr, itr_cnt
[docs] def solve_cg_iterative( system: LinearSystem, rhs: DenseVector, constraints: TraceVector, convergence: ConvergenceSettings, ) -> tuple[DenseVector, TraceVector, float, int]: """Solve the system using CG.""" # Set up the right side rhs_full = FullVector(rhs, constraints) dense_buffer1 = DenseVector(system) dense_buffer2 = DenseVector(system) def wrapped_apply_system(sys: LinearSystem, v_in: FullVector) -> None: sys.apply_diagonal(v_in.dense, dense_buffer1) sys.apply_trace_transpose(v_in.trace, dense_buffer2) sys.apply_trace(v_in.dense, v_in.trace) DenseVector.add(dense_buffer1, dense_buffer2, v_in.dense, 1.0) def wrapped_add(v1: FullVector, v2: FullVector, k: float) -> None: DenseVector.add(v1.dense, v2.dense, v1.dense, k) TraceVector.add(v1.trace, v2.trace, v1.trace, k) def wrapped_sub(v1: FullVector, v2: FullVector, k: float) -> None: DenseVector.subtract(v1.dense, v2.dense, v1.dense, k) TraceVector.subtract(v1.trace, v2.trace, v1.trace, k) solution, tr, itr_cnt = cg_general( system, rhs_full, FullVector.make_empty(system), convergence, wrapped_apply_system, FullVector.dot, wrapped_add, wrapped_sub, FullVector.copy, FullVector.set_from, ) return solution.dense, solution.trace, tr, itr_cnt
[docs] def solve_pcg_iterative( system: LinearSystem, rhs: DenseVector, constraints: TraceVector, convergence: ConvergenceSettings, ) -> tuple[DenseVector, TraceVector, float, int]: """Solve the system using preconditioned CG.""" # Set up the right side rhs_full = FullVector(rhs, constraints) dense_buffer1 = DenseVector(system) dense_buffer2 = DenseVector(system) def wrapped_apply_system( sys: LinearSystem, v_in: FullVector, v_out: FullVector ) -> None: sys.apply_diagonal(v_in.dense, dense_buffer1) sys.apply_trace_transpose(v_in.trace, dense_buffer2) sys.apply_trace(v_in.dense, v_out.trace) DenseVector.add(dense_buffer1, dense_buffer2, v_out.dense, 1.0) def precondition_block_jacobi( sys: LinearSystem, v_in: FullVector, v_out: FullVector ) -> None: # Block Jacobi precondition sys.apply_diagonal_inverse(v_in.dense, v_out.dense) v_out.trace.set_from(v_in.trace) def wrapped_add(v1: FullVector, v2: FullVector, k: float, v_out: FullVector) -> None: DenseVector.add(v1.dense, v2.dense, v_out.dense, k) TraceVector.add(v1.trace, v2.trace, v_out.trace, k) def wrapped_sub(v1: FullVector, v2: FullVector, k: float, v_out: FullVector) -> None: DenseVector.subtract(v1.dense, v2.dense, v_out.dense, k) TraceVector.subtract(v1.trace, v2.trace, v_out.trace, k) solution, tr, itr_cnt = pcg_general( system, rhs_full, FullVector.make_empty(system), convergence, wrapped_apply_system, precondition_block_jacobi, FullVector.dot, wrapped_add, wrapped_sub, FullVector.copy, ) return solution.dense, solution.trace, tr, itr_cnt