Skip to content

Telemetry

Azure Monitor telemetry is collected and posted to the RAIT API by the scheduler, not embedded in individual evaluate() calls. Use add_telemetry_job() to run this on a recurring schedule.

from rait_connector import RAITClient, Scheduler

client = RAITClient(
    azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)
scheduler = Scheduler(client)

scheduler.add_telemetry_job(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    interval="daily",
)

scheduler.start()

On each run the scheduler:

  1. Fetches data from AppDependencies, AppExceptions, and AppAvailabilityResults.
  2. Posts the data to the RAIT ingest URL with log_type="telemetry".

Custom Timespan and Tables

Pass extra keyword arguments to control what fetch_telemetry fetches:

from datetime import timedelta

scheduler.add_telemetry_job(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    interval="daily",
    timespan=timedelta(hours=6),                          # last 6 hours
    tables=["AppDependencies", "AppExceptions"],          # specific tables
    limit=500,                                            # max rows per table
)

Inspect Telemetry Before Posting

Use fetch_telemetry() directly to inspect data without posting:

from datetime import timedelta
from rait_connector import RAITClient

client = RAITClient(
    azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)

telemetry = client.fetch_telemetry(
    tables=["AppDependencies", "AppExceptions", "AppAvailabilityResults"],
    timespan=timedelta(days=1),
    limit=100,
)

for table, rows in telemetry.items():
    print(f"{table}: {len(rows)} rows")
    if rows:
        print(f"  Columns: {list(rows[0].keys())}")

Post Telemetry Manually

If you want to fetch and post outside the scheduler:

from datetime import timedelta
from rait_connector import RAITClient

client = RAITClient(
    azure_log_analytics_workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)

telemetry = client.fetch_telemetry(timespan=timedelta(days=1))

client.post_telemetry(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    telemetry_data=telemetry,
)

Telemetry Callback

React to each telemetry fetch with an on_result callback:

def on_telemetry(result):
    for table, rows in result.items():
        print(f"{table}: {len(rows)} rows fetched")

scheduler.add_telemetry_job(
    model_name="gpt-4",
    model_version="1.0",
    model_environment="production",
    model_purpose="monitoring",
    interval="daily",
    on_result=on_telemetry,
)

Supported Tables

Table Description
AppDependencies External dependency calls (HTTP, DB, etc)
AppExceptions Application exceptions and errors
AppAvailabilityResults Availability test results

Data Structure

The model_data_logs blob posted to the API contains:

{
    "app_dependencies": [...],
    "app_exceptions": [...],
    "app_availability_results": [...]
}

Using TelemetryClient Directly

For advanced use cases:

from datetime import timedelta
from azure.identity import DefaultAzureCredential
from rait_connector import TelemetryClient

credential = DefaultAzureCredential()
client = TelemetryClient(
    credential=credential,
    workspace_id="a1a4fc6d-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
)

# Fetch single table
dependencies = client.fetch(
    table="AppDependencies",
    timespan=timedelta(days=1),
    limit=100
)

# Fetch all supported tables
all_data = client.fetch_all(
    tables=["AppDependencies", "AppExceptions"],
    timespan=timedelta(hours=12)
)