Skip to content

HTTP Client API Reference

Complete API reference for HTTP clients.

Base Interface

HttpClient

Bases: ABC

Abstract base class for HTTP clients.

This is the unified HTTP client interface for the stkai SDK. All HTTP operations in the SDK should use this interface.

Implementations handle authentication and can be wrapped with decorators for rate limiting, retries, and other cross-cutting concerns.

Example

class MyHttpClient(HttpClient): ... def get(self, url, headers=None, timeout=30): ... return requests.get(url, headers=headers, timeout=timeout) ... def post(self, url, data=None, headers=None, timeout=30): ... return requests.post(url, json=data, headers=headers, timeout=timeout)

Source code in src/stkai/_http.py
class HttpClient(ABC):
    """
    Abstract base class for HTTP clients.

    This is the unified HTTP client interface for the stkai SDK.
    All HTTP operations in the SDK should use this interface.

    Implementations handle authentication and can be wrapped with
    decorators for rate limiting, retries, and other cross-cutting concerns.

    Example:
        >>> class MyHttpClient(HttpClient):
        ...     def get(self, url, headers=None, timeout=30):
        ...         return requests.get(url, headers=headers, timeout=timeout)
        ...     def post(self, url, data=None, headers=None, timeout=30):
        ...         return requests.post(url, json=data, headers=headers, timeout=timeout)
    """

    @abstractmethod
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated GET request.

        Args:
            url: The full URL to request.
            headers: Additional headers to include (merged with auth headers).
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            requests.RequestException: If the HTTP request fails.
        """
        pass

    @abstractmethod
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated POST request with JSON body.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include (merged with auth headers).
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            requests.RequestException: If the HTTP request fails.
        """
        pass

    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated POST request with streaming response.

        The returned response has ``stream=True``, meaning the body is NOT
        pre-downloaded. The caller MUST iterate and close the response.

        This is a separate method from ``post()`` because a streaming response
        behaves fundamentally differently — the body must be iterated and the
        connection explicitly closed.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include (merged with auth headers).
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response with stream=True.

        Raises:
            NotImplementedError: If the implementation does not support streaming.
            requests.RequestException: If the HTTP request fails.
        """
        raise NotImplementedError(f"{type(self).__name__} does not support streaming.")

Functions

get abstractmethod

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated GET request.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include (merged with auth headers).

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
RequestException

If the HTTP request fails.

Source code in src/stkai/_http.py
@abstractmethod
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated GET request.

    Args:
        url: The full URL to request.
        headers: Additional headers to include (merged with auth headers).
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        requests.RequestException: If the HTTP request fails.
    """
    pass

post abstractmethod

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated POST request with JSON body.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include (merged with auth headers).

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
RequestException

If the HTTP request fails.

Source code in src/stkai/_http.py
@abstractmethod
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated POST request with JSON body.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include (merged with auth headers).
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        requests.RequestException: If the HTTP request fails.
    """
    pass

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated POST request with streaming response.

The returned response has stream=True, meaning the body is NOT pre-downloaded. The caller MUST iterate and close the response.

This is a separate method from post() because a streaming response behaves fundamentally differently — the body must be iterated and the connection explicitly closed.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include (merged with auth headers).

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response with stream=True.

Raises:

Type Description
NotImplementedError

If the implementation does not support streaming.

RequestException

If the HTTP request fails.

Source code in src/stkai/_http.py
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated POST request with streaming response.

    The returned response has ``stream=True``, meaning the body is NOT
    pre-downloaded. The caller MUST iterate and close the response.

    This is a separate method from ``post()`` because a streaming response
    behaves fundamentally differently — the body must be iterated and the
    connection explicitly closed.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include (merged with auth headers).
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response with stream=True.

    Raises:
        NotImplementedError: If the implementation does not support streaming.
        requests.RequestException: If the HTTP request fails.
    """
    raise NotImplementedError(f"{type(self).__name__} does not support streaming.")

Implementations

StkCLIHttpClient

Bases: HttpClient

HTTP client using StackSpot CLI (oscli) for authentication.

This client delegates authentication to the StackSpot CLI, which must be installed and logged in for this client to work.

The CLI handles token management, refresh, and injection of authorization headers into HTTP requests.

Note

Requires the oscli package to be installed and configured. Install via: pip install oscli Login via: stk login

Example

from stkai._http import StkCLIHttpClient client = StkCLIHttpClient() response = client.post("https://api.example.com/endpoint", data={"key": "value"})

See Also

StandaloneHttpClient: Alternative that doesn't require oscli.

Source code in src/stkai/_http.py
class StkCLIHttpClient(HttpClient):
    """
    HTTP client using StackSpot CLI (oscli) for authentication.

    This client delegates authentication to the StackSpot CLI,
    which must be installed and logged in for this client to work.

    The CLI handles token management, refresh, and injection of
    authorization headers into HTTP requests.

    Note:
        Requires the `oscli` package to be installed and configured.
        Install via: pip install oscli
        Login via: stk login

    Example:
        >>> from stkai._http import StkCLIHttpClient
        >>> client = StkCLIHttpClient()
        >>> response = client.post("https://api.example.com/endpoint", data={"key": "value"})

    See Also:
        StandaloneHttpClient: Alternative that doesn't require oscli.
    """

    @override
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated GET request using oscli.

        Args:
            url: The full URL to request.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            AssertionError: If url is empty or timeout is invalid.
            requests.RequestException: If the HTTP request fails.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        from oscli.core.http import get_with_authorization

        response: requests.Response = get_with_authorization(
            url=url,
            timeout=timeout,
            headers=headers,
            use_cache=False, # disables client-side caching
        )
        return response

    @override
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated POST request using oscli.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            AssertionError: If url is empty or timeout is invalid.
            requests.RequestException: If the HTTP request fails.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        from oscli.core.http import post_with_authorization

        response: requests.Response = post_with_authorization(
            url=url,
            body=data,
            timeout=timeout,
            headers=headers,
        )
        return response

    @override
    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated streaming POST request using oscli.

        Delegates to oscli's ``post_with_authorization`` with ``stream=True``
        passed via kwargs (forwarded to ``requests.post``).

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response with stream=True.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        from oscli.core.http import post_with_authorization

        response: requests.Response = post_with_authorization(
            url=url,
            body=data,
            timeout=timeout,
            headers=headers,
            stream=True,
        )
        return response

Functions

get

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated GET request using oscli.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
AssertionError

If url is empty or timeout is invalid.

RequestException

If the HTTP request fails.

Source code in src/stkai/_http.py
@override
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated GET request using oscli.

    Args:
        url: The full URL to request.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        AssertionError: If url is empty or timeout is invalid.
        requests.RequestException: If the HTTP request fails.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    from oscli.core.http import get_with_authorization

    response: requests.Response = get_with_authorization(
        url=url,
        timeout=timeout,
        headers=headers,
        use_cache=False, # disables client-side caching
    )
    return response

post

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated POST request using oscli.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
AssertionError

If url is empty or timeout is invalid.

RequestException

If the HTTP request fails.

Source code in src/stkai/_http.py
@override
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated POST request using oscli.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        AssertionError: If url is empty or timeout is invalid.
        requests.RequestException: If the HTTP request fails.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    from oscli.core.http import post_with_authorization

    response: requests.Response = post_with_authorization(
        url=url,
        body=data,
        timeout=timeout,
        headers=headers,
    )
    return response

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated streaming POST request using oscli.

Delegates to oscli's post_with_authorization with stream=True passed via kwargs (forwarded to requests.post).

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response with stream=True.

Source code in src/stkai/_http.py
@override
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated streaming POST request using oscli.

    Delegates to oscli's ``post_with_authorization`` with ``stream=True``
    passed via kwargs (forwarded to ``requests.post``).

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response with stream=True.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    from oscli.core.http import post_with_authorization

    response: requests.Response = post_with_authorization(
        url=url,
        body=data,
        timeout=timeout,
        headers=headers,
        stream=True,
    )
    return response

StandaloneHttpClient

Bases: HttpClient

HTTP client using AuthProvider for standalone authentication.

