Skip to content

Agents API Reference

Complete API reference for AI Agents.

Main Classes

Agent

Synchronous client for interacting with StackSpot AI Agents.

This client provides a high-level interface for sending messages to Agents and receiving responses, with support for:

  • Single message requests (blocking)
  • Conversation context for multi-turn interactions
  • Knowledge source integration
  • Token usage tracking
Example

from stkai.agents import Agent, ChatRequest agent = Agent(agent_id="my-agent-slug") response = agent.chat( ... request=ChatRequest(user_prompt="What is SOLID?") ... ) if response.is_success(): ... print(response.result)

Attributes:

Name Type Description
agent_id

The Agent ID (slug) to interact with.

base_url

The base URL for the StackSpot AI API.

options

Configuration options for the client.

http_client HttpClient

HTTP client for API calls (default: EnvironmentAwareHttpClient).

Source code in src/stkai/agents/_agent.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
class Agent:
    """
    Synchronous client for interacting with StackSpot AI Agents.

    This client provides a high-level interface for sending messages to Agents
    and receiving responses, with support for:

    - Single message requests (blocking)
    - Conversation context for multi-turn interactions
    - Knowledge source integration
    - Token usage tracking

    Example:
        >>> from stkai.agents import Agent, ChatRequest
        >>> agent = Agent(agent_id="my-agent-slug")
        >>> response = agent.chat(
        ...     request=ChatRequest(user_prompt="What is SOLID?")
        ... )
        >>> if response.is_success():
        ...     print(response.result)

    Attributes:
        agent_id: The Agent ID (slug) to interact with.
        base_url: The base URL for the StackSpot AI API.
        options: Configuration options for the client.
        http_client: HTTP client for API calls (default: EnvironmentAwareHttpClient).
    """

    def __init__(
        self,
        agent_id: str,
        base_url: str | None = None,
        options: AgentOptions | None = None,
        http_client: HttpClient | None = None,
    ):
        """
        Initialize the Agent client.

        Args:
            agent_id: The Agent ID (slug) to interact with.
            base_url: Base URL for the StackSpot AI API.
                If None, uses global config (STKAI.config.agent.base_url).
            options: Configuration options for the client.
                If None, uses defaults from global config (STKAI.config.agent).
                Partial options are merged with config defaults via with_defaults_from().
            http_client: Custom HTTP client implementation for API calls.
                If None, uses EnvironmentAwareHttpClient (auto-detects CLI or standalone).

        Raises:
            AssertionError: If agent_id is empty.
        """
        # Get global config for defaults
        from stkai._config import STKAI
        cfg = STKAI.config.agent

        # Resolve options with defaults from config (Single Source of Truth)
        resolved_options = (options or AgentOptions()).with_defaults_from(cfg)

        # Resolve base_url
        if base_url is None:
            base_url = cfg.base_url

        if not http_client:
            from stkai._http import EnvironmentAwareHttpClient
            http_client = EnvironmentAwareHttpClient()

        # Validations
        assert agent_id, "Agent ID cannot be empty."
        assert base_url, "Agent base_url cannot be empty."
        assert http_client is not None, "Agent http_client cannot be None."

        assert resolved_options.max_workers is not None, "Thread-pool max_workers can not be None."
        assert resolved_options.max_workers > 0, "Thread-pool max_workers must be greater than 0."

        self.agent_id = agent_id
        self.base_url = base_url.rstrip("/")
        self.options = resolved_options
        self.max_workers = resolved_options.max_workers
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
        self.http_client: HttpClient = http_client

    def chat(
        self,
        request: ChatRequest,
        result_handler: ChatResultHandler | None = None,
    ) -> ChatResponse:
        """
        Send a message to the Agent and wait for the response (blocking).

        This method sends a user prompt to the Agent and blocks until
        a response is received or an error occurs.

        If retry is configured (retry_max_retries > 0), automatically retries on:
        - HTTP 5xx errors (500, 502, 503, 504)
        - Network errors (Timeout, ConnectionError)

        Does NOT retry on:
        - HTTP 4xx errors (client errors)

        Note:
            retry_max_retries=0 means 1 attempt (no retry).
            retry_max_retries=3 means 4 attempts (1 original + 3 retries).

        Args:
            request: The request containing the user prompt and options.
            result_handler: Optional handler to process the response message.
                If None, uses RawResultHandler (returns message as-is).
                Use JSON_RESULT_HANDLER to parse JSON responses.

        Returns:
            ChatResponse with the Agent's reply or error information.
            The `result` field contains the processed result from the handler.

        Raises:
            ChatResultHandlerError: If the result handler fails to process the response.

        Example:
            >>> # Single message (default RawResultHandler)
            >>> response = agent.chat(
            ...     request=ChatRequest(user_prompt="Hello!")
            ... )
            >>> print(response.result)  # Same as response.raw_result
            >>>
            >>> # Parse JSON response
            >>> from stkai.agents import JSON_RESULT_HANDLER
            >>> response = agent.chat(request, result_handler=JSON_RESULT_HANDLER)
            >>> print(response.result)  # Parsed dict
            >>>
            >>> # With conversation context
            >>> resp1 = agent.chat(
            ...     request=ChatRequest(
            ...         user_prompt="What is Python?",
            ...         use_conversation=True
            ...     )
            ... )
            >>> resp2 = agent.chat(
            ...     request=ChatRequest(
            ...         user_prompt="What are its main features?",
            ...         conversation_id=resp1.conversation_id,
            ...         use_conversation=True
            ...     )
            ... )
        """
        logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Starting chat with agent '{self.agent_id}'.")
        logger.info(f"{request.id[:26]:<26} | Agent |    ├ base_url={self.base_url}")
        logger.info(f"{request.id[:26]:<26} | Agent |    └ agent_id='{self.agent_id}'")

        response = self._do_chat(
            request=request,
            result_handler=result_handler
        )

        logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Chat finished.")
        if response.is_success():
            logger.info(f"{request.id[:26]:<26} | Agent |    └ with status = {response.status}")
        else:
            logger.info(f"{request.id[:26]:<26} | Agent |    ├ with status = {response.status}")
            logger.info(f"{request.id[:26]:<26} | Agent |    └ with error message = \"{response.error}\"")

        assert response.request is request, \
            "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
        return response

    def chat_many(
        self,
        request_list: list[ChatRequest],
        result_handler: ChatResultHandler | None = None,
    ) -> list[ChatResponse]:
        """
        Send multiple chat messages concurrently, wait for all responses (blocking),
        and return them in the same order as `request_list`.

        Each request is executed in parallel threads using the internal thread-pool.
        Returns a list of ChatResponse objects in the same order as `request_list`.

        Args:
            request_list: List of ChatRequest objects to send.
            result_handler: Optional handler to process the response message.
                If None, uses RawResultHandler (returns message as-is).

        Returns:
            List[ChatResponse]: One response per request, in the same order.

        Example:
            >>> requests = [
            ...     ChatRequest(user_prompt="What is Python?"),
            ...     ChatRequest(user_prompt="What is Java?"),
            ... ]
            >>> responses = agent.chat_many(requests)
            >>> for resp in responses:
            ...     print(resp.result)
        """
        if not request_list:
            return []

        logger.info(
            f"{'Agent-Batch-Execution'[:26]:<26} | Agent | "
            f"🛜 Starting batch execution of {len(request_list)} requests."
        )
        logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ base_url={self.base_url}")
        logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ agent_id='{self.agent_id}'")
        logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    └ max_concurrent={self.max_workers}")

        # Warn about race condition: chat_many inside UseConversation without a pre-set conversation_id
        if len(request_list) > 1:
            conv_ctx = ConversationScope.get_current()
            if conv_ctx and not conv_ctx.has_conversation_id():
                logger.warning(
                    f"{'Agent-Batch-Execution'[:26]:<26} | Agent | "
                    "⚠️ chat_many() called inside UseConversation without a pre-set conversation_id. "
                    "Concurrent requests will race to capture the server-assigned ID, "
                    "likely starting independent conversations. "
                    "Consider using UseConversation.with_generated_id() or passing an explicit conversation_id."
                )

        # Use thread-pool for parallel calls to `_do_chat`
        # The ConversationScope::propagate method captures the active UseConversation context (if any)
        # and installs it in each worker thread — only conversation state is propagated.
        future_to_index = {
            self.executor.submit(
                ConversationScope.propagate(self._do_chat), # propagate conversation context
                request=req,                                # arg-1
                result_handler=result_handler,              # arg-2
            ): idx
            for idx, req in enumerate(request_list)
        }

        # Block and wait for all responses to be finished
        responses_map: dict[int, ChatResponse] = {}

        for future in as_completed(future_to_index):
            idx = future_to_index[future]
            correlated_request = request_list[idx]
            try:
                responses_map[idx] = future.result()
            except Exception as e:
                logger.error(
                    f"{correlated_request.id[:26]:<26} | Agent | ❌ Chat failed in batch(seq={idx}). {e}",
                    exc_info=logger.isEnabledFor(logging.DEBUG)
                )
                responses_map[idx] = ChatResponse(
                    request=correlated_request,
                    status=ChatStatus.ERROR,
                    error=str(e),
                )

        # Rebuild responses list in the same order of requests list
        responses = [
            responses_map[i] for i in range(len(request_list))
        ]

        # Race-condition check: ensure both lists have the same length
        assert len(responses) == len(request_list), (
            f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
        )
        # Race-condition check: ensure each response points to its respective request
        assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
            "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
        )

        logger.info(
            f"{'Agent-Batch-Execution'[:26]:<26} | Agent | 🛜 Batch execution finished."
        )
        logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ total of responses = {len(responses)}")

        from collections import Counter
        totals_per_status = Counter(r.status for r in responses)
        items = totals_per_status.items()
        for idx, (status, total) in enumerate(items):
            icon = "└" if idx == (len(items) - 1) else "├"
            logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    {icon} total of responses with status {status:<7} = {total}")

        return responses

    def chat_stream(
        self,
        request: ChatRequest,
        result_handler: ChatResultHandler | None = None,
        event_parser: SseEventParser | None = None,
    ) -> ChatResponseStream:
        """
        Send a message to the Agent and return a streaming response.

        Returns a ``ChatResponseStream`` context manager that yields SSE events
        as they arrive from the server. Must be used with ``with``::

            with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream:
                for event in stream:
                    if event.is_delta:
                        print(event.text, end="", flush=True)
                print(f"\\nTokens: {stream.response.tokens.total}")

        If retry is configured, retries only the initial connection (before
        the stream begins). Mid-stream errors are not retried.

        ``chat_many`` + stream is **not supported** — streaming is real-time
        by nature and batch execution defeats its purpose.

        Args:
            request: The request containing the user prompt and options.
            result_handler: Optional handler to process the final accumulated text.
                If None, uses RawResultHandler (returns accumulated text as-is).
                The handler is applied once after the stream is fully consumed,
                over the complete accumulated text — not per chunk.
            event_parser: Optional SSE event parser. If None, uses the default
                ``SseEventParser``. Subclass ``SseEventParser`` and pass an
                instance here to handle protocol changes without waiting for
                a new SDK release.

        Returns:
            A ChatResponseStream context manager for iterating SSE events.

        Raises:
            requests.HTTPError: If the initial HTTP request fails (after retries).
            RuntimeError: If the HTTP client does not support streaming.

        Example:
            >>> with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream:
            ...     for text in stream.text_stream:
            ...         print(text, end="", flush=True)
            >>>
            >>> # With JSON result handler
            >>> from stkai.agents import JSON_RESULT_HANDLER
            >>> with agent.chat_stream(request, result_handler=JSON_RESULT_HANDLER) as stream:
            ...     response = stream.get_final_response()
            ...     print(response.result)  # Parsed dict
        """
        logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Starting streaming chat with agent '{self.agent_id}'.")
        logger.info(f"{request.id[:26]:<26} | Agent |    ├ base_url={self.base_url}")
        logger.info(f"{request.id[:26]:<26} | Agent |    └ agent_id='{self.agent_id}'")

        # Assertion for type narrowing (mypy)
        assert self.options.request_timeout is not None, \
            "🌀 Sanity check | request_timeout must be set after with_defaults_from()"
        assert self.options.retry_max_retries is not None, \
            "🌀 Sanity check | retry_max_retries must be set after with_defaults_from()"
        assert self.options.retry_initial_delay is not None, \
            "🌀 Sanity check | retry_initial_delay must be set after with_defaults_from()"

        # Build payload with streaming=True
        payload = request.to_api_payload()
        payload["streaming"] = True

        # Apply UseConversation context (if active and request has no explicit conversation_id)
        conv_ctx = ConversationScope.get_current()
        if conv_ctx is not None and not request.conversation_id:
            payload["use_conversation"] = True
            if conv_ctx.conversation_id:
                payload["conversation_id"] = conv_ctx.conversation_id

        url = f"{self.base_url}/v1/agent/{self.agent_id}/chat"

        # Retry only the initial connection
        for attempt in Retrying(
            max_retries=self.options.retry_max_retries,
            initial_delay=self.options.retry_initial_delay,
            logger_prefix=f"{request.id[:26]:<26} | Agent",
        ):
            with attempt:
                logger.info(
                    f"{request.id[:26]:<26} | Agent | "
                    f"Opening stream to agent '{self.agent_id}' (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
                )

                http_response = self.http_client.post_stream(
                    url=url,
                    data=payload,
                    timeout=self.options.request_timeout,
                )
                assert isinstance(http_response, requests.Response), \
                    f"🌀 Sanity check | Object returned by `post_stream` is not an instance of `requests.Response`. ({http_response.__class__})"

                http_response.raise_for_status()

                logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Stream opened successfully.")

                def _track_conversation(response: ChatResponse) -> None:
                    if conv_ctx is not None and response.conversation_id:
                        conv_ctx.update_if_absent(conversation_id=response.conversation_id)

                stream = ChatResponseStream(
                    request=request,
                    http_response=http_response,
                    result_handler=result_handler,
                    event_parser=event_parser,
                    on_response=_track_conversation if conv_ctx else None,
                )

                return stream

        # Should never reach here - Retrying raises MaxRetriesExceededError
        raise RuntimeError(
            "Unexpected error while opening stream: "
            "reached end of `chat_stream` method without returning a ChatResponseStream."
        )

    def _do_chat(
        self,
        request: ChatRequest,
        result_handler: ChatResultHandler | None = None,
    ) -> ChatResponse:
        """
        Internal method that executes the full chat workflow: retry, HTTP request,
        response parsing, and error handling.

        Always returns a ChatResponse (never raises exceptions).
        Called by both chat() and chat_many().

        Args:
            request: The request containing the user prompt and options.
            result_handler: Optional handler to process the response message.

        Returns:
            ChatResponse with the Agent's reply or error information.
        """
        assert request, "🌀 Sanity check | Chat-Request can not be None."
        assert request.id, "🌀 Sanity check | Chat-Request ID can not be None."

        # Assertion for type narrowing (mypy)
        assert self.options.request_timeout is not None, \
            "🌀 Sanity check | request_timeout must be set after with_defaults_from()"
        assert self.options.retry_max_retries is not None, \
            "🌀 Sanity check | retry_max_retries must be set after with_defaults_from()"
        assert self.options.retry_initial_delay is not None, \
            "🌀 Sanity check | retry_initial_delay must be set after with_defaults_from()"

        try:
            for attempt in Retrying(
                max_retries=self.options.retry_max_retries,
                initial_delay=self.options.retry_initial_delay,
                logger_prefix=f"{request.id[:26]:<26} | Agent",
            ):
                with attempt:
                    logger.info(
                        f"{request.id[:26]:<26} | Agent | "
                        f"Sending message to agent '{self.agent_id}' (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
                    )

                    # HTTP request
                    payload = request.to_api_payload()

                    # Apply UseConversation context (if active and request has no explicit conversation_id)
                    conv_ctx = ConversationScope.get_current()
                    if conv_ctx is not None and not request.conversation_id:
                        payload["use_conversation"] = True
                        if conv_ctx.conversation_id:
                            payload["conversation_id"] = conv_ctx.conversation_id

                    sent_conv_id = payload.get("conversation_id")
                    if sent_conv_id:
                        logger.debug(
                            f"{request.id[:26]:<26} | Agent | Request sent with conversation_id='{sent_conv_id}'"
                        )

                    url = f"{self.base_url}/v1/agent/{self.agent_id}/chat"
                    http_response = self.http_client.post(
                        url=url,
                        data=payload,
                        timeout=self.options.request_timeout,
                    )
                    assert isinstance(http_response, requests.Response), \
                        f"🌀 Sanity check | Object returned by `post` method is not an instance of `requests.Response`. ({http_response.__class__})"

                    http_response.raise_for_status()
                    response_data = http_response.json()
                    raw_message = response_data.get("message")

                    # Process result through handler
                    if not result_handler:
                        from stkai.agents._handlers import DEFAULT_RESULT_HANDLER
                        result_handler = DEFAULT_RESULT_HANDLER

                    try:
                        from stkai.agents._handlers import ChatResultContext
                        context = ChatResultContext(request=request, raw_result=raw_message)
                        processed_result = result_handler.handle_result(context)
                    except Exception as e:
                        handler_name = type(result_handler).__name__
                        raise ChatResultHandlerError(
                            f"{request.id} | Agent | Result handler '{handler_name}' failed: {e}",
                            cause=e, result_handler=result_handler,
                        ) from e

                    response = ChatResponse(
                        request=request,
                        status=ChatStatus.SUCCESS,
                        result=processed_result,
                        raw_response=response_data,
                    )

                    # Auto-track conversation_id in UseConversation context
                    if conv_ctx is not None and response.conversation_id:
                        conv_ctx.update_if_absent(conversation_id=response.conversation_id)

                    logger.info(
                        f"{request.id[:26]:<26} | Agent | "
                        f"✅ Response received successfully (tokens: {response.tokens.total if response.tokens else 'N/A'})"
                    )
                    if response.conversation_id:
                        logger.debug(
                            f"{request.id[:26]:<26} | Agent | Response received with conversation_id='{response.conversation_id}'"
                        )

                    assert response.request is request, \
                        "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
                    return response

            # Should never reach here - Retrying raises MaxRetriesExceededError
            raise RuntimeError(
                "Unexpected error while chatting the agent: "
                "reached end of `_do_chat` method without returning a response."
            )

        except Exception as e:
            error_status = ChatStatus.from_exception(e)
            error_msg = f"Chat message failed: {e}"
            if isinstance(e, requests.HTTPError) and e.response is not None:
                error_msg = f"Chat message failed due to an HTTP error {e.response.status_code}: {e.response.text}"
            logger.error(
                f"{request.id[:26]:<26} | Agent | ❌ {error_msg}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            return ChatResponse(
                request=request,
                status=error_status,
                error=error_msg,
            )

