Source code for liesel_gam.basis_builder

from __future__ import annotations

import logging
from collections.abc import Callable, Mapping, Sequence
from math import ceil
from typing import Any, Literal, get_args

import formulaic as fo
import jax
import jax.numpy as jnp
import liesel.model as lsl
import numpy as np
import pandas as pd

try:
    # readthedocs safeguard: R is not installed in the readthedocs build environment
    import smoothcon as scon
    from ryp import r, to_py, to_r
except RuntimeError as e:
    import os

    on_rtd = os.environ.get("READTHEDOCS", "False") == "True"
    if on_rtd:
        scon = None
        r = None
        to_py = None
        to_r = None
        pass
    else:
        raise e

from .basis import Basis, LinBasis, MRFBasis, MRFSpec
from .names import NameManager
from .registry import CategoryMapping, PandasRegistry

InferenceTypes = Any

Array = jax.Array
ArrayLike = jax.typing.ArrayLike

BasisTypes = Literal["tp", "ts", "cr", "cs", "cc", "bs", "ps", "cp", "gp"]


logger = logging.getLogger(__name__)


def _validate_bs(bs):
    if isinstance(bs, str):
        bs = [bs]
    allowed = get_args(BasisTypes)
    for bs_str in bs:
        if bs_str not in allowed:
            raise ValueError(f"Allowed values for 'bs' are: {allowed}; got {bs=}.")


def _validate_formula(formula: str) -> None:
    if "~" in formula:
        raise ValueError("'~' in formulas is not supported.")

    terms = ["".join(x.split()) for x in formula.split("+")]
    for term in terms:
        if term == "1":
            raise ValueError(
                "Using '1 +' is not supported. To add an intercept, use the "
                "argument 'include_intercept'."
            )
        if term == "0" or term == "-1":
            raise ValueError(
                "Using '0 +' or '-1' is not supported. Intercepts are not included "
                "by default and can be added manually with the argument "
                "'include_intercept'."
            )


def _validate_penalty_order(penalty_order: int):
    if not isinstance(penalty_order, int):
        raise TypeError(
            f"'penalty_order' must be int or None, got {type(penalty_order)}"
        )
    if not penalty_order > 0:
        raise ValueError(f"'penalty_order' must be >0, got {penalty_order}")


