# Data Platforms Overview

Integrate Fiddler with your existing data infrastructure to seamlessly ingest training data, production events, and ground truth labels. From cloud data warehouses to real-time streaming platforms, Fiddler connects to the data sources you already use.

## Why Data Integration Matters

AI observability requires continuous data flow from your ML pipelines and applications. Fiddler's data platform integrations enable:

* **Automated Data Ingestion** - Pull training datasets and production events without manual uploads
* **Real-Time Monitoring** - Stream prediction events for immediate drift and performance detection
* **Unified Data Pipeline** - Single integration point for all your ML data sources
* **Ground Truth Enrichment** - Automatically join production predictions with delayed labels
* **Historical Analysis** - Query data warehouse for model performance over time

## Integration Categories

### 🏢 Data Warehouses

Connect Fiddler to your cloud data warehouse for batch data ingestion and historical analysis.

**Supported Platforms:**

* [**Snowflake**](/integrations/data-platforms-and-pipelines/data-platforms/snowflake-integration.md) - Cloud data warehouse with zero-copy data sharing ✓ **GA**
* [**Google BigQuery**](/integrations/data-platforms-and-pipelines/data-platforms/bigquery-integration.md) - Serverless data warehouse with SQL analytics ✓ **GA**

**Common Use Cases:**

* Import training datasets from data warehouse tables
* Query historical model predictions for performance analysis
* Join production events with delayed ground truth labels
* Export Fiddler metrics back to warehouse for BI tools

### 📊 Data Streaming

Stream real-time prediction events and feedback directly to Fiddler for immediate observability.

**Supported Platforms:**

* [**Apache Kafka**](/integrations/data-platforms-and-pipelines/data-platforms/kafka-integration.md) - Distributed event streaming platform ✓ **GA**
* [**Amazon S3**](/integrations/data-platforms-and-pipelines/data-platforms/integration-with-s3.md) - Object storage with event notifications ✓ **GA**

**Common Use Cases:**

* Stream model predictions in real-time from production services
* Monitor agentic AI interactions as they occur
* Trigger alerts on data quality issues within seconds
* Capture ground truth feedback from user interactions

### 🔄 Orchestration & Pipelines

Integrate Fiddler into your ML workflow orchestration for automated monitoring at every pipeline stage.

**Supported Platforms:**

* [**Apache Airflow**](/integrations/data-platforms-and-pipelines/data-platforms/airflow-integration.md) - Workflow orchestration platform ✓ **GA**
* [**AWS SageMaker Pipelines**](/integrations/data-platforms-and-pipelines/data-platforms/sagemaker-integration.md) - Managed ML pipeline service ✓ **GA**

**Common Use Cases:**

* Automatically upload datasets when training pipelines complete
* Trigger model evaluation as part of CI/CD workflows
* Schedule periodic drift checks and performance reports
* Orchestrate ground truth label collection and enrichment

## Data Warehouse Integrations

### Snowflake

**Why Snowflake + Fiddler:**

* **Zero-Copy Data Sharing** - No data duplication, direct queries to Snowflake
* **Secure Data Access** - OAuth 2.0 and key-pair authentication
* **Scalable Analytics** - Leverage Snowflake's compute for large datasets
* **Cost-Effective** - Pay only for queries executed, no data transfer fees

**Quick Start:**

```sql
-- Create Fiddler integration user
CREATE USER fiddler_user PASSWORD='...';
CREATE ROLE fiddler_role;
GRANT ROLE fiddler_role TO USER fiddler_user;

-- Grant permissions to your ML data
GRANT USAGE ON DATABASE ml_data TO ROLE fiddler_role;
GRANT USAGE ON SCHEMA ml_data.predictions TO ROLE fiddler_role;
GRANT SELECT ON ALL TABLES IN SCHEMA ml_data.predictions TO ROLE fiddler_role;
```

[**Full Snowflake Setup Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/snowflake-integration.md)

### Google BigQuery

**Why BigQuery + Fiddler:**

* **Serverless Architecture** - No infrastructure management
* **SQL-Based Queries** - Familiar interface for data teams
* **Federated Queries** - Join Fiddler data with other GCP sources
* **Machine Learning** - BigQuery ML model monitoring integration

**Quick Start:**

