Skip to content

Scheduler

rait_connector.scheduler

Scheduler for running RAIT tasks at regular intervals.

Wraps APScheduler 3.x to provide a simple interface for scheduling recurring RAIT operations such as telemetry fetching and calibration runs.

Example

from rait_connector import RAITClient, Scheduler

client = RAITClient() scheduler = Scheduler(client)

scheduler.add_telemetry_job(interval="daily") scheduler.add_calibration_job( ... model_name="gpt-4", ... model_version="1.0", ... environment="production", ... model_purpose="monitoring", ... invoke_model=lambda prompt: my_llm(prompt), ... interval="weekly", ... )

scheduler.start() # runs in background; program continues print(scheduler.status()) # see all jobs and their state print(scheduler.history()) # see past execution records

Scheduler

Runs registered jobs at configured intervals using APScheduler.

Supports:

  • Named shortcuts: "hourly", "daily", "weekly"
  • Cron expressions: "0 9 * * MON-FRI"
  • :class:datetime.timedelta objects
  • Integer / float seconds

Built-in RAIT jobs:

  • :meth:add_telemetry_job — calls :meth:~rait_connector.RAITClient.fetch_telemetry
  • :meth:add_calibration_job — runs calibration prompt evaluation

Multiple :class:Scheduler instances can run simultaneously.

Tracking:

  • :meth:status — all registered jobs with next run time and live is_executing flag
  • :meth:running_jobs — IDs of jobs currently executing
  • :meth:history — past execution records (timestamp, success/error)
Example

scheduler = Scheduler(client) scheduler.add_telemetry_job(interval="daily") scheduler.add_calibration_job( ... model_name="gpt-4", model_version="1.0", ... environment="production", model_purpose="monitoring", ... invoke_model=lambda p: my_llm(p), interval="weekly", ... ) scheduler.start() scheduler.status() [{'id': 'rait_telemetry', 'trigger': '...', 'next_run': '...', 'is_executing': False}]

Decorator style

@scheduler.job(interval="daily") ... def nightly_eval(): ... client.evaluate_batch(fetch_prompts()) scheduler.start()

__init__(client, max_history=_DEFAULT_HISTORY_SIZE)

Initialize the scheduler.

Parameters:

Name Type Description Default
client Any

A :class:~rait_connector.RAITClient instance used by the built-in RAIT jobs.

required
max_history int

Maximum number of execution records to keep in memory. Older records are discarded automatically. Defaults to 100.

_DEFAULT_HISTORY_SIZE

add_calibration_job(model_name, model_version, environment, model_purpose, invoke_model, interval='weekly', run_immediately=False)

Schedule automatic calibration response collection.

On each run two calibration flows are executed:

Flow 1 — Calibrator prompt responses:

  1. Fetches prompts that need model responses via :meth:~rait_connector.RAITClient.get_prompts_response.
  2. Invokes invoke_model for each prompt to collect responses.
  3. Submits the responses back via :meth:~rait_connector.RAITClient.update_prompts_response.

Flow 2 — Model-registry calibration prompts:

  1. Fetches calibration prompts from the model registry via :meth:~rait_connector.RAITClient.get_model_calibration_prompts.
  2. Invokes invoke_model for each prompt to collect responses.
  3. Posts the responses to the RAIT ingest URL with log_type="calibration" via :meth:~rait_connector.RAITClient.post_calibration_responses.

Parameters:

Name Type Description Default
model_name str

Name of the LLM model to calibrate.

required
model_version str

Version of the LLM model.

required
environment str

Execution environment (e.g. "production").

required
model_purpose str

Evaluation purpose label.

required
invoke_model Callable[[str], str]

Callback that takes a prompt text string and returns the model's response string.

required
interval Union[str, timedelta, int, float]

How often to run. Defaults to "weekly".

'weekly'
run_immediately bool

Run once when the scheduler starts.

False
Example

scheduler.add_calibration_job( ... model_name="gpt-4", ... model_version="1.0", ... environment="production", ... model_purpose="monitoring", ... invoke_model=lambda prompt: openai_client.complete(prompt), ... interval="weekly", ... )

add_job(func, interval, name=None, run_immediately=False)

Register a callable to run at a given interval.

Parameters:

Name Type Description Default
func Callable