This client uses an AuthProvider to obtain authorization tokens, enabling standalone operation without the StackSpot CLI.

Use this client when: - You want to run without the StackSpot CLI dependency - You need to use client credentials directly - You're deploying to an environment without CLI access

Example

from stkai._auth import ClientCredentialsAuthProvider from stkai._http import StandaloneHttpClient

auth = ClientCredentialsAuthProvider( ... client_id="my-client-id", ... client_secret="my-client-secret", ... ) client = StandaloneHttpClient(auth_provider=auth) response = client.post("https://api.example.com/endpoint", data={"key": "value"})

Parameters:

Name Type Description Default
auth_provider AuthProvider

Provider for authorization tokens.

required
See Also

ClientCredentialsAuthProvider: OAuth2 client credentials implementation. StkCLIHttpClient: Alternative that uses oscli for authentication.

Source code in src/stkai/_http.py
class StandaloneHttpClient(HttpClient):
    """
    HTTP client using AuthProvider for standalone authentication.

    This client uses an AuthProvider to obtain authorization tokens,
    enabling standalone operation without the StackSpot CLI.

    Use this client when:
    - You want to run without the StackSpot CLI dependency
    - You need to use client credentials directly
    - You're deploying to an environment without CLI access

    Example:
        >>> from stkai._auth import ClientCredentialsAuthProvider
        >>> from stkai._http import StandaloneHttpClient
        >>>
        >>> auth = ClientCredentialsAuthProvider(
        ...     client_id="my-client-id",
        ...     client_secret="my-client-secret",
        ... )
        >>> client = StandaloneHttpClient(auth_provider=auth)
        >>> response = client.post("https://api.example.com/endpoint", data={"key": "value"})

    Args:
        auth_provider: Provider for authorization tokens.

    See Also:
        ClientCredentialsAuthProvider: OAuth2 client credentials implementation.
        StkCLIHttpClient: Alternative that uses oscli for authentication.
    """

    def __init__(self, auth_provider: "AuthProvider"):
        """
        Initialize the standalone HTTP client.

        Args:
            auth_provider: Provider for authorization tokens.

        Raises:
            AssertionError: If auth_provider is None or invalid type.
        """
        from stkai._auth import AuthProvider

        assert auth_provider is not None, "auth_provider cannot be None"
        assert isinstance(auth_provider, AuthProvider), "auth_provider must be an AuthProvider instance"

        self._auth = auth_provider

    @override
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated GET request.

        Args:
            url: The full URL to request.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            AssertionError: If url is empty or timeout is invalid.
            requests.RequestException: If the HTTP request fails.
            AuthenticationError: If unable to obtain authorization token.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

        return requests.get(
            url,
            headers=merged_headers,
            timeout=timeout,
        )

    @override
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated POST request with JSON body.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.

        Raises:
            AssertionError: If url is empty or timeout is invalid.
            requests.RequestException: If the HTTP request fails.
            AuthenticationError: If unable to obtain authorization token.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

        return requests.post(
            url,
            json=data,
            headers=merged_headers,
            timeout=timeout,
        )

    @override
    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Execute an authenticated streaming POST request.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response with stream=True.
        """
        assert url, "URL cannot be empty."
        assert timeout is not None, "Timeout cannot be None."
        assert timeout > 0, "Timeout must be greater than 0."

        merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

        return requests.post(
            url,
            json=data,
            headers=merged_headers,
            timeout=timeout,
            stream=True,
        )

Functions

__init__

__init__(auth_provider: AuthProvider)

Initialize the standalone HTTP client.

Parameters:

Name Type Description Default
auth_provider AuthProvider

Provider for authorization tokens.

required

Raises:

Type Description
AssertionError

If auth_provider is None or invalid type.

Source code in src/stkai/_http.py
def __init__(self, auth_provider: "AuthProvider"):
    """
    Initialize the standalone HTTP client.

    Args:
        auth_provider: Provider for authorization tokens.

    Raises:
        AssertionError: If auth_provider is None or invalid type.
    """
    from stkai._auth import AuthProvider

    assert auth_provider is not None, "auth_provider cannot be None"
    assert isinstance(auth_provider, AuthProvider), "auth_provider must be an AuthProvider instance"

    self._auth = auth_provider

get

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated GET request.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
AssertionError

If url is empty or timeout is invalid.

RequestException

If the HTTP request fails.

AuthenticationError

If unable to obtain authorization token.

Source code in src/stkai/_http.py
@override
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated GET request.

    Args:
        url: The full URL to request.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        AssertionError: If url is empty or timeout is invalid.
        requests.RequestException: If the HTTP request fails.
        AuthenticationError: If unable to obtain authorization token.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

    return requests.get(
        url,
        headers=merged_headers,
        timeout=timeout,
    )

post

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated POST request with JSON body.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Raises:

Type Description
AssertionError

If url is empty or timeout is invalid.

RequestException

If the HTTP request fails.

AuthenticationError

If unable to obtain authorization token.

Source code in src/stkai/_http.py
@override
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated POST request with JSON body.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.

    Raises:
        AssertionError: If url is empty or timeout is invalid.
        requests.RequestException: If the HTTP request fails.
        AuthenticationError: If unable to obtain authorization token.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

    return requests.post(
        url,
        json=data,
        headers=merged_headers,
        timeout=timeout,
    )

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Execute an authenticated streaming POST request.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response with stream=True.

Source code in src/stkai/_http.py
@override
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Execute an authenticated streaming POST request.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response with stream=True.
    """
    assert url, "URL cannot be empty."
    assert timeout is not None, "Timeout cannot be None."
    assert timeout > 0, "Timeout must be greater than 0."

    merged_headers = {**self._auth.get_auth_headers(), **(headers or {})}

    return requests.post(
        url,
        json=data,
        headers=merged_headers,
        timeout=timeout,
        stream=True,
    )

EnvironmentAwareHttpClient

Bases: HttpClient

Environment-aware HTTP client that automatically selects the appropriate implementation.

This client detects the runtime environment and lazily creates the appropriate HTTP client implementation:

  1. If StackSpot CLI (oscli) is installed → uses StkCLIHttpClient
  2. If credentials are configured → uses StandaloneHttpClient
  3. Otherwise → raises ValueError with clear instructions

The detection happens lazily on the first request, allowing configuration via STKAI.configure() after import.

This implementation is thread-safe using double-checked locking pattern.

Example

from stkai._http import EnvironmentAwareHttpClient client = EnvironmentAwareHttpClient()

Automatically uses CLI or standalone based on environment

response = client.post("https://api.example.com/endpoint", data={"key": "value"})

Note

CLI takes precedence over credentials if both are available.

See Also

StkCLIHttpClient: Explicit CLI-based client. StandaloneHttpClient: Explicit standalone client.

