oubliette-commerce

0.2.0

Shared revenue SDK for the Oubliette product suite — config-driven licensing, metering, key issuance, and sale webhooks

License
Unknown license
Published
July 4, 2026
9h ago
Package Registry
README badge Customize →
License Sources
SourceLicenseClass
Licensie (detected)
Pending-
PyPI (reported)
Not reported-

License detection is still in progress for this version.

Loading dependencies…
License File
"""
Oubliette Commerce - License & Metering Layer
=============================================
Soft enforcement of feature gating and usage metering.

Tiers:
- **free**: analyze(), scan_input(), basic session tracking, pre-filter + ML.
- **pro**: Unlocks scan_output, drift_monitor, webhooks, stix_export,
  agent_policy, mcp_guard, tenant_manager, rbac.
- **enterprise**: Everything, no warnings.

License key is a base64-encoded JSON blob with HMAC-SHA256 signature.
"""

from __future__ import annotations

import base64
import datetime
import hashlib
import hmac
import json
import logging
import os
import threading
import time
from typing import Any

log = logging.getLogger(__name__)

# Pro features are PRODUCT-SPECIFIC. The shared package ships an empty default;
# each product passes its own set to LicenseManager(pro_features=...) / FeatureGate.
PRO_FEATURES: frozenset[str] = frozenset()

# Default soft quota for free tier (monthly analyze() calls)
_DEFAULT_MONTHLY_QUOTA = 10_000

# How long to cache a validated license (seconds)
_VALIDATION_CACHE_TTL = 3600  # 1 hour

# Ed25519 public key (base64 of the raw 32-byte key) used to verify
# asymmetrically-signed licenses. Ships EMPTY on purpose: the vendor runs
# ``python -m oubliette_commerce.license_issuer keygen``, keeps the PRIVATE key
# server-side (the license issuer), and pastes the PUBLIC key here — or sets the
# ``OUBLIETTE_LICENSE_PUBLIC_KEY`` env var — before publishing. Distributing the
# public key is safe; it can only verify, never mint. An empty/unset public key
# means Ed25519 licenses cannot be verified and fail closed to the free tier.
_BUNDLED_PUBLIC_KEY = "Unm7yP9qaz6wHIGKiVKq8z5rQL05lEplzUZx2D1lMOE="


def _canonical_payload(data: dict[str, Any]) -> str:
    """Serialize a license payload for signing/verification.

    Excludes the signature envelope fields (``sig``/``sig_alg``) so issuer and
    verifier sign/verify exactly the same bytes. Sorted + compact so the
    encoding is deterministic across processes and versions.
    """
    body = {k: v for k, v in data.items() if k not in ("sig", "sig_alg")}
    return json.dumps(body, sort_keys=True, separators=(",", ":"))


def generate_keypair() -> tuple[str, str]:
    """Generate an Ed25519 keypair for license signing.

    Returns ``(private_b64, public_b64)`` — base64 of the raw 32-byte seed and
    the raw 32-byte public key. Keep the private value secret (server-side);
    embed/distribute the public value with the client.
    """
    from cryptography.hazmat.primitives import serialization
    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey

    priv = Ed25519PrivateKey.generate()
    priv_raw = priv.private_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PrivateFormat.Raw,
        encryption_algorithm=serialization.NoEncryption(),
    )
    pub_raw = priv.public_key().public_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PublicFormat.Raw,
    )
    return (
        base64.b64encode(priv_raw).decode("ascii"),
        base64.b64encode(pub_raw).decode("ascii"),
    )


class LicenseInfo:
    """Parsed and validated license data."""

    __slots__ = ("expires", "features", "issued", "org", "quota", "tier", "valid")

    def __init__(
        self,
        tier: str = "free",
        org: str = "",
        issued: str = "",
        expires: str = "",
        quota: int = 0,
        features: list[str] | None = None,
        valid: bool = True,
    ):
        self.tier = tier
        self.org = org
        self.issued = issued
        self.expires = expires
        self.quota = quota
        self.features = set(features or [])
        self.valid = valid

    def has_feature(self, feature: str) -> bool:
        if self.tier == "enterprise":
            return True
        return feature in self.features

    def to_dict(self) -> dict[str, Any]:
        return {
            "tier": self.tier,
            "org": self.org,
            "issued": self.issued,
            "expires": self.expires,
            "quota": self.quota,
            "features": sorted(self.features),
            "valid": self.valid,
        }


_FREE_LICENSE = LicenseInfo(tier="free", quota=_DEFAULT_MONTHLY_QUOTA)


