Scheduler
rait_connector.scheduler
Scheduler for running RAIT tasks at regular intervals.
Wraps APScheduler 3.x to provide a simple interface for scheduling recurring RAIT operations such as telemetry fetching and calibration runs.
Example
from rait_connector import RAITClient, Scheduler
client = RAITClient() scheduler = Scheduler(client)
scheduler.add_telemetry_job(interval="daily") scheduler.add_calibration_job( ... model_name="gpt-4", ... model_version="1.0", ... environment="production", ... model_purpose="monitoring", ... invoke_model=lambda prompt: my_llm(prompt), ... interval="weekly", ... )
scheduler.start() # runs in background; program continues print(scheduler.status()) # see all jobs and their state print(scheduler.history()) # see past execution records
Scheduler
Runs registered jobs at configured intervals using APScheduler.
Supports:
- Named shortcuts:
"hourly","daily","weekly" - Cron expressions:
"0 9 * * MON-FRI" - :class:
datetime.timedeltaobjects - Integer / float seconds
Built-in RAIT jobs:
- :meth:
add_telemetry_job— calls :meth:~rait_connector.RAITClient.fetch_telemetry - :meth:
add_calibration_job— runs calibration prompt evaluation
Multiple :class:Scheduler instances can run simultaneously.
Tracking:
- :meth:
status— all registered jobs with next run time and liveis_executingflag - :meth:
running_jobs— IDs of jobs currently executing - :meth:
history— past execution records (timestamp, success/error)
Example
scheduler = Scheduler(client) scheduler.add_telemetry_job(interval="daily") scheduler.add_calibration_job( ... model_name="gpt-4", model_version="1.0", ... environment="production", model_purpose="monitoring", ... invoke_model=lambda p: my_llm(p), interval="weekly", ... ) scheduler.start() scheduler.status() [{'id': 'rait_telemetry', 'trigger': '...', 'next_run': '...', 'is_executing': False}]
Decorator style
@scheduler.job(interval="daily") ... def nightly_eval(): ... client.evaluate_batch(fetch_prompts()) scheduler.start()
__init__(client, max_history=_DEFAULT_HISTORY_SIZE)
Initialize the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
Any
|
A :class: |
required |
max_history
|
int
|
Maximum number of execution records to keep in memory. Older records are discarded automatically. Defaults to 100. |
_DEFAULT_HISTORY_SIZE
|
add_calibration_job(model_name, model_version, environment, model_purpose, invoke_model, interval='weekly', run_immediately=False)
Schedule automatic calibration response collection.
On each run two calibration flows are executed:
Flow 1 — Calibrator prompt responses:
- Fetches prompts that need model responses via
:meth:
~rait_connector.RAITClient.get_prompts_response. - Invokes
invoke_modelfor each prompt to collect responses. - Submits the responses back via
:meth:
~rait_connector.RAITClient.update_prompts_response.
Flow 2 — Model-registry calibration prompts:
- Fetches calibration prompts from the model registry via
:meth:
~rait_connector.RAITClient.get_model_calibration_prompts. - Invokes
invoke_modelfor each prompt to collect responses. - Posts the responses to the RAIT ingest URL with
log_type="calibration"via :meth:~rait_connector.RAITClient.post_calibration_responses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_name
|
str
|
Name of the LLM model to calibrate. |
required |
model_version
|
str
|
Version of the LLM model. |
required |
environment
|
str
|
Execution environment (e.g. |
required |
model_purpose
|
str
|
Evaluation purpose label. |
required |
invoke_model
|
Callable[[str], str]
|
Callback that takes a prompt text string and returns the model's response string. |
required |
interval
|
Union[str, timedelta, int, float]
|
How often to run. Defaults to |
'weekly'
|
run_immediately
|
bool
|
Run once when the scheduler starts. |
False
|
Example
scheduler.add_calibration_job( ... model_name="gpt-4", ... model_version="1.0", ... environment="production", ... model_purpose="monitoring", ... invoke_model=lambda prompt: openai_client.complete(prompt), ... interval="weekly", ... )
add_job(func, interval, name=None, run_immediately=False)
Register a callable to run at a given interval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
The callable to execute on each run. |
required |
interval
|
Union[str, timedelta, int, float]
|
How often to run. Accepts |
required |
name
|
Optional[str]
|
Optional display name / job ID. Defaults to
|
None
|
run_immediately
|
bool
|
If |
False
|
Raises:
| Type | Description |
|---|---|
SchedulerError
|
If the interval cannot be parsed. |
add_telemetry_job(model_name, model_version, model_environment, model_purpose, interval='daily', on_result=None, run_immediately=False, **fetch_kwargs)
Schedule automatic telemetry fetching and posting.
Calls :meth:~rait_connector.RAITClient.fetch_telemetry on the
configured interval, posts the results to the RAIT ingest URL
via :meth:~rait_connector.RAITClient.post_telemetry, and
optionally passes the result to a callback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_name
|
str
|
Name of the LLM model. |
required |
model_version
|
str
|
Version of the LLM model. |
required |
model_environment
|
str
|
Execution environment (e.g. |
required |
model_purpose
|
str
|
Evaluation purpose label. |
required |
interval
|
Union[str, timedelta, int, float]
|
How often to fetch telemetry. Defaults to |
'daily'
|
on_result
|
Optional[Callable[[Dict[str, Any]], None]]
|
Optional callback invoked with the telemetry result
dict after each successful fetch. Signature:
|
None
|
run_immediately
|
bool
|
Run once when the scheduler starts. |
False
|
**fetch_kwargs
|
Any
|
Additional keyword arguments forwarded to
:meth: |
{}
|
Example
scheduler.add_telemetry_job( ... model_name="gpt-4", ... model_version="1.0", ... model_environment="production", ... model_purpose="monitoring", ... interval="daily", ... on_result=lambda r: print(r.keys()), ... timespan=timedelta(days=7), ... )
history(job_id=None, limit=None)
Return past execution records, most recent first.
Each record contains:
job_id— which job ranstarted_at— ISO 8601 start timestampfinished_at— ISO 8601 finish timestampsuccess—Trueif the job completed without raisingerror— error message string ifsuccessisFalse, elseNone
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
Optional[str]
|
Filter records to a specific job. Returns all jobs if omitted. |
None
|
limit
|
Optional[int]
|
Cap the number of records returned. Returns all available records if omitted. |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of execution record dicts, newest first. |
is_running()
Return True if the scheduler is currently running.
job(interval, name=None, run_immediately=False)
Decorator to register a function as a scheduled job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
Union[str, timedelta, int, float]
|
How often to run (same as :meth: |
required |
name
|
Optional[str]
|
Optional job name. Defaults to the decorated function name. |
None
|
run_immediately
|
bool
|
Whether to run once when the scheduler starts. |
False
|
Returns:
| Type | Description |
|---|---|
Callable
|
The original function, unchanged. |
Example
@scheduler.job(interval="daily") ... def my_job(): ... pass
running_jobs()
Return the IDs of jobs that are currently executing.
A job appears here from the moment its function starts until it returns (or raises). Useful for checking whether a long-running job is still in progress.
Returns:
| Type | Description |
|---|---|
List[str]
|
List of job ID strings. |
start(blocking=False)
Start the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
blocking
|
bool
|
If |
False
|
Raises:
| Type | Description |
|---|---|
SchedulerError
|
If the underlying APScheduler fails to start. |
status()
Return a snapshot of all registered jobs.
Each dict contains:
id— job ID / nametrigger— human-readable trigger descriptionnext_run— ISO 8601 timestamp of next scheduled run, orNoneis_executing—Trueif the job is currently running
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of job status dictionaries. |
stop()
Stop the scheduler gracefully.
Waits for any currently executing jobs to finish before returning. Safe to call even if the scheduler is not running.