"""Conversion and evaluation of kforms as a stack machine.
This module is concerned with converting expressions from the representation of the
:mod:`mfv2d.kform` module into a sequence of instructions which can be executed on a
stack machine.
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from enum import IntEnum
from mfv2d.kform import (
Function2D,
KBoundaryProjection,
KElementProjection,
KForm,
KFormDerivative,
KFormUnknown,
KInnerProduct,
KInteriorProduct,
KInteriorProductLowered,
KSum,
KWeight,
UnknownFormOrder,
extract_base_form,
)
from mfv2d.system import KFormSystem
[docs]
@dataclass(frozen=True)
class MatOp:
"""Matrix operations which can be created.
This is just a base class for all other matrix operations.
"""
[docs]
@dataclass(frozen=True)
class Identity(MatOp):
"""Identity operation.
Do nothing to the matrix.
"""
[docs]
@dataclass(frozen=True)
class MassMat(MatOp):
"""Mass matrix multiplication.
Multiply by either mass matrix or its inverse.
Parameters
----------
order : UnknownFormOrder
Order of the k-form for the mass matrix.
inv : bool
Should the matrix be inverted.
"""
order: UnknownFormOrder
inv: bool
[docs]
@dataclass(frozen=True)
class Incidence(MatOp):
"""Incidence matrix.
Specifies application of an incidence matrix.
Parameters
----------
begin : UnknownFormOrder
Order of the k-form for the incidence matrix from which to apply it.
transpose : bool
Should the incidence matrix be transposed.
"""
begin: UnknownFormOrder
transpose: int
[docs]
@dataclass(frozen=True)
class Push(MatOp):
"""Push the matrix on the stack.
Used for matrix multiplication and summation.
"""
[docs]
@dataclass(frozen=True)
class Scale(MatOp):
"""Scale the matrix.
Mutliply the entire matrix with a constant.
Parameters
----------
k : float
Value of the constant by which to scale the matrix.
"""
k: float
[docs]
@dataclass(frozen=True)
class Sum(MatOp):
"""Sum matrices together.
Sum the top ``count`` matrices on the stack with the current matrix.
Parameters
----------
count : int
Number of matrices to sum to the current matrix. As such must be greater
than zero.
"""
count: int
[docs]
@dataclass(frozen=True)
class InterProd(MatOp):
"""Compute interior product.
This is the most complicated operation.
Parameters
----------
starting_order : UnknownFormOrder
Order of the k-form to which the interior product should be applied.
field : str or Function2D
Index of the vector/scalar field from which the values of are taken.
transpose : bool
Should the matrix be transposed.
"""
starting_order: UnknownFormOrder
field: str | Function2D
transpose: bool
[docs]
def simplify_expression(*operations: MatOp) -> list[MatOp]:
"""Simplify expressions as much as possible.
Tries to merge operations that can be combined, and removes/simplifies
as much as possible.
Parameters
----------
*operations : MatOp
Sequence of operations to simplify.
Returns
-------
list of MatOp
Simplified sequence of operations, which should be equivalent to the
input.
"""
ops = list(operations)
nops = len(ops)
initial_nops = 0
while initial_nops != nops:
i = 0
initial_nops = int(nops)
r: Identity | Scale
while i < nops:
if (
nops - i >= 2
and type(ops[i]) is Identity
and (type(ops[i + 1]) is not Sum and type(ops[i + 1]) is not Push)
and (i != 0 or type(ops[i - 1]) is not Push)
):
# Identity on anything except on sum or push is a no-op
del ops[i]
nops -= 1
continue
if nops - i >= 2 and (
type(ops[i]) is MassMat and type(ops[i + 1]) is MassMat
):
# Mass matrix and its inverse result in identity
m1 = ops[i]
m2 = ops[i + 1]
assert type(m1) is MassMat and type(m2) is MassMat
if m1.order == m2.order and m1.inv != m2.inv:
del ops[i + 1]
ops[i] = Identity()
nops -= 1
continue
if nops - i >= 2 and (
type(ops[i]) is Identity
and (
type(ops[i + 1]) is MassMat
or type(ops[i + 1]) is Incidence
or type(ops[i + 1]) is Push
or type(ops[i + 1]) is Scale
or type(ops[i + 1]) is InterProd
)
):
# Identity does nothing to these
del ops[i]
nops -= 1
continue
if nops - i >= 2 and (
(type(ops[i]) is Scale or type(ops[i]) is Identity)
and (type(ops[i + 1]) is Scale or type(ops[i + 1]) is Identity)
):
# Merge identities/scaling
v1 = ops[i]
v2 = ops[i + 1]
if type(v1) is Identity:
if type(v2) is Identity:
r = Identity()
else:
assert type(v2) is Scale
r = v2
else:
assert type(v1) is Scale
if type(v2) is Identity:
r = v1
else:
assert type(v2) is Scale
r = Scale(v1.k * v2.k)
del ops[i + 1]
ops[i] = r
nops -= 1
continue
if nops - i >= 1 and (type(ops[i]) is Sum):
# Sum of zero stack values is no-op
sop = ops[i]
assert type(sop) is Sum
if sop.count == 0:
del ops[i]
nops -= 1
continue
if nops - i >= 5 and (
type(ops[i]) is Push
and (type(ops[i + 1]) is Scale or type(ops[i + 1]) is Identity)
and type(ops[i + 2]) is Push
and (type(ops[i + 3]) is Scale or type(ops[i + 3]) is Identity)
and type(ops[i + 4]) is Sum
):
# Sum of two pure identity/scale matrices can be pre-computed
v1 = ops[i + 1]
v2 = ops[i + 3]
if type(v1) is Identity:
if type(v2) is Identity:
r = Scale(2)
else:
assert type(v2) is Scale
r = Scale(v2.k + 1)
else:
assert type(v1) is Scale
if type(v2) is Identity:
r = Scale(v1.k + 1)
else:
assert type(v2) is Scale
r = Scale(v1.k + v2.k)
ops[i + 1] = r
sop = ops[i + 4]
assert type(sop) is Sum
ops[i + 4] = Sum(sop.count - 1)
del ops[i + 3]
del ops[i + 2]
nops -= 2
continue
if i > 1 and (type(ops[i]) is Identity and type(ops[i - 1]) is not Push):
# Identity following anything but push is a no-op
del ops[i]
nops -= 1
else:
i += 1
return ops
[docs]
def translate_implicit_ksum(ks: KSum) -> dict[KFormUnknown, list[MatOp]]:
"""Compute matrix operations on different unknown blocks.
Parameter
---------
ks : KSum
Sum to translate.
Returns
-------
dict of (KFormUnknown, list[MatOp])
Dictionary mapping forms to either a matrix that represents the operation to
perform on them, or ``float``, if it should be multiplication with a constant.
"""
# Get instructions for each InnerProduct
instructions: dict[KFormUnknown, list[list[MatOp]]] = dict()
for k, ip in ks.pairs:
if type(ip) is not KInnerProduct:
raise TypeError("Can only translate implicit terms.")
ops: list[MatOp] = _translate_inner_prod(ip)
if k != 1.0:
ops += [Scale(k)]
base = extract_base_form(ip.unknown_form)
assert type(base) is KFormUnknown
if base not in instructions:
instructions[base] = list()
instructions[base].append(ops)
# Merge instructions as needed
v: dict[KFormUnknown, list[MatOp]] = dict()
for form in instructions:
op_list = instructions[form]
assert form not in v
merged: list[MatOp] = op_list[0]
cnt = len(op_list)
for i in range(cnt - 1):
new_ops = op_list[i + 1]
merged.append(Push())
merged.extend(new_ops)
if cnt > 1:
merged.append(Sum(cnt - 1))
v[form] = simplify_expression(*merged)
return v
[docs]
def explicit_ksum_as_string(ks: KSum) -> str:
"""Make the explicit terms in the KSum into a string.
Parameter
---------
ks : KSum
Sum to translate.
Returns
-------
str
String of explicit terms.
"""
res = str()
for k, ip in ks.pairs:
out = str()
match ip:
case KInnerProduct():
# Not explicit
continue
case KElementProjection() as ep:
if ep.func is None:
continue
out = "E" + ep.label
case KBoundaryProjection() as bp:
if bp.func is None:
continue
out = "B" + bp.label
if k != 1.0:
out = f"{abs(k):g} * {out}"
if k < 0:
out = "- " + out
else:
out = "+ " + out
res = res + " " + out
return res.strip()
[docs]
def _translate_inner_prod(inner: KInnerProduct) -> list[MatOp]:
"""Translate inner product."""
unknown_ops = _translate_form(inner.unknown_form)
weight_ops = _translate_form(inner.weight_form)
# Add mass matrix
unknown_ops.append(MassMat(inner.unknown_form.order, False))
# Add transposed weight ops to unknown ops.
for op in reversed(weight_ops):
match op:
case Identity():
# Ignore
pass
case Incidence() as inc:
unknown_ops.append(Incidence(inc.begin, not inc.transpose))
case MassMat() | Scale():
# Symmetric, so no transpose
unknown_ops.append(op)
case InterProd() as ip:
unknown_ops.append(
InterProd(ip.starting_order, ip.field, not ip.transpose)
)
case _:
raise TypeError("Unexpected type for an inner product instructions.")
if len(unknown_ops) > 1:
# Skip the first identity
return unknown_ops[1:]
return unknown_ops
[docs]
class MatOpCode(IntEnum):
"""Operation codes.
Notes
-----
These values must be kept in sync with the ``matrix_op_t`` enum in the C code,
since that is how Python and C communicate with each other.
"""
INVALID = 0
IDENTITY = 1
MASS = 2
INCIDENCE = 3
PUSH = 4
SCALE = 5
SUM = 6
INTERPROD = 7
_TranslatedInstruction = tuple[MatOpCode | int | float | Function2D | str, ...]
_TranslatedBlock = Sequence[_TranslatedInstruction]
_TranslatedRow = Sequence[_TranslatedBlock | None]
_TranslatedSystem2D = Sequence[_TranslatedRow]
[docs]
def translate_to_c_instructions(*ops: MatOp) -> _TranslatedBlock:
"""Translate the operations into C-compatible values.
This translation is done since the C code can't handle arbitrary Python objects
and instead only deals with integers (or int enums) and floats.
Parameters
----------
*ops : MatOp
Operations to translate.
Returns
-------
list of MatOpCode | int | float
List of translated operations.
"""
out: list[_TranslatedInstruction] = list()
for op in ops:
translated: _TranslatedInstruction
match op:
case Identity():
translated = (MatOpCode.IDENTITY,)
case MassMat():
translated = (MatOpCode.MASS, op.order, op.inv)
case Incidence():
translated = (MatOpCode.INCIDENCE, op.begin, op.transpose)
case Push():
translated = (MatOpCode.PUSH,)
case Scale():
translated = (MatOpCode.SCALE, op.k)
case Sum():
translated = (MatOpCode.SUM, op.count)
case InterProd():
translated = (
MatOpCode.INTERPROD,
op.starting_order,
op.field,
op.transpose,
)
case _:
raise TypeError(f"Unknown instruction type {type(op).__name__}.")
out.append(translated)
return tuple(out)
def translate_system(system: KFormSystem) -> _TranslatedSystem2D:
"""Create the two-dimensional instruction array for the C code to execute."""
bytecodes = [translate_implicit_ksum(eq.left) for eq in system.equations]
codes: list[_TranslatedRow] = list()
for bite in bytecodes:
row: list[_TranslatedBlock | None] = list()
for f in system.unknown_forms.iter_forms():
if f in bite:
row.append(translate_to_c_instructions(*bite[f]))
else:
row.append(None)
codes.append(tuple(row))
return tuple(codes)
[docs]
@dataclass(frozen=True, init=False)
class CompiledSystem:
"""System of equations compiled.
This is a convenience class which first compiles the system and splits
it into explicit, linear implicit, and non-linear implicit equations, which
are then further used by different parts of the solver.
Parameters
----------
system : KFormSystem
System to compile.
"""
lhs_codes: _TranslatedSystem2D
"""All left-hand side codes of the equations. When evaluated, this will
produce the full left side of the equation."""
rhs_codes: _TranslatedSystem2D | None
"""If not ``None``, contains the right-hand side codes of the equations."""
linear_codes: _TranslatedSystem2D
"""All left-hand side codes of the equations, which are linear."""
nonlin_codes: _TranslatedSystem2D | None
"""If not ``None``, contains the non-linear codes that can be used
for Newton-Raphson solver."""
@staticmethod
def _compile_system_part(system: KFormSystem, expr: KSum | None) -> _TranslatedRow:
"""Compile an expression and fit it into the row of the expression matrix."""
if expr is None:
return (None,) * len(system.unknown_forms)
bytecodes = translate_implicit_ksum(expr)
row: list[_TranslatedBlock | None] = list()
for f in system.unknown_forms.iter_forms():
if f in bytecodes:
row.append(translate_to_c_instructions(*bytecodes[f]))
else:
row.append(None)
return tuple(row)
def __init__(self, system: KFormSystem) -> None:
# Split the system into the explicit, linear implicit, and non-linear implicit
# explicit: list[tuple[tuple[float, KExplicit], ...] | None] = list()
implicit_rhs: list[KSum | None] = list()
linear_lhs: list[KSum | None] = list()
nonlin_lhs: list[KSum | None] = list()
# Loop over equations
for equation in system.equations:
assert not equation.left.explicit_terms
# rhs_expl = equation.right.explicit_terms
# explicit.append(rhs_expl if rhs_expl else None)
rhs_impl = equation.right.implicit_terms
implicit_rhs.append(KSum(*rhs_impl) if rhs_impl else None)
linear, nonlinear = equation.left.split_terms_linear_nonlinear()
linear_lhs.append(linear)
nonlin_lhs.append(nonlinear)
rhs_codes = tuple(
CompiledSystem._compile_system_part(system, expr) for expr in implicit_rhs
)
object.__setattr__(
self,
"rhs_codes",
rhs_codes
if any(any(e is not None for e in row) for row in rhs_codes)
else None,
)
linear_codes = tuple(
CompiledSystem._compile_system_part(system, expr) for expr in linear_lhs
)
object.__setattr__(self, "linear_codes", linear_codes)
nonlinear_codes = tuple(
CompiledSystem._compile_system_part(system, expr) for expr in nonlin_lhs
)
object.__setattr__(
self,
"nonlin_codes",
nonlinear_codes
if any(any(e is not None for e in row) for row in nonlinear_codes)
else None,
)
object.__setattr__(
self,
"lhs_codes",
tuple(
CompiledSystem._compile_system_part(system, eq.left)
for eq in system.equations
),
)
[docs]
def _translate_codes_to_str(*ops: MatOp) -> str:
"""Translate operation codes to a string."""
out: list[str] = list()
for op in reversed(ops):
match op:
case Identity():
out += "I"
case MassMat() as mm:
base = f"M({mm.order.value - 1})"
if mm.inv:
out.append(f"({base})^{{-1}}")
else:
out.append(base)
case Incidence() as inc:
base = f"E({inc.begin.value}, {inc.begin.value - 1})"
if inc.transpose:
out.append(f"({base})^T")
else:
out.append(base)
case InterProd() as ip:
base = (
f"P({ip.starting_order.value - 2}, {ip.starting_order.value - 1},"
f" {ip.field if type(ip.field) is str else ip.field.__name__})" # type: ignore[union-attr]
)
if ip.transpose:
out.append(f"({base})^T")
else:
out.append(base)
case Scale() as sc:
out.append(str(sc.k))
case _:
raise TypeError(f"Unsupported instruction type {type(op)}.")
return " ".join(out)
[docs]
def _translate_expr_to_str(*ops: MatOp) -> str:
"""Translate operations which may include Sum and Push into a string."""
s = ops[-1]
if type(s) is not Sum:
return _translate_codes_to_str(*ops)
out = str()
begin = 0
c = 0
for i, op in enumerate(ops):
if type(op) is Push:
subsection = _translate_codes_to_str(*ops[begin:i])
out += f"+ ({subsection})"
begin = i + 1
c += 1
assert c == s.count
# Do it for the last one, which is not ended by Push
subsection = _translate_codes_to_str(*ops[begin:-1])
out += f" + ({subsection})"
return out.strip()
[docs]
def system_as_string(system: KFormSystem, /) -> str:
"""Create the string representation of the system."""
left_bytecodes = [translate_implicit_ksum(eq.left) for eq in system.equations]
left_rows = bytecode_matrix_as_rows(system, left_bytecodes)
right_bytecodes = [
(
translate_implicit_ksum(KSum(*eq.right.implicit_terms))
if eq.right.implicit_terms
else dict()
)
for eq in system.equations
]
right_rows = bytecode_matrix_as_rows(system, right_bytecodes)
# Add brackets and unknowns to
unknowns = [str(w.base_form) for w in system.weight_forms]
uw = max([len(u) for u in unknowns])
unknowns = [u.ljust(uw) for u in unknowns]
left_rows = [f"[{row}] [{u}]" for u, row in zip(unknowns, left_rows, strict=True)]
right_rows = [f"[{row}] [{u}]" for u, row in zip(unknowns, right_rows, strict=True)]
explicit_rows = [explicit_ksum_as_string(eq.right) for eq in system.equations]
ew = 0
n = len(explicit_rows)
for i in range(n):
ew = max(ew, len(explicit_rows[i]))
for i in range(n):
if len(explicit_rows[i]) == 0:
explicit_rows[i] = "+ 0"
explicit_rows[i] = "[" + explicit_rows[i].ljust(ew) + "]"
full_rows = [
l_row
+ (" = " if row == n // 2 else " ")
+ r_exp
+ (" + " if row == n // 2 else " ")
+ r_row
for row, (l_row, r_row, r_exp) in enumerate(
zip(left_rows, right_rows, explicit_rows, strict=True)
)
]
return "\n".join(full_rows)
[docs]
def bytecode_matrix_as_rows(
system: KFormSystem, bytecodes: Sequence[Mapping[KFormUnknown, list[MatOp]]]
) -> list[str]:
"""Extract expression rows."""
expression_matrix = [
[
(_translate_expr_to_str(*codes[form]) if form in codes else "0")
for form in system.unknown_forms.iter_forms()
]
for codes in bytecodes
]
n = len(expression_matrix)
for col in range(n):
col_w = 1
for row in range(n):
w = len(expression_matrix[row][col])
col_w = max(w, col_w)
for row in range(n):
expression_matrix[row][col] = expression_matrix[row][col].ljust(col_w)
left_rows = [" | ".join(expr_row) for expr_row in expression_matrix]
return left_rows
if __name__ == "__main__":
vor = KFormUnknown("vor", UnknownFormOrder.FORM_ORDER_0)
vel = KFormUnknown("vel", UnknownFormOrder.FORM_ORDER_1)
pre = KFormUnknown("pre", UnknownFormOrder.FORM_ORDER_2)
w_vor = vor.weight
w_vel = vel.weight
w_pre = pre.weight
def vor_src(x, y):
"""Test vorticit function."""
return 0 * x * y
import numpy as np
def vel_src(x, y):
"""Test velocity function."""
return np.stack((0 * x * y, 0 * x * y), axis=-1)
def pre_src(x, y):
"""Test pressure function."""
return 0 * x * y
system = KFormSystem(
(w_vor @ vor) + (w_vor.derivative @ vel) == w_vor ^ vor_src,
(w_vel @ vor.derivative) + (w_vel.derivative @ pre)
== ((vel * w_vel) @ vor) + ((vel_src * w_vel) @ vor),
(w_pre @ vel.derivative) == w_pre @ pre_src,
)
print(system_as_string(system))