Telemetry
Azure Monitor telemetry is collected and posted to the RAIT API by the scheduler,
not embedded in individual evaluate() calls. Use add_telemetry_job() to run
this on a recurring schedule.
Scheduled Telemetry (Recommended)
from rait_connector import RAITClient, Scheduler
client = RAITClient(
azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)
scheduler = Scheduler(client)
scheduler.add_telemetry_job(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
interval="daily",
)
scheduler.start()
On each run the scheduler:
- Fetches data from
AppDependencies,AppExceptions, andAppAvailabilityResults. - Posts the data to the RAIT ingest URL with
log_type="telemetry".
Custom Timespan and Tables
Pass extra keyword arguments to control what fetch_telemetry fetches:
from datetime import timedelta
scheduler.add_telemetry_job(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
interval="daily",
timespan=timedelta(hours=6), # last 6 hours
tables=["AppDependencies", "AppExceptions"], # specific tables
limit=500, # max rows per table
)
Inspect Telemetry Before Posting
Use fetch_telemetry() directly to inspect data without posting:
from datetime import timedelta
from rait_connector import RAITClient
client = RAITClient(
azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)
telemetry = client.fetch_telemetry(
tables=["AppDependencies", "AppExceptions", "AppAvailabilityResults"],
timespan=timedelta(days=1),
limit=100,
)
for table, rows in telemetry.items():
print(f"{table}: {len(rows)} rows")
if rows:
print(f" Columns: {list(rows[0].keys())}")
Post Telemetry Manually
If you want to fetch and post outside the scheduler:
from datetime import timedelta
from rait_connector import RAITClient
client = RAITClient(
azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)
telemetry = client.fetch_telemetry(timespan=timedelta(days=1))
client.post_telemetry(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
telemetry_data=telemetry,
)
Telemetry Callback
React to each telemetry fetch with an on_result callback:
def on_telemetry(result):
for table, rows in result.items():
print(f"{table}: {len(rows)} rows fetched")
scheduler.add_telemetry_job(
model_name="gpt-4",
model_version="1.0",
model_environment="production",
model_purpose="monitoring",
interval="daily",
on_result=on_telemetry,
)
Supported Tables
| Table | Description |
|---|---|
AppDependencies |
External dependency calls (HTTP, DB, etc) |
AppExceptions |
Application exceptions and errors |
AppAvailabilityResults |
Availability test results |
Data Structure
The model_data_logs blob posted to the API contains:
{
"app_dependencies": [...],
"app_exceptions": [...],
"app_availability_results": [...]
}
Using TelemetryClient Directly
For advanced use cases:
from datetime import timedelta
from azure.identity import DefaultAzureCredential
from rait_connector import TelemetryClient
credential = DefaultAzureCredential()
client = TelemetryClient(
credential=credential,
workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)
# Fetch single table
dependencies = client.fetch(
table="AppDependencies",
timespan=timedelta(days=1),
limit=100
)
# Fetch all supported tables
all_data = client.fetch_all(
tables=["AppDependencies", "AppExceptions"],
timespan=timedelta(hours=12)
)