Functions

chat

chat(request: ChatRequest, result_handler: ChatResultHandler | None = None) -> ChatResponse

Send a message to the Agent and wait for the response (blocking).

This method sends a user prompt to the Agent and blocks until a response is received or an error occurs.

If retry is configured (retry_max_retries > 0), automatically retries on: - HTTP 5xx errors (500, 502, 503, 504) - Network errors (Timeout, ConnectionError)

Does NOT retry on: - HTTP 4xx errors (client errors)

Note

retry_max_retries=0 means 1 attempt (no retry). retry_max_retries=3 means 4 attempts (1 original + 3 retries).

Parameters:

Name Type Description Default
request ChatRequest

The request containing the user prompt and options.

required
result_handler ChatResultHandler | None

Optional handler to process the response message. If None, uses RawResultHandler (returns message as-is). Use JSON_RESULT_HANDLER to parse JSON responses.

None

Returns:

Type Description
ChatResponse

ChatResponse with the Agent's reply or error information.

ChatResponse

The result field contains the processed result from the handler.

Raises:

Type Description
ChatResultHandlerError

If the result handler fails to process the response.

Example
Single message (default RawResultHandler)

response = agent.chat( ... request=ChatRequest(user_prompt="Hello!") ... ) print(response.result) # Same as response.raw_result

Parse JSON response

from stkai.agents import JSON_RESULT_HANDLER response = agent.chat(request, result_handler=JSON_RESULT_HANDLER) print(response.result) # Parsed dict

With conversation context

resp1 = agent.chat( ... request=ChatRequest( ... user_prompt="What is Python?", ... use_conversation=True ... ) ... ) resp2 = agent.chat( ... request=ChatRequest( ... user_prompt="What are its main features?", ... conversation_id=resp1.conversation_id, ... use_conversation=True ... ) ... )

Source code in src/stkai/agents/_agent.py
def chat(
    self,
    request: ChatRequest,
    result_handler: ChatResultHandler | None = None,
) -> ChatResponse:
    """
    Send a message to the Agent and wait for the response (blocking).

    This method sends a user prompt to the Agent and blocks until
    a response is received or an error occurs.

    If retry is configured (retry_max_retries > 0), automatically retries on:
    - HTTP 5xx errors (500, 502, 503, 504)
    - Network errors (Timeout, ConnectionError)

    Does NOT retry on:
    - HTTP 4xx errors (client errors)

    Note:
        retry_max_retries=0 means 1 attempt (no retry).
        retry_max_retries=3 means 4 attempts (1 original + 3 retries).

    Args:
        request: The request containing the user prompt and options.
        result_handler: Optional handler to process the response message.
            If None, uses RawResultHandler (returns message as-is).
            Use JSON_RESULT_HANDLER to parse JSON responses.

    Returns:
        ChatResponse with the Agent's reply or error information.
        The `result` field contains the processed result from the handler.

    Raises:
        ChatResultHandlerError: If the result handler fails to process the response.

    Example:
        >>> # Single message (default RawResultHandler)
        >>> response = agent.chat(
        ...     request=ChatRequest(user_prompt="Hello!")
        ... )
        >>> print(response.result)  # Same as response.raw_result
        >>>
        >>> # Parse JSON response
        >>> from stkai.agents import JSON_RESULT_HANDLER
        >>> response = agent.chat(request, result_handler=JSON_RESULT_HANDLER)
        >>> print(response.result)  # Parsed dict
        >>>
        >>> # With conversation context
        >>> resp1 = agent.chat(
        ...     request=ChatRequest(
        ...         user_prompt="What is Python?",
        ...         use_conversation=True
        ...     )
        ... )
        >>> resp2 = agent.chat(
        ...     request=ChatRequest(
        ...         user_prompt="What are its main features?",
        ...         conversation_id=resp1.conversation_id,
        ...         use_conversation=True
        ...     )
        ... )
    """
    logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Starting chat with agent '{self.agent_id}'.")
    logger.info(f"{request.id[:26]:<26} | Agent |    ├ base_url={self.base_url}")
    logger.info(f"{request.id[:26]:<26} | Agent |    └ agent_id='{self.agent_id}'")

    response = self._do_chat(
        request=request,
        result_handler=result_handler
    )

    logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Chat finished.")
    if response.is_success():
        logger.info(f"{request.id[:26]:<26} | Agent |    └ with status = {response.status}")
    else:
        logger.info(f"{request.id[:26]:<26} | Agent |    ├ with status = {response.status}")
        logger.info(f"{request.id[:26]:<26} | Agent |    └ with error message = \"{response.error}\"")

    assert response.request is request, \
        "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
    return response

chat_many

chat_many(request_list: list[ChatRequest], result_handler: ChatResultHandler | None = None) -> list[ChatResponse]

Send multiple chat messages concurrently, wait for all responses (blocking), and return them in the same order as request_list.

Each request is executed in parallel threads using the internal thread-pool. Returns a list of ChatResponse objects in the same order as request_list.

Parameters:

Name Type Description Default
request_list list[ChatRequest]

List of ChatRequest objects to send.

required
result_handler ChatResultHandler | None

Optional handler to process the response message. If None, uses RawResultHandler (returns message as-is).

None

Returns:

Type Description
list[ChatResponse]

List[ChatResponse]: One response per request, in the same order.

Example

requests = [ ... ChatRequest(user_prompt="What is Python?"), ... ChatRequest(user_prompt="What is Java?"), ... ] responses = agent.chat_many(requests) for resp in responses: ... print(resp.result)

Source code in src/stkai/agents/_agent.py
def chat_many(
    self,
    request_list: list[ChatRequest],
    result_handler: ChatResultHandler | None = None,
) -> list[ChatResponse]:
    """
    Send multiple chat messages concurrently, wait for all responses (blocking),
    and return them in the same order as `request_list`.

    Each request is executed in parallel threads using the internal thread-pool.
    Returns a list of ChatResponse objects in the same order as `request_list`.

    Args:
        request_list: List of ChatRequest objects to send.
        result_handler: Optional handler to process the response message.
            If None, uses RawResultHandler (returns message as-is).

    Returns:
        List[ChatResponse]: One response per request, in the same order.

    Example:
        >>> requests = [
        ...     ChatRequest(user_prompt="What is Python?"),
        ...     ChatRequest(user_prompt="What is Java?"),
        ... ]
        >>> responses = agent.chat_many(requests)
        >>> for resp in responses:
        ...     print(resp.result)
    """
    if not request_list:
        return []

    logger.info(
        f"{'Agent-Batch-Execution'[:26]:<26} | Agent | "
        f"🛜 Starting batch execution of {len(request_list)} requests."
    )
    logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ base_url={self.base_url}")
    logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ agent_id='{self.agent_id}'")
    logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    └ max_concurrent={self.max_workers}")

    # Warn about race condition: chat_many inside UseConversation without a pre-set conversation_id
    if len(request_list) > 1:
        conv_ctx = ConversationScope.get_current()
        if conv_ctx and not conv_ctx.has_conversation_id():
            logger.warning(
                f"{'Agent-Batch-Execution'[:26]:<26} | Agent | "
                "⚠️ chat_many() called inside UseConversation without a pre-set conversation_id. "
                "Concurrent requests will race to capture the server-assigned ID, "
                "likely starting independent conversations. "
                "Consider using UseConversation.with_generated_id() or passing an explicit conversation_id."
            )

    # Use thread-pool for parallel calls to `_do_chat`
    # The ConversationScope::propagate method captures the active UseConversation context (if any)
    # and installs it in each worker thread — only conversation state is propagated.
    future_to_index = {
        self.executor.submit(
            ConversationScope.propagate(self._do_chat), # propagate conversation context
            request=req,                                # arg-1
            result_handler=result_handler,              # arg-2
        ): idx
        for idx, req in enumerate(request_list)
    }

    # Block and wait for all responses to be finished
    responses_map: dict[int, ChatResponse] = {}

    for future in as_completed(future_to_index):
        idx = future_to_index[future]
        correlated_request = request_list[idx]
        try:
            responses_map[idx] = future.result()
        except Exception as e:
            logger.error(
                f"{correlated_request.id[:26]:<26} | Agent | ❌ Chat failed in batch(seq={idx}). {e}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            responses_map[idx] = ChatResponse(
                request=correlated_request,
                status=ChatStatus.ERROR,
                error=str(e),
            )

    # Rebuild responses list in the same order of requests list
    responses = [
        responses_map[i] for i in range(len(request_list))
    ]

    # Race-condition check: ensure both lists have the same length
    assert len(responses) == len(request_list), (
        f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
    )
    # Race-condition check: ensure each response points to its respective request
    assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
        "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
    )

    logger.info(
        f"{'Agent-Batch-Execution'[:26]:<26} | Agent | 🛜 Batch execution finished."
    )
    logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    ├ total of responses = {len(responses)}")

    from collections import Counter
    totals_per_status = Counter(r.status for r in responses)
    items = totals_per_status.items()
    for idx, (status, total) in enumerate(items):
        icon = "└" if idx == (len(items) - 1) else "├"
        logger.info(f"{'Agent-Batch-Execution'[:26]:<26} | Agent |    {icon} total of responses with status {status:<7} = {total}")

    return responses

chat_stream

chat_stream(request: ChatRequest, result_handler: ChatResultHandler | None = None, event_parser: SseEventParser | None = None) -> ChatResponseStream

Send a message to the Agent and return a streaming response.

Returns a ChatResponseStream context manager that yields SSE events as they arrive from the server. Must be used with with::

