Skip to content

RQC API Reference

Complete API reference for Remote Quick Commands.

Main Classes

RemoteQuickCommand

Synchronous client for executing Remote QuickCommands (RQC).

This client provides a high-level interface for executing Remote Quick Commands against the StackSpot AI API with built-in support for:

  • Automatic polling until execution completes
  • Exponential backoff with retries for transient failures
  • Thread-based concurrency for batch execution
  • Request/response logging to disk for debugging
Example

rqc = RemoteQuickCommand(slug_name="my-quick-command") request = RqcRequest(payload={"prompt": "Hello!"}) response = rqc.execute(request) if response.is_completed(): ... print(response.result)

Attributes:

Name Type Description
slug_name

The Quick Command slug name to execute.

base_url

The base URL for the StackSpot AI API.

options

Consolidated configuration options (resolved with defaults from config).

http_client HttpClient

HTTP client for API calls (default: EnvironmentAwareHttpClient).

listeners list[RqcEventListener]

List of event listeners for observing execution lifecycle.

Source code in src/stkai/rqc/_remote_quick_command.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
class RemoteQuickCommand:
    """
    Synchronous client for executing Remote QuickCommands (RQC).

    This client provides a high-level interface for executing Remote Quick Commands
    against the StackSpot AI API with built-in support for:

    - Automatic polling until execution completes
    - Exponential backoff with retries for transient failures
    - Thread-based concurrency for batch execution
    - Request/response logging to disk for debugging

    Example:
        >>> rqc = RemoteQuickCommand(slug_name="my-quick-command")
        >>> request = RqcRequest(payload={"prompt": "Hello!"})
        >>> response = rqc.execute(request)
        >>> if response.is_completed():
        ...     print(response.result)

    Attributes:
        slug_name: The Quick Command slug name to execute.
        base_url: The base URL for the StackSpot AI API.
        options: Consolidated configuration options (resolved with defaults from config).
        http_client: HTTP client for API calls (default: EnvironmentAwareHttpClient).
        listeners: List of event listeners for observing execution lifecycle.
    """

    def __init__(
        self,
        slug_name: str,
        base_url: str | None = None,
        options: RqcOptions | None = None,
        http_client: HttpClient | None = None,
        listeners: list[RqcEventListener] | None = None,
    ):
        """
        Initialize the RemoteQuickCommand client.

        By default, a FileLoggingListener is registered to persist request/response
        to JSON files in `output/rqc/{slug_name}/`. Pass `listeners=[]` to disable
        this behavior, or provide your own list of listeners.

        Args:
            slug_name: The Quick Command slug name (identifier) to execute.
            base_url: Base URL for the StackSpot AI API.
                If None, uses global config (STKAI.config.rqc.base_url).
            options: Configuration options for execution behavior.
                If None, uses defaults from global config (STKAI.config.rqc).
                Partial options are merged with config defaults via with_defaults_from().
            http_client: Custom HTTP client implementation for API calls.
                If None, uses EnvironmentAwareHttpClient (auto-detects CLI or standalone).
            listeners: Event listeners for observing execution lifecycle.
                If None (default), registers a FileLoggingListener.
                If [] (empty list), disables default logging.

        Raises:
            AssertionError: If any required parameter is invalid.
        """
        # Get global config for defaults
        from stkai._config import STKAI
        cfg = STKAI.config.rqc

        # Resolve options with defaults from config (Single Source of Truth)
        resolved_options = (options or RqcOptions()).with_defaults_from(cfg)

        # Resolve base_url
        if base_url is None:
            base_url = cfg.base_url

        if not http_client:
            from stkai._http import EnvironmentAwareHttpClient
            http_client = EnvironmentAwareHttpClient()

        # Setup default FileLoggingListener when no listeners are specified (None).
        # To disable logging, pass an empty list: `listeners=[]`
        if listeners is None:
            from stkai.rqc._event_listeners import FileLoggingListener
            listeners = [
                FileLoggingListener(output_dir=f"output/rqc/{slug_name}")
            ]

        assert slug_name, "RQC slug_name can not be empty."
        assert base_url, "RQC base_url can not be empty."
        assert resolved_options.max_workers is not None, "Thread-pool max_workers can not be None."
        assert resolved_options.max_workers > 0, "Thread-pool max_workers must be greater than 0."
        assert http_client is not None, "RQC http_client can not be None."
        assert listeners is not None, "RQC listeners can not be None."

        self.slug_name = slug_name
        self.base_url = base_url.rstrip("/")
        self.options = resolved_options
        self.max_workers = resolved_options.max_workers
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
        self.http_client: HttpClient = http_client
        self._notifier = RqcEventNotifier(listeners)

    @property
    def listeners(self) -> list[RqcEventListener]:
        return self._notifier.listeners

    # ======================
    # Public API
    # ======================

    def execute_many(
        self,
        request_list: list[RqcRequest],
        result_handler: RqcResultHandler | None = None,
    ) -> list[RqcResponse]:
        """
        Executes multiple RQC requests concurrently, waits for their completion (blocking),
        and returns their responses.

        Each request is executed in parallel threads using the internal thread-pool.
        Returns a list of RqcResponse objects in the same order as `requests_list`.

        Args:
            request_list: List of RqcRequest objects to execute.
            result_handler: Optional custom handler used to process the raw response.

        Returns:
            List[RqcResponse]: One response per request.
        """
        if not request_list:
            return []

        logger.info(
            f"{'RQC-Batch-Execution'[:26]:<26} | RQC | "
            f"🛜 Starting batch execution of {len(request_list)} requests."
        )
        logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ base_url={self.base_url}")
        logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ slug_name='{self.slug_name}'")
        logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    └ max_concurrent={self.max_workers}")

        # Use thread-pool for parallel calls to `_execute_workflow`
        future_to_index = {
            self.executor.submit(
                self._execute_workflow,           # function ref
                request=req,                      # arg-1
                result_handler=result_handler     # arg-2
            ): idx
            for idx, req in enumerate(request_list)
        }

        # Block and waits for all responses to be finished
        responses_map: dict[int, RqcResponse] = {}

        for future in as_completed(future_to_index):
            idx = future_to_index[future]
            correlated_request = request_list[idx]
            try:
                responses_map[idx] = future.result()
            except Exception as e:
                logger.error(
                    f"{correlated_request.id[:26]:<26} | RQC | ❌ Execution failed in batch(seq={idx}). {e}",
                    exc_info=logger.isEnabledFor(logging.DEBUG)
                )
                responses_map[idx] = RqcResponse(
                    request=correlated_request,
                    status=RqcExecutionStatus.ERROR,
                    error=str(e),
                )

        # Rebuild responses list in the same order of requests list
        responses = [
            responses_map[i] for i in range(len(request_list))
        ]

        # Race-condition check: ensure both lists have the same length
        assert len(responses) == len(request_list), (
            f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
        )
        # Race-condition check: ensure each response points to its respective request
        assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
            "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
        )

        logger.info(
            f"{'RQC-Batch-Execution'[:26]:<26} | RQC | 🛜 Batch execution finished."
        )
        logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ total of responses = {len(responses)}")

        from collections import Counter
        totals_per_status = Counter(r.status for r in responses)
        items = totals_per_status.items()
        for idx, (status, total) in enumerate(items):
            icon = "└" if idx == (len(items) - 1) else "├"
            logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    {icon} total of responses with status {status:<9} = {total}")

        return responses

    def execute(
        self,
        request: RqcRequest,
        result_handler: RqcResultHandler | None = None,
    ) -> RqcResponse:
        """
        Executes a Remote QuickCommand synchronously and waits for its completion (blocking).

        This method sends a single request to the remote StackSpot AI QuickCommand API,
        monitors its execution until a terminal status is reached (COMPLETED, FAILURE, ERROR, or TIMEOUT),
        and returns an RqcResponse object containing the result or error details.

        Args:
            request: The RqcRequest instance representing the command to execute.
            result_handler: Optional custom handler used to process the raw response.

        Returns:
            RqcResponse: The final response object, always returned even if an error occurs.
        """
        logger.info(f"{request.id[:26]:<26} | RQC | 🛜 Starting execution of a single request.")
        logger.info(f"{request.id[:26]:<26} | RQC |    ├ base_url={self.base_url}")
        logger.info(f"{request.id[:26]:<26} | RQC |    └ slug_name='{self.slug_name}'")

        response = self._execute_workflow(
            request=request, result_handler=result_handler
        )

        logger.info(
            f"{request.id[:26]:<26} | RQC | 🛜 Execution finished."
        )
        if response.is_completed():
            logger.info(f"{request.id[:26]:<26} | RQC |    └ with status = {response.status}")
        else:
            logger.info(f"{request.id[:26]:<26} | RQC |    ├ with status = {response.status}")
            logger.info(f"{request.id[:26]:<26} | RQC |    └ with error message = \"{response.error}\"")

        assert response.request is request, \
            "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
        return response

    def _execute_workflow(
        self,
        request: RqcRequest,
        result_handler: RqcResultHandler | None = None,
    ) -> RqcResponse:
        """
        Internal workflow that executes a Remote QuickCommand.

        This method contains the actual execution logic: creating the execution,
        polling for status, and processing the result. It is called by both
        `execute()` (for single requests) and `execute_many()` (for batch requests).

        Args:
            request: The RqcRequest instance representing the command to execute.
            result_handler: Optional custom handler used to process the raw response.

        Returns:
            RqcResponse: The final response object, always returned even if an error occurs.
        """
        assert request, "🌀 Sanity check | RQC-Request can not be None."
        assert request.id, "🌀 Sanity check | RQC-Request ID can not be None."

        # Create context for listeners (shared across all listener calls for this execution)
        event_context: dict[str, Any] = {}

        # Notify listeners: before execute
        self._notifier.notify_before_execute(
            request=request, context=event_context
        )

        response = None
        try:
            # Phase-1: Create execution
            execution = self._create_execution(request=request, context=event_context)
            if not execution.is_created():
                response = execution.to_response()
                return response

            # Sanity checks after Phase-1
            assert execution.execution_id, "🌀 Sanity check | Execution was created but `execution_id` is missing."

            # Make execution_id available to listeners via context
            event_context["execution_id"] = execution.execution_id

            # Phase-2: Poll
            if not result_handler:
                from stkai.rqc._handlers import DEFAULT_RESULT_HANDLER
                result_handler = DEFAULT_RESULT_HANDLER

            response = self._poll_until_done(
                execution=execution, handler=result_handler, context=event_context
            )

            # Sanity check after Phase-2
            assert response, "🌀 Sanity check | RQC-Response was not created during the polling phase."
            return response
        finally:
            # Single point of notification: on_after_execute (always called)
            if response is not None:
                self._notifier.notify_after_execute(
                    request=request, response=response, context=event_context
                )

    # ======================
    # Internals
    # ======================

    def _create_execution(
        self,
        request: RqcRequest,
        context: dict[str, Any],
    ) -> RqcExecution:
        """
        Creates an RQC execution via POST with retries and exponential backoff.

        Instantiates an RqcExecution tracker and attempts to create the execution
        on the server. On success, the execution has status CREATED and execution_id
        set. On failure, the execution has status ERROR or TIMEOUT with an error message.

        Args:
            request: The RqcRequest to execute.
            context: Shared event context for listeners.

        Returns:
            RqcExecution with status CREATED on success, or ERROR/TIMEOUT on failure.
        """
        assert request, "🌀 Sanity check | RQC-Request not provided to create-execution phase."
        assert context is not None, "🌀 Sanity check | Event context not provided to create-execution phase."

        # Get options and assert for type narrowing
        opts = self.options.create_execution
        assert opts is not None, "create_execution options must be set after with_defaults_from()"
        assert opts.retry_max_retries is not None, "retry_max_retries must be set after with_defaults_from()"
        assert opts.retry_initial_delay is not None, "retry_initial_delay must be set after with_defaults_from()"
        assert opts.request_timeout is not None, "request_timeout must be set after with_defaults_from()"

        request_id = request.id
        input_data = request.to_input_data()

        # Create execution tracker (internal mutable state for this execution)
        execution = RqcExecution(request=request)
        # Build full URL using base_url
        url = f"{self.base_url}/v1/quick-commands/create-execution/{self.slug_name}"

        try:
            for attempt in Retrying(
                max_retries=opts.retry_max_retries,
                initial_delay=opts.retry_initial_delay,
                logger_prefix=f"{request_id[:26]:<26} | RQC",
            ):
                with attempt:
                    logger.info(
                        f"{request_id[:26]:<26} | RQC | "
                        f"Sending request to create execution (attempt {attempt.attempt_number}/{attempt.max_attempts})..."
                    )
                    response = self.http_client.post(
                        url=url, data=input_data, timeout=opts.request_timeout
                    )
                    assert isinstance(response, requests.Response), \
                        f"🌀 Sanity check | Object returned by `post` method is not an instance of `requests.Response`. ({response.__class__})"

                    response.raise_for_status()
                    execution_id: str = response.json()
                    if not execution_id:
                        raise ExecutionIdIsMissingError("No `execution_id` returned in the create execution response by server.")

                    # Register execution state on the execution tracker
                    execution.mark_as_submitted(execution_id=execution_id)
                    logger.info(
                        f"{request_id[:26]:<26} | RQC | Execution successfully created ({execution_id})"
                    )

                    # Transition and notify: PENDING → CREATED
                    self._transition_and_notify(
                        execution=execution, new_status=RqcExecutionStatus.CREATED, context=context
                    )
                    return execution

            # Should never reach here - Retrying raises MaxRetriesExceededError
            raise RuntimeError(
                "Unexpected error while creating execution: "
                "reached end of `_create_execution` method without returning the execution ID."
            )

        except Exception as e:
            error_status = RqcExecutionStatus.from_exception(e)
            error_msg = f"Failed to create execution: {e}"
            if isinstance(e, requests.HTTPError) and e.response is not None:
                error_msg = f"Failed to create execution due to an HTTP error {e.response.status_code}: {e.response.text}"
            logger.error(
                f"{request_id[:26]:<26} | RQC | ❌ {error_msg}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )

            # Transition and notify: PENDING → ERROR or TIMEOUT
            self._transition_and_notify(
                execution=execution, new_status=error_status, context=context, error=error_msg
            )
            return execution

    def _poll_until_done(
        self,
        execution: RqcExecution,
        handler: RqcResultHandler,
        context: dict[str, Any],
    ) -> RqcResponse:
        """Polls the status endpoint until the execution reaches a terminal state."""
        assert execution, "🌀 Sanity check | RQC-Execution not provided to polling phase."
        assert handler, "🌀 Sanity check | Result Handler not provided to polling phase."
        assert context is not None, "🌀 Sanity check | Event context not provided to polling phase."
        assert execution.execution_id, "🌀 Sanity check | Execution ID not provided to polling phase."

        # Get options and assert for type narrowing
        opts = self.options.get_result
        assert opts is not None, "get_result options must be set after with_defaults_from()"
        assert opts.poll_interval is not None, "poll_interval must be set after with_defaults_from()"
        assert opts.poll_max_duration is not None, "poll_max_duration must be set after with_defaults_from()"
        assert opts.poll_overload_timeout is not None, "poll_overload_timeout must be set after with_defaults_from()"
        assert opts.request_timeout is not None, "request_timeout must be set after with_defaults_from()"

        start_time = time.time()
        execution_id = execution.execution_id

        logger.info(f"{execution_id} | RQC | Starting polling loop...")

        try:
            while True:
                # Gives up after poll max-duration (it prevents infinite loop)
                if time.time() - start_time > opts.poll_max_duration:
                    raise TimeoutError(
                        f"Timeout after {opts.poll_max_duration} seconds waiting for RQC execution to complete. "
                        f"Last status: `{execution.status}`."
                    )

                try:
                    # Build full URL using base_url and prevents client-side caching
                    import uuid
                    nocache_param = str(uuid.uuid4())
                    url = f"{self.base_url}/v1/quick-commands/callback/{execution_id}?nocache={nocache_param}"
                    nocache_headers = {
                        "Cache-Control": "no-cache, no-store",
                        "Pragma": "no-cache",
                    }

                    response = self.http_client.get(
                        url=url, headers=nocache_headers, timeout=opts.request_timeout
                    )
                    assert isinstance(response, requests.Response), \
                        f"🌀 Sanity check | Object returned by `get` method is not an instance of `requests.Response`. ({response.__class__})"

                    response.raise_for_status()
                    response_data = response.json()
                except requests.RequestException as e:
                    # Don't retry on 4xx errors
                    _response = getattr(e, "response", None)
                    if _response is not None and 400 <= _response.status_code < 500:
                        raise
                    # Sleeps a little bit before trying again
                    logger.warning(
                        f"{execution_id} | RQC | ⚠️ Temporary polling failure: {e}"
                    )
                    sleep_with_jitter(opts.poll_interval)
                    continue

                raw_status = response_data.get('progress', {}).get('status')
                status = RqcExecutionStatus.from_server(raw_status)
                if status is None:
                    logger.warning(
                        f"{execution_id} | RQC | ⚠️ Unknown server status: '{raw_status}', continuing to poll..."
                    )
                    sleep_with_jitter(opts.poll_interval)
                    continue

                if status != execution.status:
                    logger.info(f"{execution_id} | RQC | Current status: {status}")
                    self._transition_and_notify(
                        execution=execution, new_status=status, context=context
                    )

                if status == RqcExecutionStatus.COMPLETED:
                    try:
                        logger.info(f"{execution_id} | RQC | Processing the execution result...")
                        raw_result = response_data.get("result")
                        request = execution.request
                        processed_result = handler.handle_result(
                            context=RqcResultContext(request, raw_result, execution_id)
                        )
                        logger.info(f"{execution_id} | RQC | ✅ Execution completed in {execution.elapsed_since_submitted():.2f}s")
                        return execution.to_response(
                            result=processed_result, raw_response=response_data
                        )
                    except Exception as e:
                        handler_name = handler.__class__.__name__
                        logger.error(
                            f"{execution_id} | RQC | ❌ It's not possible to handle the result (handler={handler_name}).",
                        )
                        raise RqcResultHandlerError(
                            cause=e,
                            result_handler=handler,
                            message=(
                                f"Execution completed successfully on the server, "
                                f"but a client-side error occurred while processing the result "
                                f"in the handler ({handler_name}): {e}"
                            ),
                        ) from e

                elif status == RqcExecutionStatus.FAILURE:
                    logger.error(
                        f"{execution_id} | RQC | ❌ Execution failed on the server-side with the following response: "
                        f"\n{json.dumps(response_data, indent=2)}"
                    )
                    return execution.to_response(raw_response=response_data)

                elif status == RqcExecutionStatus.CREATED:
                    # Track how long we've been in CREATED status (possible server overload)
                    elapsed_in_created = execution.elapsed_since_submitted()
                    if elapsed_in_created > opts.poll_overload_timeout:
                        raise TimeoutError(
                            f"Execution stuck in CREATED status for {elapsed_in_created:.2f}s. "
                            f"The server may be overloaded (queue backpressure)."
                        )

                    logger.warning(
                        f"{execution_id} | RQC | ⚠️ Execution is still in CREATED status "
                        f"({elapsed_in_created:.2f}s/{opts.poll_overload_timeout}s). Possible server overload..."
                    )
                    sleep_with_jitter(opts.poll_interval)
                else:
                    logger.info(
                        f"{execution_id} | RQC | Execution is still running. Retrying in {int(opts.poll_interval)} seconds..."
                    )
                    sleep_with_jitter(opts.poll_interval)

        except Exception as e:
            # Catch-all for TimeoutError, HTTPError 4xx, RqcResultHandlerError, etc.
            error_status = RqcExecutionStatus.from_exception(e)
            error_msg = f"Error during polling: {e}"
            if isinstance(e, requests.HTTPError) and e.response is not None:
                error_msg = f"Error during polling due to an HTTP error {e.response.status_code}: {e.response.text}"
            logger.error(
                f"{execution_id} | RQC | ❌ {error_msg}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            # Transition and notify: current_status → ERROR/TIMEOUT
            self._transition_and_notify(
                execution=execution, new_status=error_status, context=context, error=error_msg
            )
            return execution.to_response()

        # It should never happen
        raise RuntimeError(
            "Unexpected error while polling the status of execution: "
            "reached end of `_poll_until_done` method without returning the execution result."
        )

    def _transition_and_notify(
        self,
        execution: RqcExecution,
        new_status: RqcExecutionStatus,
        context: dict[str, Any],
        error: str | None = None,
    ) -> None:
        """
        Transitions execution status and notifies listeners of the change.

        Args:
            execution: The RqcExecution tracker.
            new_status: The target status to transition to.
            context: Shared event context for listeners.
            error: Optional error message to associate with this transition.
                For FAILURE status, a default message is used when not provided.
        """
        if new_status == RqcExecutionStatus.FAILURE and error is None:
            error = "Execution failed on the server-side with status 'FAILURE'. There's no details at all! Try to look at the logs."

        execution.transition_to(new_status, error=error)

        old_status = execution.status
        self._notifier.notify_status_change(
            request=execution.request,
            old_status=old_status, new_status=new_status,
            context=context,
        )

Functions

execute

execute(request: RqcRequest, result_handler: RqcResultHandler | None = None) -> RqcResponse

Executes a Remote QuickCommand synchronously and waits for its completion (blocking).

This method sends a single request to the remote StackSpot AI QuickCommand API, monitors its execution until a terminal status is reached (COMPLETED, FAILURE, ERROR, or TIMEOUT), and returns an RqcResponse object containing the result or error details.

Parameters:

Name Type Description Default
request RqcRequest

The RqcRequest instance representing the command to execute.

required
result_handler RqcResultHandler | None

Optional custom handler used to process the raw response.

None

Returns:

Name Type Description
RqcResponse RqcResponse

The final response object, always returned even if an error occurs.

Source code in src/stkai/rqc/_remote_quick_command.py
def execute(
    self,
    request: RqcRequest,
    result_handler: RqcResultHandler | None = None,
) -> RqcResponse:
    """
    Executes a Remote QuickCommand synchronously and waits for its completion (blocking).

    This method sends a single request to the remote StackSpot AI QuickCommand API,
    monitors its execution until a terminal status is reached (COMPLETED, FAILURE, ERROR, or TIMEOUT),
    and returns an RqcResponse object containing the result or error details.

    Args:
        request: The RqcRequest instance representing the command to execute.
        result_handler: Optional custom handler used to process the raw response.

    Returns:
        RqcResponse: The final response object, always returned even if an error occurs.
    """
    logger.info(f"{request.id[:26]:<26} | RQC | 🛜 Starting execution of a single request.")
    logger.info(f"{request.id[:26]:<26} | RQC |    ├ base_url={self.base_url}")
    logger.info(f"{request.id[:26]:<26} | RQC |    └ slug_name='{self.slug_name}'")

    response = self._execute_workflow(
        request=request, result_handler=result_handler
    )

    logger.info(
        f"{request.id[:26]:<26} | RQC | 🛜 Execution finished."
    )
    if response.is_completed():
        logger.info(f"{request.id[:26]:<26} | RQC |    └ with status = {response.status}")
    else:
        logger.info(f"{request.id[:26]:<26} | RQC |    ├ with status = {response.status}")
        logger.info(f"{request.id[:26]:<26} | RQC |    └ with error message = \"{response.error}\"")

    assert response.request is request, \
        "🌀 Sanity check | Unexpected mismatch: response does not reference its corresponding request."
    return response

execute_many

execute_many(request_list: list[RqcRequest], result_handler: RqcResultHandler | None = None) -> list[RqcResponse]

Executes multiple RQC requests concurrently, waits for their completion (blocking), and returns their responses.

Each request is executed in parallel threads using the internal thread-pool. Returns a list of RqcResponse objects in the same order as requests_list.

Parameters:

Name Type Description Default
request_list list[RqcRequest]

List of RqcRequest objects to execute.

required
result_handler RqcResultHandler | None

Optional custom handler used to process the raw response.

None

Returns:

Type Description
list[RqcResponse]

List[RqcResponse]: One response per request.

Source code in src/stkai/rqc/_remote_quick_command.py
def execute_many(
    self,
    request_list: list[RqcRequest],
    result_handler: RqcResultHandler | None = None,
) -> list[RqcResponse]:
    """
    Executes multiple RQC requests concurrently, waits for their completion (blocking),
    and returns their responses.

    Each request is executed in parallel threads using the internal thread-pool.
    Returns a list of RqcResponse objects in the same order as `requests_list`.

    Args:
        request_list: List of RqcRequest objects to execute.
        result_handler: Optional custom handler used to process the raw response.

    Returns:
        List[RqcResponse]: One response per request.
    """
    if not request_list:
        return []

    logger.info(
        f"{'RQC-Batch-Execution'[:26]:<26} | RQC | "
        f"🛜 Starting batch execution of {len(request_list)} requests."
    )
    logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ base_url={self.base_url}")
    logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ slug_name='{self.slug_name}'")
    logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    └ max_concurrent={self.max_workers}")

    # Use thread-pool for parallel calls to `_execute_workflow`
    future_to_index = {
        self.executor.submit(
            self._execute_workflow,           # function ref
            request=req,                      # arg-1
            result_handler=result_handler     # arg-2
        ): idx
        for idx, req in enumerate(request_list)
    }

    # Block and waits for all responses to be finished
    responses_map: dict[int, RqcResponse] = {}

    for future in as_completed(future_to_index):
        idx = future_to_index[future]
        correlated_request = request_list[idx]
        try:
            responses_map[idx] = future.result()
        except Exception as e:
            logger.error(
                f"{correlated_request.id[:26]:<26} | RQC | ❌ Execution failed in batch(seq={idx}). {e}",
                exc_info=logger.isEnabledFor(logging.DEBUG)
            )
            responses_map[idx] = RqcResponse(
                request=correlated_request,
                status=RqcExecutionStatus.ERROR,
                error=str(e),
            )

    # Rebuild responses list in the same order of requests list
    responses = [
        responses_map[i] for i in range(len(request_list))
    ]

    # Race-condition check: ensure both lists have the same length
    assert len(responses) == len(request_list), (
        f"🌀 Sanity check | Unexpected mismatch: responses(size={len(responses)}) is different from requests(size={len(request_list)})."
    )
    # Race-condition check: ensure each response points to its respective request
    assert all(resp.request is req for req, resp in zip(request_list, responses, strict=True)), (
        "🌀 Sanity check | Unexpected mismatch: some responses do not reference their corresponding requests."
    )

    logger.info(
        f"{'RQC-Batch-Execution'[:26]:<26} | RQC | 🛜 Batch execution finished."
    )
    logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    ├ total of responses = {len(responses)}")

    from collections import Counter
    totals_per_status = Counter(r.status for r in responses)
    items = totals_per_status.items()
    for idx, (status, total) in enumerate(items):
        icon = "└" if idx == (len(items) - 1) else "├"
        logger.info(f"{'RQC-Batch-Execution'[:26]:<26} | RQC |    {icon} total of responses with status {status:<9} = {total}")

    return responses

Data Models

RqcRequest dataclass

Represents a Remote QuickCommand request.

This immutable class encapsulates all data needed to execute a Remote Quick Command, including the payload to send and optional metadata for tracking purposes.

Attributes:

Name Type Description
payload Any

The input data to send to the Quick Command. Can be any JSON-serializable object.

id str

Unique identifier for this request. Auto-generated as UUID if not provided.

metadata dict[str, Any]

Optional dictionary for storing custom metadata (e.g., source file, context).

Example

request = RqcRequest( ... payload={"prompt": "Analyze this code", "code": "def foo(): pass"}, ... id="my-custom-id", ... metadata={"source": "main.py", "line": 42} ... )

Source code in src/stkai/rqc/_models.py
@dataclass(frozen=True)
class RqcRequest:
    """
    Represents a Remote QuickCommand request.

    This immutable class encapsulates all data needed to execute a Remote Quick Command,
    including the payload to send and optional metadata for tracking purposes.

    Attributes:
        payload: The input data to send to the Quick Command. Can be any JSON-serializable object.
        id: Unique identifier for this request. Auto-generated as UUID if not provided.
        metadata: Optional dictionary for storing custom metadata (e.g., source file, context).

    Example:
        >>> request = RqcRequest(
        ...     payload={"prompt": "Analyze this code", "code": "def foo(): pass"},
        ...     id="my-custom-id",
        ...     metadata={"source": "main.py", "line": 42}
        ... )
    """
    payload: Any
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        assert self.id, "Request ID can not be empty."
        assert self.payload, "Request payload can not be empty."

    def to_input_data(self) -> dict[str, Any]:
        """Converts the request payload to the format expected by the RQC API."""
        return {
            "input_data": self.payload,
        }

    def write_to_file(self, output_dir: Path, tracking_id: str | None = None) -> Path:
        """
        Persists the request payload to a JSON file for debugging purposes.

        Args:
            output_dir: Directory where the JSON file will be saved.
            tracking_id: Optional tracking ID for file naming. If None, uses request id.

        Returns:
            Path to the created JSON file.

        The file is named `{tracking_id}-request.json` where tracking_id is either
        the provided tracking_id or the request id.
        """
        assert output_dir, "Output directory is required."
        assert output_dir.is_dir(), f"Output directory is not a directory ({output_dir})."

        _tracking_id = tracking_id or self.id
        _tracking_id = re.sub(r'[^\w.$-]', '_', _tracking_id)

        target_file = output_dir / f"{_tracking_id}-request.json"
        save_json_file(
            data=self.to_input_data(),
            file_path=target_file
        )
        return target_file

Functions

to_input_data

to_input_data() -> dict[str, Any]

Converts the request payload to the format expected by the RQC API.

Source code in src/stkai/rqc/_models.py
def to_input_data(self) -> dict[str, Any]:
    """Converts the request payload to the format expected by the RQC API."""
    return {
        "input_data": self.payload,
    }

write_to_file

write_to_file(output_dir: Path, tracking_id: str | None = None) -> Path

Persists the request payload to a JSON file for debugging purposes.

Parameters:

Name Type Description Default
output_dir Path

Directory where the JSON file will be saved.

required
tracking_id str | None

Optional tracking ID for file naming. If None, uses request id.

None

Returns:

Type Description
Path

Path to the created JSON file.

The file is named {tracking_id}-request.json where tracking_id is either the provided tracking_id or the request id.

Source code in src/stkai/rqc/_models.py
def write_to_file(self, output_dir: Path, tracking_id: str | None = None) -> Path:
    """
    Persists the request payload to a JSON file for debugging purposes.

    Args:
        output_dir: Directory where the JSON file will be saved.
        tracking_id: Optional tracking ID for file naming. If None, uses request id.

    Returns:
        Path to the created JSON file.

    The file is named `{tracking_id}-request.json` where tracking_id is either
    the provided tracking_id or the request id.
    """
    assert output_dir, "Output directory is required."
    assert output_dir.is_dir(), f"Output directory is not a directory ({output_dir})."

    _tracking_id = tracking_id or self.id
    _tracking_id = re.sub(r'[^\w.$-]', '_', _tracking_id)

    target_file = output_dir / f"{_tracking_id}-request.json"
    save_json_file(
        data=self.to_input_data(),
        file_path=target_file
    )
    return target_file

RqcResponse dataclass

Represents the full Remote QuickCommand response.

This immutable class contains the execution result, status, and any error information from a Remote Quick Command execution.

Attributes:

Name Type Description
request RqcRequest

The original RqcRequest that generated this response.

status RqcExecutionStatus

The final status of the execution (COMPLETED, FAILURE, ERROR, or TIMEOUT).

result Any | None

The processed result from the result handler (only set when COMPLETED).

error str | None

Error message describing what went wrong (only set on non-COMPLETED status).

raw_response Any | None

The raw JSON response from the StackSpot AI API (for debugging).

execution_id str | None

The server-assigned execution ID, or None if execution was never created.

Example

response = rqc.execute(request) if response.is_completed(): ... print(response.result) ... else: ... print(f"Error: {response.error}")

Source code in src/stkai/rqc/_models.py
@dataclass(frozen=True)
class RqcResponse:
    """
    Represents the full Remote QuickCommand response.

    This immutable class contains the execution result, status, and any error
    information from a Remote Quick Command execution.

    Attributes:
        request: The original RqcRequest that generated this response.
        status: The final status of the execution (COMPLETED, FAILURE, ERROR, or TIMEOUT).
        result: The processed result from the result handler (only set when COMPLETED).
        error: Error message describing what went wrong (only set on non-COMPLETED status).
        raw_response: The raw JSON response from the StackSpot AI API (for debugging).
        execution_id: The server-assigned execution ID, or None if execution was never created.

    Example:
        >>> response = rqc.execute(request)
        >>> if response.is_completed():
        ...     print(response.result)
        ... else:
        ...     print(f"Error: {response.error}")
    """
    request: RqcRequest
    status: RqcExecutionStatus
    result: Any | None = None
    error: str | None = None
    raw_response: Any | None = None
    execution_id: str | None = None

    def __post_init__(self) -> None:
        assert self.request, "RQC-Request can not be empty."
        assert self.status, "Status can not be empty."

    @property
    def raw_result(self) -> Any:
        """Extracts the 'result' field from the raw API response, if available."""
        if not self.raw_response:
            return None

        _raw_result = None
        if isinstance(self.raw_response, dict):
            _raw_result = self.raw_response.get("result")

        return _raw_result

    def is_pending(self) -> bool:
        """Returns True if the request has not been submitted yet."""
        return self.status == RqcExecutionStatus.PENDING

    def is_created(self) -> bool:
        """Returns True if the execution was created but not yet running."""
        return self.status == RqcExecutionStatus.CREATED

    def is_running(self) -> bool:
        """Returns True if the execution is currently being processed."""
        return self.status == RqcExecutionStatus.RUNNING

    def is_completed(self) -> bool:
        """Returns True if the execution completed successfully."""
        return self.status == RqcExecutionStatus.COMPLETED

    def is_failure(self) -> bool:
        """Returns True if the execution failed on the server-side."""
        return self.status == RqcExecutionStatus.FAILURE

    def is_error(self) -> bool:
        """Returns True if a client-side error occurred during execution."""
        return self.status == RqcExecutionStatus.ERROR

    def is_timeout(self) -> bool:
        """Returns True if the execution timed out waiting for completion."""
        return self.status == RqcExecutionStatus.TIMEOUT

    def error_with_details(self) -> dict[str, Any]:
        """Returns a dictionary with error details for non-completed responses."""
        if self.is_completed():
            return {}

        return {
            "status": self.status,
            "error_message": self.error,
            "response_body": self.raw_response or {},
        }

    def write_to_file(self, output_dir: Path) -> Path:
        """
        Persists the response to a JSON file for debugging purposes.

        Args:
            output_dir: Directory where the JSON file will be saved.

        Returns:
            Path to the created JSON file.

        The file is named `{tracking_id}-response-{status}.json` where tracking_id
        is either the execution_id (if available) or the request id.
        """
        assert output_dir, "Output directory is required."
        assert output_dir.is_dir(), f"Output directory is not a directory ({output_dir})."

        response_result = self.raw_response or {}
        if not self.is_completed():
            response_result = self.error_with_details()

        _tracking_id = self.execution_id or self.request.id
        _tracking_id = re.sub(r'[^\w.$-]', '_', _tracking_id)

        target_file = output_dir / f"{_tracking_id}-response-{self.status}.json"
        save_json_file(
            data=response_result,
            file_path=target_file
        )
        return target_file

Attributes

raw_result property

raw_result: Any

Extracts the 'result' field from the raw API response, if available.

Functions

is_pending

is_pending() -> bool

Returns True if the request has not been submitted yet.

Source code in src/stkai/rqc/_models.py
def is_pending(self) -> bool:
    """Returns True if the request has not been submitted yet."""
    return self.status == RqcExecutionStatus.PENDING

is_created

is_created() -> bool

Returns True if the execution was created but not yet running.

Source code in src/stkai/rqc/_models.py
def is_created(self) -> bool:
    """Returns True if the execution was created but not yet running."""
    return self.status == RqcExecutionStatus.CREATED

is_running

is_running() -> bool

Returns True if the execution is currently being processed.

Source code in src/stkai/rqc/_models.py
def is_running(self) -> bool:
    """Returns True if the execution is currently being processed."""
    return self.status == RqcExecutionStatus.RUNNING

is_completed

is_completed() -> bool

Returns True if the execution completed successfully.

Source code in src/stkai/rqc/_models.py
def is_completed(self) -> bool:
    """Returns True if the execution completed successfully."""
    return self.status == RqcExecutionStatus.COMPLETED

is_failure

is_failure() -> bool

Returns True if the execution failed on the server-side.

Source code in src/stkai/rqc/_models.py
def is_failure(self) -> bool:
    """Returns True if the execution failed on the server-side."""
    return self.status == RqcExecutionStatus.FAILURE

is_error

is_error() -> bool

Returns True if a client-side error occurred during execution.

Source code in src/stkai/rqc/_models.py
def is_error(self) -> bool:
    """Returns True if a client-side error occurred during execution."""
    return self.status == RqcExecutionStatus.ERROR

is_timeout

is_timeout() -> bool

Returns True if the execution timed out waiting for completion.

Source code in src/stkai/rqc/_models.py
def is_timeout(self) -> bool:
    """Returns True if the execution timed out waiting for completion."""
    return self.status == RqcExecutionStatus.TIMEOUT

error_with_details

error_with_details() -> dict[str, Any]

Returns a dictionary with error details for non-completed responses.

Source code in src/stkai/rqc/_models.py
def error_with_details(self) -> dict[str, Any]:
    """Returns a dictionary with error details for non-completed responses."""
    if self.is_completed():
        return {}

    return {
        "status": self.status,
        "error_message": self.error,
        "response_body": self.raw_response or {},
    }

write_to_file

write_to_file(output_dir: Path) -> Path

Persists the response to a JSON file for debugging purposes.

Parameters:

Name Type Description Default
output_dir Path

Directory where the JSON file will be saved.

required

Returns:

Type Description
Path

Path to the created JSON file.

The file is named {tracking_id}-response-{status}.json where tracking_id is either the execution_id (if available) or the request id.

Source code in src/stkai/rqc/_models.py
def write_to_file(self, output_dir: Path) -> Path:
    """
    Persists the response to a JSON file for debugging purposes.

    Args:
        output_dir: Directory where the JSON file will be saved.

    Returns:
        Path to the created JSON file.

    The file is named `{tracking_id}-response-{status}.json` where tracking_id
    is either the execution_id (if available) or the request id.
    """
    assert output_dir, "Output directory is required."
    assert output_dir.is_dir(), f"Output directory is not a directory ({output_dir})."

    response_result = self.raw_response or {}
    if not self.is_completed():
        response_result = self.error_with_details()

    _tracking_id = self.execution_id or self.request.id
    _tracking_id = re.sub(r'[^\w.$-]', '_', _tracking_id)

    target_file = output_dir / f"{_tracking_id}-response-{self.status}.json"
    save_json_file(
        data=response_result,
        file_path=target_file
    )
    return target_file

RqcExecutionStatus

Bases: StrEnum

Status of an RQC execution lifecycle.

Attributes:

Name Type Description
PENDING

Client-side status before request is submitted to server.

CREATED

Server acknowledged the request and created an execution.

RUNNING

Execution is currently being processed by the server.

COMPLETED

Execution finished successfully with a result.

FAILURE

Execution failed on the server-side (StackSpot AI returned an error).

ERROR

Client-side error occurred (network issues, invalid response, handler errors).

TIMEOUT

Any timeout, client or server-side (e.g., poll_max_duration, poll_overload_timeout, HTTP request timeout, HTTP 408, or HTTP 504).

Source code in src/stkai/rqc/_models.py
class RqcExecutionStatus(enum.StrEnum):
    """
    Status of an RQC execution lifecycle.

    Attributes:
        PENDING: Client-side status before request is submitted to server.
        CREATED: Server acknowledged the request and created an execution.
        RUNNING: Execution is currently being processed by the server.
        COMPLETED: Execution finished successfully with a result.
        FAILURE: Execution failed on the server-side (StackSpot AI returned an error).
        ERROR: Client-side error occurred (network issues, invalid response, handler errors).
        TIMEOUT: Any timeout, client or server-side (e.g., poll_max_duration, poll_overload_timeout, HTTP request timeout, HTTP 408, or HTTP 504).
    """
    PENDING = "PENDING"
    CREATED = "CREATED"
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    FAILURE = "FAILURE"
    ERROR = "ERROR"
    TIMEOUT = "TIMEOUT"

    def __str__(self) -> str:
        return self.value

    @classmethod
    def from_server(cls, value: str | None) -> "RqcExecutionStatus | None":
        """
        Safely converts a server status string to an RqcExecutionStatus.

        Only accepts server-side statuses (CREATED, RUNNING, COMPLETED, FAILURE).
        Returns None for None, empty strings, unknown values, or client-side-only
        statuses (PENDING, ERROR, TIMEOUT).

        Args:
            value: The status string to convert, or None.

        Returns:
            The corresponding RqcExecutionStatus, or None if the value is not
            a recognized server-side status.
        """
        if not value:
            return None

        try:
            status = cls(value.upper())
        except ValueError:
            return None

        return status if status in (cls.CREATED, cls.RUNNING, cls.COMPLETED, cls.FAILURE) else None

    @classmethod
    def from_exception(cls, exc: Exception) -> "RqcExecutionStatus":
        """
        Determine the appropriate status for an exception.

        Args:
            exc: The exception that occurred during RQC execution.

        Returns:
            TIMEOUT for timeout exceptions, ERROR for all others.

        Example:
            >>> try:
            ...     execution_id = rqc._create_execution(request)
            ... except Exception as e:
            ...     status = RqcExecutionStatus.from_exception(e)
            ...     # status is TIMEOUT if e is a timeout, ERROR otherwise
        """
        from stkai._utils import is_timeout_exception
        return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

Functions

from_server classmethod

from_server(value: str | None) -> RqcExecutionStatus | None

Safely converts a server status string to an RqcExecutionStatus.

Only accepts server-side statuses (CREATED, RUNNING, COMPLETED, FAILURE). Returns None for None, empty strings, unknown values, or client-side-only statuses (PENDING, ERROR, TIMEOUT).

Parameters:

Name Type Description Default
value str | None

The status string to convert, or None.

required

Returns:

Type Description
RqcExecutionStatus | None

The corresponding RqcExecutionStatus, or None if the value is not

RqcExecutionStatus | None

a recognized server-side status.

Source code in src/stkai/rqc/_models.py
@classmethod
def from_server(cls, value: str | None) -> "RqcExecutionStatus | None":
    """
    Safely converts a server status string to an RqcExecutionStatus.

    Only accepts server-side statuses (CREATED, RUNNING, COMPLETED, FAILURE).
    Returns None for None, empty strings, unknown values, or client-side-only
    statuses (PENDING, ERROR, TIMEOUT).

    Args:
        value: The status string to convert, or None.

    Returns:
        The corresponding RqcExecutionStatus, or None if the value is not
        a recognized server-side status.
    """
    if not value:
        return None

    try:
        status = cls(value.upper())
    except ValueError:
        return None

    return status if status in (cls.CREATED, cls.RUNNING, cls.COMPLETED, cls.FAILURE) else None

from_exception classmethod

from_exception(exc: Exception) -> RqcExecutionStatus

Determine the appropriate status for an exception.

Parameters:

Name Type Description Default
exc Exception

The exception that occurred during RQC execution.

required

Returns:

Type Description
RqcExecutionStatus

TIMEOUT for timeout exceptions, ERROR for all others.

Example

try: ... execution_id = rqc._create_execution(request) ... except Exception as e: ... status = RqcExecutionStatus.from_exception(e) ... # status is TIMEOUT if e is a timeout, ERROR otherwise

Source code in src/stkai/rqc/_models.py
@classmethod
def from_exception(cls, exc: Exception) -> "RqcExecutionStatus":
    """
    Determine the appropriate status for an exception.

    Args:
        exc: The exception that occurred during RQC execution.

    Returns:
        TIMEOUT for timeout exceptions, ERROR for all others.

    Example:
        >>> try:
        ...     execution_id = rqc._create_execution(request)
        ... except Exception as e:
        ...     status = RqcExecutionStatus.from_exception(e)
        ...     # status is TIMEOUT if e is a timeout, ERROR otherwise
    """
    from stkai._utils import is_timeout_exception
    return cls.TIMEOUT if is_timeout_exception(exc) else cls.ERROR

Configuration Options

RqcOptions dataclass

Consolidated configuration options for RemoteQuickCommand.

Groups all options for RQC execution into a single configuration object. Fields set to None will use values from global config (STKAI.config.rqc).

Attributes:

Name Type Description
create_execution CreateExecutionOptions | None

Options for the create-execution phase.

get_result GetResultOptions | None

Options for the get-result (polling) phase.

max_workers int | None

Maximum number of threads for batch execution (execute_many).

Example

Customize only what you need - rest comes from STKAI.config

options = RqcOptions( ... create_execution=CreateExecutionOptions(retry_max_retries=5), ... ) rqc = RemoteQuickCommand(slug_name="my-command", options=options)

Source code in src/stkai/rqc/_remote_quick_command.py
@dataclass(frozen=True)
class RqcOptions:
    """
    Consolidated configuration options for RemoteQuickCommand.

    Groups all options for RQC execution into a single configuration object.
    Fields set to None will use values from global config (STKAI.config.rqc).

    Attributes:
        create_execution: Options for the create-execution phase.
        get_result: Options for the get-result (polling) phase.
        max_workers: Maximum number of threads for batch execution (execute_many).

    Example:
        >>> # Customize only what you need - rest comes from STKAI.config
        >>> options = RqcOptions(
        ...     create_execution=CreateExecutionOptions(retry_max_retries=5),
        ... )
        >>> rqc = RemoteQuickCommand(slug_name="my-command", options=options)
    """
    create_execution: CreateExecutionOptions | None = None
    get_result: GetResultOptions | None = None
    max_workers: int | None = None

    def with_defaults_from(self, cfg: "RqcConfig") -> "RqcOptions":
        """
        Returns a new RqcOptions with None values filled from config.

        User-provided values take precedence; None values use config defaults.
        This follows the Single Source of Truth principle where STKAI.config
        is the authoritative source for default values.

        Args:
            cfg: The RqcConfig to use for default values.

        Returns:
            A new RqcOptions with all fields resolved (no None values).

        Example:
            >>> options = RqcOptions(
            ...     create_execution=CreateExecutionOptions(retry_max_retries=5),
            ... )
            >>> resolved = options.with_defaults_from(STKAI.config.rqc)
            >>> resolved.create_execution.retry_max_retries  # 5 (user-defined)
            >>> resolved.create_execution.retry_initial_delay  # from config
        """
        ce = self.create_execution or CreateExecutionOptions()
        gr = self.get_result or GetResultOptions()

        return RqcOptions(
            create_execution=CreateExecutionOptions(
                retry_max_retries=ce.retry_max_retries if ce.retry_max_retries is not None else cfg.retry_max_retries,
                retry_initial_delay=ce.retry_initial_delay if ce.retry_initial_delay is not None else cfg.retry_initial_delay,
                request_timeout=ce.request_timeout if ce.request_timeout is not None else cfg.request_timeout,
            ),
            get_result=GetResultOptions(
                poll_interval=gr.poll_interval if gr.poll_interval is not None else cfg.poll_interval,
                poll_max_duration=gr.poll_max_duration if gr.poll_max_duration is not None else cfg.poll_max_duration,
                poll_overload_timeout=gr.poll_overload_timeout if gr.poll_overload_timeout is not None else cfg.poll_overload_timeout,
                request_timeout=gr.request_timeout if gr.request_timeout is not None else cfg.request_timeout,
            ),
            max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
        )

Functions

with_defaults_from

with_defaults_from(cfg: RqcConfig) -> RqcOptions

Returns a new RqcOptions with None values filled from config.

User-provided values take precedence; None values use config defaults. This follows the Single Source of Truth principle where STKAI.config is the authoritative source for default values.

Parameters:

Name Type Description Default
cfg RqcConfig

The RqcConfig to use for default values.

required

Returns:

Type Description
RqcOptions

A new RqcOptions with all fields resolved (no None values).

Example

options = RqcOptions( ... create_execution=CreateExecutionOptions(retry_max_retries=5), ... ) resolved = options.with_defaults_from(STKAI.config.rqc) resolved.create_execution.retry_max_retries # 5 (user-defined) resolved.create_execution.retry_initial_delay # from config

Source code in src/stkai/rqc/_remote_quick_command.py
def with_defaults_from(self, cfg: "RqcConfig") -> "RqcOptions":
    """
    Returns a new RqcOptions with None values filled from config.

    User-provided values take precedence; None values use config defaults.
    This follows the Single Source of Truth principle where STKAI.config
    is the authoritative source for default values.

    Args:
        cfg: The RqcConfig to use for default values.

    Returns:
        A new RqcOptions with all fields resolved (no None values).

    Example:
        >>> options = RqcOptions(
        ...     create_execution=CreateExecutionOptions(retry_max_retries=5),
        ... )
        >>> resolved = options.with_defaults_from(STKAI.config.rqc)
        >>> resolved.create_execution.retry_max_retries  # 5 (user-defined)
        >>> resolved.create_execution.retry_initial_delay  # from config
    """
    ce = self.create_execution or CreateExecutionOptions()
    gr = self.get_result or GetResultOptions()

    return RqcOptions(
        create_execution=CreateExecutionOptions(
            retry_max_retries=ce.retry_max_retries if ce.retry_max_retries is not None else cfg.retry_max_retries,
            retry_initial_delay=ce.retry_initial_delay if ce.retry_initial_delay is not None else cfg.retry_initial_delay,
            request_timeout=ce.request_timeout if ce.request_timeout is not None else cfg.request_timeout,
        ),
        get_result=GetResultOptions(
            poll_interval=gr.poll_interval if gr.poll_interval is not None else cfg.poll_interval,
            poll_max_duration=gr.poll_max_duration if gr.poll_max_duration is not None else cfg.poll_max_duration,
            poll_overload_timeout=gr.poll_overload_timeout if gr.poll_overload_timeout is not None else cfg.poll_overload_timeout,
            request_timeout=gr.request_timeout if gr.request_timeout is not None else cfg.request_timeout,
        ),
        max_workers=self.max_workers if self.max_workers is not None else cfg.max_workers,
    )

CreateExecutionOptions dataclass

Options for the create-execution phase.

Controls retry behavior and timeouts when creating a new RQC execution. Fields set to None will use values from global config (STKAI.config.rqc).

Attributes:

Name Type Description
retry_max_retries int | None

Maximum retry attempts for failed create-execution calls. Use 0 to disable retries (single attempt only). Use 3 for 4 total attempts (1 original + 3 retries).

retry_initial_delay float | None

Initial delay in seconds for the first retry attempt. Subsequent retries use exponential backoff (delay doubles each attempt).

request_timeout int | None

HTTP request timeout in seconds.

Source code in src/stkai/rqc/_remote_quick_command.py
@dataclass(frozen=True)
class CreateExecutionOptions:
    """
    Options for the create-execution phase.

    Controls retry behavior and timeouts when creating a new RQC execution.
    Fields set to None will use values from global config (STKAI.config.rqc).

    Attributes:
        retry_max_retries: Maximum retry attempts for failed create-execution calls.
            Use 0 to disable retries (single attempt only).
            Use 3 for 4 total attempts (1 original + 3 retries).
        retry_initial_delay: Initial delay in seconds for the first retry attempt.
            Subsequent retries use exponential backoff (delay doubles each attempt).
        request_timeout: HTTP request timeout in seconds.
    """
    retry_max_retries: int | None = None
    retry_initial_delay: float | None = None
    request_timeout: int | None = None

GetResultOptions dataclass

Options for the get-result (polling) phase.

Controls polling behavior and timeouts when waiting for execution completion. Fields set to None will use values from global config (STKAI.config.rqc).

Attributes:

Name Type Description
poll_interval float | None

Seconds to wait between polling status checks.

poll_max_duration float | None

Maximum seconds to wait before timing out.

poll_overload_timeout float | None

Maximum seconds to tolerate CREATED status before assuming server overload.

request_timeout int | None

HTTP request timeout in seconds.

Source code in src/stkai/rqc/_remote_quick_command.py
@dataclass(frozen=True)
class GetResultOptions:
    """
    Options for the get-result (polling) phase.

    Controls polling behavior and timeouts when waiting for execution completion.
    Fields set to None will use values from global config (STKAI.config.rqc).

    Attributes:
        poll_interval: Seconds to wait between polling status checks.
        poll_max_duration: Maximum seconds to wait before timing out.
        poll_overload_timeout: Maximum seconds to tolerate CREATED status before assuming server overload.
        request_timeout: HTTP request timeout in seconds.
    """
    poll_interval: float | None = None
    poll_max_duration: float | None = None
    poll_overload_timeout: float | None = None
    request_timeout: int | None = None

Result Handlers

RqcResultHandler

Bases: ABC

Abstract base class for result handlers.

Result handlers are responsible for transforming the raw API response into a more useful format. Implement this class to create custom handlers.

Example

class MyHandler(RqcResultHandler): ... def handle_result(self, context: RqcResultContext) -> Any: ... return context.raw_result.upper()

Source code in src/stkai/rqc/_handlers.py
class RqcResultHandler(ABC):
    """
    Abstract base class for result handlers.

    Result handlers are responsible for transforming the raw API response
    into a more useful format. Implement this class to create custom handlers.

    Example:
        >>> class MyHandler(RqcResultHandler):
        ...     def handle_result(self, context: RqcResultContext) -> Any:
        ...         return context.raw_result.upper()
    """

    @abstractmethod
    def handle_result(self, context: RqcResultContext) -> Any:
        """
        Process the result and return the transformed value.

        Args:
            context: The RqcResultContext containing the raw result and request info.

        Returns:
            The transformed result value.

        Note:
            Any exception raised will be wrapped in RqcResultHandlerError.
        """
        pass

Functions

handle_result abstractmethod

handle_result(context: RqcResultContext) -> Any

Process the result and return the transformed value.

Parameters:

Name Type Description Default
context RqcResultContext

The RqcResultContext containing the raw result and request info.

required

Returns:

Type Description
Any

The transformed result value.

Note

Any exception raised will be wrapped in RqcResultHandlerError.

Source code in src/stkai/rqc/_handlers.py
@abstractmethod
def handle_result(self, context: RqcResultContext) -> Any:
    """
    Process the result and return the transformed value.

    Args:
        context: The RqcResultContext containing the raw result and request info.

    Returns:
        The transformed result value.

    Note:
        Any exception raised will be wrapped in RqcResultHandlerError.
    """
    pass

RqcResultContext dataclass

Context passed to result handlers during processing.

This immutable class provides result handlers with all the information needed to process an execution result, including the original request and the raw result from the API.

Attributes:

Name Type Description
request RqcRequest

The original RqcRequest.

raw_result Any

The unprocessed result from the StackSpot AI API.

execution_id str

The server-assigned execution ID for this execution.

handled bool

Flag indicating if a previous handler has already processed this result.

Source code in src/stkai/rqc/_handlers.py
@dataclass(frozen=True)
class RqcResultContext:
    """
    Context passed to result handlers during processing.

    This immutable class provides result handlers with all the information
    needed to process an execution result, including the original request
    and the raw result from the API.

    Attributes:
        request: The original RqcRequest.
        raw_result: The unprocessed result from the StackSpot AI API.
        execution_id: The server-assigned execution ID for this execution.
        handled: Flag indicating if a previous handler has already processed this result.
    """
    request: RqcRequest
    raw_result: Any
    execution_id: str
    handled: bool = False

    def __post_init__(self) -> None:
        assert self.request, "RQC-Request can not be empty."
        assert self.execution_id, "Execution ID can not be empty."
        assert self.handled is not None, "Context's handled flag can not be None."

    def with_result(self, result: Any) -> "RqcResultContext":
        """Returns a new context with the given result and handled=True."""
        return replace(self, raw_result=result, handled=True)

Functions

with_result

with_result(result: Any) -> RqcResultContext

Returns a new context with the given result and handled=True.

Source code in src/stkai/rqc/_handlers.py
def with_result(self, result: Any) -> "RqcResultContext":
    """Returns a new context with the given result and handled=True."""
    return replace(self, raw_result=result, handled=True)

JsonResultHandler

Bases: RqcResultHandler

Handler that parses JSON results into Python objects.

This is the default handler used by RemoteQuickCommand. It handles common response formats from StackSpot AI, including:

  • Raw JSON strings
  • JSON wrapped in markdown code blocks (json ...)
  • Already-parsed dict objects (returns a deep copy)
Example

handler = JsonResultHandler() context = RqcResultContext(request=req, raw_result='{"key": "value"}') result = handler.handle_result(context) print(result) # {'key': 'value'}

Source code in src/stkai/rqc/_handlers.py
class JsonResultHandler(RqcResultHandler):
    """
    Handler that parses JSON results into Python objects.

    This is the default handler used by RemoteQuickCommand. It handles
    common response formats from StackSpot AI, including:

    - Raw JSON strings
    - JSON wrapped in markdown code blocks (```json ... ```)
    - Already-parsed dict objects (returns a deep copy)

    Example:
        >>> handler = JsonResultHandler()
        >>> context = RqcResultContext(request=req, raw_result='{"key": "value"}')
        >>> result = handler.handle_result(context)
        >>> print(result)  # {'key': 'value'}
    """

    @override
    def handle_result(self, context: RqcResultContext) -> Any:
        """
        Parses the raw result as JSON.

        Handles markdown code block wrappers (```json ... ```) automatically.

        Args:
            context: The result context containing the raw result.

        Returns:
            Parsed Python object (dict, list, etc.) or None if result is empty.

        Raises:
            json.JSONDecodeError: If the result is not valid JSON.
            TypeError: If the result is neither a string nor a dict.

        Examples:
            - '{"ok": true}' -> {'ok': True}
            - '```json\\n{"x":1}\\n```' -> {'x': 1}
        """
        result = context.raw_result
        if not result:
            return result

        if isinstance(result, dict):
            return deepcopy(result)

        if not isinstance(result, str):
            _type_name = type(result).__name__
            raise TypeError(
                f"{context.execution_id} | RQC | Cannot parse JSON from non-string result (type={_type_name})"
            )

        # Remove Markdown code block wrappers (```json ... ```)
        sanitized = result.replace("```json", "").replace("```", "").strip()

        # Tries to convert JSON to Python object
        try:
            return json.loads(sanitized)
        except json.JSONDecodeError:
            # Log contextual warning with a short preview of the raw text
            preview = result.strip().splitlines(keepends=True)[:3]
            logger.warning(
                f"{context.execution_id} | RQC | ⚠️ Response result not in JSON format. Treating it as plain text. "
                f"Preview:\n | {' | '.join(preview)}"
            )
            raise

    @staticmethod
    def chain_with(other_handler: RqcResultHandler) -> RqcResultHandler:
        """
        Creates a chained handler with JSON parsing followed by another handler.

        This is a convenience method for the common pattern of first parsing
        JSON and then applying additional transformations.

        Args:
            other_handler: Handler to execute after JSON parsing.

        Returns:
            A ChainedResultHandler that first parses JSON, then applies other_handler.

        Example:
            >>> handler = JsonResultHandler.chain_with(MyCustomHandler())
        """
        json_handler = JsonResultHandler()
        return ChainedResultHandler.of([json_handler, other_handler])

Functions

handle_result

handle_result(context: RqcResultContext) -> Any

Parses the raw result as JSON.

Handles markdown code block wrappers (json ...) automatically.

Parameters:

Name Type Description Default
context RqcResultContext

The result context containing the raw result.

required

Returns:

Type Description
Any

Parsed Python object (dict, list, etc.) or None if result is empty.

Raises:

Type Description
JSONDecodeError

If the result is not valid JSON.

TypeError

If the result is neither a string nor a dict.

Examples:

  • '{"ok": true}' -> {'ok': True}
  • 'json\n{"x":1}\n' -> {'x': 1}
Source code in src/stkai/rqc/_handlers.py
@override
def handle_result(self, context: RqcResultContext) -> Any:
    """
    Parses the raw result as JSON.

    Handles markdown code block wrappers (```json ... ```) automatically.

    Args:
        context: The result context containing the raw result.

    Returns:
        Parsed Python object (dict, list, etc.) or None if result is empty.

    Raises:
        json.JSONDecodeError: If the result is not valid JSON.
        TypeError: If the result is neither a string nor a dict.

    Examples:
        - '{"ok": true}' -> {'ok': True}
        - '```json\\n{"x":1}\\n```' -> {'x': 1}
    """
    result = context.raw_result
    if not result:
        return result

    if isinstance(result, dict):
        return deepcopy(result)

    if not isinstance(result, str):
        _type_name = type(result).__name__
        raise TypeError(
            f"{context.execution_id} | RQC | Cannot parse JSON from non-string result (type={_type_name})"
        )

    # Remove Markdown code block wrappers (```json ... ```)
    sanitized = result.replace("```json", "").replace("```", "").strip()

    # Tries to convert JSON to Python object
    try:
        return json.loads(sanitized)
    except json.JSONDecodeError:
        # Log contextual warning with a short preview of the raw text
        preview = result.strip().splitlines(keepends=True)[:3]
        logger.warning(
            f"{context.execution_id} | RQC | ⚠️ Response result not in JSON format. Treating it as plain text. "
            f"Preview:\n | {' | '.join(preview)}"
        )
        raise

chain_with staticmethod

chain_with(other_handler: RqcResultHandler) -> RqcResultHandler

Creates a chained handler with JSON parsing followed by another handler.

This is a convenience method for the common pattern of first parsing JSON and then applying additional transformations.

Parameters:

Name Type Description Default
other_handler RqcResultHandler

Handler to execute after JSON parsing.

required

Returns:

Type Description
RqcResultHandler

A ChainedResultHandler that first parses JSON, then applies other_handler.

Example

handler = JsonResultHandler.chain_with(MyCustomHandler())

Source code in src/stkai/rqc/_handlers.py
@staticmethod
def chain_with(other_handler: RqcResultHandler) -> RqcResultHandler:
    """
    Creates a chained handler with JSON parsing followed by another handler.

    This is a convenience method for the common pattern of first parsing
    JSON and then applying additional transformations.

    Args:
        other_handler: Handler to execute after JSON parsing.

    Returns:
        A ChainedResultHandler that first parses JSON, then applies other_handler.

    Example:
        >>> handler = JsonResultHandler.chain_with(MyCustomHandler())
    """
    json_handler = JsonResultHandler()
    return ChainedResultHandler.of([json_handler, other_handler])

RawResultHandler

Bases: RqcResultHandler

Handler that returns the raw result without transformation.

Use this handler when you want to access the unprocessed API response, for example when debugging or when the result is not JSON.

Source code in src/stkai/rqc/_handlers.py
class RawResultHandler(RqcResultHandler):
    """
    Handler that returns the raw result without transformation.

    Use this handler when you want to access the unprocessed API response,
    for example when debugging or when the result is not JSON.
    """

    @override
    def handle_result(self, context: RqcResultContext) -> Any:
        """Returns the raw result as-is without any transformation."""
        return context.raw_result

Functions

handle_result

handle_result(context: RqcResultContext) -> Any

Returns the raw result as-is without any transformation.

Source code in src/stkai/rqc/_handlers.py
@override
def handle_result(self, context: RqcResultContext) -> Any:
    """Returns the raw result as-is without any transformation."""
    return context.raw_result

ChainedResultHandler

Bases: RqcResultHandler

Handler that chains multiple handlers in sequence.

Each handler processes the result from the previous handler, allowing for complex transformation pipelines.

Example

handler = ChainedResultHandler.of([JsonResultHandler(), MyCustomHandler()])

First parses JSON, then applies custom transformation

Attributes:

Name Type Description
chained_handlers

Sequence of handlers to execute in order.

Source code in src/stkai/rqc/_handlers.py
class ChainedResultHandler(RqcResultHandler):
    """
    Handler that chains multiple handlers in sequence.

    Each handler processes the result from the previous handler,
    allowing for complex transformation pipelines.

    Example:
        >>> handler = ChainedResultHandler.of([JsonResultHandler(), MyCustomHandler()])
        >>> # First parses JSON, then applies custom transformation

    Attributes:
        chained_handlers: Sequence of handlers to execute in order.
    """

    def __init__(self, chained_handlers: Sequence[RqcResultHandler]):
        """
        Initialize with a sequence of handlers.

        Args:
            chained_handlers: Handlers to execute in order.
        """
        self.chained_handlers = chained_handlers

    @override
    def handle_result(self, context: RqcResultContext) -> Any:
        """Executes each handler in sequence, passing results through the chain."""
        total = len(self.chained_handlers)
        handler_names = [h.__class__.__name__ for h in self.chained_handlers]
        logger.debug(
            f"{context.execution_id} | RQC | Processing chain with {total} handler(s): {handler_names}"
        )

        result = context.raw_result
        for i, next_handler in enumerate(self.chained_handlers):
            handler_name = next_handler.__class__.__name__
            try:
                result = next_handler.handle_result(context)
            except RqcResultHandlerError:
                raise
            except Exception as e:
                raise RqcResultHandlerError(
                    cause=e,
                    result_handler=next_handler,
                    message=f"Handler '{handler_name}' failed in chain: {e}",
                ) from e
            # Only advance context on success; on error the exception propagates
            context = context.with_result(result)
            logger.debug(
                f"{context.execution_id} | RQC | Handler '{handler_name}' completed ({i + 1}/{total})"
            )
        return result

    @staticmethod
    def of(handlers: RqcResultHandler | Sequence[RqcResultHandler]) -> "ChainedResultHandler":
        """
        Factory method to create a ChainedResultHandler.

        Args:
            handlers: A single handler or sequence of handlers.

        Returns:
            A ChainedResultHandler wrapping the provided handlers.
        """
        return ChainedResultHandler(
            [handlers] if isinstance(handlers, RqcResultHandler) else list(handlers)
        )

Functions

__init__

__init__(chained_handlers: Sequence[RqcResultHandler])

Initialize with a sequence of handlers.

Parameters:

Name Type Description Default
chained_handlers Sequence[RqcResultHandler]

Handlers to execute in order.

required
Source code in src/stkai/rqc/_handlers.py
def __init__(self, chained_handlers: Sequence[RqcResultHandler]):
    """
    Initialize with a sequence of handlers.

    Args:
        chained_handlers: Handlers to execute in order.
    """
    self.chained_handlers = chained_handlers

handle_result

handle_result(context: RqcResultContext) -> Any

Executes each handler in sequence, passing results through the chain.

Source code in src/stkai/rqc/_handlers.py
@override
def handle_result(self, context: RqcResultContext) -> Any:
    """Executes each handler in sequence, passing results through the chain."""
    total = len(self.chained_handlers)
    handler_names = [h.__class__.__name__ for h in self.chained_handlers]
    logger.debug(
        f"{context.execution_id} | RQC | Processing chain with {total} handler(s): {handler_names}"
    )

    result = context.raw_result
    for i, next_handler in enumerate(self.chained_handlers):
        handler_name = next_handler.__class__.__name__
        try:
            result = next_handler.handle_result(context)
        except RqcResultHandlerError:
            raise
        except Exception as e:
            raise RqcResultHandlerError(
                cause=e,
                result_handler=next_handler,
                message=f"Handler '{handler_name}' failed in chain: {e}",
            ) from e
        # Only advance context on success; on error the exception propagates
        context = context.with_result(result)
        logger.debug(
            f"{context.execution_id} | RQC | Handler '{handler_name}' completed ({i + 1}/{total})"
        )
    return result

of staticmethod

of(handlers: RqcResultHandler | Sequence[RqcResultHandler]) -> ChainedResultHandler

Factory method to create a ChainedResultHandler.

Parameters:

Name Type Description Default
handlers RqcResultHandler | Sequence[RqcResultHandler]

A single handler or sequence of handlers.

required

Returns:

Type Description
ChainedResultHandler

A ChainedResultHandler wrapping the provided handlers.

Source code in src/stkai/rqc/_handlers.py
@staticmethod
def of(handlers: RqcResultHandler | Sequence[RqcResultHandler]) -> "ChainedResultHandler":
    """
    Factory method to create a ChainedResultHandler.

    Args:
        handlers: A single handler or sequence of handlers.

    Returns:
        A ChainedResultHandler wrapping the provided handlers.
    """
    return ChainedResultHandler(
        [handlers] if isinstance(handlers, RqcResultHandler) else list(handlers)
    )

Event Listeners

RqcEventListener

Base class for observing RQC execution lifecycle events.

Listeners are read-only observers: they can react to events, log, notify, or collect metrics, but should NOT modify the request or response.

The context dict is shared across all listener calls for a single execution, allowing listeners to store and retrieve state (e.g., start time for telemetry).

All methods have default empty implementations, so subclasses only need to override the methods they care about.

Example

class MetricsListener(RqcEventListener): ... def on_before_execute(self, request, context): ... context['start_time'] = time.time() ... ... def on_after_execute(self, request, response, context): ... duration = time.time() - context['start_time'] ... statsd.timing('rqc.duration', duration)

Source code in src/stkai/rqc/_event_listeners.py
class RqcEventListener:
    """
    Base class for observing RQC execution lifecycle events.

    Listeners are read-only observers: they can react to events, log, notify,
    or collect metrics, but should NOT modify the request or response.

    The `context` dict is shared across all listener calls for a single execution,
    allowing listeners to store and retrieve state (e.g., start time for telemetry).

    All methods have default empty implementations, so subclasses only need to
    override the methods they care about.

    Example:
        >>> class MetricsListener(RqcEventListener):
        ...     def on_before_execute(self, request, context):
        ...         context['start_time'] = time.time()
        ...
        ...     def on_after_execute(self, request, response, context):
        ...         duration = time.time() - context['start_time']
        ...         statsd.timing('rqc.duration', duration)
    """

    def on_before_execute(self, request: RqcRequest, context: dict[str, Any]) -> None:
        """
        Called before starting the execution.

        Args:
            request: The request about to be executed.
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    def on_status_change(
        self,
        request: RqcRequest,
        old_status: RqcExecutionStatus,
        new_status: RqcExecutionStatus,
        context: dict[str, Any],
    ) -> None:
        """
        Called when the execution status changes throughout the lifecycle.

        This method is invoked at key state transitions:
        - PENDING → CREATED: Execution was successfully created on the server.
        - PENDING → ERROR/TIMEOUT: Failed to create execution (network error, timeout, etc.).
        - CREATED → RUNNING: Server started processing the execution.
        - RUNNING → COMPLETED: Execution finished successfully.
        - RUNNING → FAILURE: Execution failed on the server-side.
        - Any → TIMEOUT: Polling timed out waiting for completion.

        Args:
            request: The request being executed.
            old_status: The previous status.
            new_status: The new status.
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    def on_after_execute(
        self,
        request: RqcRequest,
        response: RqcResponse,
        context: dict[str, Any],
    ) -> None:
        """
        Called after execution completes (success or failure).

        Check response.status or response.is_completed() to determine the outcome.

        Args:
            request: The executed request.
            response: The final response (always provided, check status for outcome).
            context: Mutable dict for sharing state between listener calls.
        """
        pass

Functions

on_before_execute

on_before_execute(request: RqcRequest, context: dict[str, Any]) -> None

Called before starting the execution.

Parameters:

Name Type Description Default
request RqcRequest

The request about to be executed.

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_before_execute(self, request: RqcRequest, context: dict[str, Any]) -> None:
    """
    Called before starting the execution.

    Args:
        request: The request about to be executed.
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_status_change

on_status_change(request: RqcRequest, old_status: RqcExecutionStatus, new_status: RqcExecutionStatus, context: dict[str, Any]) -> None

Called when the execution status changes throughout the lifecycle.

This method is invoked at key state transitions: - PENDING → CREATED: Execution was successfully created on the server. - PENDING → ERROR/TIMEOUT: Failed to create execution (network error, timeout, etc.). - CREATED → RUNNING: Server started processing the execution. - RUNNING → COMPLETED: Execution finished successfully. - RUNNING → FAILURE: Execution failed on the server-side. - Any → TIMEOUT: Polling timed out waiting for completion.

Parameters:

Name Type Description Default
request RqcRequest

The request being executed.

required
old_status RqcExecutionStatus

The previous status.

required
new_status RqcExecutionStatus

The new status.

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_status_change(
    self,
    request: RqcRequest,
    old_status: RqcExecutionStatus,
    new_status: RqcExecutionStatus,
    context: dict[str, Any],
) -> None:
    """
    Called when the execution status changes throughout the lifecycle.

    This method is invoked at key state transitions:
    - PENDING → CREATED: Execution was successfully created on the server.
    - PENDING → ERROR/TIMEOUT: Failed to create execution (network error, timeout, etc.).
    - CREATED → RUNNING: Server started processing the execution.
    - RUNNING → COMPLETED: Execution finished successfully.
    - RUNNING → FAILURE: Execution failed on the server-side.
    - Any → TIMEOUT: Polling timed out waiting for completion.

    Args:
        request: The request being executed.
        old_status: The previous status.
        new_status: The new status.
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_after_execute

on_after_execute(request: RqcRequest, response: RqcResponse, context: dict[str, Any]) -> None

Called after execution completes (success or failure).

Check response.status or response.is_completed() to determine the outcome.

Parameters:

Name Type Description Default
request RqcRequest

The executed request.

required
response RqcResponse

The final response (always provided, check status for outcome).

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_after_execute(
    self,
    request: RqcRequest,
    response: RqcResponse,
    context: dict[str, Any],
) -> None:
    """
    Called after execution completes (success or failure).

    Check response.status or response.is_completed() to determine the outcome.

    Args:
        request: The executed request.
        response: The final response (always provided, check status for outcome).
        context: Mutable dict for sharing state between listener calls.
    """
    pass

RqcPhasedEventListener

Bases: RqcEventListener

Abstract listener that exposes granular hooks for each execution phase.

This class implements the base RqcEventListener methods and delegates to phase-specific methods, making it easier to implement listeners focused on specific phases of the execution lifecycle.

Phase 1 - Create Execution
  • on_create_execution_start: Before POST to create execution
  • on_create_execution_end: After creation (success or failure)

Phase 2 - Get Result (Polling): - on_get_result_start: Before polling loop starts - on_get_result_end: After polling completes (any terminal status)

Example

class MetricsListener(RqcPhasedEventListener): ... def on_create_execution_start(self, request, context): ... context['create_start'] = time.time() ... ... def on_create_execution_end(self, request, status, response, context): ... duration = time.time() - context['create_start'] ... if status == RqcExecutionStatus.CREATED: ... statsd.timing('rqc.create.success', duration) ... else: ... statsd.timing('rqc.create.failure', duration)

Source code in src/stkai/rqc/_event_listeners.py
class RqcPhasedEventListener(RqcEventListener):
    """
    Abstract listener that exposes granular hooks for each execution phase.

    This class implements the base RqcEventListener methods and delegates
    to phase-specific methods, making it easier to implement listeners
    focused on specific phases of the execution lifecycle.

    Phase 1 - Create Execution:
        - on_create_execution_start: Before POST to create execution
        - on_create_execution_end: After creation (success or failure)

    Phase 2 - Get Result (Polling):
        - on_get_result_start: Before polling loop starts
        - on_get_result_end: After polling completes (any terminal status)

    Example:
        >>> class MetricsListener(RqcPhasedEventListener):
        ...     def on_create_execution_start(self, request, context):
        ...         context['create_start'] = time.time()
        ...
        ...     def on_create_execution_end(self, request, status, response, context):
        ...         duration = time.time() - context['create_start']
        ...         if status == RqcExecutionStatus.CREATED:
        ...             statsd.timing('rqc.create.success', duration)
        ...         else:
        ...             statsd.timing('rqc.create.failure', duration)
    """

    # ==================
    # Phase 1: Create Execution
    # ==================

    def on_create_execution_start(
        self,
        request: RqcRequest,
        context: dict[str, Any],
    ) -> None:
        """
        Called before attempting to create the execution on the server.

        Args:
            request: The request about to be submitted.
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    def on_create_execution_end(
        self,
        request: RqcRequest,
        status: RqcExecutionStatus,
        response: RqcResponse | None,
        context: dict[str, Any],
    ) -> None:
        """
        Called after the create-execution phase completes (success or failure).

        Args:
            request: The request that was submitted.
            status: The resulting status (CREATED on success, ERROR/TIMEOUT on failure).
            response: The RqcResponse if creation failed (contains error details),
                      None if creation succeeded (polling will start).
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    # ==================
    # Phase 2: Get Result (Polling)
    # ==================

    def on_get_result_start(
        self,
        request: RqcRequest,
        context: dict[str, Any],
    ) -> None:
        """
        Called before starting the polling loop.

        Only called if create-execution succeeded (request.execution_id is set).

        Args:
            request: The request with execution_id already assigned.
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    def on_get_result_end(
        self,
        request: RqcRequest,
        response: RqcResponse,
        context: dict[str, Any],
    ) -> None:
        """
        Called after the polling phase completes (any terminal status).

        Args:
            request: The executed request.
            response: The final response (COMPLETED, FAILURE, ERROR, or TIMEOUT).
            context: Mutable dict for sharing state between listener calls.
        """
        pass

    # ==================
    # Base method implementations (delegation logic)
    # ==================

    @override
    def on_before_execute(
        self,
        request: RqcRequest,
        context: dict[str, Any],
    ) -> None:
        """Delegates to on_create_execution_start."""
        self.on_create_execution_start(request, context)

    @override
    def on_status_change(
        self,
        request: RqcRequest,
        old_status: RqcExecutionStatus,
        new_status: RqcExecutionStatus,
        context: dict[str, Any],
    ) -> None:
        """
        Delegates to phase-specific methods based on status transitions.

        - PENDING → CREATED: Calls on_create_execution_end (success) + on_get_result_start
        - PENDING → ERROR/TIMEOUT: Handled in on_after_execute (failure case)
        """
        # Create-execution succeeded, polling is about to start
        if old_status == RqcExecutionStatus.PENDING and new_status == RqcExecutionStatus.CREATED:
            self.on_create_execution_end(request, new_status, None, context)
            self.on_get_result_start(request, context)

    @override
    def on_after_execute(
        self,
        request: RqcRequest,
        response: RqcResponse,
        context: dict[str, Any],
    ) -> None:
        """
        Delegates to phase-specific methods based on execution outcome.

        - No execution_id: Create-execution failed → on_create_execution_end
        - Has execution_id: Polling finished → on_get_result_end
        """
        if response.execution_id is None:
            # Failed during create-execution phase
            self.on_create_execution_end(request, response.status, response, context)
        else:
            # Polling phase completed
            self.on_get_result_end(request, response, context)

Functions

on_create_execution_start

on_create_execution_start(request: RqcRequest, context: dict[str, Any]) -> None

Called before attempting to create the execution on the server.

Parameters:

Name Type Description Default
request RqcRequest

The request about to be submitted.

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_create_execution_start(
    self,
    request: RqcRequest,
    context: dict[str, Any],
) -> None:
    """
    Called before attempting to create the execution on the server.

    Args:
        request: The request about to be submitted.
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_create_execution_end

on_create_execution_end(request: RqcRequest, status: RqcExecutionStatus, response: RqcResponse | None, context: dict[str, Any]) -> None

Called after the create-execution phase completes (success or failure).

Parameters:

Name Type Description Default
request RqcRequest

The request that was submitted.

required
status RqcExecutionStatus

The resulting status (CREATED on success, ERROR/TIMEOUT on failure).

required
response RqcResponse | None

The RqcResponse if creation failed (contains error details), None if creation succeeded (polling will start).

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_create_execution_end(
    self,
    request: RqcRequest,
    status: RqcExecutionStatus,
    response: RqcResponse | None,
    context: dict[str, Any],
) -> None:
    """
    Called after the create-execution phase completes (success or failure).

    Args:
        request: The request that was submitted.
        status: The resulting status (CREATED on success, ERROR/TIMEOUT on failure).
        response: The RqcResponse if creation failed (contains error details),
                  None if creation succeeded (polling will start).
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_get_result_start

on_get_result_start(request: RqcRequest, context: dict[str, Any]) -> None

Called before starting the polling loop.

Only called if create-execution succeeded (request.execution_id is set).

Parameters:

Name Type Description Default
request RqcRequest

The request with execution_id already assigned.

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_get_result_start(
    self,
    request: RqcRequest,
    context: dict[str, Any],
) -> None:
    """
    Called before starting the polling loop.

    Only called if create-execution succeeded (request.execution_id is set).

    Args:
        request: The request with execution_id already assigned.
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_get_result_end

on_get_result_end(request: RqcRequest, response: RqcResponse, context: dict[str, Any]) -> None

Called after the polling phase completes (any terminal status).

Parameters:

Name Type Description Default
request RqcRequest

The executed request.

required
response RqcResponse

The final response (COMPLETED, FAILURE, ERROR, or TIMEOUT).

required
context dict[str, Any]

Mutable dict for sharing state between listener calls.

required
Source code in src/stkai/rqc/_event_listeners.py
def on_get_result_end(
    self,
    request: RqcRequest,
    response: RqcResponse,
    context: dict[str, Any],
) -> None:
    """
    Called after the polling phase completes (any terminal status).

    Args:
        request: The executed request.
        response: The final response (COMPLETED, FAILURE, ERROR, or TIMEOUT).
        context: Mutable dict for sharing state between listener calls.
    """
    pass

on_before_execute

on_before_execute(request: RqcRequest, context: dict[str, Any]) -> None

Delegates to on_create_execution_start.

Source code in src/stkai/rqc/_event_listeners.py
@override
def on_before_execute(
    self,
    request: RqcRequest,
    context: dict[str, Any],
) -> None:
    """Delegates to on_create_execution_start."""
    self.on_create_execution_start(request, context)

on_status_change

on_status_change(request: RqcRequest, old_status: RqcExecutionStatus, new_status: RqcExecutionStatus, context: dict[str, Any]) -> None

Delegates to phase-specific methods based on status transitions.

  • PENDING → CREATED: Calls on_create_execution_end (success) + on_get_result_start
  • PENDING → ERROR/TIMEOUT: Handled in on_after_execute (failure case)
Source code in src/stkai/rqc/_event_listeners.py
@override
def on_status_change(
    self,
    request: RqcRequest,
    old_status: RqcExecutionStatus,
    new_status: RqcExecutionStatus,
    context: dict[str, Any],
) -> None:
    """
    Delegates to phase-specific methods based on status transitions.

    - PENDING → CREATED: Calls on_create_execution_end (success) + on_get_result_start
    - PENDING → ERROR/TIMEOUT: Handled in on_after_execute (failure case)
    """
    # Create-execution succeeded, polling is about to start
    if old_status == RqcExecutionStatus.PENDING and new_status == RqcExecutionStatus.CREATED:
        self.on_create_execution_end(request, new_status, None, context)
        self.on_get_result_start(request, context)

on_after_execute

on_after_execute(request: RqcRequest, response: RqcResponse, context: dict[str, Any]) -> None

Delegates to phase-specific methods based on execution outcome.

  • No execution_id: Create-execution failed → on_create_execution_end
  • Has execution_id: Polling finished → on_get_result_end
Source code in src/stkai/rqc/_event_listeners.py
@override
def on_after_execute(
    self,
    request: RqcRequest,
    response: RqcResponse,
    context: dict[str, Any],
) -> None:
    """
    Delegates to phase-specific methods based on execution outcome.

    - No execution_id: Create-execution failed → on_create_execution_end
    - Has execution_id: Polling finished → on_get_result_end
    """
    if response.execution_id is None:
        # Failed during create-execution phase
        self.on_create_execution_end(request, response.status, response, context)
    else:
        # Polling phase completed
        self.on_get_result_end(request, response, context)

FileLoggingListener

Bases: RqcEventListener

Listener that persists request and response to JSON files for debugging.

This listener writes files to the specified output directory: - {tracking_id}-request.json: The request payload - {tracking_id}-response-{status}.json: The response result or error details

Example

listener = FileLoggingListener(Path("./output/rqc")) rqc = RemoteQuickCommand(slug_name="my-rqc", listeners=[listener])

Source code in src/stkai/rqc/_event_listeners.py
class FileLoggingListener(RqcEventListener):
    """
    Listener that persists request and response to JSON files for debugging.

    This listener writes files to the specified output directory:
    - `{tracking_id}-request.json`: The request payload
    - `{tracking_id}-response-{status}.json`: The response result or error details

    Example:
        >>> listener = FileLoggingListener(Path("./output/rqc"))
        >>> rqc = RemoteQuickCommand(slug_name="my-rqc", listeners=[listener])
    """

    def __init__(self, output_dir: Path | str):
        """
        Initialize the listener with an output directory.

        Args:
            output_dir: Directory where JSON files will be saved (Path or str).
                       Created automatically if it doesn't exist.
        """
        assert output_dir, "Output directory is required."

        self.output_dir: Path = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)


    @override
    def on_status_change(
        self,
        request: RqcRequest,
        old_status: RqcExecutionStatus,
        new_status: RqcExecutionStatus,
        context: dict[str, Any],
    ) -> None:
        """Writes request file when status transitions from PENDING (for debugging)."""
        if old_status == RqcExecutionStatus.PENDING:
            request.write_to_file(output_dir=self.output_dir, tracking_id=context.get("execution_id"))

    @override
    def on_after_execute(
        self,
        request: RqcRequest,
        response: RqcResponse,
        context: dict[str, Any],
    ) -> None:
        """Writes response to JSON file after execution completes."""
        response.write_to_file(output_dir=self.output_dir)

Functions

__init__

__init__(output_dir: Path | str)

Initialize the listener with an output directory.

Parameters:

Name Type Description Default
output_dir Path | str

Directory where JSON files will be saved (Path or str). Created automatically if it doesn't exist.

required
Source code in src/stkai/rqc/_event_listeners.py
def __init__(self, output_dir: Path | str):
    """
    Initialize the listener with an output directory.

    Args:
        output_dir: Directory where JSON files will be saved (Path or str).
                   Created automatically if it doesn't exist.
    """
    assert output_dir, "Output directory is required."

    self.output_dir: Path = Path(output_dir)
    self.output_dir.mkdir(parents=True, exist_ok=True)

on_status_change

on_status_change(request: RqcRequest, old_status: RqcExecutionStatus, new_status: RqcExecutionStatus, context: dict[str, Any]) -> None

Writes request file when status transitions from PENDING (for debugging).

Source code in src/stkai/rqc/_event_listeners.py
@override
def on_status_change(
    self,
    request: RqcRequest,
    old_status: RqcExecutionStatus,
    new_status: RqcExecutionStatus,
    context: dict[str, Any],
) -> None:
    """Writes request file when status transitions from PENDING (for debugging)."""
    if old_status == RqcExecutionStatus.PENDING:
        request.write_to_file(output_dir=self.output_dir, tracking_id=context.get("execution_id"))

on_after_execute

on_after_execute(request: RqcRequest, response: RqcResponse, context: dict[str, Any]) -> None

Writes response to JSON file after execution completes.

Source code in src/stkai/rqc/_event_listeners.py
@override
def on_after_execute(
    self,
    request: RqcRequest,
    response: RqcResponse,
    context: dict[str, Any],
) -> None:
    """Writes response to JSON file after execution completes."""
    response.write_to_file(output_dir=self.output_dir)

Errors

MaxRetriesExceededError

Bases: Exception

Raised when all retry attempts are exhausted.

This exception wraps the last exception that occurred during retry attempts, providing access to the original error for debugging.

Attributes:

Name Type Description
message

Human-readable error message.

last_exception

The original exception from the last retry attempt.

Example

try: ... for attempt in Retrying(max_retries=3): ... with attempt: ... raise ConnectionError("Server unavailable") ... except MaxRetriesExceededError as e: ... print(f"Failed after retries: {e}") ... print(f"Original error: {e.last_exception}")

Source code in src/stkai/_retry.py
class MaxRetriesExceededError(Exception):
    """
    Raised when all retry attempts are exhausted.

    This exception wraps the last exception that occurred during retry attempts,
    providing access to the original error for debugging.

    Attributes:
        message: Human-readable error message.
        last_exception: The original exception from the last retry attempt.

    Example:
        >>> try:
        ...     for attempt in Retrying(max_retries=3):
        ...         with attempt:
        ...             raise ConnectionError("Server unavailable")
        ... except MaxRetriesExceededError as e:
        ...     print(f"Failed after retries: {e}")
        ...     print(f"Original error: {e.last_exception}")
    """

    def __init__(self, message: str, last_exception: Exception | None = None):
        super().__init__(message)
        self.last_exception = last_exception

RqcResultHandlerError

Bases: RuntimeError

Raised when the result handler fails to process the result.

This exception wraps any error that occurs during result processing, providing access to the original cause and the handler that failed.

Attributes:

Name Type Description
cause

The original exception that caused the handler to fail.

result_handler

The handler instance that raised the error.

Source code in src/stkai/rqc/_handlers.py
class RqcResultHandlerError(RuntimeError):
    """
    Raised when the result handler fails to process the result.

    This exception wraps any error that occurs during result processing,
    providing access to the original cause and the handler that failed.

    Attributes:
        cause: The original exception that caused the handler to fail.
        result_handler: The handler instance that raised the error.
    """

    def __init__(self, message: str, cause: Exception | None = None, result_handler: "RqcResultHandler | None" = None):
        super().__init__(message)
        self.cause = cause
        self.result_handler = result_handler

ExecutionIdIsMissingError

Bases: RuntimeError

Raised when the execution ID is missing or not provided.

This exception indicates that the StackSpot AI API returned an invalid response without an execution ID after a create-execution request.

Source code in src/stkai/rqc/_remote_quick_command.py
class ExecutionIdIsMissingError(RuntimeError):
    """
    Raised when the execution ID is missing or not provided.

    This exception indicates that the StackSpot AI API returned an invalid
    response without an execution ID after a create-execution request.
    """

    def __init__(self, message: str):
        super().__init__(message)