Source code in src/stkai/_http.py
class EnvironmentAwareHttpClient(HttpClient):
    """
    Environment-aware HTTP client that automatically selects the appropriate implementation.

    This client detects the runtime environment and lazily creates the appropriate
    HTTP client implementation:

    1. If StackSpot CLI (oscli) is installed → uses StkCLIHttpClient
    2. If credentials are configured → uses StandaloneHttpClient
    3. Otherwise → raises ValueError with clear instructions

    The detection happens lazily on the first request, allowing configuration
    via `STKAI.configure()` after import.

    This implementation is thread-safe using double-checked locking pattern.

    Example:
        >>> from stkai._http import EnvironmentAwareHttpClient
        >>> client = EnvironmentAwareHttpClient()
        >>> # Automatically uses CLI or standalone based on environment
        >>> response = client.post("https://api.example.com/endpoint", data={"key": "value"})

    Note:
        CLI takes precedence over credentials if both are available.

    See Also:
        StkCLIHttpClient: Explicit CLI-based client.
        StandaloneHttpClient: Explicit standalone client.
    """

    def __init__(self) -> None:
        """Initialize the environment-aware HTTP client."""
        self._delegate: HttpClient | None = None
        self._lock = threading.Lock()

    def _get_delegate(self) -> HttpClient:
        """
        Get or create the delegate HTTP client.

        Uses double-checked locking for thread-safe lazy initialization.

        Returns:
            The appropriate HttpClient implementation for the current environment.

        Raises:
            ValueError: If no authentication method is available.
        """
        if self._delegate is None:
            with self._lock:
                if self._delegate is None:
                    self._delegate = self._create_delegate()
        return self._delegate

    def _create_delegate(self) -> HttpClient:
        """
        Create the appropriate HTTP client based on environment detection.

        Returns:
            StkCLIHttpClient if oscli is available, otherwise StandaloneHttpClient.
            Wrapped with rate limiting if configured via STKAI.config.rate_limit.

        Raises:
            ValueError: If no authentication method is available.
        """
        # 1. Create base client
        base_client = self._create_base_client()

        # 2. Apply rate limiting if configured
        return self._apply_rate_limiting(base_client)

    def _create_base_client(self) -> HttpClient:
        """
        Create the base HTTP client based on environment detection.

        Returns:
            StkCLIHttpClient if oscli is available, otherwise StandaloneHttpClient.

        Raises:
            ValueError: If no authentication method is available.
        """
        # 1. Try CLI first (has priority)
        if self._is_cli_available():
            logger.debug(
                "EnvironmentAwareHttpClient: StackSpot CLI (oscli) detected. "
                "Using StkCLIHttpClient."
            )
            # Warn if credentials are also configured (they will be ignored)
            from stkai._config import STKAI

            if STKAI.config.auth.has_credentials():
                logger.warning(
                    "[STKAI-SDK] ⚠️ Auth credentials detected (via env vars or configure) but running in CLI mode. "
                    "Authentication will be handled by oscli. Credentials will be ignored."
                )
            return StkCLIHttpClient()

        # 2. Try standalone with credentials
        from stkai._auth import create_standalone_auth
        from stkai._config import STKAI

        if STKAI.config.auth.has_credentials():
            logger.debug(
                "EnvironmentAwareHttpClient: Client credentials detected. "
                "Using StandaloneHttpClient."
            )
            auth = create_standalone_auth()
            return StandaloneHttpClient(auth_provider=auth)

        # 3. No valid configuration
        logger.debug(
            "EnvironmentAwareHttpClient: No authentication method available. "
            "Neither oscli nor client credentials found."
        )
        raise ValueError(
            "No authentication method available. Either:\n"
            "  1. Install and login to StackSpot CLI: pip install oscli && stk login\n"
            "  2. Set credentials via environment variables:\n"
            "     STKAI_AUTH_CLIENT_ID and STKAI_AUTH_CLIENT_SECRET\n"
            "  3. Call STKAI.configure(auth={...}) at startup"
        )

    def _apply_rate_limiting(self, client: HttpClient) -> HttpClient:
        """
        Wrap the client with rate limiting if configured.

        Args:
            client: The base HTTP client to potentially wrap.

        Returns:
            The client wrapped with rate limiting, or the original client
            if rate limiting is not enabled.

        Raises:
            ValueError: If an unknown rate limit strategy is configured.
        """
        from stkai._config import STKAI

        rl_config = STKAI.config.rate_limit

        if not rl_config.enabled:
            return client

        # Lazy import to avoid circular dependency
        from stkai._rate_limit import (
            AdaptiveRateLimitedHttpClient,
            TokenBucketRateLimitedHttpClient,
        )

        if rl_config.strategy == "token_bucket":
            logger.debug(
                "EnvironmentAwareHttpClient: Applying token_bucket rate limiting "
                f"(max_requests={rl_config.max_requests}, time_window={rl_config.time_window}s)."
            )
            return TokenBucketRateLimitedHttpClient(
                delegate=client,
                max_requests=rl_config.max_requests,
                time_window=rl_config.time_window,
                max_wait_time=rl_config.max_wait_time,
            )
        elif rl_config.strategy == "adaptive":
            logger.debug(
                "EnvironmentAwareHttpClient: Applying adaptive rate limiting "
                f"(max_requests={rl_config.max_requests}, time_window={rl_config.time_window}s)."
            )
            return AdaptiveRateLimitedHttpClient(
                delegate=client,
                max_requests=rl_config.max_requests,
                time_window=rl_config.time_window,
                max_wait_time=rl_config.max_wait_time,
                min_rate_floor=rl_config.min_rate_floor,
                penalty_factor=rl_config.penalty_factor,
                recovery_factor=rl_config.recovery_factor,
            )
        else:
            raise ValueError(
                f"Unknown rate limit strategy: {rl_config.strategy}. "
                f"Valid strategies are: 'token_bucket', 'adaptive'."
            )

    def _is_cli_available(self) -> bool:
        """
        Check if StackSpot CLI (oscli) is available.

        Returns:
            True if oscli can be imported, False otherwise.
        """
        from stkai._cli import StkCLI

        return StkCLI.is_available()

    @override
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Delegate GET request to the appropriate HTTP client.

        Args:
            url: The full URL to request.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.
        """
        return self._get_delegate().get(url, headers, timeout)

    @override
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Delegate POST request to the appropriate HTTP client.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.
        """
        return self._get_delegate().post(url, data, headers, timeout)

    @override
    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Delegate streaming POST request to the appropriate HTTP client.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response with stream=True.
        """
        return self._get_delegate().post_stream(url, data, headers, timeout)

Functions

__init__

__init__() -> None

Initialize the environment-aware HTTP client.

Source code in src/stkai/_http.py
def __init__(self) -> None:
    """Initialize the environment-aware HTTP client."""
    self._delegate: HttpClient | None = None
    self._lock = threading.Lock()

get

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Delegate GET request to the appropriate HTTP client.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Source code in src/stkai/_http.py
@override
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Delegate GET request to the appropriate HTTP client.

    Args:
        url: The full URL to request.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.
    """
    return self._get_delegate().get(url, headers, timeout)

post

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Delegate POST request to the appropriate HTTP client.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Source code in src/stkai/_http.py
@override
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Delegate POST request to the appropriate HTTP client.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.
    """
    return self._get_delegate().post(url, data, headers, timeout)

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Delegate streaming POST request to the appropriate HTTP client.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response with stream=True.

Source code in src/stkai/_http.py
@override
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Delegate streaming POST request to the appropriate HTTP client.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response with stream=True.
    """
    return self._get_delegate().post_stream(url, data, headers, timeout)

Rate Limiting

TokenBucketRateLimitedHttpClient

Bases: HttpClient

HTTP client decorator that applies rate limiting to requests.

Uses the Token Bucket algorithm to limit the rate of requests. Only POST requests are rate-limited; GET requests (typically polling) pass through without limiting.

This decorator is thread-safe and can be used with concurrent requests.

Example

from stkai._rate_limit import TokenBucketRateLimitedHttpClient from stkai._http import StkCLIHttpClient

Limit to 10 requests per minute, give up after 45s waiting

client = TokenBucketRateLimitedHttpClient( ... delegate=StkCLIHttpClient(), ... max_requests=10, ... time_window=60.0, ... max_wait_time=45.0, ... )

Parameters:

Name Type Description Default
delegate HttpClient

The underlying HTTP client to delegate requests to.

required
max_requests int

Maximum number of requests allowed in the time window.

required
time_window float

Time window in seconds for the rate limit.

required
max_wait_time float | None

Maximum time in seconds to wait for a token. If None, waits indefinitely. Default is 45 seconds.

45.0

Raises:

Type Description
TokenAcquisitionTimeoutError

If max_wait_time is exceeded while waiting for a token.