with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream:
    for event in stream:
        if event.is_delta:
            print(event.text, end="", flush=True)
    print(f"\nTokens: {stream.response.tokens.total}")

If retry is configured, retries only the initial connection (before the stream begins). Mid-stream errors are not retried.

chat_many + stream is not supported — streaming is real-time by nature and batch execution defeats its purpose.

Parameters:

Name Type Description Default
request ChatRequest

The request containing the user prompt and options.

required
result_handler ChatResultHandler | None

Optional handler to process the final accumulated text. If None, uses RawResultHandler (returns accumulated text as-is). The handler is applied once after the stream is fully consumed, over the complete accumulated text — not per chunk.

None
event_parser SseEventParser | None

Optional SSE event parser. If None, uses the default SseEventParser. Subclass SseEventParser and pass an instance here to handle protocol changes without waiting for a new SDK release.

None

Returns:

Type Description
ChatResponseStream

A ChatResponseStream context manager for iterating SSE events.

Raises:

Type Description
HTTPError

If the initial HTTP request fails (after retries).

RuntimeError

If the HTTP client does not support streaming.

Example

with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream: ... for text in stream.text_stream: ... print(text, end="", flush=True)

With JSON result handler

from stkai.agents import JSON_RESULT_HANDLER with agent.chat_stream(request, result_handler=JSON_RESULT_HANDLER) as stream: ... response = stream.get_final_response() ... print(response.result) # Parsed dict

Source code in src/stkai/agents/_agent.py
def chat_stream(
    self,
    request: ChatRequest,
    result_handler: ChatResultHandler | None = None,
    event_parser: SseEventParser | None = None,
) -> ChatResponseStream:
    """
    Send a message to the Agent and return a streaming response.

    Returns a ``ChatResponseStream`` context manager that yields SSE events
    as they arrive from the server. Must be used with ``with``::

        with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream:
            for event in stream:
                if event.is_delta:
                    print(event.text, end="", flush=True)
            print(f"\\nTokens: {stream.response.tokens.total}")

    If retry is configured, retries only the initial connection (before
    the stream begins). Mid-stream errors are not retried.

    ``chat_many`` + stream is **not supported** — streaming is real-time
    by nature and batch execution defeats its purpose.

    Args:
        request: The request containing the user prompt and options.
        result_handler: Optional handler to process the final accumulated text.
            If None, uses RawResultHandler (returns accumulated text as-is).
            The handler is applied once after the stream is fully consumed,
            over the complete accumulated text — not per chunk.
        event_parser: Optional SSE event parser. If None, uses the default
            ``SseEventParser``. Subclass ``SseEventParser`` and pass an
            instance here to handle protocol changes without waiting for
            a new SDK release.

    Returns:
        A ChatResponseStream context manager for iterating SSE events.

    Raises:
        requests.HTTPError: If the initial HTTP request fails (after retries).
        RuntimeError: If the HTTP client does not support streaming.

    Example:
        >>> with agent.chat_stream(ChatRequest(user_prompt="Hello")) as stream:
        ...     for text in stream.text_stream:
        ...         print(text, end="", flush=True)
        >>>
        >>> # With JSON result handler
        >>> from stkai.agents import JSON_RESULT_HANDLER
        >>> with agent.chat_stream(request, result_handler=JSON_RESULT_HANDLER) as stream:
        ...     response = stream.get_final_response()
        ...     print(response.result)  # Parsed dict
    """
    logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Starting streaming chat with agent '{self.agent_id}'.")
    logger.info(f"{request.id[:26]:<26} | Agent |    ├ base_url={self.base_url}")
    logger.info(f"{request.id[:26]:<26} | Agent |    └ agent_id='{self.agent_id}'")

    # Assertion for type narrowing (mypy)
    assert self.options.request_timeout is not None, \
        "🌀 Sanity check | request_timeout must be set after with_defaults_from()"
    assert self.options.retry_max_retries is not None, \
        "🌀 Sanity check | retry_max_retries must be set after with_defaults_from()"
    assert self.options.retry_initial_delay is not None, \
        "🌀 Sanity check | retry_initial_delay must be set after with_defaults_from()"

    # Build payload with streaming=True
    payload = request.to_api_payload()
    payload["streaming"] = True

    # Apply UseConversation context (if active and request has no explicit conversation_id)
    conv_ctx = ConversationScope.get_current()
    if conv_ctx is not None and not request.conversation_id:
        payload["use_conversation"] = True
        if conv_ctx.conversation_id:
            payload["conversation_id"] = conv_ctx.conversation_id

    url = f"{self.base_url}/v1/agent/{self.agent_id}/chat"

    # Retry only the initial connection
    for attempt in Retrying(
        max_retries=self.options.retry_max_retries,
        initial_delay=self.options.retry_initial_delay,
        logger_prefix=f"{request.id[:26]:<26} | Agent",
    ):
        with attempt:
            logger.info(
                f"{request.id[:26]:<26} | Agent | "
                f"Opening stream to agent '{self.agent_id}' (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
            )

            http_response = self.http_client.post_stream(
                url=url,
                data=payload,
                timeout=self.options.request_timeout,
            )
            assert isinstance(http_response, requests.Response), \
                f"🌀 Sanity check | Object returned by `post_stream` is not an instance of `requests.Response`. ({http_response.__class__})"

            http_response.raise_for_status()

            logger.info(f"{request.id[:26]:<26} | Agent | 🛜 Stream opened successfully.")

            def _track_conversation(response: ChatResponse) -> None:
                if conv_ctx is not None and response.conversation_id:
                    conv_ctx.update_if_absent(conversation_id=response.conversation_id)

            stream = ChatResponseStream(
                request=request,
                http_response=http_response,
                result_handler=result_handler,
                event_parser=event_parser,
                on_response=_track_conversation if conv_ctx else None,
            )

            return stream

    # Should never reach here - Retrying raises MaxRetriesExceededError
    raise RuntimeError(
        "Unexpected error while opening stream: "
        "reached end of `chat_stream` method without returning a ChatResponseStream."
    )

Data Models

ChatRequest dataclass

Represents a chat request to be sent to a StackSpot AI Agent.

This class encapsulates all data needed to send a message to an Agent, including the prompt, conversation context, and knowledge source settings.

Attributes:

Name Type Description
user_prompt str

The message/prompt to send to the Agent.

id str

Unique identifier for this request. Auto-generated as UUID if not provided.

conversation_id str | None

Optional ID to continue an existing conversation.

use_conversation bool

Whether to maintain conversation context (default: False).

use_knowledge_sources bool

Whether to use StackSpot knowledge sources (default: True).

return_knowledge_sources bool

Whether to return knowledge source IDs in response (default: False).

metadata dict[str, Any]

Optional dictionary for storing custom metadata.

Example

request = ChatRequest( ... user_prompt="Explain what SOLID principles are", ... use_knowledge_sources=True, ... metadata={"source": "cli"} ... )

Source code in src/stkai/agents/_models.py
@dataclass(frozen=True)
class ChatRequest:
    """
    Represents a chat request to be sent to a StackSpot AI Agent.

    This class encapsulates all data needed to send a message to an Agent,
    including the prompt, conversation context, and knowledge source settings.

    Attributes:
        user_prompt: The message/prompt to send to the Agent.
        id: Unique identifier for this request. Auto-generated as UUID if not provided.
        conversation_id: Optional ID to continue an existing conversation.
        use_conversation: Whether to maintain conversation context (default: False).
        use_knowledge_sources: Whether to use StackSpot knowledge sources (default: True).
        return_knowledge_sources: Whether to return knowledge source IDs in response (default: False).
        metadata: Optional dictionary for storing custom metadata.

    Example:
        >>> request = ChatRequest(
        ...     user_prompt="Explain what SOLID principles are",
        ...     use_knowledge_sources=True,
        ...     metadata={"source": "cli"}
        ... )
    """
    user_prompt: str
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    conversation_id: str | None = None
    use_conversation: bool = False
    use_knowledge_sources: bool = True
    return_knowledge_sources: bool = False
    upload_ids: list[str] | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        assert self.id, "Request ID cannot be empty."
        assert self.user_prompt, "User prompt cannot be empty."

    def to_api_payload(self) -> dict[str, Any]:
        """
        Converts the request to the API payload format.

        Returns:
            Dictionary formatted for the Agent API.
        """
        payload: dict[str, Any] = {
            "user_prompt": self.user_prompt,
            "streaming": False,
            "use_conversation": self.use_conversation,
            "stackspot_knowledge": str(self.use_knowledge_sources).lower(),
            "return_ks_in_response": self.return_knowledge_sources,
        }

        if self.conversation_id:
            payload["conversation_id"] = self.conversation_id

        if self.upload_ids:
            payload["upload_ids"] = self.upload_ids

        return payload

Functions

to_api_payload

to_api_payload() -> dict[str, Any]

Converts the request to the API payload format.

Returns:

Type Description
dict[str, Any]

Dictionary formatted for the Agent API.

Source code in src/stkai/agents/_models.py
def to_api_payload(self) -> dict[str, Any]:
    """
    Converts the request to the API payload format.

    Returns:
        Dictionary formatted for the Agent API.
    """
    payload: dict[str, Any] = {
        "user_prompt": self.user_prompt,
        "streaming": False,
        "use_conversation": self.use_conversation,
        "stackspot_knowledge": str(self.use_knowledge_sources).lower(),
        "return_ks_in_response": self.return_knowledge_sources,
    }

    if self.conversation_id:
        payload["conversation_id"] = self.conversation_id

    if self.upload_ids:
        payload["upload_ids"] = self.upload_ids

    return payload

ChatResponse dataclass

Represents a response from a StackSpot AI Agent.

This class encapsulates the Agent's response including the message, token usage, status, and any error information. Properties are lazily extracted from raw_response (source of truth).

Attributes:

Name Type Description
request ChatRequest

The original request that generated this response.

status ChatStatus

The status of the response (SUCCESS, ERROR, TIMEOUT).

result Any | None

The processed result from the result handler. By default (RawResultHandler), this is the same as raw_result. When using JsonResultHandler, this is the parsed JSON object.

error str | None

Error message if the request failed.

raw_response dict[str, Any] | None

The raw API response dictionary (source of truth for properties).

Properties (derived from raw_response): raw_result: The Agent's response message (raw text from API). stop_reason: Reason why the Agent stopped generating (e.g., "stop"). tokens: Token usage information. conversation_id: ID for continuing the conversation. knowledge_sources: List of knowledge source IDs used in the response.

Example

if response.is_success(): ... print(response.raw_result) # Raw text ... print(response.result) # Processed by handler ... print(f"Tokens used: {response.tokens.total}") ... elif response.is_timeout(): ... print("Request timed out") ... else: ... print(f"Error: {response.error}")

Source code in src/stkai/agents/_models.py
@dataclass(frozen=True)
class ChatResponse:
    """
    Represents a response from a StackSpot AI Agent.

    This class encapsulates the Agent's response including the message,
    token usage, status, and any error information. Properties are lazily
    extracted from raw_response (source of truth).

    Attributes:
        request: The original request that generated this response.
        status: The status of the response (SUCCESS, ERROR, TIMEOUT).
        result: The processed result from the result handler.
            By default (RawResultHandler), this is the same as raw_result.
            When using JsonResultHandler, this is the parsed JSON object.
        error: Error message if the request failed.
        raw_response: The raw API response dictionary (source of truth for properties).

    Properties (derived from raw_response):
        raw_result: The Agent's response message (raw text from API).
        stop_reason: Reason why the Agent stopped generating (e.g., "stop").
        tokens: Token usage information.
        conversation_id: ID for continuing the conversation.
        knowledge_sources: List of knowledge source IDs used in the response.

    Example:
        >>> if response.is_success():
        ...     print(response.raw_result)  # Raw text
        ...     print(response.result)      # Processed by handler
        ...     print(f"Tokens used: {response.tokens.total}")
        ... elif response.is_timeout():
        ...     print("Request timed out")
        ... else:
        ...     print(f"Error: {response.error}")
    """
    request: ChatRequest
    status: ChatStatus
    result: Any | None = None
    error: str | None = None
    raw_response: dict[str, Any] | None = None

    def __post_init__(self) -> None:
        assert self.request, "Request cannot be empty."
        assert self.status, "Status cannot be empty."

    @property
    def raw_result(self) -> str | None:
        """Extracts the 'message' field from the raw API response."""
        if not self.raw_response:
            return None
        return self.raw_response.get("message")

    @property
    def stop_reason(self) -> str | None:
        """Extracts the 'stop_reason' field from the raw API response."""
        if not self.raw_response:
            return None
        return self.raw_response.get("stop_reason")

    @property
    def tokens(self) -> ChatTokenUsage | None:
        """Extracts and parses token usage from the raw API response."""
        if not self.raw_response:
            return None
        tokens_data = self.raw_response.get("tokens")
        if tokens_data is None:
            return None
        return ChatTokenUsage(
            user=tokens_data.get("user") or 0,
            enrichment=tokens_data.get("enrichment") or 0,
            output=tokens_data.get("output") or 0,
        )

    @property
    def conversation_id(self) -> str | None:
        """Extracts the 'conversation_id' field from the raw API response."""
        if not self.raw_response:
            return None
        return self.raw_response.get("conversation_id")

    @property
    def knowledge_sources(self) -> list[str]:
        """Extracts the 'knowledge_source_id' field from the raw API response."""
        if not self.raw_response:
            return []
        result: list[str] = self.raw_response.get("knowledge_source_id", [])
        return result

    def is_success(self) -> bool:
        """Returns True if the response was successful."""
        return self.status == ChatStatus.SUCCESS

    def is_error(self) -> bool:
        """Returns True if there was an error."""
        return self.status == ChatStatus.ERROR

    def is_timeout(self) -> bool:
        """Returns True if the request timed out."""
        return self.status == ChatStatus.TIMEOUT

    def error_with_details(self) -> dict[str, Any]:
        """Returns a dictionary with error details for non-success responses."""
        if self.is_success():
            return {}

        return {
            "status": self.status,
            "error_message": self.error,
            "response_body": self.raw_response or {},
        }

