The following Python script can be used to define a AWS Lambda function that can move your SageMaker inference logs from an S3 bucket to a Fiddler environment.
Setup
In addition to pasting this code into your Lambda function, you will need to ensure the following steps are completed before the integration will work.
Make sure your model is actively being served by SageMaker and that you have enabled data capture for your SageMaker hosted models so that your model inferences are stored in a S3 bucket as JSONL files.
Make sure you have a Fiddler environment and your SageMaker model is onboarded with Fiddler. Check out our ML Monitoring - Simple Quick Start Guide for guidance on how to onboard your models.
Make sure to specify the environment variables in the “Configuration” section of your Lambda function so that the Lambda knows how to connect with your Fiddler environment and so it knows what inputs and outputs to expect in the JSONL files captured by your SageMaker model.
Make sure you have set up a trigger on your Lambda function so that the function is called upon “Object creation” events in your model’s S3 bucket.
Make sure you paste the following code into your new Lambda function.
Make sure that your Lambda function references the Fiddler ARN for the Layer that encapsulates the Fiddler Python client. (arn:aws:lambda:us-west-2:079310353266:layer:fiddler-client-0814:1)
Script
import fiddler as fdlimport jsonimport boto3import osimport pandas as pdimport sysimport uuidfrom urllib.parse import unquote_plusimport csvimport jsonimport base64from io import StringIOfrom botocore.vendored import requestss3_client = boto3.client('s3')url = os.getenv('FIDDLER_URL')org = os.getenv('FIDDLER_ORG')token = os.getenv('FIDDLER_TOKEN')project = os.getenv('FIDDLER_PROJECT')model = os.getenv('FIDDLER_MODEL')timestamp_field = os.getenv('FIDDLER_TIMESTAMP_FIELD', None)# optional argid_field = os.getenv('FIDDLER_ID_FIELD', None)# optional argcredentials = os.getenv('FIDDLER_AWS_CREDENTIALS', '{}')# optional arg, json stringstring_in_features = os.getenv('FEATURE_INPUTS')out_feature = os.getenv('MODEL_OUTPUT')deflambda_handler(event,context):for record in event['Records']: bucket = record['s3']['bucket']['name'] key =unquote_plus(record['s3']['object']['key']) tmpkey = key.replace('/', '') download_path ='/tmp/{}{}'.format(uuid.uuid4(), tmpkey) s3_client.download_file(bucket, key, download_path)parse_sagemaker_log(download_path)return{'statusCode':200,'body': json.dumps('Successful Lambda Publishing Run')}defparse_sagemaker_log(log_file):withopen(log_file)as f: result ={} resultList = [] in_features= string_in_features.replace("'", "").split(',')for line in f: pline = json.loads(line)input= pline['captureData']['endpointInput']['data'] inputstr =StringIO(input) output = pline['captureData']['endpointOutput']['data'] outputstr =StringIO(output) outarray =list(csv.reader(outputstr, delimiter=',')) new_outarray = [float(x)for x in outarray[0]] csvReader = csv.reader(inputstr, delimiter=',') j =0for row in csvReader: input_dict ={in_features[i]: row[i]for i inrange(len(row))} pred_dict ={out_feature:new_outarray[j]} result.update(input_dict) result.update(pred_dict) result['__event_type']='execution_event' resultList.append(result) j= j+1 df = pd.DataFrame(resultList)print("Data frame : ", df)publish_event(df, log_file)defassert_envs():""" Asserting presence of required environmental variables: - FIDDLER_URL - FIDDLER_ORG - FIDDLER_TOKEN - FIDDLER_PROJECT - FIDDLER_MODEL """try:assert url isnotNone,'`FIDDLER_URL` env variable must be set.'assert org isnotNone,'`FIDDLER_ORG` env variable must be set.'assert token isnotNone,'`FIDDLER_TOKEN` env variable must be set.'assert project isnotNone,'`FIDDLER_PROJECT` env variable must be set.'assert model isnotNone,'`FIDDLER_MODEL` env variable must be set.'returnNoneexceptExceptionas e:log(f'ERROR: Env Variable assertion failed: {str(e)}')return{'statusCode':500,'body': json.dumps(f'ERROR: Env Variable assertion failed: {str(e)}'),}deflog(out):print(out)defpublish_event(df,log_file): client = fdl.FiddlerApi(url=url, org_id=org, auth_token=token)log(f'Publishing events for file JSON for S3 file '+str(log_file)) res = client.publish_events_batch( project_id=project, model_id=model, batch_source=df, data_source=fdl.BatchPublishType.DATAFRAME, timestamp_field=timestamp_field, )log(res)