Skip to content

Scheduler

Examples of running recurring RAIT jobs on a schedule using the Scheduler class.

Basic Setup

from rait_connector import RAITClient, Scheduler

client = RAITClient()
scheduler = Scheduler(client)

# Fetch and post Azure Monitor telemetry every day
scheduler.add_telemetry_job(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    interval="daily",
)

# Collect and post calibration responses every week
scheduler.add_calibration_job(
    model_name="gpt-4",
    model_version="1.0",
    environment="production",
    model_purpose="monitoring",
    invoke_model=lambda prompt: my_llm.generate(prompt),
    interval="weekly",
)

scheduler.start()  # runs in background; program continues

Interval Options

The interval parameter accepts several formats:

from datetime import timedelta

# Named shortcuts
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="hourly")
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="daily")
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="weekly")

# Cron expression (weekdays at 09:00)
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="0 9 * * MON-FRI")

# timedelta
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval=timedelta(hours=6))

# Seconds (int or float)
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval=3600)

Telemetry Job with Callback

from rait_connector import RAITClient, Scheduler
from datetime import timedelta

client = RAITClient()
scheduler = Scheduler(client)

def on_telemetry(result):
    for table, rows in result.items():
        print(f"{table}: {len(rows)} rows")

scheduler.add_telemetry_job(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    interval="daily",
    on_result=on_telemetry,
    timespan=timedelta(days=7),  # passed to fetch_telemetry
)

scheduler.start()

Custom Job (Decorator Style)

from rait_connector import RAITClient, Scheduler

client = RAITClient()
scheduler = Scheduler(client)

@scheduler.job(interval="daily")
def nightly_evaluation():
    prompts = fetch_prompts_from_db()
    client.evaluate_batch(prompts)

scheduler.start()

Custom Job (Imperative Style)

scheduler.add_job(
    func=nightly_evaluation,
    interval="daily",
    name="nightly_eval",
    run_immediately=True,  # also runs once on start
)

scheduler.start()

Blocking Mode (Standalone Script)

Use blocking=True to keep the script alive until Ctrl+C:

scheduler.start(blocking=True)

Wait for Running Jobs to Finish

If you want the script to exit only after all currently-executing jobs complete (e.g. after triggering jobs with run_immediately=True):

import time

scheduler.start()

while scheduler.running_jobs():
    time.sleep(0.5)

scheduler.stop()

Inspecting Jobs

# All registered jobs with next run time
for job in scheduler.status():
    print(job["id"], job["next_run"], "running:", job["is_executing"])

# Jobs currently executing
print(scheduler.running_jobs())

# Past execution history (newest first)
for record in scheduler.history(limit=10):
    status = "OK" if record["success"] else f"ERROR: {record['error']}"
    print(record["job_id"], record["started_at"], status)

# Filter history by job ID
for record in scheduler.history(job_id="rait_telemetry"):
    print(record)

Stopping the Scheduler

scheduler.stop()  # waits for running jobs to finish

Multiple Schedulers

You can run multiple Scheduler instances simultaneously, e.g. one per model:

scheduler_prod = Scheduler(client)
scheduler_staging = Scheduler(client)

scheduler_prod.add_calibration_job(
    model_name="gpt-4", model_version="1.0",
    environment="production", model_purpose="monitoring",
    invoke_model=lambda p: my_llm.generate(p),
    interval="weekly",
)

scheduler_staging.add_calibration_job(
    model_name="gpt-4", model_version="1.0",
    environment="staging", model_purpose="testing",
    invoke_model=lambda p: my_llm.generate(p),
    interval="daily",
)

scheduler_prod.start()
scheduler_staging.start()