```python
from fiddler import FiddlerClient

# Connect Fiddler to BigQuery
client = FiddlerClient(api_key="fid_...")
client.add_bigquery_connection(
    project_id="your-gcp-project",
    dataset_id="ml_predictions",
    credentials_path="service-account-key.json"
)

# Import dataset from BigQuery table
client.upload_dataset_from_bigquery(
    project="your-project",
    dataset="ml_predictions",
    table="credit_risk_predictions"
)
```

[**Full BigQuery Setup Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/bigquery-integration.md)

## Streaming Integrations

### Apache Kafka

**Why Kafka + Fiddler:**

* **Real-Time Monitoring** - Sub-second latency from prediction to observability
* **High Throughput** - Handle millions of events per second
* **Event Replay** - Replay historical events for testing and validation
* **Exactly-Once Semantics** - Guaranteed delivery for critical predictions

**Architecture Pattern:**

```
Prediction Service
  ↓ (publish)
Kafka Topic: model-predictions
  ↓ (consume)
Fiddler Kafka Consumer
  ↓ (ingest)
Fiddler Platform
```

**Quick Start:**

```python
from fiddler import FiddlerClient

# Configure Kafka consumer for Fiddler
client = FiddlerClient(api_key="fid_...")
client.add_kafka_source(
    bootstrap_servers="kafka-broker:9092",
    topic="model-predictions",
    group_id="fiddler-consumer",

    # Message format
    value_deserializer="json",
    schema={
        "prediction_id": "string",
        "model_id": "string",
        "prediction": "float",
        "features": "object"
    }
)
```

[**Full Kafka Setup Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/kafka-integration.md)

### Amazon S3

**Why S3 + Fiddler:**

* **Batch Processing** - Ingest large datasets efficiently
* **Event Notifications** - Automatic processing when new files arrive
* **Data Lake Integration** - Monitor models trained on S3 data lakes
* **Cost-Effective Storage** - Archive historical predictions in S3

**Integration Patterns:**

**Pattern 1: Scheduled Batch Upload**

```python
from fiddler import FiddlerClient

client = FiddlerClient(api_key="fid_...")

# Upload dataset from S3
client.upload_dataset_from_s3(
    project="fraud-detection",
    dataset="production-predictions",
    s3_uri="s3://my-bucket/predictions/2024-11-10.parquet",
    format="parquet"
)
```

**Pattern 2: Event-Driven Upload**

```python
# AWS Lambda triggered by S3 event
import json
from fiddler import FiddlerClient

def lambda_handler(event, context):
    # Extract S3 bucket and key from event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Upload to Fiddler
    client = FiddlerClient(api_key=os.environ['FIDDLER_API_KEY'])
    client.upload_dataset_from_s3(
        project="my-project",
        dataset="streaming-data",
        s3_uri=f"s3://{bucket}/{key}"
    )
```

[**Full S3 Setup Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/integration-with-s3.md)

## Orchestration & Pipeline Integrations

### Apache Airflow

**Why Airflow + Fiddler:**

* **Automated Workflows** - Trigger Fiddler operations as DAG tasks
* **Dependency Management** - Ensure data quality before model training
* **Scheduling** - Periodic drift checks and model evaluations
* **Observability** - Monitor ML pipelines and models in one platform

**Example DAG:**

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from fiddler_airflow import FiddlerDatasetUploadOperator, FiddlerModelEvaluationOperator
from datetime import datetime

dag = DAG(
    'ml_pipeline_with_monitoring',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Train model
train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model_func,
    dag=dag
)

# Upload training data to Fiddler
upload_task = FiddlerDatasetUploadOperator(
    task_id='upload_to_fiddler',
    project='credit-risk',
    dataset='training_data',
    data_source='s3://my-bucket/training-data.csv',
    dag=dag
)

# Evaluate model in Fiddler
eval_task = FiddlerModelEvaluationOperator(
    task_id='evaluate_model',
    project='credit-risk',
    model='risk_model_v2',
    baseline_dataset='training_data',
    dag=dag
)

train_task >> upload_task >> eval_task
```

[**Full Airflow Setup Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/airflow-integration.md)

### AWS SageMaker Pipelines

**Why SageMaker Pipelines + Fiddler:**

* **Native AWS Integration** - Seamless with SageMaker Partner AI App
* **End-to-End ML Workflows** - From data prep to model monitoring
* **Model Registry Integration** - Automatic monitoring setup for registered models
* **Cost Optimization** - Leverage existing SageMaker infrastructure

**Example Pipeline:**

```python
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.functions import Join
from fiddler_sagemaker import FiddlerMonitoringStep