The callable to execute on each run.

required
interval Union[str, timedelta, int, float]

How often to run. Accepts "hourly", "daily", "weekly", a cron expression, a :class:datetime.timedelta, or a number of seconds.

required
name Optional[str]

Optional display name / job ID. Defaults to func.__name__. Must be unique per scheduler instance; re-registering with the same name replaces the previous job.

None
run_immediately bool

If True the job runs once as soon as the scheduler starts; otherwise it waits for the first interval to elapse.

False

Raises:

Type Description
SchedulerError

If the interval cannot be parsed.

add_telemetry_job(model_name, model_version, model_environment, model_purpose, interval='daily', on_result=None, run_immediately=False, **fetch_kwargs)

Schedule automatic telemetry fetching and posting.

Calls :meth:~rait_connector.RAITClient.fetch_telemetry on the configured interval, posts the results to the RAIT ingest URL via :meth:~rait_connector.RAITClient.post_telemetry, and optionally passes the result to a callback.

Parameters:

Name Type Description Default
model_name str

Name of the LLM model.

required
model_version str

Version of the LLM model.

required
model_environment str

Execution environment (e.g. "production").

required
model_purpose str

Evaluation purpose label.

required
interval Union[str, timedelta, int, float]

How often to fetch telemetry. Defaults to "daily".

'daily'
on_result Optional[Callable[[Dict[str, Any]], None]]

Optional callback invoked with the telemetry result dict after each successful fetch. Signature: on_result(result: Dict[str, List[Dict]]).

None
run_immediately bool

Run once when the scheduler starts.

False
**fetch_kwargs Any

Additional keyword arguments forwarded to :meth:~rait_connector.RAITClient.fetch_telemetry (e.g. timespan, tables, limit).

{}
Example

scheduler.add_telemetry_job( ... model_name="gpt-4", ... model_version="1.0", ... model_environment="production", ... model_purpose="monitoring", ... interval="daily", ... on_result=lambda r: print(r.keys()), ... timespan=timedelta(days=7), ... )

history(job_id=None, limit=None)

Return past execution records, most recent first.

Each record contains:

  • job_id — which job ran
  • started_at — ISO 8601 start timestamp
  • finished_at — ISO 8601 finish timestamp
  • successTrue if the job completed without raising
  • error — error message string if success is False, else None

Parameters:

Name Type Description Default
job_id Optional[str]

Filter records to a specific job. Returns all jobs if omitted.

None
limit Optional[int]

Cap the number of records returned. Returns all available records if omitted.

None

Returns:

Type Description
List[Dict[str, Any]]

List of execution record dicts, newest first.

is_running()

Return True if the scheduler is currently running.

job(interval, name=None, run_immediately=False)

Decorator to register a function as a scheduled job.

Parameters:

Name Type Description Default
interval Union[str, timedelta, int, float]

How often to run (same as :meth:add_job).

required
name Optional[str]

Optional job name. Defaults to the decorated function name.

None
run_immediately bool

Whether to run once when the scheduler starts.

False

Returns:

Type Description
Callable

The original function, unchanged.

Example

@scheduler.job(interval="daily") ... def my_job(): ... pass

running_jobs()

Return the IDs of jobs that are currently executing.

A job appears here from the moment its function starts until it returns (or raises). Useful for checking whether a long-running job is still in progress.

Returns:

Type Description
List[str]

List of job ID strings.

start(blocking=False)

Start the scheduler.

Parameters:

Name Type Description Default
blocking bool

If True the calling thread blocks until the scheduler is stopped (e.g. via KeyboardInterrupt). Useful for standalone scripts. Defaults to False which runs the scheduler in a background daemon thread so the program can continue.

False

Raises:

Type Description
SchedulerError

If the underlying APScheduler fails to start.

status()

Return a snapshot of all registered jobs.

Each dict contains:

  • id — job ID / name
  • trigger — human-readable trigger description
  • next_run — ISO 8601 timestamp of next scheduled run, or None
  • is_executingTrue if the job is currently running

Returns:

Type Description
List[Dict[str, Any]]

List of job status dictionaries.

stop()

Stop the scheduler gracefully.

Waits for any currently executing jobs to finish before returning. Safe to call even if the scheduler is not running.