# Data Platforms Overview

Integrate Fiddler with your existing data infrastructure to seamlessly ingest training data, production events, and ground truth labels. From cloud data warehouses to real-time streaming platforms, Fiddler connects to the data sources you already use.

## Why Data Integration Matters

AI observability requires continuous data flow from your ML pipelines and applications. Fiddler's data platform integrations enable:

* **Automated Data Ingestion** - Pull training datasets and production events without manual uploads
* **Real-Time Monitoring** - Stream prediction events for immediate drift and performance detection
* **Unified Data Pipeline** - Single integration point for all your ML data sources
* **Ground Truth Enrichment** - Automatically join production predictions with delayed labels
* **Historical Analysis** - Query data warehouse for model performance over time

## Integration Categories

### 🏢 Data Warehouses

Connect Fiddler to your cloud data warehouse for batch data ingestion and historical analysis.

**Supported Platforms:**

* [**Snowflake**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/snowflake-integration) - Cloud data warehouse with zero-copy data sharing ✓ **GA**
* [**Google BigQuery**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/bigquery-integration) - Serverless data warehouse with SQL analytics ✓ **GA**

**Common Use Cases:**

* Import training datasets from data warehouse tables
* Query historical model predictions for performance analysis
* Join production events with delayed ground truth labels
* Export Fiddler metrics back to warehouse for BI tools

### 📊 Data Streaming

Stream real-time prediction events and feedback directly to Fiddler for immediate observability.

**Supported Platforms:**

* [**Apache Kafka**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/kafka-integration) - Distributed event streaming platform ✓ **GA**
* [**Amazon S3**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/integration-with-s3) - Object storage with event notifications ✓ **GA**

**Common Use Cases:**

* Stream model predictions in real-time from production services
* Monitor agentic AI interactions as they occur
* Trigger alerts on data quality issues within seconds
* Capture ground truth feedback from user interactions

### 🔄 Orchestration & Pipelines

Integrate Fiddler into your ML workflow orchestration for automated monitoring at every pipeline stage.

**Supported Platforms:**

* [**Apache Airflow**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/airflow-integration) - Workflow orchestration platform ✓ **GA**
* [**AWS SageMaker Pipelines**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/sagemaker-integration) - Managed ML pipeline service ✓ **GA**

**Common Use Cases:**

* Automatically upload datasets when training pipelines complete
* Trigger model evaluation as part of CI/CD workflows
* Schedule periodic drift checks and performance reports
* Orchestrate ground truth label collection and enrichment

## Data Warehouse Integrations

### Snowflake

**Why Snowflake + Fiddler:**

* **Zero-Copy Data Sharing** - No data duplication, direct queries to Snowflake
* **Secure Data Access** - OAuth 2.0 and key-pair authentication
* **Scalable Analytics** - Leverage Snowflake's compute for large datasets
* **Cost-Effective** - Pay only for queries executed, no data transfer fees

**Quick Start:**

```sql
-- Create Fiddler integration user
CREATE USER fiddler_user PASSWORD='...';
CREATE ROLE fiddler_role;
GRANT ROLE fiddler_role TO USER fiddler_user;

-- Grant permissions to your ML data
GRANT USAGE ON DATABASE ml_data TO ROLE fiddler_role;
GRANT USAGE ON SCHEMA ml_data.predictions TO ROLE fiddler_role;
GRANT SELECT ON ALL TABLES IN SCHEMA ml_data.predictions TO ROLE fiddler_role;
```

[**Full Snowflake Setup Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/snowflake-integration)

### Google BigQuery

**Why BigQuery + Fiddler:**

* **Serverless Architecture** - No infrastructure management
* **SQL-Based Queries** - Familiar interface for data teams
* **Federated Queries** - Join Fiddler data with other GCP sources
* **Machine Learning** - BigQuery ML model monitoring integration

**Quick Start:**

```python
from fiddler import FiddlerClient

# Connect Fiddler to BigQuery
client = FiddlerClient(api_key="fid_...")
client.add_bigquery_connection(
    project_id="your-gcp-project",
    dataset_id="ml_predictions",
    credentials_path="service-account-key.json"
)

# Import dataset from BigQuery table
client.upload_dataset_from_bigquery(
    project="your-project",
    dataset="ml_predictions",
    table="credit_risk_predictions"
)
```