# Define SageMaker pipeline steps
processing_step = ProcessingStep(...)
training_step = TrainingStep(...)

# Add Fiddler monitoring step
fiddler_step = FiddlerMonitoringStep(
    name="SetupFiddlerMonitoring",
    model_name=training_step.properties.ModelName,
    baseline_dataset_s3_uri=processing_step.properties.ProcessingOutputConfig.Outputs["baseline"].S3Output.S3Uri,
    fiddler_project="sagemaker-models",
    depends_on=[training_step]
)

# Create pipeline
pipeline = Pipeline(
    name="MLPipelineWithFiddler",
    steps=[processing_step, training_step, fiddler_step]
)
```

[**Full SageMaker Pipelines Guide →**](/integrations/data-platforms-and-pipelines/data-platforms/sagemaker-integration.md)

## Integration Selector

Not sure which data integration to use? Here's a quick decision guide:

| Your Data Source             | Recommended Integration | Why                                    |
| ---------------------------- | ----------------------- | -------------------------------------- |
| Snowflake data warehouse     | **Snowflake connector** | Zero-copy sharing, direct queries      |
| BigQuery tables              | **BigQuery connector**  | Serverless, SQL-based, GCP-native      |
| Real-time prediction streams | **Kafka integration**   | Sub-second latency, high throughput    |
| S3 data lake                 | **S3 integration**      | Batch processing, event-driven uploads |
| Airflow ML pipelines         | **Airflow operators**   | Automated workflows, task dependencies |
| SageMaker workflows          | **SageMaker Pipelines** | Native AWS integration, model registry |

## Getting Started

### Prerequisites

Before setting up data integrations, ensure you have:

* **Fiddler Account** - Cloud or on-premises instance
* **API Key** - Generate from Fiddler UI Settings
* **Data Source Access** - Credentials with read permissions
* **Network Connectivity** - Firewall rules allowing Fiddler → Data Source

### General Setup Pattern

All data integrations follow this pattern:

**1. Configure Connection**

```python
from fiddler import FiddlerClient

client = FiddlerClient(
    api_key="fid_...",
    url="https://app.fiddler.ai"
)

# Add data source
client.add_data_source(
    name="my-data-warehouse",
    type="snowflake",  # or bigquery, kafka, s3
    connection_params={...}
)
```

**2. Test Connection**

```python
# Verify connectivity
status = client.test_data_source("my-data-warehouse")
print(f"Connection status: {status}")
```

**3. Ingest Data**

```python
# Upload dataset from data source
client.upload_dataset_from_source(
    project="my-project",
    dataset="training-data",
    source="my-data-warehouse",
    query="SELECT * FROM predictions WHERE date > '2024-01-01'"
)
```

## Advanced Patterns

### Pattern 1: Multi-Source Data Enrichment

Combine data from multiple sources for comprehensive monitoring:

```python
# Production predictions from Kafka
client.add_kafka_source(
    topic="production-predictions",
    ...
)

# Ground truth labels from Snowflake
client.add_snowflake_source(
    database="ml_data",
    schema="labels",
    ...
)

# Join predictions with labels
client.configure_ground_truth_enrichment(
    project="fraud-detection",
    prediction_source="production-predictions",
    label_source="ml_data.labels.fraud_labels",
    join_key="transaction_id",
    time_window_hours=24
)
```

### Pattern 2: Data Quality Validation

Validate data quality before ingestion:

```python
from fiddler import DataValidator

validator = DataValidator()
validator.add_rule("age", min_value=18, max_value=100)
validator.add_rule("income", not_null=True)
validator.add_rule("credit_score", pattern=r'^\d{3}$')

# Validate before upload
is_valid = client.upload_dataset_from_source(
    ...,
    validator=validator,
    on_validation_failure="abort"  # or "warn" or "ignore"
)
```

### Pattern 3: Incremental Updates

Efficiently update datasets with only new data:

```python
# Initial full load
client.upload_dataset_from_source(
    project="my-project",
    dataset="user-events",
    source="my-bigquery",
    query="SELECT * FROM events"
)

