amesa-core-dev

0.20.5.dev13 Not latest — view latest

the Amesa core functionality required to apply the Machine Teaching paradigm

License
Unknown license
Published
February 25, 2026
11d ago
Package Registry
README badge Customize →
License Sources
SourceLicenseClass
Licensie (detected)
Pending-
PyPI (reported)
Not reported-

License detection is still in progress for this version.

Loading dependencies…
License File
# Copyright (C) Amesa, Inc - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited
# Proprietary and confidential

"""USB License Key Validator Library.

This library provides JWT-based license validation for software distributed
with USB dongle licenses. It validates cryptographically signed license files
stored on USB drives and verifies hardware binding to prevent license copying.

Typical usage example:

    from src.license_checker import LicenseValidator

    validator = LicenseValidator(
        public_key_pem=public_key,
        issuer="Amesa Inc",
        product_id="amesa-sdk"
    )

    if validator.license_key_valid():
        print("License valid - starting application")
        info = validator.get_license_info()
        print(f"Licensed to: {info['customer_id']}")
    else:
        print("Invalid or missing license")
        sys.exit(1)
"""

import glob
import logging
from datetime import datetime
import os
from pathlib import Path
from typing import Dict, Optional
from pathlib import Path

import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend

from .exceptions import (
    InvalidSignatureError,
    InvalidIssuerError,
    InvalidProductError,
    MissingFieldError,
    USBSerialMismatchError,
    USBSerialDetectionError,
    MalformedLicenseError,
    FutureIssuedLicenseError,
)
from .usb_detection import get_usb_drives, get_usb_serial

# Configure logging
logger = logging.getLogger(__name__)

# Default license file name
DEFAULT_LICENSE_FILE_NAME = "license.lic"