Attributes

raw_result property

raw_result: str | None

Extracts the 'message' field from the raw API response.

stop_reason property

stop_reason: str | None

Extracts the 'stop_reason' field from the raw API response.

tokens property

tokens: ChatTokenUsage | None

Extracts and parses token usage from the raw API response.

conversation_id property

conversation_id: str | None

Extracts the 'conversation_id' field from the raw API response.

knowledge_sources property

knowledge_sources: list[str]

Extracts the 'knowledge_source_id' field from the raw API response.

Functions

is_success

is_success() -> bool

Returns True if the response was successful.

Source code in src/stkai/agents/_models.py
def is_success(self) -> bool:
    """Returns True if the response was successful."""
    return self.status == ChatStatus.SUCCESS

is_error

is_error() -> bool

Returns True if there was an error.

Source code in src/stkai/agents/_models.py
def is_error(self) -> bool:
    """Returns True if there was an error."""
    return self.status == ChatStatus.ERROR

is_timeout

is_timeout() -> bool

Returns True if the request timed out.

Source code in src/stkai/agents/_models.py
def is_timeout(self) -> bool:
    """Returns True if the request timed out."""
    return self.status == ChatStatus.TIMEOUT

error_with_details

error_with_details() -> dict[str, Any]

Returns a dictionary with error details for non-success responses.

Source code in src/stkai/agents/_models.py
def error_with_details(self) -> dict[str, Any]:
    """Returns a dictionary with error details for non-success responses."""
    if self.is_success():
        return {}

    return {
        "status": self.status,
        "error_message": self.error,
        "response_body": self.raw_response or {},
    }

ChatStatus

Bases: StrEnum

Status of a chat response.

Attributes:

Name Type Description
SUCCESS

Response received successfully from the Agent.

ERROR

Client-side error (HTTP error, network issue, parsing error).

TIMEOUT

Any timeout, client or server-side (e.g., HTTP request timeout, HTTP 408, or HTTP 504).

Source code in src/stkai/agents/_models.py
class ChatStatus(StrEnum):
    """
    Status of a chat response.

    Attributes:
        SUCCESS: Response received successfully from the Agent.
        ERROR: Client-side error (HTTP error, network issue, parsing error).
        TIMEOUT: Any timeout, client or server-side (e.g., HTTP request timeout, HTTP 408, or HTTP 504).
    """
    SUCCESS = "SUCCESS"
    ERROR = "ERROR"
    TIMEOUT = "TIMEOUT"

    @classmethod
    def from_exception(cls, exc: Exception) -> "ChatStatus":
        """
        Determine the appropriate status for an exception.

        Args:
            exc: The exception that occurred during the chat request.

        Returns:
            TIMEOUT for timeout exceptions, ERROR for all others.

        Example:
            >>> try:
            ...     response = agent.chat(request)
            ... except Exception as e:
            ...     status = ChatStatus.from_exception(e)
            ...     # status is TIMEOUT if e is a timeout, ERROR otherwise
        """
        from stkai._utils import is_timeout_exception
        return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

Functions

from_exception classmethod

from_exception(exc: Exception) -> ChatStatus

Determine the appropriate status for an exception.

Parameters:

Name Type Description Default
exc Exception

The exception that occurred during the chat request.

required

Returns:

Type Description
ChatStatus

TIMEOUT for timeout exceptions, ERROR for all others.

Example

try: ... response = agent.chat(request) ... except Exception as e: ... status = ChatStatus.from_exception(e) ... # status is TIMEOUT if e is a timeout, ERROR otherwise

Source code in src/stkai/agents/_models.py
@classmethod
def from_exception(cls, exc: Exception) -> "ChatStatus":
    """
    Determine the appropriate status for an exception.

    Args:
        exc: The exception that occurred during the chat request.

    Returns:
        TIMEOUT for timeout exceptions, ERROR for all others.

    Example:
        >>> try:
        ...     response = agent.chat(request)
        ... except Exception as e:
        ...     status = ChatStatus.from_exception(e)
        ...     # status is TIMEOUT if e is a timeout, ERROR otherwise
    """
    from stkai._utils import is_timeout_exception
    return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

ChatTokenUsage dataclass

Token usage information from a chat response.

Tracks the number of tokens consumed in different stages of processing.

Attributes:

Name Type Description
user int

Tokens from the user prompt.

enrichment int

Tokens from knowledge source enrichment.

output int

Tokens in the generated output.

Example

usage = ChatTokenUsage(user=100, enrichment=50, output=200) print(f"Total tokens: {usage.total}") Total tokens: 350

Source code in src/stkai/agents/_models.py
@dataclass(frozen=True)
class ChatTokenUsage:
    """
    Token usage information from a chat response.

    Tracks the number of tokens consumed in different stages of processing.

    Attributes:
        user: Tokens from the user prompt.
        enrichment: Tokens from knowledge source enrichment.
        output: Tokens in the generated output.

    Example:
        >>> usage = ChatTokenUsage(user=100, enrichment=50, output=200)
        >>> print(f"Total tokens: {usage.total}")
        Total tokens: 350
    """
    user: int
    enrichment: int
    output: int

    @property
    def total(self) -> int:
        """Returns the total number of tokens used."""
        return self.user + self.enrichment + self.output

Attributes

total property

total: int

Returns the total number of tokens used.

Streaming (Experimental)

ChatResponseStream

Context manager and iterator for streaming Agent responses.

Wraps an HTTP response with stream=True and parses SSE events, providing auto-accumulation and a final ChatResponse after iteration.

Must be used as a context manager to ensure proper cleanup of the underlying HTTP connection::

with agent.chat_stream(request) as stream:
    for event in stream:
        ...
    response = stream.response

Error handling: following the SDK principle of "requests in, responses out", errors during streaming (SSE failures, handler errors) never propagate as exceptions. Instead, response is always available after iteration with an appropriate status:

  • SUCCESS — stream completed and handler (if any) succeeded.
  • ERROR — SSE connection failed or handler raised an exception.
  • TIMEOUT — SSE connection timed out.

On error, response.result contains the raw accumulated text (partial on SSE failure, complete on handler failure) so the caller can still inspect what the Agent returned.

Attributes:

Name Type Description
request ChatRequest

The original ChatRequest.

response ChatResponse

The final ChatResponse (available after iteration completes).

accumulated_text str

Text accumulated so far during iteration.

Source code in src/stkai/agents/_stream.py
class ChatResponseStream:
    """
    Context manager and iterator for streaming Agent responses.

    Wraps an HTTP response with ``stream=True`` and parses SSE events,
    providing auto-accumulation and a final ``ChatResponse`` after iteration.

    Must be used as a context manager to ensure proper cleanup of the
    underlying HTTP connection::

        with agent.chat_stream(request) as stream:
            for event in stream:
                ...
            response = stream.response

    **Error handling:** following the SDK principle of "requests in,
    responses out", errors during streaming (SSE failures, handler errors)
    never propagate as exceptions. Instead, ``response`` is always
    available after iteration with an appropriate status:

    - **SUCCESS** — stream completed and handler (if any) succeeded.
    - **ERROR** — SSE connection failed or handler raised an exception.
    - **TIMEOUT** — SSE connection timed out.

    On error, ``response.result`` contains the raw accumulated text
    (partial on SSE failure, complete on handler failure) so the caller
    can still inspect what the Agent returned.

    Attributes:
        request: The original ChatRequest.
        response: The final ChatResponse (available after iteration completes).
        accumulated_text: Text accumulated so far during iteration.
    """

    def __init__(
        self,
        request: ChatRequest,
        http_response: requests.Response,
        result_handler: ChatResultHandler | None = None,
        event_parser: SseEventParser | None = None,
        on_response: Callable[[ChatResponse], None] | None = None,
    ) -> None:
        self._request = request
        self._http_response = http_response
        self._result_handler = result_handler
        self._on_response = on_response
        self._accumulated_parts: list[str] = []
        self._response: ChatResponse | None = None
        self._closed = False
        self._iterated = False

        if event_parser is not None:
            self._event_parser = event_parser
        else:
            # Lazy import to avoid circular dependency (_sse_parser imports from _stream)
            from stkai.agents._sse_parser import SseEventParser
            self._event_parser = SseEventParser()

    @property
    def request(self) -> ChatRequest:
        """The original ChatRequest."""
        return self._request

    @property
    def response(self) -> ChatResponse:
        """
        The final ChatResponse, available after iteration completes.

        Always present after iteration, even on errors. Check
        ``response.is_success()`` / ``response.is_error()`` /
        ``response.is_timeout()`` to determine the outcome.
        On non-success, ``response.result`` holds the raw accumulated
        text and ``response.error`` describes what went wrong.

        Raises:
            RuntimeError: If accessed before the stream is fully consumed.
        """
        if self._response is None:
            raise RuntimeError(
                "ChatResponseStream.response is not available until the stream is fully consumed. "
                "Iterate over the stream first."
            )
        return self._response

    @property
    def accumulated_text(self) -> str:
        """Text accumulated so far (useful during iteration)."""
        return "".join(self._accumulated_parts)

    def __enter__(self) -> ChatResponseStream:
        return self

    def __exit__(self, *args: object) -> None:
        self.close()

    def __iter__(self) -> Iterator[ChatResponseStreamEvent]:
        """
        Iterate SSE events from the streaming response.

        Consumable only once. Yields ``ChatResponseStreamEvent`` objects parsed
        from the SSE stream. After iteration completes, ``self.response``
        becomes available.

        Following the SDK design principle of "requests in, responses out",
        iteration **never** propagates exceptions to the caller. Any error
        (SSE or handler) is captured in the final ``ChatResponse`` with an
        appropriate status (ERROR or TIMEOUT). In both cases, ``result``
        holds the raw accumulated text so the caller can inspect partial
        or full content regardless of the outcome.

        Yields:
            ChatResponseStreamEvent for each SSE event.

        Raises:
            RuntimeError: If iterated more than once.
        """
        if self._iterated:
            raise RuntimeError("ChatResponseStream can only be iterated once.")
        self._iterated = True

        try:
            lines = self._http_response.iter_lines(decode_unicode=True)
            for event in self._event_parser.parse(lines):
                if event.is_delta and event.text:
                    self._accumulated_parts.append(event.text)
                yield event
        except Exception as e:
            # SSE failed: build an error response with partial text (no handler).
            self._build_error_response(e)
            return
        # SSE completed: build a success response (handler runs here).
        self._build_response()
        if self._on_response is not None and self._response is not None:
            self._on_response(self._response)

    @property
    def text_stream(self) -> Iterator[str]:
        """
        Convenience iterator that yields only text chunks from DELTA events.

        Example:
            >>> with agent.chat_stream(request) as stream:
            ...     for text in stream.text_stream:
            ...         print(text, end="", flush=True)
        """
        for event in self:
            if event.is_delta and event.text:
                yield event.text

    def until_done(self) -> None:
        """Consume the stream silently, discarding all events.

        After this call, ``self.response`` is available.

        Example:
            >>> with agent.chat_stream(request) as stream:
            ...     stream.until_done()
            ...     print(stream.response.result)
        """
        for _ in self:
            pass

    def get_final_response(self) -> ChatResponse:
        """Consume the stream and return the final ``ChatResponse``.

        Equivalent to calling ``until_done()`` followed by ``self.response``.
        The returned response is always present — check its status to
        determine whether the stream completed successfully.

        Example:
            >>> with agent.chat_stream(request) as stream:
            ...     response = stream.get_final_response()
            ...     if response.is_success():
            ...         print(response.result)
            ...     else:
            ...         print(f"Error: {response.error}")
        """
        self.until_done()
        return self.response

    def close(self) -> None:
        """Close the underlying HTTP connection."""
        if not self._closed:
            self._closed = True
            self._http_response.close()

    def _build_response(self) -> None:
        """Build the final ``ChatResponse`` after successful SSE iteration.

        If a ``result_handler`` was provided, the accumulated text is
        processed through it (same as ``Agent.chat()``).
        If the handler fails, the response is built with ERROR status and
        ``result`` is set to the raw accumulated text so the caller can
        still inspect what the Agent returned.
        """
        if self._response is not None:
            return

        accumulated = self.accumulated_text
        raw_response = self._build_raw_response(accumulated)

        # Apply result handler (same pattern as Agent._do_chat)
        result: Any = accumulated
        if self._result_handler:
            from stkai.agents._handlers import ChatResultContext
            context = ChatResultContext(request=self._request, raw_result=accumulated)
            try:
                result = self._result_handler.handle_result(context)
            except Exception as e:
                handler_name = type(self._result_handler).__name__
                error_msg = f"Result handler '{handler_name}' failed: {e}"
                logger.error(f"{self._request.id} | Agent | ❌ {error_msg}")
                self._response = ChatResponse(
                    request=self._request,
                    status=ChatStatus.ERROR,
                    result=accumulated,
                    error=error_msg,
                    raw_response=raw_response,
                )
                return

        self._response = ChatResponse(
            request=self._request,
            status=ChatStatus.SUCCESS,
            result=result,
            raw_response=raw_response,
        )

    def _build_error_response(self, error: Exception) -> None:
        """Build an error ``ChatResponse`` from an SSE failure.

        The result handler is intentionally **not** applied here because
        the accumulated text is likely incomplete (e.g. half a JSON payload)
        and feeding it to a handler would be misleading.

        ``result`` is set to the raw accumulated text so the caller can
        still recover whatever partial content was received before the failure.
        """
        if self._response is not None:
            return

        accumulated = self.accumulated_text
        raw_response = self._build_raw_response(accumulated)
        status = ChatStatus.from_exception(error)
        error_msg = f"Streaming failed: {error}"
        logger.error(f"{self._request.id} | Agent | ❌ {error_msg}")

        self._response = ChatResponse(
            request=self._request,
            status=status,
            result=accumulated,
            error=error_msg,
            raw_response=raw_response,
        )

    def _build_raw_response(self, accumulated: str) -> dict[str, Any]:
        """Build the raw_response dict from accumulated text and DONE metadata."""
        raw_response: dict[str, Any] = {"message": accumulated}
        metadata = self._event_parser.metadata
        if metadata:
            for key in ("conversation_id", "tokens", "stop_reason", "knowledge_source_id"):
                if key in metadata:
                    raw_response[key] = metadata[key]
        return raw_response