[**Full BigQuery Setup Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/bigquery-integration)

## Streaming Integrations

### Apache Kafka

**Why Kafka + Fiddler:**

* **Real-Time Monitoring** - Sub-second latency from prediction to observability
* **High Throughput** - Handle millions of events per second
* **Event Replay** - Replay historical events for testing and validation
* **Exactly-Once Semantics** - Guaranteed delivery for critical predictions

**Architecture Pattern:**

```
Prediction Service
  ↓ (publish)
Kafka Topic: model-predictions
  ↓ (consume)
Fiddler Kafka Consumer
  ↓ (ingest)
Fiddler Platform
```

**Quick Start:**

```python
from fiddler import FiddlerClient

# Configure Kafka consumer for Fiddler
client = FiddlerClient(api_key="fid_...")
client.add_kafka_source(
    bootstrap_servers="kafka-broker:9092",
    topic="model-predictions",
    group_id="fiddler-consumer",

    # Message format
    value_deserializer="json",
    schema={
        "prediction_id": "string",
        "model_id": "string",
        "prediction": "float",
        "features": "object"
    }
)
```

[**Full Kafka Setup Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/kafka-integration)

### Amazon S3

**Why S3 + Fiddler:**

* **Batch Processing** - Ingest large datasets efficiently
* **Event Notifications** - Automatic processing when new files arrive
* **Data Lake Integration** - Monitor models trained on S3 data lakes
* **Cost-Effective Storage** - Archive historical predictions in S3

**Integration Patterns:**

**Pattern 1: Scheduled Batch Upload**

```python
from fiddler import FiddlerClient

client = FiddlerClient(api_key="fid_...")

# Upload dataset from S3
client.upload_dataset_from_s3(
    project="fraud-detection",
    dataset="production-predictions",
    s3_uri="s3://my-bucket/predictions/2024-11-10.parquet",
    format="parquet"
)
```

**Pattern 2: Event-Driven Upload**

```python
# AWS Lambda triggered by S3 event
import json
from fiddler import FiddlerClient

def lambda_handler(event, context):
    # Extract S3 bucket and key from event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Upload to Fiddler
    client = FiddlerClient(api_key=os.environ['FIDDLER_API_KEY'])
    client.upload_dataset_from_s3(
        project="my-project",
        dataset="streaming-data",
        s3_uri=f"s3://{bucket}/{key}"
    )
```

[**Full S3 Setup Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/integration-with-s3)

## Orchestration & Pipeline Integrations

### Apache Airflow

**Why Airflow + Fiddler:**

* **Automated Workflows** - Trigger Fiddler operations as DAG tasks
* **Dependency Management** - Ensure data quality before model training
* **Scheduling** - Periodic drift checks and model evaluations
* **Observability** - Monitor ML pipelines and models in one platform

**Example DAG:**

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from fiddler_airflow import FiddlerDatasetUploadOperator, FiddlerModelEvaluationOperator
from datetime import datetime

dag = DAG(
    'ml_pipeline_with_monitoring',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Train model
train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model_func,
    dag=dag
)

# Upload training data to Fiddler
upload_task = FiddlerDatasetUploadOperator(
    task_id='upload_to_fiddler',
    project='credit-risk',
    dataset='training_data',
    data_source='s3://my-bucket/training-data.csv',
    dag=dag
)

# Evaluate model in Fiddler
eval_task = FiddlerModelEvaluationOperator(
    task_id='evaluate_model',
    project='credit-risk',
    model='risk_model_v2',
    baseline_dataset='training_data',
    dag=dag
)

train_task >> upload_task >> eval_task
```

[**Full Airflow Setup Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/airflow-integration)

### AWS SageMaker Pipelines

**Why SageMaker Pipelines + Fiddler:**

* **Native AWS Integration** - Seamless with SageMaker Partner AI App
* **End-to-End ML Workflows** - From data prep to model monitoring
* **Model Registry Integration** - Automatic monitoring setup for registered models
* **Cost Optimization** - Leverage existing SageMaker infrastructure

**Example Pipeline:**

```python
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.functions import Join
from fiddler_sagemaker import FiddlerMonitoringStep

# Define SageMaker pipeline steps
processing_step = ProcessingStep(...)
training_step = TrainingStep(...)