# Daily incremental updates (scheduled via Airflow)
client.update_dataset_from_source(
    project="my-project",
    dataset="user-events",
    source="my-bigquery",
    query="SELECT * FROM events WHERE date = CURRENT_DATE()",
    mode="append"  # or "upsert"
)
```

## Data Format Requirements

### Baseline/Training Data

Must include:

* **Features** - All model input features
* **Predictions** - Model outputs (for validation)
* **Metadata** (optional) - Additional context fields

**Example Schema:**

```json
{
  "transaction_id": "string",
  "amount": "float",
  "merchant_category": "string",
  "prediction": "float",
  "timestamp": "datetime"
}
```

### Production Event Data

Must include:

* **Event ID** - Unique identifier
* **Timestamp** - Event time
* **Features** - Model inputs
* **Predictions** - Model outputs
* **Model Version** (optional) - For multi-model monitoring

**Example Kafka Message:**

```json
{
  "event_id": "evt_12345",
  "timestamp": "2024-11-10T14:32:01Z",
  "model": "fraud_detector_v3",
  "features": {
    "amount": 127.50,
    "merchant": "AMZN"
  },
  "prediction": 0.23
}
```

## Security & Compliance

### Authentication Methods

**Snowflake:**

* Username/Password
* Key Pair Authentication (recommended for production)
* OAuth 2.0

**BigQuery:**

* Service Account JSON key
* Application Default Credentials
* Workload Identity (GKE)

**Kafka:**

* SASL/PLAIN
* SASL/SCRAM
* mTLS

**S3:**

* IAM Role (recommended for AWS deployments)
* Access Key / Secret Key
* Cross-account access via IAM role assumption

### Data Privacy

* **Encryption in Transit** - TLS 1.3 for all data transfers
* **Encryption at Rest** - Data encrypted in Fiddler storage
* **PII Redaction** - Automatically detect and redact sensitive fields
* **Data Retention** - Configurable retention policies per dataset

### Network Security

**Firewall Rules:**

```
Source: Fiddler Platform IP ranges
Destination: Your Data Source
Ports:
  - Snowflake: 443 (HTTPS)
  - BigQuery: 443 (HTTPS)
  - Kafka: 9092/9093 (SASL/SSL)
  - S3: 443 (HTTPS)
```

**Private Connectivity:**

* **AWS PrivateLink** - For SageMaker Partner AI App
* **VPC Peering** - Direct connection to data sources
* **VPN Tunnels** - Secure connectivity for on-premises sources

## Monitoring Data Pipeline Health

### Connection Health Checks

```python
# Check all data source connections
sources = client.list_data_sources()
for source in sources:
    status = client.test_data_source(source.name)
    print(f"{source.name}: {status.status} (latency: {status.latency_ms}ms)")
```

### Data Ingestion Metrics

Monitor data pipeline performance:

* **Ingestion Latency** - Time from source to Fiddler
* **Throughput** - Events per second processed
* **Error Rate** - Failed ingestion attempts
* **Data Freshness** - Time since last successful update

### Alerts on Pipeline Failures

```python
# Configure alert for data pipeline issues
client.create_alert(
    name="Data Pipeline Failure",
    trigger="data_ingestion_failed",
    source="kafka-production-predictions",
    notification_channels=["email", "pagerduty"]
)
```

## Troubleshooting

### Common Issues

**Connection Timeouts:**

* Check network connectivity and firewall rules
* Verify credentials are current and have proper permissions
* Ensure data source is reachable from Fiddler's network

**Schema Mismatches:**

* Validate data types match Fiddler's expected schema
* Check for null values in required fields
* Ensure timestamp fields use supported formats (ISO 8601)

**High Latency:**

* For Kafka: Check consumer lag and partition count
* For Data Warehouses: Optimize queries, add indexes
* For S3: Use Parquet or ORC instead of CSV

**Data Quality Issues:**

* Enable data validation rules before ingestion
* Set up alerts for out-of-range values
* Configure automatic PII redaction

## Related Integrations

* [**Cloud Platforms**](/integrations/cloud-platforms-and-deployment/cloud-platforms.md) - Deploy Fiddler on AWS, Azure, GCP
* [**ML Platforms**](/integrations/ml-platforms-and-tools/ml-platforms.md) - Integrate with Databricks, MLflow
* [**Agentic AI**](/integrations/agentic-ai-and-llm-frameworks/agentic-ai.md) - Monitor LangGraph and Strands Agents
* [**Monitoring & Alerting**](/integrations/monitoring-and-alerting/monitoring-alerting.md) - Send alerts to incident management tools

***


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fiddler.ai/integrations/data-platforms-and-pipelines/data-platforms.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