Attributes

response property

response: ChatResponse

The final ChatResponse, available after iteration completes.

Always present after iteration, even on errors. Check response.is_success() / response.is_error() / response.is_timeout() to determine the outcome. On non-success, response.result holds the raw accumulated text and response.error describes what went wrong.

Raises:

Type Description
RuntimeError

If accessed before the stream is fully consumed.

accumulated_text property

accumulated_text: str

Text accumulated so far (useful during iteration).

text_stream property

text_stream: Iterator[str]

Convenience iterator that yields only text chunks from DELTA events.

Example

with agent.chat_stream(request) as stream: ... for text in stream.text_stream: ... print(text, end="", flush=True)

Functions

until_done

until_done() -> None

Consume the stream silently, discarding all events.

After this call, self.response is available.

Example

with agent.chat_stream(request) as stream: ... stream.until_done() ... print(stream.response.result)

Source code in src/stkai/agents/_stream.py
def until_done(self) -> None:
    """Consume the stream silently, discarding all events.

    After this call, ``self.response`` is available.

    Example:
        >>> with agent.chat_stream(request) as stream:
        ...     stream.until_done()
        ...     print(stream.response.result)
    """
    for _ in self:
        pass

get_final_response

get_final_response() -> ChatResponse

Consume the stream and return the final ChatResponse.

Equivalent to calling until_done() followed by self.response. The returned response is always present — check its status to determine whether the stream completed successfully.

Example

with agent.chat_stream(request) as stream: ... response = stream.get_final_response() ... if response.is_success(): ... print(response.result) ... else: ... print(f"Error: {response.error}")

Source code in src/stkai/agents/_stream.py
def get_final_response(self) -> ChatResponse:
    """Consume the stream and return the final ``ChatResponse``.

    Equivalent to calling ``until_done()`` followed by ``self.response``.
    The returned response is always present — check its status to
    determine whether the stream completed successfully.

    Example:
        >>> with agent.chat_stream(request) as stream:
        ...     response = stream.get_final_response()
        ...     if response.is_success():
        ...         print(response.result)
        ...     else:
        ...         print(f"Error: {response.error}")
    """
    self.until_done()
    return self.response

close

close() -> None

Close the underlying HTTP connection.

Source code in src/stkai/agents/_stream.py
def close(self) -> None:
    """Close the underlying HTTP connection."""
    if not self._closed:
        self._closed = True
        self._http_response.close()

ChatResponseStreamEvent dataclass

A single event from a streaming Agent response.

Attributes:

Name Type Description
type ChatResponseStreamEventType

The event type (DELTA, DONE, ERROR).

text str

Text content for DELTA events.

raw_data dict[str, Any] | None

Raw parsed SSE data dictionary.

error str | None

Error message for ERROR events.

Source code in src/stkai/agents/_stream.py
@dataclass(frozen=True)
class ChatResponseStreamEvent:
    """
    A single event from a streaming Agent response.

    Attributes:
        type: The event type (DELTA, DONE, ERROR).
        text: Text content for DELTA events.
        raw_data: Raw parsed SSE data dictionary.
        error: Error message for ERROR events.
    """

    type: ChatResponseStreamEventType
    text: str = ""
    raw_data: dict[str, Any] | None = None
    error: str | None = None

    @property
    def is_delta(self) -> bool:
        """Returns True if this is a text delta event."""
        return self.type == ChatResponseStreamEventType.DELTA

    @property
    def is_done(self) -> bool:
        """Returns True if the stream is complete."""
        return self.type == ChatResponseStreamEventType.DONE

    @property
    def is_error(self) -> bool:
        """Returns True if this is an error event."""
        return self.type == ChatResponseStreamEventType.ERROR

Attributes

is_delta property

is_delta: bool

Returns True if this is a text delta event.

is_done property

is_done: bool

Returns True if the stream is complete.

is_error property

is_error: bool

Returns True if this is an error event.

ChatResponseStreamEventType

Bases: StrEnum

Type of a streaming event from the Agent API.

Source code in src/stkai/agents/_stream.py
class ChatResponseStreamEventType(StrEnum):
    """Type of a streaming event from the Agent API."""

    DELTA = "delta"
    DONE = "done"
    ERROR = "error"

SseEventParser

Parses SSE (Server-Sent Events) lines into ChatResponseStreamEvent objects.

Call parse(lines) to iterate over parsed events. Metadata accumulated from chunks (conversation_id, tokens, stop_reason, etc.) is available via the metadata property after the returned iterator is fully consumed.

The parser is safe to reuse — each parse() call resets internal state. Subclass and override _extract_delta_text or _track_chunk_metadata to handle protocol changes.

Source code in src/stkai/agents/_sse_parser.py
class SseEventParser:
    """Parses SSE (Server-Sent Events) lines into ``ChatResponseStreamEvent`` objects.

    Call ``parse(lines)`` to iterate over parsed events. Metadata accumulated
    from chunks (conversation_id, tokens, stop_reason, etc.) is available via
    the ``metadata`` property after the returned iterator is fully consumed.

    The parser is safe to reuse — each ``parse()`` call resets internal state.
    Subclass and override ``_extract_delta_text`` or ``_track_chunk_metadata``
    to handle protocol changes.
    """

    def __init__(self) -> None:
        self._raw_done_data: dict[str, Any] | None = None

    @property
    def metadata(self) -> dict[str, Any] | None:
        """Accumulated metadata from chunks (conversation_id, tokens, etc.).

        Available after the iterator returned by ``parse()`` is fully consumed.
        Returns ``None`` if no metadata was found in any chunk.
        """
        return self._raw_done_data

    def parse(self, lines: Iterable[str | bytes]) -> Iterator[ChatResponseStreamEvent]:
        """Parse SSE lines and yield events.

        Each call resets internal state (including ``metadata``), making the
        parser safe to reuse across multiple streams.

        Args:
            lines: An iterable of SSE lines (typically from
                ``response.iter_lines(decode_unicode=True)``).

        Yields:
            ChatResponseStreamEvent for each parsed SSE event.
        """
        self._raw_done_data = None

        event_type: str | None = None
        data_buffer: list[str] = []

        for line in lines:
            if line is None:
                continue

            line = str(line)

            # Empty line = end of event
            if not line.strip():
                if data_buffer:
                    event = self._build_event(event_type, "\n".join(data_buffer))
                    if event is not None:
                        yield event
                    event_type = None
                    data_buffer = []
                continue

            # SSE field parsing
            if line.startswith("event:"):
                event_type = line[len("event:"):].strip()
            elif line.startswith("data:"):
                data_buffer.append(line[len("data:"):].strip())
            # Ignore comments (lines starting with :) and unknown fields

        # Handle last event if stream ends without trailing newline
        if data_buffer:
            event = self._build_event(event_type, "\n".join(data_buffer))
            if event is not None:
                yield event

    def _build_event(
        self, event_type: str | None, data: str,
    ) -> ChatResponseStreamEvent | None:
        """Build a ChatResponseStreamEvent from raw SSE fields.

        Supports the LiteLLM/OpenAI-compatible SSE format used by StackSpot AI::

            data: {"choices":[{"index":0,"delta":{"content":"Hello"}}]}
            data: [DONE]
        """
        # LiteLLM/OpenAI stream termination signal
        if data == "[DONE]":
            return ChatResponseStreamEvent(
                type=ChatResponseStreamEventType.DONE,
                raw_data=self._raw_done_data or {},
            )

        logger.debug(f"SSE raw | event_type={event_type!r} data={data!r}")
        parsed_data = self._try_parse_json(data)

        # Error events
        if event_type == "error" or (
            isinstance(parsed_data, dict) and parsed_data.get("type") == "error"
        ):
            error_msg = data
            if isinstance(parsed_data, dict):
                error_msg = str(parsed_data.get("message", parsed_data.get("error", data)))
            return ChatResponseStreamEvent(
                type=ChatResponseStreamEventType.ERROR,
                error=str(error_msg),
                raw_data=parsed_data if isinstance(parsed_data, dict) else {"raw": data},
            )

        # Explicit done event (non-LiteLLM servers)
        if event_type == "done" or (
            isinstance(parsed_data, dict) and parsed_data.get("type") == "done"
        ):
            self._raw_done_data = parsed_data if isinstance(parsed_data, dict) else {}
            return ChatResponseStreamEvent(
                type=ChatResponseStreamEventType.DONE,
                raw_data=self._raw_done_data,
            )

        # Delta event: extract text content
        text = ""
        if isinstance(parsed_data, dict):
            text = self._extract_delta_text(parsed_data)
            # Track metadata from the last chunk (finish_reason, usage, etc.)
            self._track_chunk_metadata(parsed_data)
        elif isinstance(parsed_data, str):
            text = parsed_data
        else:
            text = data

        text = str(text)

        return ChatResponseStreamEvent(
            type=ChatResponseStreamEventType.DELTA,
            text=text,
            raw_data=parsed_data if isinstance(parsed_data, dict) else {"raw": data},
        )

    @staticmethod
    def _extract_delta_text(data: dict[str, Any]) -> str:
        """Extract text content from a delta event.

        Supports two SSE formats (checked in order):

        1. **StackSpot native** — flat ``message`` field::

            data: {"message": "Hello", "knowledge_source_id": [], ...}

        2. **LiteLLM/OpenAI-compatible** — ``choices[0].delta.content``::

            data: {"choices":[{"index":0,"delta":{"content":"Hello"}}]}
        """
        # StackSpot native format: flat "message" field
        message = data.get("message")
        if message is not None and isinstance(message, str):
            return str(message)

        # LiteLLM/OpenAI format: choices[0].delta.content
        choices = data.get("choices")
        if isinstance(choices, list) and len(choices) > 0:
            choice = choices[0]
            if isinstance(choice, dict):
                delta = choice.get("delta")
                if isinstance(delta, dict):
                    content = delta.get("content")
                    if content is not None:
                        return str(content)

        # Flat fallback (forward-compatibility)
        for field in ("content", "text", "delta"):
            value = data.get(field)
            if value is not None and isinstance(value, str):
                return str(value)

        return ""

    def _track_chunk_metadata(self, data: dict[str, Any]) -> None:
        """Track metadata from streaming chunks for the final response.

        The last chunk before ``[DONE]`` typically contains ``finish_reason``
        and may contain ``usage`` data. StackSpot-specific fields like
        ``conversation_id`` may also appear in chunks.
        """
        # StackSpot-specific fields (may appear in any chunk)
        for field in ("conversation_id", "tokens", "knowledge_source_id", "stop_reason"):
            if field in data:
                if self._raw_done_data is None:
                    self._raw_done_data = {}
                self._raw_done_data[field] = data[field]

        # LiteLLM/OpenAI: finish_reason from choices[0]
        choices = data.get("choices")
        if isinstance(choices, list) and len(choices) > 0:
            choice = choices[0]
            if isinstance(choice, dict):
                finish_reason = choice.get("finish_reason")
                if finish_reason is not None:
                    if self._raw_done_data is None:
                        self._raw_done_data = {}
                    self._raw_done_data["stop_reason"] = finish_reason

        # LiteLLM/OpenAI: usage from top-level
        usage = data.get("usage")
        if isinstance(usage, dict) and "tokens" not in (self._raw_done_data or {}):
            if self._raw_done_data is None:
                self._raw_done_data = {}
            # Map LiteLLM usage fields to StackSpot token format
            self._raw_done_data["tokens"] = {
                "user": usage.get("prompt_tokens", 0),
                "enrichment": 0,
                "output": usage.get("completion_tokens", 0),
            }

    @staticmethod
    def _try_parse_json(data: str) -> dict[str, Any] | str:
        """Try to parse data as JSON; return original string on failure."""
        try:
            parsed = json.loads(data)
            if isinstance(parsed, dict):
                return parsed
            return data
        except (json.JSONDecodeError, ValueError):
            return data

Attributes

metadata property

metadata: dict[str, Any] | None

Accumulated metadata from chunks (conversation_id, tokens, etc.).

Available after the iterator returned by parse() is fully consumed. Returns None if no metadata was found in any chunk.

Functions

parse

parse(lines: Iterable[str | bytes]) -> Iterator[ChatResponseStreamEvent]

Parse SSE lines and yield events.

Each call resets internal state (including metadata), making the parser safe to reuse across multiple streams.

Parameters:

Name Type Description Default
lines Iterable[str | bytes]

An iterable of SSE lines (typically from response.iter_lines(decode_unicode=True)).

required

Yields:

Type Description
ChatResponseStreamEvent

ChatResponseStreamEvent for each parsed SSE event.

Source code in src/stkai/agents/_sse_parser.py
def parse(self, lines: Iterable[str | bytes]) -> Iterator[ChatResponseStreamEvent]:
    """Parse SSE lines and yield events.

    Each call resets internal state (including ``metadata``), making the
    parser safe to reuse across multiple streams.

    Args:
        lines: An iterable of SSE lines (typically from
            ``response.iter_lines(decode_unicode=True)``).

    Yields:
        ChatResponseStreamEvent for each parsed SSE event.
    """
    self._raw_done_data = None

    event_type: str | None = None
    data_buffer: list[str] = []

    for line in lines:
        if line is None:
            continue

        line = str(line)

        # Empty line = end of event
        if not line.strip():
            if data_buffer:
                event = self._build_event(event_type, "\n".join(data_buffer))
                if event is not None:
                    yield event
                event_type = None
                data_buffer = []
            continue

        # SSE field parsing
        if line.startswith("event:"):
            event_type = line[len("event:"):].strip()
        elif line.startswith("data:"):
            data_buffer.append(line[len("data:"):].strip())
        # Ignore comments (lines starting with :) and unknown fields

    # Handle last event if stream ends without trailing newline
    if data_buffer:
        event = self._build_event(event_type, "\n".join(data_buffer))
        if event is not None:
            yield event

Conversation

UseConversation

Context manager that automatically tracks and propagates conversation_id across all Agent.chat() calls within the block.