# Add Fiddler monitoring step
fiddler_step = FiddlerMonitoringStep(
    name="SetupFiddlerMonitoring",
    model_name=training_step.properties.ModelName,
    baseline_dataset_s3_uri=processing_step.properties.ProcessingOutputConfig.Outputs["baseline"].S3Output.S3Uri,
    fiddler_project="sagemaker-models",
    depends_on=[training_step]
)

# Create pipeline
pipeline = Pipeline(
    name="MLPipelineWithFiddler",
    steps=[processing_step, training_step, fiddler_step]
)
```

[**Full SageMaker Pipelines Guide →**](https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms/sagemaker-integration)

## Integration Selector

Not sure which data integration to use? Here's a quick decision guide:

| Your Data Source             | Recommended Integration | Why                                    |
| ---------------------------- | ----------------------- | -------------------------------------- |
| Snowflake data warehouse     | **Snowflake connector** | Zero-copy sharing, direct queries      |
| BigQuery tables              | **BigQuery connector**  | Serverless, SQL-based, GCP-native      |
| Real-time prediction streams | **Kafka integration**   | Sub-second latency, high throughput    |
| S3 data lake                 | **S3 integration**      | Batch processing, event-driven uploads |
| Airflow ML pipelines         | **Airflow operators**   | Automated workflows, task dependencies |
| SageMaker workflows          | **SageMaker Pipelines** | Native AWS integration, model registry |

## Getting Started

### Prerequisites

Before setting up data integrations, ensure you have:

* **Fiddler Account** - Cloud or on-premises instance
* **API Key** - Generate from Fiddler UI Settings
* **Data Source Access** - Credentials with read permissions
* **Network Connectivity** - Firewall rules allowing Fiddler → Data Source

### General Setup Pattern

All data integrations follow this pattern:

**1. Configure Connection**

```python
from fiddler import FiddlerClient

client = FiddlerClient(
    api_key="fid_...",
    url="https://app.fiddler.ai"
)

# Add data source
client.add_data_source(
    name="my-data-warehouse",
    type="snowflake",  # or bigquery, kafka, s3
    connection_params={...}
)
```

**2. Test Connection**

```python
# Verify connectivity
status = client.test_data_source("my-data-warehouse")
print(f"Connection status: {status}")
```

**3. Ingest Data**

```python
# Upload dataset from data source
client.upload_dataset_from_source(
    project="my-project",
    dataset="training-data",
    source="my-data-warehouse",
    query="SELECT * FROM predictions WHERE date > '2024-01-01'"
)
```

## Advanced Patterns

### Pattern 1: Multi-Source Data Enrichment

Combine data from multiple sources for comprehensive monitoring:

```python
# Production predictions from Kafka
client.add_kafka_source(
    topic="production-predictions",
    ...
)

# Ground truth labels from Snowflake
client.add_snowflake_source(
    database="ml_data",
    schema="labels",
    ...
)

# Join predictions with labels
client.configure_ground_truth_enrichment(
    project="fraud-detection",
    prediction_source="production-predictions",
    label_source="ml_data.labels.fraud_labels",
    join_key="transaction_id",
    time_window_hours=24
)
```

### Pattern 2: Data Quality Validation

Validate data quality before ingestion:

```python
from fiddler import DataValidator

validator = DataValidator()
validator.add_rule("age", min_value=18, max_value=100)
validator.add_rule("income", not_null=True)
validator.add_rule("credit_score", pattern=r'^\d{3}$')

# Validate before upload
is_valid = client.upload_dataset_from_source(
    ...,
    validator=validator,
    on_validation_failure="abort"  # or "warn" or "ignore"
)
```

### Pattern 3: Incremental Updates

Efficiently update datasets with only new data:

```python
# Initial full load
client.upload_dataset_from_source(
    project="my-project",
    dataset="user-events",
    source="my-bigquery",
    query="SELECT * FROM events"
)