[docs] class BasisBuilder: """ Initializes :class:`.Basis` objects from data in a :class:`.PandasRegistry`. Parameters ---------- registry A pandas registry, giving access to the data. names A name manager for creating unique names. See Also -------- .TermBuilder : Initializes structured additive terms. .Basis : Basic basis class. .LinBasis : Specialized basis for linear effects. .MRFBasis : Specialized basis for Gaussian Markov random fields. Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.ps("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ def __init__( self, registry: PandasRegistry, names: NameManager | None = None ) -> None: self.registry = registry self.mappings: dict[str, CategoryMapping] = {} self.names = NameManager() if names is None else names def __repr__(self) -> str: return f"{type(self).__name__}(data_shape={self.registry.data.shape})" @property def data(self) -> pd.DataFrame: """The dataframe wrapped by this builder's registry.""" return self.registry.data
[docs] def basis( self, *x: str | lsl.Var, basis_fn: Callable[[Array], Array], use_callback: bool = True, cache_basis: bool = True, penalty: ArrayLike | lsl.Value | None = None, basis_name: str = "B", ) -> Basis: """ Initializes a general basis given a basis function. Parameters ---------- *x Names of input variables. basis_fn Basis function. Must take a 2d-array as input and return a 2d array. use_callback If *True*, the basis function is evaluated using a Python callback, which means that it does not have to be jit-compatible via JAX. This also means that the basis must remain constant throughout estimation. Passed on to :class:`.Basis`. cache_basis If ``True`` the computed basis is cached in a persistent calculation node (``lsl.Calc``), which avoids re-computation when not required. Passed on to :class:`.Basis`. penalty Penalty matrix associated with the basis. Passed on to :class:`.Basis`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Examples -------- .. rubric:: Manually specified B-Spline basis >>> from liesel.contrib.splines import basis_matrix, equidistant_knots >>> from liesel.contrib.splines import pspline_penalty >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> knots = equidistant_knots(df["x_nonlin"].to_numpy(), n_param=20) >>> pen = pspline_penalty(d=20) The basis function should always expect a matrix-valued array as an input. >>> def bspline_basis(x_mat): ... # x_mat is shape (n, 1) ... x_vec = x_mat.squeeze() # shape (n,) ... return basis_matrix(x_vec, knots=knots) >>> bb.basis("x_nonlin", basis_fn=bspline_basis, penalty=pen) Basis(name="B(x_nonlin)") .. rubric:: Manually specified linear basis This is a minimal example for how a basis as a function of multiple variables works. >>> import jax.numpy as jnp >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> def linear_basis(x_mat): ... # x_mat is shape (n, 2) ... basis_mat = jnp.column_stack((jnp.ones(df.shape[0]), x_mat)) ... return basis_mat >>> basis = bb.basis("x_nonlin", "x_lin", basis_fn=linear_basis) >>> basis Basis(name="B(x_nonlin,x_lin)") >>> basis.value.shape (100, 3) """ if isinstance(penalty, lsl.Value): penalty.value = jnp.asarray(penalty.value) elif penalty is not None: penalty = jnp.asarray(penalty) x_vars = [] x_names = [] for x_name in x: x_var = self._get_var_and_value(x_name)[0] x_names.append(x_var.name) x_vars.append(x_var) Xname = self.registry.prefix + ",".join(x_names) Xvar = lsl.TransientCalc( lambda *x: jnp.column_stack(x), *x_vars, _name=Xname, ) basis = Basis( value=Xvar, basis_fn=basis_fn, name=self.names.create(basis_name + "(" + Xname + ")"), use_callback=use_callback, cache_basis=cache_basis, penalty=penalty, ) return basis
def _get_var_and_value(self, x: str | lsl.Var) -> tuple[lsl.Var, jax.Array]: if isinstance(x, str): x_array = jnp.asarray(self.registry.data[x].to_numpy()) x_var = self.registry.get_numeric_obs(x) elif isinstance(x, lsl.Var): if not x.name: raise ValueError("If you supply a variable for 'x', it must be named.") x_array = jnp.asarray(x.value) x_var = x else: raise TypeError(f"Type {type(x)} not supported for 'x'.") return x_var, x_array
[docs] def ps( self, x: str | lsl.Var, *, k: int, basis_degree: int = 3, penalty_order: int = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ B-spline basis with a discrete (P-spline) penalty matrix. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. basis_degree Degree of the polynomials used in the B-spline basis function. Default is 3 for cubic B-splines. penalty_order Order of the penalty. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. The number of knots must be ``k + basis_degree + 1``, and for the observed data, it must be true that ``knots[basis_degree] < min(x)`` and ``max(x) < knots[-basis_degree]``. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. References ---------- - Lang, S., & Brezger, A. (2004). Bayesian P-splines. Journal of Computational and Graphical Statistics, 13(1), 183–212. https://doi.org/10.1198/1061860043010 - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.ps("x_nonlin", k=20) Basis(name="B(x_nonlin)") The default is a constrained basis: >>> bb.ps("x_nonlin", k=20).value.shape (100, 19) The constraint can be turned off by passing ``absorb_cons=False``: >>> bb.ps("x_nonlin", k=20, absorb_cons=False).value.shape (100, 20) """ _validate_penalty_order(penalty_order) if knots is not None: knots = np.asarray(knots) x_var, x_array = self._get_var_and_value(x) spec = ( f"s({x_var.name}, bs='ps', k={k}, m=c({basis_degree - 1}, {penalty_order}))" ) smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def cr( self, x: str | lsl.Var, *, k: int, penalty_order: int = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Cubic regression spline basis and penalty matrix. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. penalty_order Order of the penalty. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. See Also -------- .cs : Cubic regression splines with additinal shrinkage on the null space. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples --------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.cr("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ _validate_penalty_order(penalty_order) if knots is not None: knots = np.asarray(knots) x_var, x_array = self._get_var_and_value(x) spec = f"s({x_var.name}, bs='cr', k={k}, m=c({penalty_order}))" smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def cs( self, x: str | lsl.Var, *, k: int, penalty_order: int = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Cubic regression spline basis and penalty matrix with null space penalty. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. penalty_order Order of the penalty. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples --------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.cs("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ _validate_penalty_order(penalty_order) if knots is not None: knots = np.asarray(knots) x_var, x_array = self._get_var_and_value(x) spec = f"s({x_var.name}, bs='cs', k={k}, m=c({penalty_order}))" smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def cc( self, x: str | lsl.Var, *, k: int, penalty_order: int = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Cyclic cubic regression spline basis and penalty matrix. Basis for a penalized cubic regression spline whose ends match, up to second derivative. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. penalty_order Order of the penalty. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. Cyclicity is enforced by matching the function and its derivatives at the domain boundaries. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples --------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.cc("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ _validate_penalty_order(penalty_order) if knots is not None: knots = np.asarray(knots) x_var, x_array = self._get_var_and_value(x) spec = f"s({x_var.name}, bs='cc', k={k}, m=c({penalty_order}))" smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def bs( self, x: str | lsl.Var, *, k: int, basis_degree: int = 3, penalty_order: int | Sequence[int] = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ B-spline basis with integrated squared derivative penalties. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. basis_degree Degree of the polynomials used in the B-spline basis function. Default is 3 for cubic B-splines. penalty_order Order of the penalty. If this is a sequence of integers, a penalty of the integer's order is added for each entry in the sequence. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples --------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.bs("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ if knots is not None: knots = np.asarray(knots) if isinstance(penalty_order, int): _validate_penalty_order(penalty_order) penalty_order_seq: Sequence[str] = [str(penalty_order)] else: [_validate_penalty_order(p) for p in penalty_order] penalty_order_seq = [str(p) for p in penalty_order] x_var, x_array = self._get_var_and_value(x) spec = ( f"s({x_var.name}, bs='bs', k={k}, " f"m=c({basis_degree}, {', '.join(penalty_order_seq)}))" ) smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def cp( self, x: str | lsl.Var, *, k: int, basis_degree: int = 3, penalty_order: int = 2, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Cyclic P-spline basis and penalty matrix. Parameters ---------- x Name of input variable. k Number of (unconstrained) bases. basis_degree Degree of the polynomials used in the B-spline basis function. Default is 3 for cubic B-splines. penalty_order Order of the penalty. knots Knots used to set up the basis. If ``None`` (default), a set of equidistant knots will be set up automatically, with the domain boundaries inferred from the minimum and maximum of the observed values. The number of knots must be ``k + basis_degree + 1``, and for the observed data, it must be true that ``knots[basis_degree] < min(x)`` and ``max(x) < knots[-basis_degree]``. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Lang, S., & Brezger, A. (2004). Bayesian P-splines. Journal of Computational and Graphical Statistics, 13(1), 183–212. https://doi.org/10.1198/1061860043010 - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.cp("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ _validate_penalty_order(penalty_order) if knots is not None: knots = np.asarray(knots) x_var, x_array = self._get_var_and_value(x) spec = ( f"s({x_var.name}, bs='cp', k={k}, m=c({basis_degree - 1}, {penalty_order}))" ) smooth = scon.SmoothCon( spec, data={x_var.name: x_array}, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) basis = Basis( x_var, name=self.names.create(basis_name + "(" + x_var.name + ")"), basis_fn=lambda x_: jnp.asarray(smooth.predict({x_var.name: x_})), penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
def _s( self, *x: str | lsl.Var, k: int, bs: BasisTypes, m: str = "NA", knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: if knots is not None: knots = np.asarray(knots) _validate_bs(bs) bs_arg = f"'{bs}'" obs_vars = {} for xname in x: xvar: lsl.Var | lsl.TransientCalc = self._get_var_and_value(xname)[0] obs_vars[xvar.name] = xvar obs_values = {k: np.asarray(v.value) for k, v in obs_vars.items()} obs_names = [v.name for v in obs_vars.values()] spec = f"s({','.join(obs_names)}, bs={bs_arg}, k={k}, m={m})" smooth = scon.SmoothCon( spec, data=pd.DataFrame.from_dict(obs_values), knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, ) xname = ",".join([v.name for v in obs_vars.values()]) if len(obs_vars) > 1: xvar = lsl.TransientCalc( # for memory-efficiency lambda *args: jnp.vstack(args).T, *list(obs_vars.values()), _name=self.names.create(xname), ) else: xvar = obs_vars[xname] def basis_fn(x): df = pd.DataFrame(x, columns=list(obs_vars)) return jnp.asarray(smooth.predict(df)) basis = Basis( xvar, name=self.names.create(basis_name + "(" + xname + ")"), basis_fn=basis_fn, penalty=smooth.penalty, use_callback=True, cache_basis=True, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" return basis
[docs] def tp( self, *x: str | lsl.Var, k: int, penalty_order: int | None = None, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", remove_null_space_completely: bool = False, ) -> Basis: """ Thin plate spline basis and penalty matrix. Parameters ---------- *x Names of input variables (one or more). k Number of (unconstrained) bases. penalty_order Order of the penalty. Quote from mgcv: "The default is to set this to the smallest value satisfying ``2*penalty_order > d+1`` where ``d`` is the number of covariates of the term." knots Knots used to set up the basis. If ``None`` (default), a set knots will be set up automatically. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. remove_null_space_completely If ``True``, the unpenalized part of the smooth, corresponding to the null space of the penalty matrix, is removed completely. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2003) Thin-plate regression splines. Journal of the Royal Statistical Society (B) 65(1):95-114. - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.tp("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ d = len(x) m_args = [] if penalty_order is None: penalty_order_default = ceil((d + 1) / 2) i = 0 while not 2 * penalty_order_default > (d + 1) and i < 20: penalty_order_default += 1 i += 1 m_args.append(str(penalty_order_default)) else: _validate_penalty_order(penalty_order) m_args.append(str(penalty_order)) if remove_null_space_completely: m_args.append("0") m_str = "c(" + ", ".join(m_args) + ")" basis = self._s( *x, k=k, bs="tp", m=m_str, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, basis_name=basis_name, ) return basis
[docs] def ts( self, *x: str | lsl.Var, k: int, penalty_order: int | None = None, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Thin plate spline basis and penalty matrix with null space penalty. Parameters ---------- *x Names of input variables (one or more). k Number of (unconstrained) bases. penalty_order Order of the penalty. Quote from mgcv: "The default is to set this to the smallest value satisfying ``2*penalty_order > d+1`` where ``d`` is the number of covariates of the term." knots Knots used to set up the basis. If ``None`` (default), a set knots will be set up automatically. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Wood, S.N. (2003) Thin-plate regression splines. Journal of the Royal Statistical Society (B) 65(1):95-114. - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.ts("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ d = len(x) m_args = [] if not penalty_order: m_args.append(str(ceil((d + 1) / 2))) else: _validate_penalty_order(penalty_order) m_args.append(str(penalty_order)) m_str = "c(" + ", ".join(m_args) + ")" basis = self._s( *x, k=k, bs="ts", m=m_str, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, basis_name=basis_name, ) return basis
[docs] def kriging( self, *x: str | lsl.Var, k: int, kernel_name: Literal[ "spherical", "power_exponential", "matern1.5", "matern2.5", "matern3.5", ] = "matern1.5", linear_trend: bool = True, range: float | None = None, power_exponential_power: float = 1.0, knots: ArrayLike | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> Basis: """ Gaussian process models with a fixed range parameter in a basis-penalty-parameterization, often referred to as Kriging. Parameters ---------- *x Name of input variables (one or more). k Number of (unconstrained) bases. kernel_name Selects the kernel / covariance function to use. linear_trend Whether to include or remove a linear trend. range Range parameter. If ``None``, estimated as in Kamman & Wand (2003). power_exponential_power Power for the power exponential kernel. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. References ---------- - Kammann, E. E. and M.P. Wand (2003) Geoadditive Models. Applied Statistics 52(1):1-18. - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.kriging("x_nonlin", k=20) Basis(name="B(x_nonlin)") """ m_kernel_dict = { "spherical": 1, "power_exponential": 2, "matern1.5": 3, "matern2.5": 4, "matern3.5": 5, } m_linear = 1.0 if linear_trend else -1.0 m_args = [] m_kernel = str(int(m_linear * m_kernel_dict[kernel_name])) m_args.append(m_kernel) if range: m_range = str(range) m_args.append(m_range) if power_exponential_power: if not range: m_args.append(str(-1.0)) if not 0.0 < power_exponential_power <= 2.0: raise ValueError( "'power_exponential_power' must be in (0, 2.0], " f"got {power_exponential_power}" ) m_args.append(str(power_exponential_power)) m_str = "c(" + ", ".join(m_args) + ")" basis = self._s( *x, k=k, bs="gp", m=m_str, knots=knots, absorb_cons=absorb_cons, diagonal_penalty=diagonal_penalty, scale_penalty=scale_penalty, basis_name=basis_name, ) return basis
[docs] def lin( self, formula: str, xname: str = "", basis_name: str = "X", include_intercept: bool = False, context: dict[str, Any] | None = None, ) -> LinBasis: """ Linear design matrix without penalty. Parameters ---------- formula Right-hand side of a model formula, as understood by formulaic_. Most of formulaic's grammar_ is supported. See notes for details. xname If provided, the design matrix will be named ``{basis_name}({xname})``, for example ``B(x)``, is ``basis_name="B"`` and ``xname="x"``. basis_name Name of the basis variable. include_intercept Whether to include an intercept column in the basis. context Dictionary of additional Python objects that should be made available to formulaic when constructing the design matrix. Gets passed to ``formulaic.ModelSpec.get_model_matrix()``. Notes ----- The following formulaic syntax is supported: - ``+`` for adding a term - ``a:b`` for simple interactions - ``a*b`` for expanding to ``a + b + a:b`` - ``(a + b)**n`` for n-th order interactions - ``a / b`` for nesting - ``C(a, ...)`` for categorical effects - ``b %in% a`` for inverted nesting - ``{a+1}`` for quoted Python code to be executed - ```weird name``` backtick-strings for weird names - Other transformations like ``center(a)``, ``scale(a)``, or ``lag(a)``, see grammar_. - Python functions Not supported: - String literals - Numeric literals - Wildcard ``"."`` - ``\\|`` for splitting a formula - ``"~"`` in formula, since this method supports only the right-hand side of a Wilkinson formula. - ``1 +``, ``0 +``, or ``-1`` in formula, since intercept addition is handled via the argument ``include_intercept``. References ---------- - Python library formulaic: https://matthewwardrop.github.io/formulaic/latest/ Examples -------- Simple example: >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.lin("x_lin + x_nonlin + x_cat") LinBasis(name="X") Customized categorical encoding: >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.lin("x_lin + x_nonlin + C(x_cat, contr.sum)") LinBasis(name="X") Interaction: >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.lin("x_lin * x_cat") LinBasis(name="X") .. _formulaic: https://matthewwardrop.github.io/formulaic/latest/ .. _grammar: https://matthewwardrop.github.io/formulaic/latest/guides/grammar/ """ _validate_formula(formula) spec = fo.ModelSpec(formula, output="numpy") # evaluate model matrix once to get a spec with structure information # also necessary to populate spec with the correct information for # transformations like center, scale, standardize try: spec = spec.get_model_matrix(self.data, context=context).model_spec except Exception as e: raise RuntimeError( "Could not build model matrix. This could be caused by " "unsupported data dtypes like dates. Please check your input data. " "Also check the original error message, included above." ) from e # get column names. There may be a more efficient way to do it # that does not require building the model matrix a second time, but this # works robustly for now: we take the names that formulaic creates column_names = list( fo.ModelSpec(formula, output="pandas") .get_model_matrix(self.data, context=context) .columns )[1:] required = sorted(str(var) for var in spec.required_variables) df_subset = self.data.loc[:, required] df_colnames = df_subset.columns variables = dict() mappings = {} for col in df_colnames: result = self.registry.get_obs_and_mapping(col) variables[col] = result.var if result.mapping is not None: self.mappings[col] = result.mapping mappings[col] = result.mapping xvar = lsl.TransientCalc( # for memory-efficiency lambda *args: jnp.vstack(args).T, *list(variables.values()), _name=self.names.create(xname) if xname else xname, ) def basis_fn(x): df = pd.DataFrame(x, columns=df_colnames) # for categorical variables: convert integer representation back to # labels for col in df_colnames: if col in self.mappings: integers = df[col].to_numpy() df[col] = self.mappings[col].integers_to_labels(integers) basis = np.asarray(spec.get_model_matrix(df, context=context)) if not include_intercept: basis = basis[:, 1:] return jnp.asarray(basis, dtype=float) if xname: bname = self.names.create(basis_name + "(" + xvar.name + ")") else: bname = self.names.create(basis_name) basis = LinBasis( xvar, basis_fn=basis_fn, use_callback=True, cache_basis=True, name=bname, penalty=None, ) basis.model_spec = spec basis.mappings = mappings basis.column_names = column_names return basis
[docs] def ri( self, cluster: str, basis_name: str = "B", penalty: ArrayLike | None = None, ) -> Basis: """ Random intercept basis. Parameters ---------- cluster Name of the cluster variable. basis_name Name of the basis variable. penalty Custom penalty matrix to use. Default is an iid penalty. Notes ------ If the penalty is iid, then each column of the basis consists only of binary (0/1) entries, and each row has only one non-zero entry. In this case it is not necessary to store the full matrix in memory and evaluate the term as a dot product ``basis @ coef``. Instead, we can simply store a 1d array of indices, identifying the nonzero column for each row of the basis matrix, and use this index to access the corresponding coefficient. This scenario is common for independent random intercepts. This method returns such a sparse representation of the random intercept basis if ``penalty=None``. Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> bb.ri("x_cat") Basis(name="B(x_cat)") """ if penalty is not None: penalty = jnp.asarray(penalty) result = self.registry.get_obs_and_mapping(cluster) if not result.is_categorical: raise TypeError(f"{cluster=} must be categorical.") if result.mapping is not None: self.mappings[cluster] = result.mapping basis = Basis( value=result.var, basis_fn=lambda x: x, name=self.names.create(basis_name + "(" + cluster + ")"), use_callback=False, cache_basis=False, penalty=jnp.asarray(penalty) if penalty is not None else penalty, ) return basis
[docs] def mrf( self, x: str, k: int = -1, polys: dict[str, ArrayLike] | None = None, nb: Mapping[str, ArrayLike | list[str] | list[int]] | None = None, penalty: ArrayLike | None = None, penalty_labels: Sequence[str] | None = None, absorb_cons: bool = True, diagonal_penalty: bool = True, scale_penalty: bool = True, basis_name: str = "B", ) -> MRFBasis: """ Gaussian Markov random field basis and penalty. The preferred way to initialize these is by supplying ``polys``, because this enables plotting via :func:`.plot_regions`. Parameters ---------- x Name of the region variable. k If ``-1``, this is a "full-rank" (up to identifiability constraint) Markov random field. If ``k`` is an integer smaller than the number of unique regions, a low-rank field will be returned, see Wood (2017), Sections 5.8.1 and 5.4.2. polys Dictionary of arrays. The keys of the dict are the region labels. The corresponding values define the region by defining polygons. The neighborhood structure can be inferred from this polygon information. nb Dictionary of array. The keys of the dict are the region labels. The corresponding values indicate the neighbors of the region. If the values are lists or arrays of strings, the values are the labels of the neighbors. If they are lists or arrays of integers, the values are the indices of the neighbors. Indices correspond to regions based on an alphabetical ordering of regions. penalty If a penalty is supplied explicitly, it takes precedence over a potential penalty derived from both nb and polys. penalty_labels If a penalty is supplied explicitly, labels must also be specified. The labels create the association between penalty columns and region labels. The values of this sequence should be the string labels of unique regions in ``x``. absorb_cons Whether the default identification constraint should be applied by reparameterization and absorbing the reparameterization matrix into the basis and penalty matrices for computational efficiency. If ``False``, the basis is unconstrained, if ``True`` it receives a sum to zero constrained. Also see :meth:`.Basis.constrain`. diagonal_penalty Whether the penalty matrix associated with this term should be reparameterized into a diagonal matrix. In this case, the basis matrix is reparameterized accordingly. This can be beneficial for posterior geometry, which is why it is the default. Also see :meth:`.Basis.diagonalize_penalty`. scale_penalty Whether the penalty matrix should be scaled such that its infinity norm is one. This can improve numerical stability, which is why it is done by default. Also see :meth:`.Basis.scale_penalty`. basis_name Function-name for the basis matrix. If ``"B"``, and the basis is a function of the variable ``"x"``, the full name of the :class:`.Basis` object will be ``"B(x)"``. Names are made unique by appending a counter if necessary. See Also -------- .plot_regions : Plots MCMC results on a map of the regions. .plot_polys : Plots a map based on polygons. .plot_forest : Plots regions with uncertainty in a forest plot. Notes ----- This basis is initialized with ``use_callback=True`` and ``cache_basis=True``. See :class:`.Basis` for details. This method internally calls the R package mgcv to set up the basis and penalty. The mgcv documentation provides further details. Returns ------- Comments on the :class:`.MRFSpec` attached to the returned :class:`.MRFBasis` variable: - If either polys or nb are supplied, the returned MRFSpec will contain nb. - If only a penalty matrix is supplied, the returned MRFSpec will *not* contain nb. - Returning the label order only makes sense if the basis is *not* reparameterized, because only then we have a clear correspondence of parameters to labels. If the basis is reparameterized, with ``absorb_cons=True`` or of low rank with ``k ≠ -1``, there is no such correspondence in a clear way, so the label order is None. Examples -------- >>> import liesel_gam as gam >>> df = gam.demo_data(n=100) >>> print(df.x_cat.unique().tolist()) ['a', 'b', 'c'] >>> registry = gam.PandasRegistry(df) >>> bb = gam.BasisBuilder(registry) >>> nb = {"a": ["b", "c"], "b": ["a"], "c": ["a"]} >>> bb.mrf("x_cat", nb=nb) MRFBasis(name="B(x_cat)") To inspect the penalty and the dummy-coded basis matrix: >>> basis = bb.mrf( ... "x_cat", ... nb=nb, ... absorb_cons=False, ... diagonal_penalty=False, ... scale_penalty=False, ... ) >>> basis.penalty.value Array([[ 2., -1., -1.], [-1., 1., 0.], [-1., 0., 1.]], dtype=float32) >>> basis.value[:5, ...] Array([[1., 0., 0.], [0., 1., 0.], [1., 0., 0.], [0., 1., 0.], [0., 0., 1.]], dtype=float32) >>> basis.mrf_spec.ordered_labels ['a', 'b', 'c'] References ---------- - Wood, S.N. (2017) Generalized Additive Models: An Introduction with R (2nd edition). Chapman and Hall/CRC. - R package mgcv https://cran.r-project.org/web/packages/mgcv/index.html """ if not isinstance(k, int): raise TypeError(f"'k' must be int, got {type(k)}.") if k < -1: raise ValueError(f"'k' cannot be smaller than -1, got {k=}.") if polys is None and nb is None and penalty is None: raise ValueError("At least one of polys, nb, or penalty must be provided.") var, mapping = self.registry.get_categorical_obs(x) self.mappings[x] = mapping labels = set(list(mapping.labels_to_integers_map)) if penalty is not None: if penalty_labels is None: raise ValueError( "If 'penalty' is supplied, 'penalty_labels' must also be supplied." ) if len(penalty_labels) != len(labels): raise ValueError( f"Variable {x} has {len(labels)} unique entries, but " f"'penalty_labels' has {len(penalty_labels)}. Both must match." ) xt_args = [] pass_to_r: dict[str, np.typing.NDArray | dict[str, np.typing.NDArray]] = {} if polys is not None: xt_args.append("polys=polys") if not labels == set(list(polys)): raise ValueError( "Names in 'polys' must correspond to the levels of 'x'." ) pass_to_r["polys"] = {key: np.asarray(val) for key, val in polys.items()} if nb is not None: xt_args.append("nb=nb") if not labels == set(list(nb)): raise ValueError("Names in 'nb' must correspond to the levels of 'x'.") nb_processed = {} for key, val in nb.items(): val_arr = np.asarray(val) if val_arr.ndim != 1: raise ValueError( f"Expected 1d arrays in 'nb', got {val_arr.ndim=} for {key}." ) if np.isdtype(val_arr.dtype, np.dtype("int")): # add one to convert to 1-based indexing for R # and cast to float for R val_arr = np.astype(val_arr + 1, float) # val_arr = np.astype(val_arr, float) elif np.isdtype(val_arr.dtype, np.dtype("float")): # add one to convert to 1-based indexing for R val_arr = np.astype(np.astype(val_arr, int) + 1, float) elif val_arr.dtype.kind == "U": # must be unicode strings then pass else: raise TypeError(f"Unsupported dtype: {val_arr.dtype!r}") nb_processed[key] = val_arr pass_to_r["nb"] = nb_processed if penalty is not None: penalty = np.asarray(penalty) pen_rank = np.linalg.matrix_rank(penalty) pen_dim = penalty.shape[-1] if (pen_dim - pen_rank) != 1: logger.warning( f"Supplied penalty has dimension {penalty.shape} and rank " f"{pen_rank}. The expected rank deficiency is 1. " "This may indicate a problem. There might be disconnected sets " "of regions in the data represented by this penalty. " "In this case, you probably need more elaborate constraints " "than the ones provided here. You might consider splitting the " "disconnected regions into several mrf terms. " "Otherwise, please only continue if you are certain that you " "know what is happening." ) xt_args.append("penalty=penalty") if not np.shape(penalty)[0] == np.shape(penalty)[1]: raise ValueError(f"Penalty must be square, got {np.shape(penalty)=}") if not np.shape(penalty)[1] == len(labels): raise ValueError( "Dimensions of 'penalty' must correspond to the levels of 'x'." ) pass_to_r["penalty"] = penalty if "nb" in pass_to_r and "penalty" in pass_to_r: logger.warning( "Both 'nb' and 'penalty' were supplied. 'penalty' will be used to " "setup this basis." ) if "polys" in pass_to_r and "penalty" in pass_to_r: logger.warning( "Both 'polys' and 'penalty' were supplied. 'penalty' will be used " "to setup this basis." ) xt = "list(" xt += ",".join(xt_args) xt += ")" if penalty is not None: # removing penalty from the pass_to_r dict, because we are giving it # special treatment here. # specifically, we have to equip it with row and column names to make # sure that penalty entries get correctly matched to clusters by mgcv penalty_prelim_arr = np.asarray(pass_to_r.pop("penalty")) to_r(penalty_prelim_arr, "penalty") to_r(np.array(penalty_labels), "penalty_labels") r("colnames(penalty) <- penalty_labels") r("rownames(penalty) <- penalty_labels") spec = f"s({x}, k={k}, bs='mrf', xt={xt})" observed = mapping.integers_to_labels(var.value) regions = list(mapping.labels_to_integers_map) df = pd.DataFrame({x: pd.Categorical(observed, categories=regions)}) smooth = scon.SmoothCon( spec, data=df, diagonal_penalty=diagonal_penalty, absorb_cons=absorb_cons, scale_penalty=scale_penalty, pass_to_r=pass_to_r, ) x_name = x def basis_fun(x): """ The array outputted by this smooth contains column names. Here, we remove these column names and convert to jax. """ # disabling warnings about "mrf should be a factor" r("old_warn <- getOption('warn')") r("options(warn = -1)") labels = mapping.integers_to_labels(x) df = pd.DataFrame({x_name: pd.Categorical(labels, categories=regions)}) basis = jnp.asarray(np.astype(smooth.predict(df)[:, 1:], "float")) r("options(warn = old_warn)") return basis smooth_penalty = smooth.penalty if np.shape(smooth_penalty)[1] > len(labels): smooth_penalty = smooth_penalty[:, 1:] elif np.shape(smooth_penalty)[0] < np.shape(smooth_penalty)[1]: smooth_penalty = smooth_penalty[:, 1:] try: penalty_arr = jnp.asarray(np.astype(smooth_penalty, "float")) except ValueError: penalty_arr = jnp.asarray(np.astype(smooth_penalty[:, 1:], "float")) basis = MRFBasis( value=var, basis_fn=basis_fun, name=self.names.create(basis_name + "(" + x + ")"), cache_basis=True, use_callback=True, penalty=penalty_arr, ) if absorb_cons: basis._constraint = "absorbed_via_mgcv" try: nb_out = to_py(f"{smooth._smooth_r_name}[[1]]$xt$nb", format="numpy") except TypeError: nb_out = None # nb_out = {key: np.astype(val, "int") for key, val in nb_out.items()} if absorb_cons: label_order = None else: label_order = list( to_py(f"{smooth._smooth_r_name}[[1]]$X", format="pandas").columns ) label_order = [lab[1:] for lab in label_order] # removes leading x from R if nb_out is not None: def to_label(code): try: label_array = mapping.integers_to_labels(code - 1) except TypeError: label_array = code return np.atleast_1d(label_array).tolist() nb_out = {k: to_label(v) for k, v in nb_out.items()} basis.mrf_spec = MRFSpec(mapping, nb_out, label_order, polys) return basis