from __future__ import annotations
import warnings
from abc import ABCMeta
from logging import getLogger
from numbers import Real
from typing import Any, Callable, Sequence, final
import attrs
import humps
import lightgbm as lgb
import numpy as np
import xgboost as xgb
from numpy.typing import NDArray
from typing_extensions import Self
LOG = getLogger(__name__)
def _dataset_to_ndarray(
y: NDArray | lgb.Dataset | xgb.DMatrix,
) -> tuple[NDArray, NDArray]:
if isinstance(y, lgb.Dataset):
y_ = y.get_label()
if y_ is None:
raise ValueError("y is None")
weight = y.get_weight()
if weight is None:
weight = np.ones_like(y_)
return y_, weight
if isinstance(y, xgb.DMatrix):
y_ = y.get_label()
weight = y.get_weight()
if weight is None or weight.size == 0:
weight = np.ones_like(y_)
return y_, weight
return y, np.ones_like(y)
def _get_name_from_callable(obj: Callable[..., Any]) -> str:
if hasattr(obj, "__name__"):
return getattr(obj, "__name__")
if hasattr(obj, "__class__") and hasattr(getattr(obj, "__class__"), "__name__"):
return getattr(getattr(obj, "__class__"), "__name__")
raise ValueError(f"Could not get name from callable {obj}")
[docs]class LossBase(metaclass=ABCMeta):
"""Base class for loss functions.
Inherit this class to implement custom loss function.
See Also
--------
Catboost:
https://catboost.ai/en/docs/concepts/python-usages-examples#user-defined-loss-function
LightGBM:
https://lightgbm.readthedocs.io/en/latest/Advanced-Topics.html#custom-objective-function
XGBoost:
https://xgboost.readthedocs.io/en/latest/tutorials/custom_metric_obj.html
Example
-------
>>> from boost_loss.base import LossBase
>>> import numpy as np
>>> from numpy.typing import NDArray
>>>
>>> class L2Loss(LossBase):
>>> def loss(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
>>> return (y_true - y_pred) ** 2
>>> def grad(self, y_true: NDArray, y_pred: NDArray) -> NDArray: # dL/dy_pred
>>> return -2 * (y_true - y_pred) # or (y_pred - y_true)
>>> def hess(self, y_true: NDArray, y_pred: NDArray) -> NDArray: # d2L/dy_pred2
>>> return 2 * np.ones_like(y_true) # or np.ones_like(y_true)
>>>
>>> from boost.sklearn import apply_custom_loss
>>> import lightgbm as lgb
>>> apply_custom_loss(lgb.LGBMRegressor(), L2Loss()).fit(X, y)
"""
is_higher_better: bool = False
"""Whether the result of loss function is better when it is higher."""
[docs] @classmethod
@final
def from_callable(
cls,
loss: Callable[[NDArray, NDArray], NDArray | float],
grad: Callable[[NDArray, NDArray], NDArray],
hess: Callable[[NDArray, NDArray], NDArray],
name: str | None = None,
is_higher_better: bool = False,
) -> type[Self]:
"""Create this class from loss, grad, and hess callables.
Parameters
----------
loss : Callable[[NDArray, NDArray], NDArray | float]
The loss function. If 1-D array is returned,
the mean of array is calculated.
Return 1-D array if possible in order to utilize weights in the dataset
if available.
(y_true, y_pred) -> loss
grad : Callable[[NDArray, NDArray], NDArray]
The 1st order derivative (gradient) of loss w.r.t. y_pred.
(y_true, y_pred) -> grad
hess : Callable[[NDArray, NDArray], NDArray]
The 2nd order derivative (Hessian) of loss w.r.t. y_pred.
(y_true, y_pred) -> hess
name : str | None, optional
The name of loss function.
If None, it tries to infer from loss function, by default None
is_higher_better : bool, optional
Whether the result of loss function is better when it is higher,
by default False
Returns
-------
type[Self]
The subclass of this class.
Raises
------
ValueError
If name is None and it can't infer from loss function.
"""
if name is None:
try:
name = _get_name_from_callable(loss)
except ValueError as e:
raise ValueError(
"Could not infer name from loss function. Please specify name."
) from e
return type(
name,
(cls,),
dict(
loss=staticmethod(loss),
grad=staticmethod(grad),
hess=staticmethod(hess),
is_higher_better=is_higher_better,
),
)
@property
def _grad_hess_sign(self) -> int:
return -1 if self.is_higher_better else 1
def __init_subclass__(cls, **kwargs: Any) -> None:
grad_inherited = cls.grad is not LossBase.grad
hess_inherited = cls.hess is not LossBase.hess
grad_hess_inherited = cls.grad_hess is not LossBase.grad_hess
if grad_inherited and hess_inherited:
pass
elif grad_hess_inherited:
pass
else:
raise TypeError(
f"Can't instantiate abstract class {cls.__name__} "
"with grad_hess or both grad and hess not implemented"
)
super().__init_subclass__(**kwargs)
@property
def name(self) -> str:
"""Name of loss function.
Returns
-------
str
Snake case of class name. e.g. `LogCoshLoss` -> `log_cosh_loss`.
"""
return humps.decamelize(self.__class__.__name__.replace("Loss", ""))
[docs] def grad(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
"""The 1st order derivative (gradient) of loss w.r.t. y_pred.
Parameters
----------
y_true : NDArray
The true target values.
y_pred : NDArray
The predicted target values.
Returns
-------
NDArray
The gradient of loss function. 1-D array with shape (n_samples,).
Raises
------
NotImplementedError
If not implemented.
"""
raise NotImplementedError()
[docs] def hess(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
"""The 2nd order derivative (hessian) of loss w.r.t. y_pred.
Parameters
----------
y_true : NDArray
The true target values.
y_pred : NDArray
The predicted target values.
Returns
-------
NDArray
The hessian of loss function. 1-D array with shape (n_samples,).
Raises
------
NotImplementedError
If not implemented.
"""
raise NotImplementedError()
[docs] def loss(self, y_true: NDArray, y_pred: NDArray) -> NDArray | float:
"""Loss function. If 1-D array is returned, the mean of array is calculated.
Return 1-D array if possible in order to utilize weights in the dataset
if available.
Parameters
----------
y_true : NDArray
The true target values.
y_pred : NDArray
The predicted target values.
Returns
-------
NDArray | float
The loss function. 1-D array with shape (n_samples,) or float.
Raises
------
NotImplementedError
If not implemented.
"""
raise NotImplementedError()
[docs] def grad_hess(self, y_true: NDArray, y_pred: NDArray) -> tuple[NDArray, NDArray]:
"""Gradient and hessian of loss function. Override this method if you want to
calculate both gradient and hessian at the same time.
Parameters
----------
y_true : NDArray
The true target values.
y_pred : NDArray
The predicted target values.
Returns
-------
tuple[NDArray, NDArray]
The gradient and hessian of loss function.
1-D array with shape (n_samples,).
"""
return self.grad(y_true=y_true, y_pred=y_pred), self.hess(
y_true=y_true, y_pred=y_pred
)
def _grad_hess_weighted(
self, y_true: NDArray, y_pred: NDArray, weight: NDArray
) -> tuple[NDArray, NDArray]:
grad, hess = self.grad_hess(y_true=y_true, y_pred=y_pred)
if np.any(hess < 0):
negative_rate = np.mean(hess < 0)
warnings.warn(
f"Found negative hessian in {negative_rate:.2%} samples."
"This may cause convergence issue and cause CatBoostError in CatBoost."
"If LightGBM or XGBoost is used, the estimator will return "
"nonsense values (like all 0s if 100%).",
RuntimeWarning,
)
grad, hess = grad * weight, hess * weight
grad, hess = grad * self._grad_hess_sign, hess * self._grad_hess_sign
return grad, hess
@final
def __call__(
self,
y_true: NDArray | lgb.Dataset | xgb.DMatrix,
y_pred: NDArray | lgb.Dataset | xgb.DMatrix,
) -> tuple[NDArray, NDArray]:
"""Sklearn-compatible interface (Sklearn, LightGBM, XGBoost)"""
if isinstance(y_pred, lgb.Dataset) or isinstance(y_pred, xgb.DMatrix):
# NOTE: swap (it is so fucking that the order is inconsistent)
y_true, y_pred = y_pred, y_true
y_true, weight = _dataset_to_ndarray(y=y_true)
y_pred, _ = _dataset_to_ndarray(y=y_pred)
return self._grad_hess_weighted(y_true=y_true, y_pred=y_pred, weight=weight)
[docs] @final
def calc_ders_range(
self,
preds: Sequence[float],
targets: Sequence[float],
weights: Sequence[float] | None = None,
) -> list[tuple[float, float]]:
"""Catboost-compatible interface"""
y_pred = np.array(preds)
y_true = np.array(targets)
weight = np.array(weights) if weights is not None else np.ones_like(y_pred)
grad, hess = self._grad_hess_weighted(
y_true=y_true, y_pred=y_pred, weight=weight
)
# NOTE: in catboost, the definition of loss is the inverse
grad, hess = -grad, -hess
return list(zip(grad, hess))
[docs] @final
def is_max_optimal(self) -> bool:
"""Catboost-compatible interface"""
return self.is_higher_better
[docs] @final
def evaluate(
self,
approxes: Sequence[float],
target: Sequence[float],
weight: Sequence[float] | None = None,
) -> tuple[float, float]:
"""Catboost-compatible interface"""
approxes_ = np.array(approxes[0])
targets_ = np.array(target)
weights_ = np.array(weight) if weight is not None else np.ones_like(approxes_)
loss = self.loss(y_true=targets_, y_pred=approxes_)
if isinstance(loss, float) and not np.allclose(weights_, 1.0):
warnings.warn("loss() should return ndarray when weight is not all 1.0")
return loss, np.nan
return float(np.sum(loss * weights_)), float(np.sum(weights_))
[docs] @final
def get_final_error(self, error: float, weight: float | None = None) -> float:
"""Catboost-compatible interface"""
return error / (weight + 1e-38) if weight is not None else error
[docs] @final
def eval_metric_lgb(
self,
y_true: NDArray | lgb.Dataset | xgb.DMatrix,
y_pred: NDArray | lgb.Dataset | xgb.DMatrix,
sample_weight: NDArray | lgb.Dataset | xgb.DMatrix | None = None
# not used, exists for eval_metric_xgb_sklearn
) -> tuple[str, float, bool]:
"""LightGBM-compatible interface"""
if isinstance(y_pred, lgb.Dataset) or isinstance(y_pred, xgb.DMatrix):
# NOTE: swap (it is so fucking that the order is inconsistent)
y_true, y_pred = y_pred, y_true
y_true, weight = _dataset_to_ndarray(y=y_true)
if sample_weight is not None:
weight = sample_weight
y_pred, _ = _dataset_to_ndarray(y=y_pred)
loss = self.loss(y_true=y_true, y_pred=y_pred)
if isinstance(loss, float) and not np.allclose(weight, 1.0):
warnings.warn("loss() should return ndarray when weight is not all 1.0")
return self.name, loss, self.is_higher_better
return (
self.name,
float(np.sum(loss * weight) / (np.sum(weight) + 1e-38)),
self.is_higher_better,
)
[docs] @final
def eval_metric_xgb_native(
self,
y_true: NDArray | lgb.Dataset | xgb.DMatrix,
y_pred: NDArray | lgb.Dataset | xgb.DMatrix,
) -> tuple[str, float]:
"""XGBoost-native-api-compatible interface"""
result = self.eval_metric_lgb(y_true=y_true, y_pred=y_pred)
return result[0], result[1]
[docs] @final
def eval_metric_xgb_sklearn(
self,
y_true: NDArray | lgb.Dataset | xgb.DMatrix,
y_pred: NDArray | lgb.Dataset | xgb.DMatrix,
sample_weight: NDArray | lgb.Dataset | xgb.DMatrix | None = None,
) -> float:
"""XGBoost-sklearn-api-compatible interface"""
result = self.eval_metric_lgb(
y_true=y_true, y_pred=y_pred, sample_weight=sample_weight
)
return result[1]
def __add__(self, other: LossBase) -> LossBase:
if not isinstance(other, LossBase):
return NotImplemented # type: ignore
return _LossSum(self, other)
def __sub__(self, other: LossBase) -> LossBase:
return self.__add__(-other)
def __mul__(self, other: float | int | Real) -> LossBase:
if not isinstance(other, Real):
return NotImplemented
return _LossMul(self, other)
def __div__(self, other: float | int | Real) -> LossBase:
return self.__mul__(1.0 / other)
def __radd__(self, other: LossBase) -> LossBase:
return self.__add__(other)
def __rsub__(self, other: LossBase) -> LossBase:
return self.__sub__(other)
def __rmul__(self, other: float | int | Real) -> LossBase:
return self.__mul__(other)
def __rdiv__(self, other: float | int | Real) -> LossBase:
return self.__div__(other)
def __neg__(self) -> LossBase:
return self.__mul__(-1.0)
def __pos__(self) -> Self:
return self
@attrs.define()
class _LossSum(LossBase):
loss1: LossBase
loss2: LossBase
@property
def name(self) -> str:
return self.loss1.name + "+" + self.loss2.name
def loss(self, y_true: NDArray, y_pred: NDArray) -> NDArray | float:
return self.loss1.loss(y_true=y_true, y_pred=y_pred) + self.loss2.loss(
y_true=y_true, y_pred=y_pred
)
def grad(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
return self.loss1.grad(y_true=y_true, y_pred=y_pred) + self.loss2.grad(
y_true=y_true, y_pred=y_pred
)
def hess(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
return self.loss1.hess(y_true=y_true, y_pred=y_pred) + self.loss2.hess(
y_true=y_true, y_pred=y_pred
)
def grad_hess(self, y_true: NDArray, y_pred: NDArray) -> tuple[NDArray, NDArray]:
grad1, hess1 = self.loss1.grad_hess(y_true=y_true, y_pred=y_pred)
grad2, hess2 = self.loss2.grad_hess(y_true=y_true, y_pred=y_pred)
return grad1 + grad2, hess1 + hess2
@attrs.define()
class _LossMul(LossBase):
loss_: LossBase
factor: float | int | Real
@property
def name(self) -> str:
return f"{self.factor}*{self.loss_.name}"
def loss(self, y_true: NDArray, y_pred: NDArray) -> NDArray | float:
return self.factor * self.loss_.loss(y_true=y_true, y_pred=y_pred)
def grad(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
return self.factor * self.loss_.grad(y_true=y_true, y_pred=y_pred)
def hess(self, y_true: NDArray, y_pred: NDArray) -> NDArray:
return self.factor * self.loss_.hess(y_true=y_true, y_pred=y_pred)
def grad_hess(self, y_true: NDArray, y_pred: NDArray) -> tuple[NDArray, NDArray]:
grad, hess = self.loss_.grad_hess(y_true=y_true, y_pred=y_pred)
return self.factor * grad, self.factor * hess