# Daily incremental updates (scheduled via Airflow)
client.update_dataset_from_source(
    project="my-project",
    dataset="user-events",
    source="my-bigquery",
    query="SELECT * FROM events WHERE date = CURRENT_DATE()",
    mode="append"  # or "upsert"
)
```

## Data Format Requirements

### Baseline/Training Data

Must include:

* **Features** - All model input features
* **Predictions** - Model outputs (for validation)
* **Metadata** (optional) - Additional context fields

**Example Schema:**

```json
{
  "transaction_id": "string",
  "amount": "float",
  "merchant_category": "string",
  "prediction": "float",
  "timestamp": "datetime"
}
```

### Production Event Data

Must include:

* **Event ID** - Unique identifier
* **Timestamp** - Event time
* **Features** - Model inputs
* **Predictions** - Model outputs
* **Model Version** (optional) - For multi-model monitoring

**Example Kafka Message:**

```json
{
  "event_id": "evt_12345",
  "timestamp": "2024-11-10T14:32:01Z",
  "model": "fraud_detector_v3",
  "features": {
    "amount": 127.50,
    "merchant": "AMZN"
  },
  "prediction": 0.23
}
```

## Security & Compliance

### Authentication Methods

**Snowflake:**

* Username/Password
* Key Pair Authentication (recommended for production)
* OAuth 2.0

**BigQuery:**

* Service Account JSON key
* Application Default Credentials
* Workload Identity (GKE)

**Kafka:**

* SASL/PLAIN
* SASL/SCRAM
* mTLS

**S3:**

* IAM Role (recommended for AWS deployments)
* Access Key / Secret Key
* Cross-account access via IAM role assumption

### Data Privacy

* **Encryption in Transit** - TLS 1.3 for all data transfers
* **Encryption at Rest** - Data encrypted in Fiddler storage
* **PII Redaction** - Automatically detect and redact sensitive fields
* **Data Retention** - Configurable retention policies per dataset

### Network Security

**Firewall Rules:**

```
Source: Fiddler Platform IP ranges
Destination: Your Data Source
Ports:
  - Snowflake: 443 (HTTPS)
  - BigQuery: 443 (HTTPS)
  - Kafka: 9092/9093 (SASL/SSL)
  - S3: 443 (HTTPS)
```

**Private Connectivity:**

* **AWS PrivateLink** - For SageMaker Partner AI App
* **VPC Peering** - Direct connection to data sources
* **VPN Tunnels** - Secure connectivity for on-premises sources

## Monitoring Data Pipeline Health

### Connection Health Checks

```python
# Check all data source connections
sources = client.list_data_sources()
for source in sources:
    status = client.test_data_source(source.name)
    print(f"{source.name}: {status.status} (latency: {status.latency_ms}ms)")
```

### Data Ingestion Metrics

Monitor data pipeline performance:

* **Ingestion Latency** - Time from source to Fiddler
* **Throughput** - Events per second processed
* **Error Rate** - Failed ingestion attempts
* **Data Freshness** - Time since last successful update

### Alerts on Pipeline Failures

```python
# Configure alert for data pipeline issues
client.create_alert(
    name="Data Pipeline Failure",
    trigger="data_ingestion_failed",
    source="kafka-production-predictions",
    notification_channels=["email", "pagerduty"]
)
```

## Troubleshooting

### Common Issues

**Connection Timeouts:**

* Check network connectivity and firewall rules
* Verify credentials are current and have proper permissions
* Ensure data source is reachable from Fiddler's network

**Schema Mismatches:**

* Validate data types match Fiddler's expected schema
* Check for null values in required fields
* Ensure timestamp fields use supported formats (ISO 8601)

**High Latency:**

* For Kafka: Check consumer lag and partition count
* For Data Warehouses: Optimize queries, add indexes
* For S3: Use Parquet or ORC instead of CSV

**Data Quality Issues:**

* Enable data validation rules before ingestion
* Set up alerts for out-of-range values
* Configure automatic PII redaction

## Related Integrations

* [**Cloud Platforms**](https://docs.fiddler.ai/integrations/cloud-platforms-and-deployment/cloud-platforms) - Deploy Fiddler on AWS, Azure, GCP
* [**ML Platforms**](https://docs.fiddler.ai/integrations/ml-platforms-and-tools/ml-platforms) - Integrate with Databricks, MLflow
* [**Agentic AI**](https://docs.fiddler.ai/integrations/agentic-ai-and-llm-frameworks/agentic-ai) - Monitor LangGraph and Strands Agents
* [**Monitoring & Alerting**](https://docs.fiddler.ai/integrations/monitoring-and-alerting/monitoring-alerting) - Send alerts to incident management tools

***