Source code in src/stkai/_rate_limit.py
class TokenBucketRateLimitedHttpClient(HttpClient):
    """
    HTTP client decorator that applies rate limiting to requests.

    Uses the Token Bucket algorithm to limit the rate of requests.
    Only POST requests are rate-limited; GET requests (typically polling)
    pass through without limiting.

    This decorator is thread-safe and can be used with concurrent requests.

    Example:
        >>> from stkai._rate_limit import TokenBucketRateLimitedHttpClient
        >>> from stkai._http import StkCLIHttpClient
        >>> # Limit to 10 requests per minute, give up after 45s waiting
        >>> client = TokenBucketRateLimitedHttpClient(
        ...     delegate=StkCLIHttpClient(),
        ...     max_requests=10,
        ...     time_window=60.0,
        ...     max_wait_time=45.0,
        ... )

    Args:
        delegate: The underlying HTTP client to delegate requests to.
        max_requests: Maximum number of requests allowed in the time window.
        time_window: Time window in seconds for the rate limit.
        max_wait_time: Maximum time in seconds to wait for a token. If None,
            waits indefinitely. Default is 45 seconds.

    Raises:
        TokenAcquisitionTimeoutError: If max_wait_time is exceeded while waiting for a token.
    """

    def __init__(
        self,
        delegate: HttpClient,
        max_requests: int,
        time_window: float,
        max_wait_time: float | None = 45.0,
    ):
        """
        Initialize the rate-limited HTTP client.

        Args:
            delegate: The underlying HTTP client to delegate requests to.
            max_requests: Maximum number of requests allowed in the time window.
            time_window: Time window in seconds for the rate limit.
            max_wait_time: Maximum time in seconds to wait for a token.
                If None, waits indefinitely. Default is 30 seconds.

        Raises:
            AssertionError: If any parameter is invalid.
        """
        assert delegate is not None, "Delegate HTTP client is required."
        assert max_requests is not None, "max_requests cannot be None."
        assert max_requests > 0, "max_requests must be greater than 0."
        assert time_window is not None, "time_window cannot be None."
        assert time_window > 0, "time_window must be greater than 0."
        assert max_wait_time is None or max_wait_time > 0, "max_wait_time must be > 0 or None."

        self.delegate = delegate
        self.max_requests = max_requests
        self.time_window = time_window
        self.max_wait_time = max_wait_time

        # Token bucket state
        self._tokens = float(max_requests)
        self._last_refill = time.monotonic()
        self._lock = threading.Lock()

    def _acquire_token(self) -> None:
        """
        Acquire a token, blocking if necessary until one is available.

        Uses Token Bucket algorithm:
        - Refills tokens based on elapsed time
        - Waits if no tokens are available
        - Raises TokenAcquisitionTimeoutError if max_wait_time is exceeded

        Raises:
            TokenAcquisitionTimeoutError: If waiting exceeds max_wait_time.
        """
        start_time = time.monotonic()

        while True:
            with self._lock:
                now = time.monotonic()
                # Refill tokens based on elapsed time
                elapsed_since_refill = now - self._last_refill
                refill_rate = self.max_requests / self.time_window
                self._tokens = min(
                    float(self.max_requests),
                    self._tokens + elapsed_since_refill * refill_rate
                )
                self._last_refill = now

                if self._tokens >= 1.0:
                    self._tokens -= 1.0
                    return

                # Calculate wait time for next token
                wait_time = (1.0 - self._tokens) / refill_rate

            # Check timeout before sleeping
            if self.max_wait_time is not None:
                total_waited = time.monotonic() - start_time
                if total_waited + wait_time > self.max_wait_time:
                    raise TokenAcquisitionTimeoutError(
                        waited=total_waited,
                        max_wait_time=self.max_wait_time,
                    )

            # Sleep outside the lock to allow other threads to proceed
            time.sleep(wait_time)

    @override
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Delegate GET request without rate limiting.

        GET requests (typically polling) are not rate-limited as they
        usually don't count against API rate limits.

        Args:
            url: The full URL to request.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.
        """
        return self.delegate.get(url, headers, timeout)

    @override
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Acquire a rate limit token, then delegate POST request.

        This method blocks until a token is available if the rate limit
        has been reached.

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.
        """
        self._acquire_token()
        return self.delegate.post(url, data, headers, timeout)

    @override
    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """Acquire a rate limit token, then delegate streaming POST request."""
        self._acquire_token()
        return self.delegate.post_stream(url, data, headers, timeout)

Functions

__init__

__init__(delegate: HttpClient, max_requests: int, time_window: float, max_wait_time: float | None = 45.0)

Initialize the rate-limited HTTP client.

Parameters:

Name Type Description Default
delegate HttpClient

The underlying HTTP client to delegate requests to.

required
max_requests int

Maximum number of requests allowed in the time window.

required
time_window float

Time window in seconds for the rate limit.

required
max_wait_time float | None

Maximum time in seconds to wait for a token. If None, waits indefinitely. Default is 30 seconds.

45.0

Raises:

Type Description
AssertionError

If any parameter is invalid.

Source code in src/stkai/_rate_limit.py
def __init__(
    self,
    delegate: HttpClient,
    max_requests: int,
    time_window: float,
    max_wait_time: float | None = 45.0,
):
    """
    Initialize the rate-limited HTTP client.

    Args:
        delegate: The underlying HTTP client to delegate requests to.
        max_requests: Maximum number of requests allowed in the time window.
        time_window: Time window in seconds for the rate limit.
        max_wait_time: Maximum time in seconds to wait for a token.
            If None, waits indefinitely. Default is 30 seconds.

    Raises:
        AssertionError: If any parameter is invalid.
    """
    assert delegate is not None, "Delegate HTTP client is required."
    assert max_requests is not None, "max_requests cannot be None."
    assert max_requests > 0, "max_requests must be greater than 0."
    assert time_window is not None, "time_window cannot be None."
    assert time_window > 0, "time_window must be greater than 0."
    assert max_wait_time is None or max_wait_time > 0, "max_wait_time must be > 0 or None."

    self.delegate = delegate
    self.max_requests = max_requests
    self.time_window = time_window
    self.max_wait_time = max_wait_time

    # Token bucket state
    self._tokens = float(max_requests)
    self._last_refill = time.monotonic()
    self._lock = threading.Lock()

get

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Delegate GET request without rate limiting.

GET requests (typically polling) are not rate-limited as they usually don't count against API rate limits.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Source code in src/stkai/_rate_limit.py
@override
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Delegate GET request without rate limiting.

    GET requests (typically polling) are not rate-limited as they
    usually don't count against API rate limits.

    Args:
        url: The full URL to request.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.
    """
    return self.delegate.get(url, headers, timeout)

post

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Acquire a rate limit token, then delegate POST request.

This method blocks until a token is available if the rate limit has been reached.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Source code in src/stkai/_rate_limit.py
@override
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Acquire a rate limit token, then delegate POST request.

    This method blocks until a token is available if the rate limit
    has been reached.

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.
    """
    self._acquire_token()
    return self.delegate.post(url, data, headers, timeout)

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Acquire a rate limit token, then delegate streaming POST request.

Source code in src/stkai/_rate_limit.py
@override
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """Acquire a rate limit token, then delegate streaming POST request."""
    self._acquire_token()
    return self.delegate.post_stream(url, data, headers, timeout)

AdaptiveRateLimitedHttpClient

Bases: HttpClient

HTTP client decorator with adaptive rate limiting using AIMD algorithm.

Extends rate limiting with: - AIMD algorithm to adapt rate based on server responses - Floor protection to prevent deadlock - Configurable timeout to prevent indefinite blocking

When an HTTP 429 response is received, this client: 1. Applies AIMD penalty (reduces effective rate) 2. Raises requests.HTTPError for the caller/Retrying to handle

This follows the pattern of Resilience4J, Polly, and AWS SDK where rate limiting and retry are separate concerns. Use this client with Retrying for complete 429 handling with backoff.

Example

