Sagemaker Integration

Amazon SageMaker Integration

Introduction

Integrate Amazon SageMaker with Fiddler to monitor your deployed models effectively. This guide shows you how to create an AWS Lambda function that uses the Fiddler Python client to process SageMaker inference logs from Amazon S3 and send them to your Fiddler instance. This integration provides real-time monitoring capabilities and valuable insights into your model's performance and behavior.

Fiddler AI Observability Platform is now available within Amazon SageMaker AI in SageMaker Unified Studio. This native integration lets SageMaker customers monitor ML models privately and securely without leaving the SageMaker environment.

Learn more about the Amazon SageMaker AI with Fiddler native integration here.

Prerequisites

Before you begin, ensure you have:

  1. An active SageMaker model with:

    • Data capture enabled

    • Inference logs saved to S3 in JSONL format

  2. Access to a Fiddler environment

  3. Your SageMaker model onboarded to Fiddler (See the ML Monitoring Quick Start Guide)

  4. Latest Fiddler Python client version

Implementation Steps

1. Configure SageMaker Data Capture

Ensure your SageMaker endpoint has data capture properly configured:

  1. Open the SageMaker console

  2. Navigate to your model endpoint

  3. Verify data capture is enabled and configured to save to your S3 bucket

  4. Confirm captured data is in JSONL format

2. Create an AWS Lambda Function

  1. Open the AWS Lambda console

  2. Click "Create function"

  3. Configure the basic settings:

    • Name your function (for example, "fiddler-sagemaker-integration")

    • Select Python 3.9 or later as the runtime

    • Choose execution permissions that allow S3 access

3. Set Up Environment Variables

Configure these environment variables in your Lambda function:

Variable
Description
Example

FIDDLER_URL

Your Fiddler environment URL

https://your_company.fiddler.ai

FIDDLER_TOKEN

Your Fiddler authorization token

(secure token value)

FIDDLER_MODEL_UUID

Your model's unique identifier in Fiddler

8a86cc43-71c1-49e7-a01b-d98ae91975bb

MODEL_COLUMNS

Comma-separated list of input column names

feature1,feature2,feature3

MODEL_OUTPUT

Name of the model output column

prediction

MODEL_TIMESTAMP

Name of the timestamp column (optional)

event_time

If you provisioned Fiddler via the SageMaker AI marketplace, add these additional variables:

  • AWS_PARTNER_APP_AUTH: Set to True

  • AWS_PARTNER_APP_ARN: The ARN of your SageMaker AI Fiddler instance

  • AWS_PARTNER_APP_URL: The URL of your SageMaker AI Fiddler instance

AWS Lambda function console on the environment variables tab showing the example variables used in this guide.

4. Configure S3 Trigger

Set up your Lambda to run automatically when new data arrives:

  1. In the Lambda console, select your function

  2. Choose the "Add trigger" option

  3. Select "S3" as the trigger type

  4. Configure these settings:

    • Bucket: Select your SageMaker inference logs bucket

    • Event type: "All object create events"

    • Prefix: (Optional) Specify a path prefix if needed

    • Suffix: .jsonl (to only process JSON Lines files)

AWS Lambda function console on the triggers tab showing the details of the trigger used in this guide.

5. Add Lambda Function Code

AWS Lambda function console on the main view showing the code tab and some of the code used in this guide.

Copy this code into your Lambda function editor:

import os
import json
import uuid
import boto3
import logging
from typing import Dict, List, Any
import fiddler as fdl

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Load environment variables, customize to model and use case
url = os.getenv('FIDDLER_URL')
token = os.getenv('FIDDLER_TOKEN')
model_uuid = os.getenv('FIDDLER_MODEL_UUID')
model_columns = os.getenv('MODEL_COLUMNS')
model_output_column = os.getenv('MODEL_OUTPUT')
timestamp_column = os.getenv('MODEL_TIMESTAMP')

# Initialize AWS clients
s3_client = boto3.client('s3')

# Initialize Fiddler connection and Fiddler Model to receive events
fdl.init(url=url, token=token)
fiddler_model = fdl.Model.get(id_=model_uuid)

def get_all_columns():
    # The types of columns needed when publishing depend on use case. Typically,
    # you would expect to pass at least your model inputs and output(s) and often
    # metadata such as IDs, dates, data segments, etc.
    return model_columns.split(',') + [timestamp_column] + [model_output_column]

def process_jsonl_content(event_data: str) -> Dict[str, Any]:
    input_data = event_data['captureData']['endpointInput']['data']
    input_values = input_data.split(',')  # Split the CSV string into a list
    
    # Extract the model prediction from 'captureData/endpointOutput/data'
    model_prediction = event_data['captureData']['endpointOutput']['data']

    # Optionally, you can set your own timestamp value on the inference occurrence time,
    # or let Fiddler default it to the time of publish.
    timestamp_value = event_data['eventMetadata']['inferenceTime']
  
    # Combine inputs and any metadata values with the output into a single row
    all_values = input_values + [timestamp_value] + [model_prediction]

    # Create dictionary using zip to pair column names with their values
    return dict(zip(get_all_columns(), all_values))

def parse_sagemaker_log(log_file_path: str) -> List[Dict[str, Any]]:
    try:
        # Collect all events in a List, 1 per JSON-line in the file
        event_rows = []
        with open(log_file_path, 'r') as file:
            for line in file:
                event = json.loads(line.strip())
                row = process_jsonl_content(event)
                event_rows.append(row)
        
        return {
            'status': 'success',
            'record_count': len(event_rows),
            'data': event_rows
        }   
        
    except json.JSONDecodeError as e:
        logger.error(f'Error parsing JSONL content: {str(e)}')
        raise

def publish_to_fiddler(inferences: List[Dict[str, Any]], model: fdl.Model):
    # There are multiple options for publishing data to Fiddler, check
    # the online documentation for batch, streaming, and REST API options.
    # The below publish call will use a streaming approach managed by 
    # the Fiddler Python client internally based on the volume of inferences.

    event_ids = model.publish(
        source=inferences,
        environment=fdl.EnvType.PRODUCTION
    )

    return event_ids 

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    # Process each record in the event, streaming to Fiddler in batches
    for record in event['Records']:
        # Extract bucket and key information
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        logger.info(f'Processing new file: {key} from bucket: {bucket}')

        # Persist log file to a temporary location 
        tmp_key = key.replace('/', '')
        download_path = f'/tmp/{uuid.uuid4()}{tmp_key}'
        s3_client.download_file(bucket, key, download_path)

        # Retrieve the inference event(s) from the log file
        results = parse_sagemaker_log(download_path)

        # Check if the log file was processed successfully
        if results['status'] != 'success':
            logger.error(f'Error processing log file: {key}')
            return {
                'statusCode': 500,
                'body': {'message': 'Error processing log file', 'results': results},
            }

        # Push the inference events to Fiddler
        event_ids = publish_to_fiddler(results["data"], fiddler_model)
        logger.info(f'Published events to Fiddler with ID(s): {event_ids}')

    return {
        'statusCode': 200,
        'body': {'message': 'Successfully processed events', 'results': results},
    }

Last updated

Was this helpful?