Precedence rules
  1. ChatRequest.conversation_id (explicit) wins over UseConversation (implicit).
  2. use_conversation=True is automatically set inside the block.
  3. If no conversation_id is provided, captures from the first successful response.

Nestable: inner UseConversation overrides outer; restores on exit.

Parameters:

Name Type Description Default
conversation_id str | None

Optional initial conversation ID. If None, auto-captures from the first successful Agent.chat() response.

None
Example

with UseConversation() as conv: ... r1 = agent.chat(ChatRequest(user_prompt="Hello")) ... print(conv.conversation_id) # captured from r1 ... r2 = agent.chat(ChatRequest(user_prompt="Follow up"))

Source code in src/stkai/agents/_conversation.py
class UseConversation:
    """
    Context manager that automatically tracks and propagates ``conversation_id``
    across all ``Agent.chat()`` calls within the block.

    Precedence rules:
        1. ``ChatRequest.conversation_id`` (explicit) wins over ``UseConversation`` (implicit).
        2. ``use_conversation=True`` is automatically set inside the block.
        3. If no ``conversation_id`` is provided, captures from the first successful response.

    Nestable: inner ``UseConversation`` overrides outer; restores on exit.

    Args:
        conversation_id: Optional initial conversation ID. If None, auto-captures
            from the first successful ``Agent.chat()`` response.

    Example:
        >>> with UseConversation() as conv:
        ...     r1 = agent.chat(ChatRequest(user_prompt="Hello"))
        ...     print(conv.conversation_id)  # captured from r1
        ...     r2 = agent.chat(ChatRequest(user_prompt="Follow up"))
    """

    def __init__(self, conversation_id: str | None = None) -> None:
        if conversation_id is not None:
            self._warn_if_not_ulid(conversation_id)
        self._context = ConversationContext(conversation_id=conversation_id)
        self._token: Token[ConversationContext | None] | None = None

    @classmethod
    def with_generated_id(cls) -> "UseConversation":
        """
        Factory method that creates a ``UseConversation`` with a pre-generated
        conversation ID in ULID format.

        This is useful when you want the conversation ID available before
        the first request, especially with ``chat_many()`` where concurrent
        requests would otherwise race to capture the server-assigned ID.

        Example:
            >>> with UseConversation.with_generated_id() as conv:
            ...     print(conv.conversation_id)  # ULID already available
            ...     agent.chat(ChatRequest(user_prompt="Hello"))
        """
        return cls(conversation_id=str(ULID()))

    def __enter__(self) -> ConversationContext:
        self._token = ConversationScope._set(self._context)
        return self._context

    def __exit__(self, *args: object) -> None:
        assert self._token is not None, \
            "UseConversation.__exit__ called without __enter__"
        ConversationScope._reset(self._token)
        self._token = None

    @staticmethod
    def _warn_if_not_ulid(conversation_id: str) -> None:
        """Logs a warning if ``conversation_id`` is not a valid ULID."""
        try:
            ULID.from_str(conversation_id)
        except ValueError:
            logger.warning(
                "⚠️ conversation_id '%s' is not a valid ULID. "
                "The StackSpot AI API currently expects ULID format — "
                "an invalid ID may be ignored by the server or start a new conversation scope. "
                "Consider using UseConversation.with_generated_id() for automatic ULID generation.",
                conversation_id,
            )

Functions

with_generated_id classmethod

with_generated_id() -> UseConversation

Factory method that creates a UseConversation with a pre-generated conversation ID in ULID format.

This is useful when you want the conversation ID available before the first request, especially with chat_many() where concurrent requests would otherwise race to capture the server-assigned ID.

Example

with UseConversation.with_generated_id() as conv: ... print(conv.conversation_id) # ULID already available ... agent.chat(ChatRequest(user_prompt="Hello"))

Source code in src/stkai/agents/_conversation.py
@classmethod
def with_generated_id(cls) -> "UseConversation":
    """
    Factory method that creates a ``UseConversation`` with a pre-generated
    conversation ID in ULID format.

    This is useful when you want the conversation ID available before
    the first request, especially with ``chat_many()`` where concurrent
    requests would otherwise race to capture the server-assigned ID.

    Example:
        >>> with UseConversation.with_generated_id() as conv:
        ...     print(conv.conversation_id)  # ULID already available
        ...     agent.chat(ChatRequest(user_prompt="Hello"))
    """
    return cls(conversation_id=str(ULID()))

ConversationContext

Holds the mutable conversation state within a UseConversation block.

Thread-safe: _update_if_absent() uses a lock for safe auto-tracking from concurrent threads (e.g., chat_many()).

Attributes:

Name Type Description
conversation_id str | None

The current conversation ID, or None if not yet captured.

Source code in src/stkai/agents/_conversation.py
class ConversationContext:
    """
    Holds the mutable conversation state within a `UseConversation` block.

    Thread-safe: ``_update_if_absent()`` uses a lock for safe auto-tracking
    from concurrent threads (e.g., ``chat_many()``).

    Attributes:
        conversation_id: The current conversation ID, or None if not yet captured.
    """

    def __init__(self, conversation_id: str | None = None) -> None:
        self._conversation_id = conversation_id
        self._lock = threading.Lock()

    @property
    def conversation_id(self) -> str | None:
        return self._conversation_id

    def has_conversation_id(self) -> bool:
        """Returns True if a conversation_id is already set."""
        return self._conversation_id is not None

    def enrich(self, request: ChatRequest) -> ChatRequest:
        """
        Returns a new ``ChatRequest`` enriched with the current conversation state.

        Sets ``use_conversation=True`` and ``conversation_id`` (if already captured).
        If the request already has a ``conversation_id``, returns it unchanged
        (explicit takes precedence).

        The original request is never mutated.

        Example:
            >>> with UseConversation() as conv:
            ...     request = conv.enrich(ChatRequest(user_prompt="Hello"))
            ...     response = agent.chat(request)
            ...     # response.request.conversation_id reflects what was sent
        """
        if request.conversation_id:
            return request
        return dataclasses.replace(
            request,
            use_conversation=True,
            conversation_id=self.conversation_id,
        )

    def update_if_absent(self, conversation_id: str) -> str:
        """
        Set the conversation_id only if not already set. Returns the
        current conversation_id (either the existing one or the newly set one).

        Thread-safe via lock so concurrent ``chat_many()`` workers
        can safely race to capture the first response's conversation_id.
        """
        if self._conversation_id is not None:
            return self._conversation_id
        with self._lock:
            if self._conversation_id is None:
                self._conversation_id = conversation_id
            return self._conversation_id

Attributes

conversation_id property

conversation_id: str | None

Functions

has_conversation_id

has_conversation_id() -> bool

Returns True if a conversation_id is already set.

Source code in src/stkai/agents/_conversation.py
def has_conversation_id(self) -> bool:
    """Returns True if a conversation_id is already set."""
    return self._conversation_id is not None

enrich

enrich(request: ChatRequest) -> ChatRequest

Returns a new ChatRequest enriched with the current conversation state.

Sets use_conversation=True and conversation_id (if already captured). If the request already has a conversation_id, returns it unchanged (explicit takes precedence).

The original request is never mutated.

Example

with UseConversation() as conv: ... request = conv.enrich(ChatRequest(user_prompt="Hello")) ... response = agent.chat(request) ... # response.request.conversation_id reflects what was sent

Source code in src/stkai/agents/_conversation.py
def enrich(self, request: ChatRequest) -> ChatRequest:
    """
    Returns a new ``ChatRequest`` enriched with the current conversation state.

    Sets ``use_conversation=True`` and ``conversation_id`` (if already captured).
    If the request already has a ``conversation_id``, returns it unchanged
    (explicit takes precedence).

    The original request is never mutated.

    Example:
        >>> with UseConversation() as conv:
        ...     request = conv.enrich(ChatRequest(user_prompt="Hello"))
        ...     response = agent.chat(request)
        ...     # response.request.conversation_id reflects what was sent
    """
    if request.conversation_id:
        return request
    return dataclasses.replace(
        request,
        use_conversation=True,
        conversation_id=self.conversation_id,
    )

update_if_absent

update_if_absent(conversation_id: str) -> str

Set the conversation_id only if not already set. Returns the current conversation_id (either the existing one or the newly set one).

Thread-safe via lock so concurrent chat_many() workers can safely race to capture the first response's conversation_id.

Source code in src/stkai/agents/_conversation.py
def update_if_absent(self, conversation_id: str) -> str:
    """
    Set the conversation_id only if not already set. Returns the
    current conversation_id (either the existing one or the newly set one).

    Thread-safe via lock so concurrent ``chat_many()`` workers
    can safely race to capture the first response's conversation_id.
    """
    if self._conversation_id is not None:
        return self._conversation_id
    with self._lock:
        if self._conversation_id is None:
            self._conversation_id = conversation_id
        return self._conversation_id

File Upload

FileUploader

Client for uploading files to the StackSpot platform.

The upload is a two-step process: 1. Request pre-signed S3 credentials from the Data Integration API (authenticated) 2. Upload the file to S3 using the pre-signed form (unauthenticated)

Note: File uploading via API is only available for Enterprise accounts.

Example

from stkai import FileUploader, FileUploadRequest uploader = FileUploader() response = uploader.upload(FileUploadRequest(file_path="doc.pdf")) if response.is_success(): ... print(response.upload_id)

Attributes:

Name Type Description
base_url

The base URL for the Data Integration API.

options

Configuration options for the client.

http_client HttpClient

HTTP client for authenticated API calls.

