Scheduler
Examples of running recurring RAIT jobs on a schedule using the Scheduler class.
Basic Setup
from rait_connector import RAITClient, Scheduler
client = RAITClient()
scheduler = Scheduler(client)
# Fetch and post Azure Monitor telemetry every day
scheduler.add_telemetry_job(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
interval="daily",
)
# Collect and post calibration responses every week
scheduler.add_calibration_job(
model_name="gpt-4",
model_version="1.0",
environment="production",
model_purpose="monitoring",
invoke_model=lambda prompt: my_llm.generate(prompt),
interval="weekly",
)
scheduler.start() # runs in background; program continues
Interval Options
The interval parameter accepts several formats:
from datetime import timedelta
# Named shortcuts
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="hourly")
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="daily")
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="weekly")
# Cron expression (weekdays at 09:00)
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval="0 9 * * MON-FRI")
# timedelta
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval=timedelta(hours=6))
# Seconds (int or float)
scheduler.add_telemetry_job(model_name="gpt-4", model_version="1.0", model_environment="production", model_purpose="monitoring", interval=3600)
Telemetry Job with Callback
from rait_connector import RAITClient, Scheduler
from datetime import timedelta
client = RAITClient()
scheduler = Scheduler(client)
def on_telemetry(result):
for table, rows in result.items():
print(f"{table}: {len(rows)} rows")
scheduler.add_telemetry_job(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
interval="daily",
on_result=on_telemetry,
timespan=timedelta(days=7), # passed to fetch_telemetry
)
scheduler.start()
Custom Job (Decorator Style)
from rait_connector import RAITClient, Scheduler
client = RAITClient()
scheduler = Scheduler(client)
@scheduler.job(interval="daily")
def nightly_evaluation():
prompts = fetch_prompts_from_db()
client.evaluate_batch(prompts)
scheduler.start()
Custom Job (Imperative Style)
scheduler.add_job(
func=nightly_evaluation,
interval="daily",
name="nightly_eval",
run_immediately=True, # also runs once on start
)
scheduler.start()
Blocking Mode (Standalone Script)
Use blocking=True to keep the script alive until Ctrl+C:
scheduler.start(blocking=True)
Wait for Running Jobs to Finish
If you want the script to exit only after all currently-executing jobs complete
(e.g. after triggering jobs with run_immediately=True):
import time
scheduler.start()
while scheduler.running_jobs():
time.sleep(0.5)
scheduler.stop()
Inspecting Jobs
# All registered jobs with next run time
for job in scheduler.status():
print(job["id"], job["next_run"], "running:", job["is_executing"])
# Jobs currently executing
print(scheduler.running_jobs())
# Past execution history (newest first)
for record in scheduler.history(limit=10):
status = "OK" if record["success"] else f"ERROR: {record['error']}"
print(record["job_id"], record["started_at"], status)
# Filter history by job ID
for record in scheduler.history(job_id="rait_telemetry"):
print(record)
Stopping the Scheduler
scheduler.stop() # waits for running jobs to finish
Multiple Schedulers
You can run multiple Scheduler instances simultaneously, e.g. one per model:
scheduler_prod = Scheduler(client)
scheduler_staging = Scheduler(client)
scheduler_prod.add_calibration_job(
model_name="gpt-4", model_version="1.0",
environment="production", model_purpose="monitoring",
invoke_model=lambda p: my_llm.generate(p),
interval="weekly",
)
scheduler_staging.add_calibration_job(
model_name="gpt-4", model_version="1.0",
environment="staging", model_purpose="testing",
invoke_model=lambda p: my_llm.generate(p),
interval="daily",
)
scheduler_prod.start()
scheduler_staging.start()