class LicenseManager:
    """Manages license validation, feature gating, and usage metering.

    Thread-safe with RLock on all shared state.

    Args:
        signing_key: HMAC signing key for license validation. Defaults to
            ``OUBLIETTE_LICENSE_SIGNING_KEY`` env var.
        storage_backend: Optional storage backend for persisting usage data.
    """

    def __init__(
        self,
        signing_key: str | None = None,
        storage_backend: Any = None,
        pro_features: frozenset[str] | set[str] | None = None,
        public_key: str | None = None,
    ) -> None:
        self._lock = threading.RLock()
        self._signing_key = signing_key or os.getenv("OUBLIETTE_LICENSE_SIGNING_KEY", "")
        # Ed25519 public key (base64) for verifying asymmetric licenses. Prefers
        # an explicit arg, then env, then the bundled constant.
        self._public_key = (
            public_key
            or os.getenv("OUBLIETTE_LICENSE_PUBLIC_KEY", "")
            or _BUNDLED_PUBLIC_KEY
        )
        self._storage = storage_backend
        # Product-supplied Pro feature set (falls back to the module default).
        self._pro_features: frozenset[str] = (
            frozenset(pro_features) if pro_features is not None else PRO_FEATURES
        )
        self._license: LicenseInfo | None = None
        self._validated_at: float = 0.0
        self._usage: dict[str, dict[str, Any]] = {}  # {month: {total, by_feature}}
        self._quota = int(os.getenv("OUBLIETTE_MONTHLY_QUOTA", str(_DEFAULT_MONTHLY_QUOTA)))
        self._warned_80 = False
        self._warned_100 = False

        # Auto-load license from env
        raw = os.getenv("OUBLIETTE_LICENSE_KEY", "")
        if raw:
            self._load_license(raw)
        else:
            self._license = _FREE_LICENSE
            log.info("[LICENSE] No license key set -- running in free tier")

    # ------------------------------------------------------------------
    # License validation
    # ------------------------------------------------------------------

    def _load_license(self, raw_key: str) -> None:
        """Parse and validate a base64-encoded license key."""
        try:
            decoded = base64.b64decode(raw_key)
            data = json.loads(decoded)
        except Exception:
            log.warning("[LICENSE] Invalid license key format -- falling back to free tier")
            self._license = _FREE_LICENSE
            return

        sig = data.pop("sig", "")
        # Signature algorithm. Legacy blobs predate this field and are HMAC.
        sig_alg = data.pop("sig_alg", "hmac")
        payload = _canonical_payload(data)

        # Ed25519 (asymmetric) is the preferred scheme: the issuer signs with a
        # private key and the client verifies with an embedded/configured PUBLIC
        # key, which is safe to distribute (it can only verify, never mint).
        if sig_alg == "ed25519":
            if not self._verify_ed25519(payload, sig):
                log.warning(
                    "[LICENSE] Ed25519 signature verification failed -- "
                    "falling back to free tier"
                )
                self._license = _FREE_LICENSE
                return
        elif sig_alg == "hmac":
            # FAIL CLOSED: a shared *symmetric* HMAC secret cannot be verified
            # on the client without also shipping the secret that mints
            # licenses, so with no signing key configured any blob is
            # unauthenticated and must be treated as free tier. (Skipping this
            # check previously let anyone forge an enterprise license by leaving
            # the key unset.) Prefer Ed25519; HMAC is retained only for legacy
            # licenses issued before the asymmetric switch.
            if not self._signing_key:
                log.warning(
                    "[LICENSE] No signing key configured -- cannot verify HMAC "
                    "license signature; falling back to free tier"
                )
                self._license = _FREE_LICENSE
                return
            expected_sig = hmac.new(
                self._signing_key.encode("utf-8"),
                payload.encode("utf-8"),
                hashlib.sha256,
            ).hexdigest()
            if not hmac.compare_digest(sig, expected_sig):
                log.warning("[LICENSE] Invalid license signature -- falling back to free tier")
                self._license = _FREE_LICENSE
                return
        else:
            log.warning(
                "[LICENSE] Unknown signature algorithm %r -- falling back to free tier",
                sig_alg,
            )
            self._license = _FREE_LICENSE
            return

        # Check expiry
        expires = data.get("expires", "")
        if expires:
            try:
                exp_date = datetime.date.fromisoformat(expires)
                if exp_date < datetime.date.today():
                    log.warning(
                        "[LICENSE] License expired on %s -- falling back to free tier", expires
                    )
                    self._license = _FREE_LICENSE
                    return
            except ValueError:
                pass

        self._license = LicenseInfo(
            tier=data.get("tier", "free"),
            org=data.get("org", ""),
            issued=data.get("issued", ""),
            expires=expires,
            quota=data.get("quota", 0),
            features=data.get("features", []),
        )
        self._validated_at = time.time()
        if self._license.quota:
            self._quota = self._license.quota
        log.info(
            "[LICENSE] Loaded %s license for %s (expires %s)",
            self._license.tier,
            self._license.org,
            self._license.expires,
        )

    def _verify_ed25519(self, payload: str, sig_b64: str) -> bool:
        """Verify an Ed25519 license signature. Fails closed on any error."""
        if not self._public_key:
            log.warning(
                "[LICENSE] No Ed25519 public key configured -- cannot verify "
                "asymmetric license"
            )
            return False
        try:
            from cryptography.exceptions import InvalidSignature
            from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
        except ImportError:
            log.warning(
                "[LICENSE] 'cryptography' not installed -- cannot verify Ed25519 "
                "license; install oubliette-commerce[licensing]"
            )
            return False
        try:
            pub = Ed25519PublicKey.from_public_bytes(base64.b64decode(self._public_key))
            pub.verify(base64.b64decode(sig_b64), payload.encode("utf-8"))
            return True
        except InvalidSignature:
            return False
        except Exception:  # malformed key/sig, bad base64, etc.
            log.warning("[LICENSE] Malformed Ed25519 key or signature")
            return False

    @property
    def license(self) -> LicenseInfo:
        """Get current license info, re-validating if cache expired."""
        with self._lock:
            if self._license is None:
                return _FREE_LICENSE
            # Re-validate from env if cache expired
            if time.time() - self._validated_at > _VALIDATION_CACHE_TTL:
                raw = os.getenv("OUBLIETTE_LICENSE_KEY", "")
                if raw:
                    self._load_license(raw)
            return self._license

    # ------------------------------------------------------------------
    # Feature gating (soft enforcement)
    # ------------------------------------------------------------------

    def check_feature(self, feature: str) -> bool:
        """Check if a feature is available. Logs warning if not.

        Returns True if feature is allowed (always True, but logs warning).
        """
        lic = self.license
        if lic.tier == "enterprise":
            return True
        if feature in self._pro_features and not lic.has_feature(feature):
            log.warning("[LICENSE] Feature '%s' requires Pro tier (current: %s)", feature, lic.tier)
            return False
        return True

    # ------------------------------------------------------------------
    # Usage metering
    # ------------------------------------------------------------------

    def _month_key(self) -> str:
        return datetime.date.today().strftime("%Y-%m")

    def record_usage(self, feature: str = "analyze") -> None:
        """Record a usage event. Thread-safe."""
        with self._lock:
            month = self._month_key()
            if month not in self._usage:
                self._usage[month] = {"total": 0, "by_feature": {}}
                self._warned_80 = False
                self._warned_100 = False
            self._usage[month]["total"] += 1
            self._usage[month]["by_feature"][feature] = (
                self._usage[month]["by_feature"].get(feature, 0) + 1
            )

            total = self._usage[month]["total"]
            if self._quota > 0:
                pct = total / self._quota
                if pct >= 1.0 and not self._warned_100:
                    log.warning(
                        "[LICENSE] Monthly quota reached: %d/%d (100%%). "
                        "Usage continues but upgrade recommended.",
                        total,
                        self._quota,
                    )
                    self._warned_100 = True
                elif pct >= 0.8 and not self._warned_80:
                    log.warning(
                        "[LICENSE] Approaching monthly quota: %d/%d (80%%)",
                        total,
                        self._quota,
                    )
                    self._warned_80 = True

    def get_usage(self, month: str | None = None) -> dict[str, Any]:
        """Get usage summary for a given month (default: current)."""
        with self._lock:
            key = month or self._month_key()
            usage = self._usage.get(key, {"total": 0, "by_feature": {}})
            return {
                "month": key,
                "total": usage["total"],
                "by_feature": dict(usage["by_feature"]),
                "quota": self._quota,
                "tier": self.license.tier,
            }

    def get_usage_all(self) -> dict[str, dict[str, Any]]:
        """Get usage for all tracked months."""
        with self._lock:
            return {k: dict(v) for k, v in self._usage.items()}