Source code in src/stkai/_file_upload.py
class FileUploader:
    """
    Client for uploading files to the StackSpot platform.

    The upload is a two-step process:
    1. Request pre-signed S3 credentials from the Data Integration API (authenticated)
    2. Upload the file to S3 using the pre-signed form (unauthenticated)

    Note: File uploading via API is only available for Enterprise accounts.

    Example:
        >>> from stkai import FileUploader, FileUploadRequest
        >>> uploader = FileUploader()
        >>> response = uploader.upload(FileUploadRequest(file_path="doc.pdf"))
        >>> if response.is_success():
        ...     print(response.upload_id)

    Attributes:
        base_url: The base URL for the Data Integration API.
        options: Configuration options for the client.
        http_client: HTTP client for authenticated API calls.
    """

    def __init__(
        self,
        base_url: str | None = None,
        options: FileUploadOptions | None = None,
        http_client: HttpClient | None = None,
    ):
        """
        Initialize the FileUploader client.

        Args:
            base_url: Base URL for the Data Integration API.
                If None, uses global config (STKAI.config.file_upload.base_url).
            options: Configuration options for the client.
                If None, uses defaults from global config.
            http_client: Custom HTTP client for authenticated API calls (Step 1).
                If None, uses EnvironmentAwareHttpClient (auto-detects CLI or standalone).
        """
        from stkai._config import STKAI
        cfg = STKAI.config.file_upload

        resolved_options = (options or FileUploadOptions()).with_defaults_from(cfg)

        if base_url is None:
            base_url = cfg.base_url

        if not http_client:
            from stkai._http import EnvironmentAwareHttpClient
            http_client = EnvironmentAwareHttpClient()

        assert base_url, "FileUploader base_url cannot be empty."
        assert http_client is not None, "FileUploader http_client cannot be None."
        assert resolved_options.max_workers is not None, "Thread-pool max_workers can not be None."
        assert resolved_options.max_workers > 0, "Thread-pool max_workers must be greater than 0."

        self.base_url = base_url.rstrip("/")
        self.options = resolved_options
        self.max_workers = resolved_options.max_workers
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
        self.http_client: HttpClient = http_client

    def upload(self, request: FileUploadRequest) -> FileUploadResponse:
        """
        Upload a file and wait for the response (blocking).

        Args:
            request: The file upload request.

        Returns:
            FileUploadResponse with the upload_id or error information.

        Example:
            >>> response = uploader.upload(FileUploadRequest(file_path="doc.pdf"))
            >>> if response.is_success():
            ...     print(response.upload_id)
        """
        logger.info(f"{request.id[:26]:<26} | FileUpload | 📤 Starting file upload.")
        logger.info(f"{request.id[:26]:<26} | FileUpload |    ├ base_url={self.base_url}")
        logger.info(f"{request.id[:26]:<26} | FileUpload |    └ file_name='{request.file_name}'")

        response = self._do_upload(request)

        logger.info(f"{request.id[:26]:<26} | FileUpload | 📤 File upload finished.")
        logger.info(f"{request.id[:26]:<26} | FileUpload |    ├ with status = {response.status}")
        if response.is_success():
            logger.info(f"{request.id[:26]:<26} | FileUpload |    └ with upload_id = {response.upload_id}")
        else:
            logger.info(f"{request.id[:26]:<26} | FileUpload |    └ with error message = \"{response.error}\"")

        assert response.request is request, \
            "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
        return response

    def upload_many(self, request_list: list[FileUploadRequest]) -> list[FileUploadResponse]:
        """
        Upload multiple files concurrently, wait for all responses (blocking),
        and return them in the same order as `request_list`.

        Args:
            request_list: List of FileUploadRequest objects to upload.

        Returns:
            List[FileUploadResponse]: One response per request, in the same order.

        Example:
            >>> responses = uploader.upload_many([
            ...     FileUploadRequest(file_path="doc1.pdf"),
            ...     FileUploadRequest(file_path="doc2.pdf"),
            ... ])
            >>> upload_ids = [r.upload_id for r in responses if r.is_success()]
        """
        if not request_list:
            return []

        logger.info(
            f"{'FileUpload-Batch'[:26]:<26} | FileUpload | 📤 "
            f"Starting batch upload of {len(request_list)} files."
        )
        logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    ├ base_url={self.base_url}")
        logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    └ max_concurrent={self.max_workers}")

        future_to_index = {
            self.executor.submit(self._do_upload, req): idx
            for idx, req in enumerate(request_list)
        }

        responses_map: dict[int, FileUploadResponse] = {}

        for future in as_completed(future_to_index):
            idx = future_to_index[future]
            correlated_request = request_list[idx]
            try:
                responses_map[idx] = future.result()
            except Exception as e:
                logger.error(
                    f"{correlated_request.id[:26]:<26} | FileUpload | ❌ Upload failed in batch(seq={idx}). {e}",
                    exc_info=logger.isEnabledFor(logging.DEBUG)
                )
                responses_map[idx] = FileUploadResponse(
                    request=correlated_request,
                    status=FileUploadStatus.ERROR,
                    error=str(e),
                )

        responses = [responses_map[i] for i in range(len(request_list))]

        assert len(responses) == len(request_list), (
            f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
        )
        assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
            "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
        )

        logger.info(
            f"{'FileUpload-Batch'[:26]:<26} | FileUpload | 📤 Batch upload finished."
        )

        from collections import Counter
        totals_per_status = Counter(r.status for r in responses)
        items = totals_per_status.items()
        for idx_s, (status, total) in enumerate(items):
            icon = "└" if idx_s == (len(items) - 1) else "├"
            logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    {icon} total with status {status:<7} = {total}")

        return responses

    def _do_upload(self, request: FileUploadRequest) -> FileUploadResponse:
        """
        Internal method that executes the full upload workflow.

        Always returns a FileUploadResponse (never raises exceptions).

        Args:
            request: The file upload request.

        Returns:
            FileUploadResponse with the upload_id or error information.
        """
        assert request, "🌀 Sanity check | FileUploadRequest can not be None."
        assert request.id, "🌀 Sanity check | FileUploadRequest ID can not be None."

        form_data: dict[str, Any] | None = None

        try:
            # Validate file exists before making API calls
            file_path = Path(request.file_path)
            if not file_path.exists():
                raise FileNotFoundError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise ValueError(f"Path is not a file: {file_path}")

            # Step 1: Generate pre-signed upload form
            form_data = self._generate_presigned_form(request)

            upload_id = form_data.get("id")
            s3_url = form_data.get("url")
            s3_form_fields = form_data.get("form")

            assert upload_id, "🌀 Sanity check | Presigned upload form was created but `id` field is missing."
            assert s3_url, "🌀 Sanity check | Presigned upload form was created but `url` field is missing."
            assert s3_form_fields, "🌀 Sanity check | Presigned upload form was created but `form` field is missing."

            # Step 2: Upload file to S3
            self._upload_file_to_s3(request, s3_url, s3_form_fields)

            logger.info(
                f"{request.id[:26]:<26} | FileUpload | "
                f"✅ File uploaded successfully (upload_id={upload_id})"
            )

            return FileUploadResponse(
                request=request,
                status=FileUploadStatus.SUCCESS,
                upload_id=upload_id,
                raw_response=form_data,
            )

        except Exception as e:
            error_status = FileUploadStatus.from_exception(e)
            error_msg = f"File upload failed: {e}"
            if isinstance(e, requests.HTTPError) and e.response is not None:
                error_msg = f"File upload failed due to an HTTP error {e.response.status_code}: {e.response.text}"
            logger.error(
                f"{request.id[:26]:<26} | FileUpload | ❌ {error_msg}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            return FileUploadResponse(
                request=request,
                status=error_status,
                error=error_msg,
                raw_response=form_data,
            )

    def _generate_presigned_form(self, request: FileUploadRequest) -> dict[str, Any]:
        """
        Step 1: Request pre-signed S3 upload form from the Data Integration API.

        Args:
            request: The file upload request.

        Returns:
            Dict with 'id', 'url', and 'form' keys from the API response.

        Raises:
            requests.HTTPError: On non-2xx response.
            MaxRetriesExceededError: When retries are exhausted.
        """
        assert request, "🌀 Sanity check | FileUpload-Request not provided to generate_presigned_form phase."

        assert self.options.request_timeout is not None, "request_timeout must be set after with_defaults_from()"
        assert self.options.retry_max_retries is not None, "retry_max_retries must be set after with_defaults_from()"
        assert self.options.retry_initial_delay is not None, "retry_initial_delay must be set after with_defaults_from()"

        for attempt in Retrying(
            max_retries=self.options.retry_max_retries,
            initial_delay=self.options.retry_initial_delay,
            logger_prefix=f"{request.id[:26]:<26} | FileUpload",
        ):
            with attempt:
                logger.info(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 1: Generating presigned upload form (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
                )

                url = f"{self.base_url}/v2/file-upload/form"
                http_response = self.http_client.post(
                    url=url,
                    data=request.to_api_payload(),
                    timeout=self.options.request_timeout,
                )
                assert isinstance(http_response, requests.Response), \
                    f"🌀 Sanity check | Object returned by `post` method is not an instance of `requests.Response`. ({http_response.__class__})"

                logger.debug(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 1: Presigned upload form response: status={http_response.status_code}"
                    f"\n{http_response.text}"
                )

                http_response.raise_for_status()
                response_data: dict[str, Any] = http_response.json()

                logger.info(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 1: Presigned upload form received (upload_id={response_data['id']})"
                )
                return response_data

        raise RuntimeError(
            "Unexpected error while generating presigned upload form: "
            "reached end of `_generate_presigned_form` method without returning a response."
        )

    def _upload_file_to_s3(
        self,
        request: FileUploadRequest,
        s3_url: str,
        form_fields: dict[str, str],
    ) -> None:
        """
        Step 2: Upload file to S3 using the pre-signed form data.

        Uses raw requests.post() since this is an unauthenticated multipart upload.

        Args:
            request: The file upload request.
            s3_url: The S3 pre-signed URL.
            form_fields: The form fields for the multipart upload.

        Raises:
            requests.HTTPError: On non-2xx response from S3.
            MaxRetriesExceededError: When retries are exhausted.
        """
        assert request, "🌀 Sanity check | FileUpload-Request not provided to upload_file_to_s3 phase."
        assert s3_url is not None, "🌀 Sanity check | S3 URL not provided to upload_file_to_s3 phase."
        assert form_fields is not None, "🌀 Sanity check | Form fields not provided to upload_file_to_s3 phase."

        assert self.options.transfer_timeout is not None, "transfer_timeout must be set after with_defaults_from()"
        assert self.options.retry_max_retries is not None, "retry_max_retries must be set after with_defaults_from()"
        assert self.options.retry_initial_delay is not None, "retry_initial_delay must be set after with_defaults_from()"

        for attempt in Retrying(
            max_retries=self.options.retry_max_retries,
            initial_delay=self.options.retry_initial_delay,
            logger_prefix=f"{request.id[:26]:<26} | FileUpload",
        ):
            with attempt:
                logger.info(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 2: Uploading file to S3 (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
                )

                file_path = Path(request.file_path)
                content_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"

                with file_path.open("rb") as f:
                    http_response = requests.post(
                        s3_url,
                        data=form_fields,
                        files={"file": (request.file_name, f, content_type)},
                        timeout=self.options.transfer_timeout,
                    )

                logger.debug(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 2: S3 response: status={http_response.status_code}"
                    f"\n{http_response.text}"
                )

                http_response.raise_for_status()

                logger.info(
                    f"{request.id[:26]:<26} | FileUpload | "
                    f"Step 2: File uploaded to S3 successfully"
                )
                return

        raise RuntimeError(
            "Unexpected error while uploading file to S3: "
            "reached end of `_upload_file_to_s3` method without returning."
        )

Functions

upload

upload(request: FileUploadRequest) -> FileUploadResponse

Upload a file and wait for the response (blocking).

Parameters:

Name Type Description Default
request FileUploadRequest

The file upload request.

required

Returns:

Type Description
FileUploadResponse

FileUploadResponse with the upload_id or error information.

Example

response = uploader.upload(FileUploadRequest(file_path="doc.pdf")) if response.is_success(): ... print(response.upload_id)

Source code in src/stkai/_file_upload.py
def upload(self, request: FileUploadRequest) -> FileUploadResponse:
    """
    Upload a file and wait for the response (blocking).

    Args:
        request: The file upload request.

    Returns:
        FileUploadResponse with the upload_id or error information.

    Example:
        >>> response = uploader.upload(FileUploadRequest(file_path="doc.pdf"))
        >>> if response.is_success():
        ...     print(response.upload_id)
    """
    logger.info(f"{request.id[:26]:<26} | FileUpload | 📤 Starting file upload.")
    logger.info(f"{request.id[:26]:<26} | FileUpload |    ├ base_url={self.base_url}")
    logger.info(f"{request.id[:26]:<26} | FileUpload |    └ file_name='{request.file_name}'")

    response = self._do_upload(request)

    logger.info(f"{request.id[:26]:<26} | FileUpload | 📤 File upload finished.")
    logger.info(f"{request.id[:26]:<26} | FileUpload |    ├ with status = {response.status}")
    if response.is_success():
        logger.info(f"{request.id[:26]:<26} | FileUpload |    └ with upload_id = {response.upload_id}")
    else:
        logger.info(f"{request.id[:26]:<26} | FileUpload |    └ with error message = \"{response.error}\"")

    assert response.request is request, \
        "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
    return response

upload_many

upload_many(request_list: list[FileUploadRequest]) -> list[FileUploadResponse]

Upload multiple files concurrently, wait for all responses (blocking), and return them in the same order as request_list.

Parameters:

Name Type Description Default
request_list list[FileUploadRequest]

List of FileUploadRequest objects to upload.

required

Returns:

Type Description
list[FileUploadResponse]

List[FileUploadResponse]: One response per request, in the same order.

Example

responses = uploader.upload_many([ ... FileUploadRequest(file_path="doc1.pdf"), ... FileUploadRequest(file_path="doc2.pdf"), ... ]) upload_ids = [r.upload_id for r in responses if r.is_success()]

Source code in src/stkai/_file_upload.py
def upload_many(self, request_list: list[FileUploadRequest]) -> list[FileUploadResponse]:
    """
    Upload multiple files concurrently, wait for all responses (blocking),
    and return them in the same order as `request_list`.

    Args:
        request_list: List of FileUploadRequest objects to upload.

    Returns:
        List[FileUploadResponse]: One response per request, in the same order.

    Example:
        >>> responses = uploader.upload_many([
        ...     FileUploadRequest(file_path="doc1.pdf"),
        ...     FileUploadRequest(file_path="doc2.pdf"),
        ... ])
        >>> upload_ids = [r.upload_id for r in responses if r.is_success()]
    """
    if not request_list:
        return []

    logger.info(
        f"{'FileUpload-Batch'[:26]:<26} | FileUpload | 📤 "
        f"Starting batch upload of {len(request_list)} files."
    )
    logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    ├ base_url={self.base_url}")
    logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    └ max_concurrent={self.max_workers}")

    future_to_index = {
        self.executor.submit(self._do_upload, req): idx
        for idx, req in enumerate(request_list)
    }

    responses_map: dict[int, FileUploadResponse] = {}

    for future in as_completed(future_to_index):
        idx = future_to_index[future]
        correlated_request = request_list[idx]
        try:
            responses_map[idx] = future.result()
        except Exception as e:
            logger.error(
                f"{correlated_request.id[:26]:<26} | FileUpload | ❌ Upload failed in batch(seq={idx}). {e}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            responses_map[idx] = FileUploadResponse(
                request=correlated_request,
                status=FileUploadStatus.ERROR,
                error=str(e),
            )

    responses = [responses_map[i] for i in range(len(request_list))]

    assert len(responses) == len(request_list), (
        f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
    )
    assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
        "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
    )

    logger.info(
        f"{'FileUpload-Batch'[:26]:<26} | FileUpload | 📤 Batch upload finished."
    )

    from collections import Counter
    totals_per_status = Counter(r.status for r in responses)
    items = totals_per_status.items()
    for idx_s, (status, total) in enumerate(items):
        icon = "└" if idx_s == (len(items) - 1) else "├"
        logger.info(f"{'FileUpload-Batch'[:26]:<26} | FileUpload |    {icon} total with status {status:<7} = {total}")

    return responses

FileUploadRequest dataclass

Represents a file upload request.

Attributes:

Name Type Description
file_path str | Path

Path to the file to upload.

target_type FileUploadTargetType

Upload target type (default: CONTEXT). Use CONTEXT for agent chat context, KNOWLEDGE_SOURCE for knowledge sources.

target_id str | None

Knowledge source slug. Required when target_type is KNOWLEDGE_SOURCE.

expiration int

Expiration in minutes for the uploaded file (default: 60).

id str

Unique identifier for this request. Auto-generated as UUID if not provided.

metadata dict[str, Any]

Optional dictionary for storing custom metadata.

Example

request = FileUploadRequest(file_path="document.pdf") request = FileUploadRequest( ... file_path="doc.pdf", ... target_type=FileUploadTargetType.KNOWLEDGE_SOURCE, ... target_id="my-ks-slug", ... )

Source code in src/stkai/_file_upload.py
@dataclass(frozen=True)
class FileUploadRequest:
    """
    Represents a file upload request.

    Attributes:
        file_path: Path to the file to upload.
        target_type: Upload target type (default: CONTEXT).
            Use CONTEXT for agent chat context, KNOWLEDGE_SOURCE for knowledge sources.
        target_id: Knowledge source slug. Required when target_type is KNOWLEDGE_SOURCE.
        expiration: Expiration in minutes for the uploaded file (default: 60).
        id: Unique identifier for this request. Auto-generated as UUID if not provided.
        metadata: Optional dictionary for storing custom metadata.

    Example:
        >>> request = FileUploadRequest(file_path="document.pdf")
        >>> request = FileUploadRequest(
        ...     file_path="doc.pdf",
        ...     target_type=FileUploadTargetType.KNOWLEDGE_SOURCE,
        ...     target_id="my-ks-slug",
        ... )
    """
    file_path: str | Path
    target_type: FileUploadTargetType = FileUploadTargetType.CONTEXT
    target_id: str | None = None
    expiration: int = 60
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        assert self.id, "Request ID cannot be empty."
        assert self.file_path, "File path cannot be empty."
        assert self.expiration is not None, "Expiration cannot be None."
        assert self.expiration > 0, "Expiration must be greater than 0."

        assert self.target_type, "Target type cannot be None."
        assert isinstance(self.target_type, FileUploadTargetType), f"Target type must be a FileUploadTargetType, got {type(self.target_type).__name__}."
        if self.target_type == FileUploadTargetType.KNOWLEDGE_SOURCE:
            assert self.target_id and self.target_id.strip(), f"Target ID is required when Target type is {self.target_type}."

        file_path = Path(self.file_path)
        assert file_path.exists(), f"File path not found: {file_path}"
        assert file_path.is_file(), f"File path is not a file: {file_path}"

    @property
    def file_name(self) -> str:
        """Extract file name from the file path."""
        return Path(self.file_path).name

    def to_api_payload(self) -> dict[str, Any]:
        """Converts the request to the API payload format for the pre-signed form endpoint."""
        payload: dict[str, Any] = {
            "file_name": self.file_name,
            "target_type": self.target_type,
            "expiration": self.expiration,
        }
        if self.target_id is not None:
            payload["target_id"] = self.target_id
        return payload

Attributes

file_name property

file_name: str

Extract file name from the file path.

Functions

to_api_payload

to_api_payload() -> dict[str, Any]

Converts the request to the API payload format for the pre-signed form endpoint.

Source code in src/stkai/_file_upload.py
def to_api_payload(self) -> dict[str, Any]:
    """Converts the request to the API payload format for the pre-signed form endpoint."""
    payload: dict[str, Any] = {
        "file_name": self.file_name,
        "target_type": self.target_type,
        "expiration": self.expiration,
    }
    if self.target_id is not None:
        payload["target_id"] = self.target_id
    return payload

FileUploadResponse dataclass

Represents a response from a file upload operation.

Attributes:

Name Type Description
request FileUploadRequest

The original request that generated this response.

status FileUploadStatus

The status of the response (SUCCESS, ERROR, TIMEOUT).

upload_id str | None

The upload ID returned by the API on success.

error str | None

Error message if the upload failed.

raw_response dict[str, Any] | None

Raw API response from Step 1 (pre-signed form request).

Example

if response.is_success(): ... print(response.upload_id) ... else: ... print(f"Error: {response.error}")

Source code in src/stkai/_file_upload.py
@dataclass(frozen=True)
class FileUploadResponse:
    """
    Represents a response from a file upload operation.

    Attributes:
        request: The original request that generated this response.
        status: The status of the response (SUCCESS, ERROR, TIMEOUT).
        upload_id: The upload ID returned by the API on success.
        error: Error message if the upload failed.
        raw_response: Raw API response from Step 1 (pre-signed form request).

    Example:
        >>> if response.is_success():
        ...     print(response.upload_id)
        ... else:
        ...     print(f"Error: {response.error}")
    """
    request: FileUploadRequest
    status: FileUploadStatus
    upload_id: str | None = None
    error: str | None = None
    raw_response: dict[str, Any] | None = None

    def __post_init__(self) -> None:
        assert self.request, "Request cannot be empty."
        assert self.status, "Status cannot be empty."

    def is_success(self) -> bool:
        """Returns True if the upload was successful."""
        return self.status == FileUploadStatus.SUCCESS

    def is_error(self) -> bool:
        """Returns True if there was an error."""
        return self.status == FileUploadStatus.ERROR

    def is_timeout(self) -> bool:
        """Returns True if the upload timed out."""
        return self.status == FileUploadStatus.TIMEOUT

    def error_with_details(self) -> dict[str, Any]:
        """Returns a dictionary with error details for non-success responses."""
        if self.is_success():
            return {}

        return {
            "status": self.status,
            "error_message": self.error,
            "response_body": self.raw_response or {},
        }

Functions

is_success

is_success() -> bool

Returns True if the upload was successful.

Source code in src/stkai/_file_upload.py
def is_success(self) -> bool:
    """Returns True if the upload was successful."""
    return self.status == FileUploadStatus.SUCCESS

is_error

is_error() -> bool

Returns True if there was an error.

Source code in src/stkai/_file_upload.py
def is_error(self) -> bool:
    """Returns True if there was an error."""
    return self.status == FileUploadStatus.ERROR

is_timeout

is_timeout() -> bool

Returns True if the upload timed out.

Source code in src/stkai/_file_upload.py
def is_timeout(self) -> bool:
    """Returns True if the upload timed out."""
    return self.status == FileUploadStatus.TIMEOUT

error_with_details

error_with_details() -> dict[str, Any]

Returns a dictionary with error details for non-success responses.

Source code in src/stkai/_file_upload.py
def error_with_details(self) -> dict[str, Any]:
    """Returns a dictionary with error details for non-success responses."""
    if self.is_success():
        return {}

    return {
        "status": self.status,
        "error_message": self.error,
        "response_body": self.raw_response or {},
    }

FileUploadStatus

Bases: StrEnum

Status of a file upload response.

Attributes:

Name Type Description
SUCCESS

File uploaded successfully.

ERROR

Client-side error (HTTP error, network issue, file not found).

TIMEOUT

Any timeout, client or server-side.

Source code in src/stkai/_file_upload.py
class FileUploadStatus(StrEnum):
    """
    Status of a file upload response.

    Attributes:
        SUCCESS: File uploaded successfully.
        ERROR: Client-side error (HTTP error, network issue, file not found).
        TIMEOUT: Any timeout, client or server-side.
    """
    SUCCESS = "SUCCESS"
    ERROR = "ERROR"
    TIMEOUT = "TIMEOUT"

    @classmethod
    def from_exception(cls, exc: Exception) -> "FileUploadStatus":
        """
        Determine the appropriate status for an exception.

        Args:
            exc: The exception that occurred during the upload.

        Returns:
            TIMEOUT for timeout exceptions, ERROR for all others.
        """
        from stkai._utils import is_timeout_exception
        return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

Functions

from_exception classmethod

from_exception(exc: Exception) -> FileUploadStatus

Determine the appropriate status for an exception.

Parameters:

Name Type Description Default
exc Exception

The exception that occurred during the upload.

required

Returns:

Type Description
FileUploadStatus

TIMEOUT for timeout exceptions, ERROR for all others.

Source code in src/stkai/_file_upload.py
@classmethod
def from_exception(cls, exc: Exception) -> "FileUploadStatus":
    """
    Determine the appropriate status for an exception.

    Args:
        exc: The exception that occurred during the upload.

    Returns:
        TIMEOUT for timeout exceptions, ERROR for all others.
    """
    from stkai._utils import is_timeout_exception
    return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

FileUploadOptions dataclass

Configuration options for the FileUploader client.

Fields set to None will use values from global config (STKAI.config.file_upload).

Attributes:

Name Type Description
request_timeout int | None

HTTP timeout for Step 1 (get pre-signed form).

transfer_timeout int | None

HTTP timeout for Step 2 (file transfer to S3).

retry_max_retries int | None

Maximum retry attempts for transient failures.

retry_initial_delay float | None

Initial delay for first retry (exponential backoff).

max_workers int | None

Maximum threads for upload_many().

Example

options = FileUploadOptions(request_timeout=15, transfer_timeout=60) uploader = FileUploader(options=options)

Source code in src/stkai/_file_upload.py
@dataclass(frozen=True)
class FileUploadOptions:
    """
    Configuration options for the FileUploader client.

    Fields set to None will use values from global config (STKAI.config.file_upload).

    Attributes:
        request_timeout: HTTP timeout for Step 1 (get pre-signed form).
        transfer_timeout: HTTP timeout for Step 2 (file transfer to S3).
        retry_max_retries: Maximum retry attempts for transient failures.
        retry_initial_delay: Initial delay for first retry (exponential backoff).
        max_workers: Maximum threads for upload_many().

    Example:
        >>> options = FileUploadOptions(request_timeout=15, transfer_timeout=60)
        >>> uploader = FileUploader(options=options)
    """
    request_timeout: int | None = None
    transfer_timeout: int | None = None
    retry_max_retries: int | None = None
    retry_initial_delay: float | None = None
    max_workers: int | None = None

    def with_defaults_from(self, cfg: FileUploadConfig) -> "FileUploadOptions":
        """
        Returns a new FileUploadOptions with None values filled from config.

        Args:
            cfg: The FileUploadConfig to use for default values.

        Returns:
            A new FileUploadOptions with all fields resolved (no None values).
        """
        return FileUploadOptions(
            request_timeout=self.request_timeout if self.request_timeout is not None else cfg.request_timeout,
            transfer_timeout=self.transfer_timeout if self.transfer_timeout is not None else cfg.transfer_timeout,
            retry_max_retries=self.retry_max_retries if self.retry_max_retries is not None else cfg.retry_max_retries,
            retry_initial_delay=self.retry_initial_delay if self.retry_initial_delay is not None else cfg.retry_initial_delay,
            max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
        )

Functions

with_defaults_from

with_defaults_from(cfg: FileUploadConfig) -> FileUploadOptions

Returns a new FileUploadOptions with None values filled from config.

Parameters:

Name Type Description Default
cfg FileUploadConfig

The FileUploadConfig to use for default values.

required

Returns:

Type Description
FileUploadOptions

A new FileUploadOptions with all fields resolved (no None values).

Source code in src/stkai/_file_upload.py
def with_defaults_from(self, cfg: FileUploadConfig) -> "FileUploadOptions":
    """
    Returns a new FileUploadOptions with None values filled from config.

    Args:
        cfg: The FileUploadConfig to use for default values.

    Returns:
        A new FileUploadOptions with all fields resolved (no None values).
    """
    return FileUploadOptions(
        request_timeout=self.request_timeout if self.request_timeout is not None else cfg.request_timeout,
        transfer_timeout=self.transfer_timeout if self.transfer_timeout is not None else cfg.transfer_timeout,
        retry_max_retries=self.retry_max_retries if self.retry_max_retries is not None else cfg.retry_max_retries,
        retry_initial_delay=self.retry_initial_delay if self.retry_initial_delay is not None else cfg.retry_initial_delay,
        max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
    )

Configuration

AgentOptions dataclass

Configuration options for the Agent client.

Fields set to None will use values from global config (STKAI.config.agent).

Attributes:

Name Type Description
request_timeout int | None

HTTP request timeout in seconds.

retry_max_retries int | None

Maximum number of retry attempts for failed chat calls. Use 0 to disable retries (single attempt only). Use 3 for 4 total attempts (1 original + 3 retries).

retry_initial_delay float | None

Initial delay in seconds for the first retry attempt. Subsequent retries use exponential backoff (delay doubles each attempt).

max_workers int | None

Maximum number of threads for batch execution (chat_many).

Example

Use all defaults from config

agent = Agent(agent_id="my-agent")

Customize timeout and enable retry

options = AgentOptions(request_timeout=120, retry_max_retries=3) agent = Agent(agent_id="my-agent", options=options)

Source code in src/stkai/agents/_agent.py
@dataclass(frozen=True)
class AgentOptions:
    """
    Configuration options for the Agent client.

    Fields set to None will use values from global config (STKAI.config.agent).

    Attributes:
        request_timeout: HTTP request timeout in seconds.
        retry_max_retries: Maximum number of retry attempts for failed chat calls.
            Use 0 to disable retries (single attempt only).
            Use 3 for 4 total attempts (1 original + 3 retries).
        retry_initial_delay: Initial delay in seconds for the first retry attempt.
            Subsequent retries use exponential backoff (delay doubles each attempt).
        max_workers: Maximum number of threads for batch execution (chat_many).

    Example:
        >>> # Use all defaults from config
        >>> agent = Agent(agent_id="my-agent")
        >>>
        >>> # Customize timeout and enable retry
        >>> options = AgentOptions(request_timeout=120, retry_max_retries=3)
        >>> agent = Agent(agent_id="my-agent", options=options)
    """
    request_timeout: int | None = None
    retry_max_retries: int | None = None
    retry_initial_delay: float | None = None
    max_workers: int | None = None

    def with_defaults_from(self, cfg: AgentConfig) -> AgentOptions:
        """
        Returns a new AgentOptions with None values filled from config.

        User-provided values take precedence; None values use config defaults.
        This follows the Single Source of Truth principle where STKAI.config
        is the authoritative source for default values.

        Args:
            cfg: The AgentConfig to use for default values.

        Returns:
            A new AgentOptions with all fields resolved (no None values).

        Example:
            >>> options = AgentOptions(request_timeout=120)
            >>> resolved = options.with_defaults_from(STKAI.config.agent)
            >>> resolved.request_timeout  # 120 (user-defined)
        """
        return AgentOptions(
            request_timeout=self.request_timeout if self.request_timeout is not None else cfg.request_timeout,
            retry_max_retries=self.retry_max_retries if self.retry_max_retries is not None else cfg.retry_max_retries,
            retry_initial_delay=self.retry_initial_delay if self.retry_initial_delay is not None else cfg.retry_initial_delay,
            max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
        )

Functions

with_defaults_from

with_defaults_from(cfg: AgentConfig) -> AgentOptions

Returns a new AgentOptions with None values filled from config.

User-provided values take precedence; None values use config defaults. This follows the Single Source of Truth principle where STKAI.config is the authoritative source for default values.

Parameters:

Name Type Description Default
cfg AgentConfig

The AgentConfig to use for default values.

required

Returns:

Type Description
AgentOptions

A new AgentOptions with all fields resolved (no None values).

Example

options = AgentOptions(request_timeout=120) resolved = options.with_defaults_from(STKAI.config.agent) resolved.request_timeout # 120 (user-defined)

Source code in src/stkai/agents/_agent.py
def with_defaults_from(self, cfg: AgentConfig) -> AgentOptions:
    """
    Returns a new AgentOptions with None values filled from config.

    User-provided values take precedence; None values use config defaults.
    This follows the Single Source of Truth principle where STKAI.config
    is the authoritative source for default values.

    Args:
        cfg: The AgentConfig to use for default values.

    Returns:
        A new AgentOptions with all fields resolved (no None values).

    Example:
        >>> options = AgentOptions(request_timeout=120)
        >>> resolved = options.with_defaults_from(STKAI.config.agent)
        >>> resolved.request_timeout  # 120 (user-defined)
    """
    return AgentOptions(
        request_timeout=self.request_timeout if self.request_timeout is not None else cfg.request_timeout,
        retry_max_retries=self.retry_max_retries if self.retry_max_retries is not None else cfg.retry_max_retries,
        retry_initial_delay=self.retry_initial_delay if self.retry_initial_delay is not None else cfg.retry_initial_delay,
        max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
    )