class LicenseValidator:
    """Validates USB dongle licenses using JWT tokens.

    This class provides license validation functionality with configurable
    parameters for public key, issuer, product ID, and USB serial validation.

    Args:
        public_key_pem: PEM-formatted RSA public key string for signature verification.
        issuer: Expected issuer name in license (default: "Amesa Inc").
        product_id: Expected product ID in license (default: "amesa-sdk").
        usb_serial_validation: Enable USB hardware binding (default: True).
        license_file_name: Name of license file on USB drive (default: "license.lic").

    Example:
        >>> validator = LicenseValidator(public_key_pem=my_public_key)
        >>> if validator.license_key_valid():
        ...     info = validator.get_license_info()
        ...     print(f"Licensed to: {info['customer_id']}")
    """

    def __init__(
        self,
        public_key_pem: str,
        issuer: str = "Amesa Inc",
        product_id: str = "amesa-sdk",
        usb_serial_validation: bool = True,
        license_file_name: str = DEFAULT_LICENSE_FILE_NAME,
    ):
        self.public_key_pem = public_key_pem
        self.expected_issuer = issuer
        self.expected_product_id = product_id
        self.usb_serial_validation = usb_serial_validation
        self.license_file_name = license_file_name

        # Load and cache public key
        try:
            self._public_key = serialization.load_pem_public_key(
                public_key_pem.encode(),
                backend=default_backend()
            )
        except Exception as e:
            raise ValueError(f"Failed to load public key: {e}") from e

    def _find_license_file(self) -> Optional[tuple[Path, Optional[str]]]:
        """Search all USB drives for license file.

        Returns immediately after finding the first valid license file.
        If USB serial validation is enabled and serial detection fails, continues
        to check other drives in case another USB has a valid license.

        On case-insensitive filesystems (FAT32, exFAT, NTFS), this will also
        find files with different case (e.g., "License.LIC").

        Returns:
            Tuple of (license_file_path, usb_serial) if found, None otherwise.
            usb_serial will be None if USB_SERIAL_VALIDATION is False or detection fails.
        """
        usb_drives = get_usb_drives()

        for drive in usb_drives:
            # Try exact filename first (case-sensitive)
            license_path = None
            for file in glob.glob(f"{drive}/**/{self.license_file_name}", recursive=True):
                if os.path.exists(file) and os.path.isfile(file):
                    license_path = file
                    break

            # Found license file
            logger.info(f"Found license file on USB: {drive}")
            licence_actual_path = Path(license_path)
            while licence_actual_path != drive:
                if os.path.ismount(licence_actual_path):
                    drive = licence_actual_path
                    break
                else:
                    licence_actual_path = licence_actual_path.parent


            # Only get USB serial if validation is enabled
            if self.usb_serial_validation:
                usb_serial = get_usb_serial(drive)
                if usb_serial:
                    # Found valid license with serial - return immediately
                    return (license_path, usb_serial)
                else:
                    logger.warning(
                        f"Found license file at {license_path} but could not "
                        f"determine USB serial number. Checking other drives..."
                    )
                    # Continue checking other drives in case another USB has valid license
                    continue
            else:
                # USB serial validation disabled, return immediately with first found license
                return (license_path, None)

        return None

    def _validate_jwt_claims(self, payload: Dict) -> None:
        """Validate JWT payload claims for license requirements.

        Args:
            payload: Decoded JWT payload dictionary.

        Raises:
            InvalidIssuerError: If issuer doesn't match expected value.
            InvalidProductError: If product_id doesn't match expected value.
            MissingFieldError: If required fields are missing.
            FutureIssuedLicenseError: If license was issued in the future.
        """
        # Check issuer
        if payload.get("iss") != self.expected_issuer:
            raise InvalidIssuerError(
                f"Expected issuer '{self.expected_issuer}', got '{payload.get('iss')}'"
            )

        # Check product ID
        if payload.get("product_id") != self.expected_product_id:
            raise InvalidProductError(
                f"Expected product_id '{self.expected_product_id}', got '{payload.get('product_id')}'"
            )

        # Check issued at time (should not be in the future)
        iat = payload.get("iat")
        if iat:
            issued_at = datetime.fromtimestamp(iat)
            if issued_at > datetime.now():
                raise FutureIssuedLicenseError(
                    f"License issued in the future: {issued_at}"
                )

        # Check for required fields
        required_fields = ["customer_id"]

        # USB serial is only required if hardware validation is enabled
        if self.usb_serial_validation:
            required_fields.append("usb_serial")

        for field in required_fields:
            if field not in payload:
                raise MissingFieldError(f"Missing required field: {field}")

    def _validate_license_file(
        self,
        license_path: Path,
        license_token: str,
        detected_usb_serial: Optional[str]
    ) -> Dict:
        """Validate a license file from USB drive.

        Args:
            license_path: Path to the license file on USB.
            license_token: JWT token string from the license file.
            detected_usb_serial: USB serial number detected from hardware (can be None).

        Returns:
            Decoded JWT payload dictionary if valid.

        Raises:
            InvalidSignatureError: If signature is invalid.
            MalformedLicenseError: If license is malformed.
            InvalidIssuerError: If issuer doesn't match.
            InvalidProductError: If product_id doesn't match.
            MissingFieldError: If required fields are missing.
            USBSerialMismatchError: If USB serial doesn't match.
            USBSerialDetectionError: If USB serial cannot be determined.
        """
        # Decode and verify JWT
        try:
            payload = jwt.decode(
                license_token,
                self._public_key,
                algorithms=["RS256"],
                options={"verify_signature": True}
            )
        except jwt.InvalidSignatureError:
            raise InvalidSignatureError("License signature is invalid or tampered")
        except jwt.DecodeError:
            raise MalformedLicenseError("License file is malformed or corrupted")
        except Exception as e:
            raise MalformedLicenseError(f"JWT validation failed: {str(e)}")

        # Validate claims
        self._validate_jwt_claims(payload)

        # Verify USB serial number matches (only if validation is enabled)
        if self.usb_serial_validation:
            if detected_usb_serial is None:
                raise USBSerialDetectionError("Could not determine USB serial number")

            license_usb_serial = payload.get("usb_serial")
            if license_usb_serial != detected_usb_serial:
                logger.warning(
                    f"USB serial mismatch. Expected: {license_usb_serial}, "
                    f"Detected: {detected_usb_serial}"
                )
                raise USBSerialMismatchError(
                    f"License serial '{license_usb_serial}' does not match "
                    f"USB device serial '{detected_usb_serial}'"
                )
        else:
            logger.info("USB serial validation disabled - skipping hardware binding check")

        # All checks passed
        logger.info(
            f"License valid for customer {payload.get('customer_id')}"
        )
        return payload

    def license_key_valid(self) -> bool:
        """Check if a valid license key exists on attached USB drive.

        This function searches all connected USB drives for a valid license file,
        verifies the JWT signature using the configured public key, validates all
        claims, and ensures the USB hardware serial number matches the license.

        Returns:
            True if a valid license is found, False otherwise.

        Example:
            >>> validator = LicenseValidator(public_key_pem=key)
            >>> if validator.license_key_valid():
            ...     start_application()
            ... else:
            ...     print("No valid license found")
            ...     sys.exit(1)
        """
        try:
            # Find license file on USB drives
            result = self._find_license_file()
            if result is None:
                logger.warning("No license file found on any USB drive")
                return False

            license_path, usb_serial = result

            # Read license file
            try:
                with open(license_path, 'r') as f:
                    license_token = f.read().strip()
            except FileNotFoundError:
                logger.warning(f"License file not found: {license_path}")
                return False
            except PermissionError as e:
                logger.warning(f"Permission denied reading license file: {e}")
                return False

            # Validate the license
            try:
                self._validate_license_file(license_path, license_token, usb_serial)
                return True
            except (
                InvalidSignatureError,
                InvalidIssuerError,
                InvalidProductError,
                MissingFieldError,
                USBSerialMismatchError,
                USBSerialDetectionError,
                MalformedLicenseError,
                FutureIssuedLicenseError,
            ) as e:
                logger.error(f"License validation failed: {e}")
                return False

        except Exception as e:
            # Only catch truly unexpected errors - let expected exceptions propagate
            logger.exception(f"Unexpected error checking license: {e}")
            return False

    def get_license_info(self) -> Optional[Dict]:
        """Retrieve license information from USB drive if valid.

        Returns:
            Dictionary containing license claims if valid, None otherwise.

        Example:
            >>> validator = LicenseValidator(public_key_pem=key)
            >>> info = validator.get_license_info()
            >>> if info:
            ...     print(f"Licensed to: {info['customer_id']}")
        """
        try:
            result = self._find_license_file()
            if result is None:
                return None

            license_path, usb_serial = result

            # Read license file
            try:
                with open(license_path, 'r') as f:
                    license_token = f.read().strip()
            except (FileNotFoundError, PermissionError) as e:
                logger.error(f"Could not read license file: {e}")
                return None

            # Validate and return payload
            try:
                payload = self._validate_license_file(license_path, license_token, usb_serial)
                return payload
            except (
                InvalidSignatureError,
                InvalidIssuerError,
                InvalidProductError,
                MissingFieldError,
                USBSerialMismatchError,
                USBSerialDetectionError,
                MalformedLicenseError,
                FutureIssuedLicenseError,
            ):
                return None

        except Exception as e:
            logger.error(f"Could not retrieve license info: {e}")
            return None


if __name__ == "__main__":
    # Configure logging for testing
    import sys
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    print("ERROR: LicenseValidator requires a public key to be configured.")
    print("Please use the LicenseValidator class in your code:")
    print("  from src.license_checker import LicenseValidator")
    print("  validator = LicenseValidator(public_key_pem=your_public_key)")
    sys.exit(1)
Versions
13 versions
VersionLicensePublishedStatus
0.20.7.dev16 Latest -Mar 4, 2026 Pending
0.20.7.dev15 -Mar 4, 2026 Pending
0.20.7.dev14 -Mar 3, 2026 Pending
0.20.7.dev13 -Mar 3, 2026 Pending
0.20.7.dev12 -Mar 2, 2026 Pending
0.20.7.dev11 -Mar 2, 2026 Pending
0.20.7.dev9 -Mar 2, 2026 Pending
0.20.7.dev8 -Mar 2, 2026 Pending
0.20.7.dev7 -Feb 27, 2026 Pending
0.20.7.dev6 -Feb 27, 2026 Pending
0.20.7.dev5 -Feb 27, 2026 Pending
0.20.7.dev4 -Feb 26, 2026 Pending
0.20.5.dev13 Viewing-Feb 25, 2026 Pending