from stkai._rate_limit import AdaptiveRateLimitedHttpClient from stkai._http import StkCLIHttpClient client = AdaptiveRateLimitedHttpClient( ... delegate=StkCLIHttpClient(), ... max_requests=100, ... time_window=60.0, ... min_rate_floor=0.1, # Never below 10 req/min ... max_wait_time=45.0, # Give up after 45s waiting ... )

Parameters:

Name Type Description Default
delegate HttpClient

The underlying HTTP client to delegate requests to.

required
max_requests int

Maximum number of requests allowed in the time window.

required
time_window float

Time window in seconds for the rate limit.

required
min_rate_floor float

Minimum rate as fraction of max_requests (default: 0.1 = 10%).

0.1
penalty_factor float

Rate reduction factor on 429 (default: 0.3 = -30%).

0.3
recovery_factor float

Rate increase factor on success (default: 0.05 = +5%).

0.05
max_wait_time float | None

Maximum time in seconds to wait for a token. If None, waits indefinitely. Default is 45 seconds.

45.0

Raises:

Type Description
TokenAcquisitionTimeoutError

If max_wait_time is exceeded while waiting for a token.

HTTPError

When server returns HTTP 429 (after AIMD penalty applied).

Source code in src/stkai/_rate_limit.py
class AdaptiveRateLimitedHttpClient(HttpClient):
    """
    HTTP client decorator with adaptive rate limiting using AIMD algorithm.

    Extends rate limiting with:
    - AIMD algorithm to adapt rate based on server responses
    - Floor protection to prevent deadlock
    - Configurable timeout to prevent indefinite blocking

    When an HTTP 429 response is received, this client:
    1. Applies AIMD penalty (reduces effective rate)
    2. Raises requests.HTTPError for the caller/Retrying to handle

    This follows the pattern of Resilience4J, Polly, and AWS SDK where rate
    limiting and retry are separate concerns. Use this client with Retrying
    for complete 429 handling with backoff.

    Example:
        >>> from stkai._rate_limit import AdaptiveRateLimitedHttpClient
        >>> from stkai._http import StkCLIHttpClient
        >>> client = AdaptiveRateLimitedHttpClient(
        ...     delegate=StkCLIHttpClient(),
        ...     max_requests=100,
        ...     time_window=60.0,
        ...     min_rate_floor=0.1,  # Never below 10 req/min
        ...     max_wait_time=45.0,  # Give up after 45s waiting
        ... )

    Args:
        delegate: The underlying HTTP client to delegate requests to.
        max_requests: Maximum number of requests allowed in the time window.
        time_window: Time window in seconds for the rate limit.
        min_rate_floor: Minimum rate as fraction of max_requests (default: 0.1 = 10%).
        penalty_factor: Rate reduction factor on 429 (default: 0.3 = -30%).
        recovery_factor: Rate increase factor on success (default: 0.05 = +5%).
        max_wait_time: Maximum time in seconds to wait for a token. If None,
            waits indefinitely. Default is 45 seconds.

    Raises:
        TokenAcquisitionTimeoutError: If max_wait_time is exceeded while waiting for a token.
        requests.HTTPError: When server returns HTTP 429 (after AIMD penalty applied).
    """

    # Structural jitter applied to AIMD factors and sleep times.
    # ±20% desynchronizes processes sharing a quota, preventing
    # thundering herd effects and synchronized oscillations.
    _JITTER_FACTOR = 0.20

    def __init__(
        self,
        delegate: HttpClient,
        max_requests: int,
        time_window: float,
        min_rate_floor: float = 0.1,
        penalty_factor: float = 0.3,
        recovery_factor: float = 0.05,
        max_wait_time: float | None = 45.0,
    ):
        """
        Initialize the adaptive rate-limited HTTP client.

        Args:
            delegate: The underlying HTTP client to delegate requests to.
            max_requests: Maximum number of requests allowed in the time window.
            time_window: Time window in seconds for the rate limit.
            min_rate_floor: Minimum rate as fraction of max_requests (default: 0.1 = 10%).
            penalty_factor: Rate reduction factor on 429 (default: 0.3 = -30%).
            recovery_factor: Rate increase factor on success (default: 0.05 = +5%).
            max_wait_time: Maximum time in seconds to wait for a token.
                If None, waits indefinitely. Default is 30 seconds.

        Raises:
            AssertionError: If any parameter is invalid.
        """
        assert delegate is not None, "Delegate HTTP client is required."
        assert max_requests is not None, "max_requests cannot be None."
        assert max_requests > 0, "max_requests must be greater than 0."
        assert time_window is not None, "time_window cannot be None."
        assert time_window > 0, "time_window must be greater than 0."
        assert min_rate_floor is not None, "min_rate_floor cannot be None."
        assert 0 < min_rate_floor <= 1, "min_rate_floor must be between 0 (exclusive) and 1 (inclusive)."
        assert penalty_factor is not None, "penalty_factor cannot be None."
        assert 0 < penalty_factor < 1, "penalty_factor must be between 0 and 1 (exclusive)."
        assert recovery_factor is not None, "recovery_factor cannot be None."
        assert 0 < recovery_factor < 1, "recovery_factor must be between 0 and 1 (exclusive)."
        assert max_wait_time is None or max_wait_time > 0, "max_wait_time must be > 0 or None."

        self.delegate = delegate
        self.max_requests = max_requests
        self.time_window = time_window
        self.min_rate_floor = min_rate_floor
        self.penalty_factor = penalty_factor
        self.recovery_factor = recovery_factor
        self.max_wait_time = max_wait_time

        # Token bucket state (adaptive)
        self._effective_max = float(max_requests)
        self._min_effective = max_requests * min_rate_floor
        self._tokens = float(max_requests)
        self._last_refill = time.monotonic()
        self._lock = threading.Lock()

        # Structural jitter for desynchronizing processes
        self._jitter = Jitter(factor=self._JITTER_FACTOR)

    def _acquire_token(self) -> None:
        """
        Acquire a token using adaptive effective_max.

        Uses Token Bucket algorithm with adaptive rate based on 429 responses.
        Raises TokenAcquisitionTimeoutError if max_wait_time is exceeded.

        Raises:
            TokenAcquisitionTimeoutError: If waiting exceeds max_wait_time.
        """
        start_time = time.monotonic()

        while True:
            with self._lock:
                now = time.monotonic()
                elapsed_since_refill = now - self._last_refill
                refill_rate = self._effective_max / self.time_window
                self._tokens = min(
                    self._effective_max,
                    self._tokens + elapsed_since_refill * refill_rate
                )
                self._last_refill = now

                if self._tokens >= 1.0:
                    self._tokens -= 1.0
                    return

                wait_time = (1.0 - self._tokens) / refill_rate

            # Check timeout before sleeping
            if self.max_wait_time is not None:
                total_waited = time.monotonic() - start_time
                if total_waited + wait_time > self.max_wait_time:
                    raise TokenAcquisitionTimeoutError(
                        waited=total_waited,
                        max_wait_time=self.max_wait_time,
                    )

            # Sleep with jitter to prevent thundering herd
            sleep_with_jitter(wait_time, jitter_factor=self._JITTER_FACTOR)

    def _on_success(self) -> None:
        """
        Additive increase after successful request.

        Gradually recovers the effective rate after successful requests,
        up to the original max_requests ceiling.

        Uses jittered recovery factor to desynchronize processes
        and prevent collective oscillations.
        """
        with self._lock:
            recovery = self.max_requests * self.recovery_factor * self._jitter
            self._effective_max = min(
                float(self.max_requests),
                self._effective_max + recovery
            )

    def _on_rate_limited(self) -> None:
        """
        Multiplicative decrease after receiving 429.

        Reduces the effective rate to adapt to server-side rate limits,
        but never below the configured floor.

        Uses jittered penalty factor to desynchronize processes
        and prevent collective oscillations.

        Also clamps _tokens to maintain Token Bucket invariant: tokens <= effective_max.
        Without this, after penalization the tokens could exceed the new effective_max,
        breaking the bucket's capacity constraint.
        """
        with self._lock:
            jittered_penalty = self.penalty_factor * self._jitter

            old_max = self._effective_max
            self._effective_max = max(
                self._min_effective,
                self._effective_max * (1.0 - jittered_penalty)
            )
            # Clamp tokens to maintain invariant: tokens <= effective_max
            self._tokens = min(self._tokens, self._effective_max)
            logger.warning(
                f"Rate limit adapted: effective_max reduced from {old_max:.1f} to {self._effective_max:.1f}"
            )

    @override
    def get(
        self,
        url: str,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Delegate GET request without rate limiting.

        GET requests (typically polling) are not rate-limited as they
        usually don't count against API rate limits.

        Args:
            url: The full URL to request.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response.
        """
        return self.delegate.get(url, headers, timeout)

    @override
    def post(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """
        Acquire token, delegate request, adapt rate based on response.

        This method:
        1. Acquires a rate limit token (blocking if necessary)
        2. Delegates the request to the underlying client
        3. On success: gradually increases the effective rate (AIMD recovery)
        4. On 429: reduces effective rate (AIMD penalty) and raises HTTPError

        The 429 handling follows the separation of concerns pattern:
        - Rate limiter: applies AIMD penalty and raises exception
        - Retrying: handles retry with Retry-After header support

        Args:
            url: The full URL to request.
            data: JSON-serializable data to send in the request body.
            headers: Additional headers to include.
            timeout: Request timeout in seconds.

        Returns:
            The HTTP response (non-429 responses only).

        Raises:
            ServerSideRateLimitError: When server returns HTTP 429.
            TokenAcquisitionTimeoutError: When max_wait_time is exceeded.
        """
        self._acquire_token()
        response = self.delegate.post(url, data, headers, timeout)

        if response.status_code == 429:
            self._on_rate_limited()
            raise ServerSideRateLimitError(response)

        self._on_success()
        return response

    @override
    def post_stream(
        self,
        url: str,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        timeout: int = 30,
    ) -> requests.Response:
        """Acquire token, delegate streaming POST, adapt rate based on response."""
        self._acquire_token()
        response = self.delegate.post_stream(url, data, headers, timeout)

        if response.status_code == 429:
            self._on_rate_limited()
            raise ServerSideRateLimitError(response)

        self._on_success()
        return response

Functions

__init__

__init__(delegate: HttpClient, max_requests: int, time_window: float, min_rate_floor: float = 0.1, penalty_factor: float = 0.3, recovery_factor: float = 0.05, max_wait_time: float | None = 45.0)

Initialize the adaptive rate-limited HTTP client.

Parameters:

Name Type Description Default
delegate HttpClient

The underlying HTTP client to delegate requests to.

required
max_requests int

Maximum number of requests allowed in the time window.

required
time_window float

Time window in seconds for the rate limit.

required
min_rate_floor float

Minimum rate as fraction of max_requests (default: 0.1 = 10%).

0.1
penalty_factor float

Rate reduction factor on 429 (default: 0.3 = -30%).

0.3
recovery_factor float

Rate increase factor on success (default: 0.05 = +5%).

0.05
max_wait_time float | None

Maximum time in seconds to wait for a token. If None, waits indefinitely. Default is 30 seconds.

45.0

Raises:

Type Description
AssertionError

If any parameter is invalid.

Source code in src/stkai/_rate_limit.py
def __init__(
    self,
    delegate: HttpClient,
    max_requests: int,
    time_window: float,
    min_rate_floor: float = 0.1,
    penalty_factor: float = 0.3,
    recovery_factor: float = 0.05,
    max_wait_time: float | None = 45.0,
):
    """
    Initialize the adaptive rate-limited HTTP client.

    Args:
        delegate: The underlying HTTP client to delegate requests to.
        max_requests: Maximum number of requests allowed in the time window.
        time_window: Time window in seconds for the rate limit.
        min_rate_floor: Minimum rate as fraction of max_requests (default: 0.1 = 10%).
        penalty_factor: Rate reduction factor on 429 (default: 0.3 = -30%).
        recovery_factor: Rate increase factor on success (default: 0.05 = +5%).
        max_wait_time: Maximum time in seconds to wait for a token.
            If None, waits indefinitely. Default is 30 seconds.

    Raises:
        AssertionError: If any parameter is invalid.
    """
    assert delegate is not None, "Delegate HTTP client is required."
    assert max_requests is not None, "max_requests cannot be None."
    assert max_requests > 0, "max_requests must be greater than 0."
    assert time_window is not None, "time_window cannot be None."
    assert time_window > 0, "time_window must be greater than 0."
    assert min_rate_floor is not None, "min_rate_floor cannot be None."
    assert 0 < min_rate_floor <= 1, "min_rate_floor must be between 0 (exclusive) and 1 (inclusive)."
    assert penalty_factor is not None, "penalty_factor cannot be None."
    assert 0 < penalty_factor < 1, "penalty_factor must be between 0 and 1 (exclusive)."
    assert recovery_factor is not None, "recovery_factor cannot be None."
    assert 0 < recovery_factor < 1, "recovery_factor must be between 0 and 1 (exclusive)."
    assert max_wait_time is None or max_wait_time > 0, "max_wait_time must be > 0 or None."

    self.delegate = delegate
    self.max_requests = max_requests
    self.time_window = time_window
    self.min_rate_floor = min_rate_floor
    self.penalty_factor = penalty_factor
    self.recovery_factor = recovery_factor
    self.max_wait_time = max_wait_time

    # Token bucket state (adaptive)
    self._effective_max = float(max_requests)
    self._min_effective = max_requests * min_rate_floor
    self._tokens = float(max_requests)
    self._last_refill = time.monotonic()
    self._lock = threading.Lock()

    # Structural jitter for desynchronizing processes
    self._jitter = Jitter(factor=self._JITTER_FACTOR)

get

get(url: str, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Delegate GET request without rate limiting.

GET requests (typically polling) are not rate-limited as they usually don't count against API rate limits.

Parameters:

Name Type Description Default
url str

The full URL to request.

required
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response.

Source code in src/stkai/_rate_limit.py
@override
def get(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Delegate GET request without rate limiting.

    GET requests (typically polling) are not rate-limited as they
    usually don't count against API rate limits.

    Args:
        url: The full URL to request.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response.
    """
    return self.delegate.get(url, headers, timeout)

post

post(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Acquire token, delegate request, adapt rate based on response.

This method: 1. Acquires a rate limit token (blocking if necessary) 2. Delegates the request to the underlying client 3. On success: gradually increases the effective rate (AIMD recovery) 4. On 429: reduces effective rate (AIMD penalty) and raises HTTPError

The 429 handling follows the separation of concerns pattern: - Rate limiter: applies AIMD penalty and raises exception - Retrying: handles retry with Retry-After header support

Parameters:

Name Type Description Default
url str

The full URL to request.

required
data dict[str, Any] | None

JSON-serializable data to send in the request body.

None
headers dict[str, str] | None

Additional headers to include.

None
timeout int

Request timeout in seconds.

30

Returns:

Type Description
Response

The HTTP response (non-429 responses only).

Raises:

Type Description
ServerSideRateLimitError

When server returns HTTP 429.

TokenAcquisitionTimeoutError

When max_wait_time is exceeded.

Source code in src/stkai/_rate_limit.py
@override
def post(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """
    Acquire token, delegate request, adapt rate based on response.

    This method:
    1. Acquires a rate limit token (blocking if necessary)
    2. Delegates the request to the underlying client
    3. On success: gradually increases the effective rate (AIMD recovery)
    4. On 429: reduces effective rate (AIMD penalty) and raises HTTPError

    The 429 handling follows the separation of concerns pattern:
    - Rate limiter: applies AIMD penalty and raises exception
    - Retrying: handles retry with Retry-After header support

    Args:
        url: The full URL to request.
        data: JSON-serializable data to send in the request body.
        headers: Additional headers to include.
        timeout: Request timeout in seconds.

    Returns:
        The HTTP response (non-429 responses only).

    Raises:
        ServerSideRateLimitError: When server returns HTTP 429.
        TokenAcquisitionTimeoutError: When max_wait_time is exceeded.
    """
    self._acquire_token()
    response = self.delegate.post(url, data, headers, timeout)

    if response.status_code == 429:
        self._on_rate_limited()
        raise ServerSideRateLimitError(response)

    self._on_success()
    return response

post_stream

post_stream(url: str, data: dict[str, Any] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> requests.Response

Acquire token, delegate streaming POST, adapt rate based on response.

Source code in src/stkai/_rate_limit.py
@override
def post_stream(
    self,
    url: str,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> requests.Response:
    """Acquire token, delegate streaming POST, adapt rate based on response."""
    self._acquire_token()
    response = self.delegate.post_stream(url, data, headers, timeout)

    if response.status_code == 429:
        self._on_rate_limited()
        raise ServerSideRateLimitError(response)

    self._on_success()
    return response

Exceptions

ClientSideRateLimitError

Bases: RetryableError

Base exception for client-side rate limiting errors.

This is the base class for all rate limiting errors that originate from the client's rate limiter (TokenBucket, Adaptive, etc.), as opposed to server-side rate limiting (HTTP 429).

Extends RetryableError so all client-side rate limit errors are automatically retried by the Retrying context manager.

Example

try: ... client.post(url, data) ... except ClientSideRateLimitError as e: ... print(f"Client-side rate limit: {e}")

Source code in src/stkai/_rate_limit.py
class ClientSideRateLimitError(RetryableError):
    """
    Base exception for client-side rate limiting errors.

    This is the base class for all rate limiting errors that originate
    from the client's rate limiter (TokenBucket, Adaptive, etc.), as opposed
    to server-side rate limiting (HTTP 429).

    Extends RetryableError so all client-side rate limit errors are
    automatically retried by the Retrying context manager.

    Example:
        >>> try:
        ...     client.post(url, data)
        ... except ClientSideRateLimitError as e:
        ...     print(f"Client-side rate limit: {e}")
    """

    pass

TokenAcquisitionTimeoutError

Bases: ClientSideRateLimitError

Raised when rate limiter exceeds max_wait_time waiting for a token.

This exception indicates that a thread waited too long to acquire a rate limit token and gave up. This prevents threads from blocking indefinitely when rate limits are very restrictive.

Extends ClientSideRateLimitError (which extends RetryableError) so it's automatically retried by the Retrying context manager, following the pattern used by Resilience4J, Polly, failsafe-go, AWS SDK, and Spring Retry - where rate limit/throttling exceptions are retryable by default.

Attributes:

Name Type Description
waited

Time in seconds the thread waited before giving up.

max_wait_time

The configured maximum wait time.

Example

try: ... client.post(url, data) ... except TokenAcquisitionTimeoutError as e: ... print(f"Rate limit timeout after {e.waited:.1f}s")

Source code in src/stkai/_rate_limit.py
class TokenAcquisitionTimeoutError(ClientSideRateLimitError):
    """
    Raised when rate limiter exceeds max_wait_time waiting for a token.

    This exception indicates that a thread waited too long to acquire
    a rate limit token and gave up. This prevents threads from blocking
    indefinitely when rate limits are very restrictive.

    Extends ClientSideRateLimitError (which extends RetryableError) so it's
    automatically retried by the Retrying context manager, following the
    pattern used by Resilience4J, Polly, failsafe-go, AWS SDK, and Spring
    Retry - where rate limit/throttling exceptions are retryable by default.

    Attributes:
        waited: Time in seconds the thread waited before giving up.
        max_wait_time: The configured maximum wait time.

    Example:
        >>> try:
        ...     client.post(url, data)
        ... except TokenAcquisitionTimeoutError as e:
        ...     print(f"Rate limit timeout after {e.waited:.1f}s")
    """

    def __init__(self, waited: float, max_wait_time: float):
        self.waited = waited
        self.max_wait_time = max_wait_time
        super().__init__(
            f"Rate limit timeout: waited {waited:.2f}s, max_wait_time={max_wait_time:.2f}s"
        )

ServerSideRateLimitError

Bases: RetryableError

Raised when server returns HTTP 429 (Too Many Requests).

This exception indicates that the server has rate-limited the request. It wraps the original response so the Retry-After header can be extracted for calculating the appropriate wait time before retrying.

Extends RetryableError so it's automatically retried by the Retrying context manager. The Retrying class will extract the Retry-After header from the wrapped response to determine the wait time.

Only raised by AdaptiveRateLimitedHttpClient after applying AIMD penalty. Other clients (TokenBucket, no rate-limit) let HTTPError propagate directly.

Attributes:

Name Type Description
response

The original HTTP response with status code 429.

Example

try: ... client.post(url, data) ... except ServerSideRateLimitError as e: ... retry_after = e.response.headers.get("Retry-After") ... print(f"Server rate limited. Retry after: {retry_after}s")

Source code in src/stkai/_rate_limit.py
class ServerSideRateLimitError(RetryableError):
    """
    Raised when server returns HTTP 429 (Too Many Requests).

    This exception indicates that the server has rate-limited the request.
    It wraps the original response so the Retry-After header can be extracted
    for calculating the appropriate wait time before retrying.

    Extends RetryableError so it's automatically retried by the Retrying
    context manager. The Retrying class will extract the Retry-After header
    from the wrapped response to determine the wait time.

    Only raised by AdaptiveRateLimitedHttpClient after applying AIMD penalty.
    Other clients (TokenBucket, no rate-limit) let HTTPError propagate directly.

    Attributes:
        response: The original HTTP response with status code 429.

    Example:
        >>> try:
        ...     client.post(url, data)
        ... except ServerSideRateLimitError as e:
        ...     retry_after = e.response.headers.get("Retry-After")
        ...     print(f"Server rate limited. Retry after: {retry_after}s")
    """

    def __init__(self, response: requests.Response):
        self.response = response
        super().__init__("Server rate limit exceeded (HTTP 429)")

Authentication

AuthProvider

Bases: ABC

Abstract base class for authentication providers.

Implementations are responsible for obtaining and managing access tokens. All implementations must be thread-safe.

Example

class MyAuthProvider(AuthProvider): ... def get_access_token(self) -> str: ... return "my-token" ... auth = MyAuthProvider() headers = auth.get_auth_headers()

{"Authorization": "Bearer my-token"}

Source code in src/stkai/_auth.py
class AuthProvider(ABC):
    """
    Abstract base class for authentication providers.

    Implementations are responsible for obtaining and managing access tokens.
    All implementations must be thread-safe.

    Example:
        >>> class MyAuthProvider(AuthProvider):
        ...     def get_access_token(self) -> str:
        ...         return "my-token"
        ...
        >>> auth = MyAuthProvider()
        >>> headers = auth.get_auth_headers()
        >>> # {"Authorization": "Bearer my-token"}
    """

    @abstractmethod
    def get_access_token(self) -> str:
        """
        Obtain a valid access token.

        Returns:
            Access token string (without "Bearer" prefix).

        Raises:
            AuthenticationError: If unable to obtain a valid token.
        """
        pass

    def get_auth_headers(self) -> dict[str, str]:
        """
        Return authorization headers for HTTP requests.

        Returns:
            Dict with Authorization header containing Bearer token.

        Example:
            >>> headers = auth.get_auth_headers()
            >>> # {"Authorization": "Bearer eyJ..."}
        """
        return {"Authorization": f"Bearer {self.get_access_token()}"}

Functions

get_access_token abstractmethod

get_access_token() -> str

Obtain a valid access token.

Returns:

Type Description
str

Access token string (without "Bearer" prefix).

Raises:

Type Description
AuthenticationError

If unable to obtain a valid token.

Source code in src/stkai/_auth.py
@abstractmethod
def get_access_token(self) -> str:
    """
    Obtain a valid access token.

    Returns:
        Access token string (without "Bearer" prefix).

    Raises:
        AuthenticationError: If unable to obtain a valid token.
    """
    pass

get_auth_headers

get_auth_headers() -> dict[str, str]

Return authorization headers for HTTP requests.

Returns:

Type Description
dict[str, str]

Dict with Authorization header containing Bearer token.

Example

headers = auth.get_auth_headers()

{"Authorization": "Bearer eyJ..."}
Source code in src/stkai/_auth.py
def get_auth_headers(self) -> dict[str, str]:
    """
    Return authorization headers for HTTP requests.

    Returns:
        Dict with Authorization header containing Bearer token.

    Example:
        >>> headers = auth.get_auth_headers()
        >>> # {"Authorization": "Bearer eyJ..."}
    """
    return {"Authorization": f"Bearer {self.get_access_token()}"}

ClientCredentialsAuthProvider

Bases: AuthProvider

OAuth2 Client Credentials flow for StackSpot.

This provider implements the OAuth2 client credentials grant type, which is used for machine-to-machine authentication.

Features
  • Token caching: Avoids unnecessary token requests.
  • Auto-refresh: Automatically refreshes tokens before expiration.
  • Thread-safe: Safe for use across multiple threads.

Attributes:

Name Type Description
DEFAULT_TOKEN_URL

Default StackSpot OAuth2 token endpoint.

DEFAULT_REFRESH_MARGIN

Seconds before expiration to refresh (60s).

Example

auth = ClientCredentialsAuthProvider( ... client_id="my-client-id", ... client_secret="my-client-secret", ... ) headers = auth.get_auth_headers()

{"Authorization": "Bearer eyJ..."}

Parameters:

Name Type Description Default
client_id str

StackSpot client ID.

required
client_secret str

StackSpot client secret.

required
token_url str

OAuth2 token endpoint URL.

DEFAULT_TOKEN_URL
refresh_margin int

Seconds before expiration to trigger refresh.

DEFAULT_REFRESH_MARGIN
Source code in src/stkai/_auth.py
class ClientCredentialsAuthProvider(AuthProvider):
    """
    OAuth2 Client Credentials flow for StackSpot.

    This provider implements the OAuth2 client credentials grant type,
    which is used for machine-to-machine authentication.

    Features:
        - Token caching: Avoids unnecessary token requests.
        - Auto-refresh: Automatically refreshes tokens before expiration.
        - Thread-safe: Safe for use across multiple threads.

    Attributes:
        DEFAULT_TOKEN_URL: Default StackSpot OAuth2 token endpoint.
        DEFAULT_REFRESH_MARGIN: Seconds before expiration to refresh (60s).

    Example:
        >>> auth = ClientCredentialsAuthProvider(
        ...     client_id="my-client-id",
        ...     client_secret="my-client-secret",
        ... )
        >>> headers = auth.get_auth_headers()
        >>> # {"Authorization": "Bearer eyJ..."}

    Args:
        client_id: StackSpot client ID.
        client_secret: StackSpot client secret.
        token_url: OAuth2 token endpoint URL.
        refresh_margin: Seconds before expiration to trigger refresh.
    """

    DEFAULT_TOKEN_URL = "https://idm.stackspot.com/stackspot-dev/oidc/oauth/token"
    DEFAULT_REFRESH_MARGIN = 60  # Refresh 1 min before expiration

    def __init__(
        self,
        client_id: str,
        client_secret: str,
        token_url: str = DEFAULT_TOKEN_URL,
        refresh_margin: int = DEFAULT_REFRESH_MARGIN,
    ):
        assert client_id, "client_id cannot be empty"
        assert client_secret, "client_secret cannot be empty"

        self._client_id = client_id
        self._client_secret = client_secret
        self._token_url = token_url
        self._refresh_margin = refresh_margin

        self._token: TokenInfo | None = None
        self._lock = threading.Lock()

    def get_access_token(self) -> str:
        """
        Obtain a valid access token, fetching a new one if necessary.

        This method is thread-safe. If the current token is valid (not expired
        and not within the refresh margin), it returns the cached token.
        Otherwise, it fetches a new token from the OAuth2 endpoint.

        Returns:
            Valid access token string.

        Raises:
            AuthenticationError: If unable to obtain a valid token.
        """
        with self._lock:
            if self._is_token_valid():
                assert self._token is not None  # for type checker
                return self._token.access_token

            self._token = self._fetch_new_token()
            return self._token.access_token

    def _is_token_valid(self) -> bool:
        """Check if current token exists and is not near expiration."""
        if self._token is None:
            return False
        return time.time() < (self._token.expires_at - self._refresh_margin)

    def _fetch_new_token(self) -> TokenInfo:
        """
        Fetch a new token from the OAuth2 endpoint.

        Returns:
            TokenInfo with the new access token and expiration time.

        Raises:
            AuthenticationError: If the token request fails.
        """
        try:
            response = requests.post(
                self._token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self._client_id,
                    "client_secret": self._client_secret,
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                timeout=30,
            )
            response.raise_for_status()

            data = response.json()
            expires_in = data.get("expires_in", 1199)

            return TokenInfo(
                access_token=data["access_token"],
                expires_at=time.time() + expires_in,
            )

        except requests.HTTPError as e:
            raise AuthenticationError(
                f"Failed to obtain access token (HTTP {e.response.status_code}): {e}",
                cause=e,
            ) from e
        except requests.RequestException as e:
            raise AuthenticationError(
                f"Failed to obtain access token: {e}",
                cause=e,
            ) from e
        except KeyError as e:
            raise AuthenticationError(
                f"Invalid token response: missing '{e}' field",
                cause=e,
            ) from e

Functions

get_access_token

get_access_token() -> str

Obtain a valid access token, fetching a new one if necessary.

This method is thread-safe. If the current token is valid (not expired and not within the refresh margin), it returns the cached token. Otherwise, it fetches a new token from the OAuth2 endpoint.

Returns:

Type Description
str

Valid access token string.

Raises:

Type Description
AuthenticationError

If unable to obtain a valid token.

Source code in src/stkai/_auth.py
def get_access_token(self) -> str:
    """
    Obtain a valid access token, fetching a new one if necessary.

    This method is thread-safe. If the current token is valid (not expired
    and not within the refresh margin), it returns the cached token.
    Otherwise, it fetches a new token from the OAuth2 endpoint.

    Returns:
        Valid access token string.

    Raises:
        AuthenticationError: If unable to obtain a valid token.
    """
    with self._lock:
        if self._is_token_valid():
            assert self._token is not None  # for type checker
            return self._token.access_token

        self._token = self._fetch_new_token()
        return self._token.access_token

AuthenticationError

Bases: Exception

Raised when authentication fails.

This exception is raised when the authentication provider fails to obtain or refresh an access token.

Attributes:

Name Type Description
message

Description of the authentication failure.

cause

The underlying exception that caused the failure, if any.

Example

try: ... token = auth.get_access_token() ... except AuthenticationError as e: ... print(f"Auth failed: {e}")

Source code in src/stkai/_auth.py
class AuthenticationError(Exception):
    """
    Raised when authentication fails.

    This exception is raised when the authentication provider fails to
    obtain or refresh an access token.

    Attributes:
        message: Description of the authentication failure.
        cause: The underlying exception that caused the failure, if any.

    Example:
        >>> try:
        ...     token = auth.get_access_token()
        ... except AuthenticationError as e:
        ...     print(f"Auth failed: {e}")
    """

    def __init__(self, message: str, cause: Exception | None = None):
        super().__init__(message)
        self.message = message
        self.cause = cause

create_standalone_auth

create_standalone_auth(config: AuthConfig | None = None) -> ClientCredentialsAuthProvider

Create a ClientCredentialsAuthProvider from configuration.

This helper function creates an auth provider using credentials from the provided config or from the global STKAI.config.

Parameters:

Name Type Description Default
config AuthConfig | None

Optional AuthConfig with credentials. If None, uses STKAI.config.auth from global configuration.

None

Returns:

Type Description
ClientCredentialsAuthProvider

Configured ClientCredentialsAuthProvider instance.

Raises:

Type Description
ValueError

If credentials are not configured.

Example

from stkai import STKAI STKAI.configure(auth={"client_id": "x", "client_secret": "y"}) auth = create_standalone_auth()

Uses credentials from global config

Source code in src/stkai/_auth.py
def create_standalone_auth(config: AuthConfig | None = None) -> ClientCredentialsAuthProvider:
    """
    Create a ClientCredentialsAuthProvider from configuration.

    This helper function creates an auth provider using credentials from
    the provided config or from the global STKAI.config.

    Args:
        config: Optional AuthConfig with credentials. If None, uses
            STKAI.config.auth from global configuration.

    Returns:
        Configured ClientCredentialsAuthProvider instance.

    Raises:
        ValueError: If credentials are not configured.

    Example:
        >>> from stkai import STKAI
        >>> STKAI.configure(auth={"client_id": "x", "client_secret": "y"})
        >>> auth = create_standalone_auth()
        >>> # Uses credentials from global config
    """
    if config is None:
        from stkai._config import STKAI

        config = STKAI.config.auth

    if not config.has_credentials():
        raise ValueError(
            "Client credentials not configured. "
            "Set client_id and client_secret via STKAI.configure() or environment variables "
            "(STKAI_AUTH_CLIENT_ID, STKAI_AUTH_CLIENT_SECRET)."
        )

    return ClientCredentialsAuthProvider(
        client_id=config.client_id,  # type: ignore[arg-type]
        client_secret=config.client_secret,  # type: ignore[arg-type]
        token_url=config.token_url,
    )