# ======================================================================
# Feature Gate -- simplified tier-based access control
# ======================================================================


class FeatureGate:
    """Controls access to Pro features based on license key.

    Provides a simple boolean check for whether a feature is available
    under the current license tier.  Integrates with :class:`Shield` via
    the ``feature_gate`` constructor parameter.

    Tiers:
        - **community**: ``analyze``, ``health``, ``basic_session``
        - **pro**: Everything in community plus ``multi_tenant``,
          ``siem_export``, ``webhooks``, ``openc2``, ``rbac``,
          ``advanced_session``, ``threat_intel``

    Usage::

        gate = FeatureGate(license_key="my-key")
        gate.validate()
        if gate.is_allowed("openc2"):
            # enable OpenC2 adapter
            ...

    Args:
        license_key: A license key string.  Defaults to the
            ``OUBLIETTE_LICENSE_KEY`` environment variable.
        license_manager: Optional :class:`LicenseManager` to
            delegate validation to.  When provided, the gate uses
            the manager's tier information after validation.
    """

    # Product-specific feature sets. Supplied per-instance via __init__; the
    # shared package ships empty defaults so it stays product-neutral.
    COMMUNITY_FEATURES: frozenset[str] = frozenset()
    PRO_FEATURES: frozenset[str] = frozenset()
    ALL_FEATURES: frozenset[str] = frozenset()

    def __init__(
        self,
        license_key: str | None = None,
        license_manager: LicenseManager | None = None,
        *,
        community_features: frozenset[str] | set[str] | None = None,
        pro_features: frozenset[str] | set[str] | None = None,
        insecure_simple_mode: bool = False,
    ):
        self.license_key = license_key or os.getenv("OUBLIETTE_LICENSE_KEY", "")
        self._license_manager = license_manager
        self._insecure_simple_mode = insecure_simple_mode
        self._validated = False
        self._tier = "community"
        if community_features is not None:
            self.COMMUNITY_FEATURES = frozenset(community_features)
        if pro_features is not None:
            self.PRO_FEATURES = frozenset(pro_features)
        self.ALL_FEATURES = self.COMMUNITY_FEATURES | self.PRO_FEATURES

    def validate(self) -> bool:
        """Validate the license key and determine the tier.

        If a ``LicenseManager`` was provided, delegates to its
        validation logic and reads the resulting tier — this is the only
        path that actually verifies the license signature and is the
        supported production configuration.

        Without a manager the gate cannot cryptographically verify the key.
        It therefore denies Pro by default and stays at ``community`` unless
        the caller explicitly opts into ``insecure_simple_mode=True`` (for
        local development/testing only), which trusts any non-empty string.

        Returns:
            ``True`` if a Pro or Enterprise key was validated.
        """
        if self._license_manager is not None:
            lic = self._license_manager.license
            if lic.tier in ("pro", "enterprise"):
                self._tier = lic.tier
                self._validated = True
            else:
                self._tier = "community"
                self._validated = False
            return self._validated

        # No manager -> no signature verification is possible. Fail closed to
        # community unless the insecure dev-only shortcut is explicitly enabled.
        if self._insecure_simple_mode and self.license_key:
            log.warning(
                "[LICENSE] FeatureGate insecure_simple_mode is enabled -- any "
                "non-empty key grants Pro. Do NOT use in production."
            )
            self._tier = "pro"
            self._validated = True
        else:
            self._tier = "community"
            self._validated = False
        return self._validated

    def is_allowed(self, feature: str) -> bool:
        """Check if *feature* is available under the current tier.

        Community features are always allowed.  Pro features require
        a validated Pro or Enterprise license.

        Args:
            feature: Feature name to check.

        Returns:
            ``True`` if the feature is accessible.
        """
        if feature in self.COMMUNITY_FEATURES:
            return True
        if self._tier in ("pro", "enterprise") and feature in self.PRO_FEATURES:
            return True
        return False

    def require(self, feature: str) -> None:
        """Raise ``PermissionError`` if *feature* is not allowed."""
        if not self.is_allowed(feature):
            raise PermissionError(
                f"Feature '{feature}' requires a Pro license "
                f"(current tier: {self._tier}). "
                "Set OUBLIETTE_LICENSE_KEY or contact sales@oubliettesecurity.com"
            )

    @property
    def tier(self) -> str:
        """Return the current license tier."""
        return self._tier

    @property
    def validated(self) -> bool:
        """Return whether a license has been successfully validated."""
        return self._validated

    def to_dict(self) -> dict[str, Any]:
        """Serialize gate state for API responses."""
        return {
            "tier": self._tier,
            "validated": self._validated,
            "community_features": sorted(self.COMMUNITY_FEATURES),
            "pro_features": sorted(self.PRO_FEATURES),
        }
Versions
1 version
VersionLicensePublishedStatus
0.2.0 Latest Viewing-Jul 4, 2026 Pending