← Proyectos/pipeline-datos-serverless
Planned💻 Full Stack

Pipeline de Datos Serverless

Ingesta y procesamiento de datos en tiempo real con Kinesis + Lambda + S3 + Glue + Athena. 100% Terraform.

KinesisLambdaS3GlueAthenaPythonTerraformServerless

Problema & Solución

Problema

Un sistema de e-commerce necesita procesar eventos de usuario en tiempo real (clicks, views, purchases) para analytics. El volumen es de ~10,000 eventos/minuto con picos de 100,000/minuto. Los datos deben estar disponibles para consultas SQL ad-hoc en menos de 5 minutos desde que se generaron.

Solución

Pipeline event streaming con Kinesis Data Streams como ingestor de alta velocidad. Lambda consume el stream en micro-batches, transforma los eventos (normalización, enriquecimiento) y los escribe en S3 en formato Parquet particionado por fecha/hora. AWS Glue Crawler actualiza el Data Catalog automáticamente. Athena permite queries SQL serverless directamente sobre S3 sin ETL adicional. Todo el stack definido en Terraform.

Diagrama de Arquitectura


  Productores de eventos:
  ┌──────────┐ ┌──────────┐ ┌──────────────┐
  │ Frontend │ │ Backend  │ │ Mobile App   │
  │ (clicks) │ │ (orders) │ │ (sessions)   │
  └────┬─────┘ └────┬─────┘ └──────┬───────┘
       │             │              │ PutRecords (SDK)
       └─────────────┴──────────────┘
                     │
                     ▼
  ┌─────────────────────────────────────────┐
  │  Kinesis Data Stream                    │
  │  Shards: 2 (20,000 records/s capacity) │
  │  Retention: 24 horas                   │
  └──────────────────────┬──────────────────┘
                         │ Event Source Mapping
                         │ (BatchSize: 100, BisectOnError)
                         ▼
  ┌────────────────────────────────────────────┐
  │  Lambda (Python) — Transformer             │
  │  1. Decodifica base64                      │
  │  2. Parsea JSON                            │
  │  3. Valida schema (Pydantic)               │
  │  4. Enriquece (geo-IP, user-agent parsing) │
  │  5. Convierte a Parquet (pyarrow)          │
  │  6. Escribe a S3 particionado             │
  └──────────────────────┬─────────────────────┘
                         │ s3://data-lake/events/
                         │   year=2025/month=05/day=17/hour=14/
                         │   part-00001.parquet
                         ▼
  ┌─────────────────────────────────────────────────┐
  │  S3 Data Lake (Parquet, Snappy compression)     │
  │  Particionado: year/month/day/hour              │
  └──────────────────────┬──────────────────────────┘
                         │ (trigger: s3:ObjectCreated)
                         ▼
  ┌──────────────────────────────┐
  │  Glue Crawler (cada 15 min) │
  │  → actualiza Data Catalog   │
  └──────────────────┬───────────┘
                     │
                     ▼
  ┌──────────────────────────────────────────┐
  │  Amazon Athena (SQL serverless)          │
  │  SELECT event_type, COUNT(*) as total    │
  │  FROM events                             │
  │  WHERE year='2025' AND month='05'        │
  │  GROUP BY event_type                     │
  │  → $0.005 por query típica              │
  └──────────────────────────────────────────┘

Implementación

1

Ingesta con Kinesis Data Streams

Los productores (frontend, backend, mobile) envían eventos a Kinesis con PutRecords (batch de hasta 500 records). Cada record tiene un partition key (userId) que determina el shard destino — records del mismo usuario van al mismo shard, garantizando orden por usuario. El stream tiene 2 shards iniciales (20,000 records/s de ingesta, 4MB/s). Con Shard-level Enhanced Fan-Out, cada consumidor Lambda tiene su propio throughput de 2MB/s por shard.

2

Transformación en Lambda con batching

Lambda consume el stream con BisectOnFunctionError=true: si un batch falla, lo divide a la mitad y reintenta, aislando el record problemático. Cada record del stream contiene el evento en base64. Lambda: decodifica, valida con Pydantic (descarta eventos malformados con logging), enriquece (parsing de user-agent, geo-lookup si hay IP), agrupa 100 records y convierte a Parquet con pyarrow (columnar, compresión Snappy: ~10x menos espacio que JSON).

3

Escritura en S3 particionada

Los archivos Parquet se escriben con el esquema de particionado Hive-compatible: s3://data-lake/events/year=2025/month=05/day=17/hour=14/part-XXXXX.parquet. Este esquema permite a Athena hacer partition pruning — si se consulta solo el día de hoy, solo lee las particiones de hoy, no todo el dataset. Archivo típico: ~5MB Parquet = ~50MB JSON original.

4

Glue Crawler y Data Catalog

Glue Crawler escanea el bucket S3 cada 15 minutos (o se puede disparar vía Lambda al escribir nuevas particiones). Detecta el schema Parquet automáticamente y registra/actualiza la tabla en el Glue Data Catalog. Una vez en el catalog, Athena puede consultar los datos con SQL estándar sin ninguna configuración adicional.

5

Queries SQL con Athena

Athena ejecuta queries SQL directamente sobre los archivos Parquet en S3 sin necesidad de cargar datos en una base de datos. El costo es $5/TB de datos escaneados — con particionado y Parquet, una query típica escanea <1GB ($0.005). Los resultados se guardan en un bucket S3 de resultados y pueden consumirse desde cualquier herramienta BI (QuickSight, Tableau, Grafana con el plugin de Athena).

Tech Stack

Streaming & Ingesta

Amazon Kinesis Data StreamsPutRecords API (SDK v3)

Procesamiento

AWS Lambda (Python 3.12)pyarrowPydanticboto3

Storage & Format

Amazon S3Apache ParquetSnappy compression

Analytics

AWS Glue CrawlerAWS Glue Data CatalogAmazon Athena

IaC & CI/CD

TerraformGitHub ActionsAWS SAM (alternativa)

Decisiones Técnicas

Kinesis vs SQS para ingesta de eventos

Elegido

Kinesis Data Streams

Alternativas

  • SQS — más simple, sin orden garantizado, sin replay, sin múltiples consumidores
  • Kinesis Data Firehose — más simple que Streams, sin replay, latencia mínima 60s
  • MSK (Kafka managed) — máxima flexibilidad y ecosistema, mucho más complejo y caro

Razón

Kinesis preserva el orden dentro de cada shard (eventos del mismo usuario van al mismo shard via partition key). Retiene datos 24h (configurable hasta 365 días) — permite replay de eventos. Múltiples consumidores pueden leer el mismo stream independientemente (Enhanced Fan-Out). SQS no garantiza orden en Standard Queues, y un mensaje solo puede ser consumido por un consumidor.

Parquet vs JSON en S3

Elegido

Apache Parquet con compresión Snappy

Alternativas

  • JSON — más simple, legible, mayor costo de almacenamiento y queries en Athena
  • ORC — similar a Parquet en performance, menos popular en el ecosistema AWS
  • Avro — mejor para streaming/serialización, no tan eficiente para analytical queries

Razón

Parquet es columnar: Athena solo lee las columnas que necesita la query (si haces SELECT event_type, COUNT(*), solo lee la columna event_type, no todas). Compresión Snappy: ~10x menos espacio que JSON. Athena cobra por datos escaneados — Parquet reduce el costo dramáticamente. Para 1TB de eventos JSON, en Parquet son ~100GB → queries 10x más baratas.

Snippets de Código

Python — Productor de eventos a Kinesispython
import boto3
import json
import uuid
from datetime import datetime, timezone
from typing import Any

kinesis = boto3.client("kinesis", region_name="us-east-1")
STREAM_NAME = "ecommerce-events"


def publish_event(event_type: str, payload: dict[str, Any], user_id: str) -> None:
    """
    Publica un evento al stream de Kinesis.

    Args:
        event_type: Tipo de evento ('page_view', 'add_to_cart', 'purchase')
        payload: Datos del evento
        user_id: ID del usuario (usado como partition key para preservar orden)
    """
    record = {
        "eventId": str(uuid.uuid4()),
        "eventType": event_type,
        "userId": user_id,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "payload": payload,
        "metadata": {
            "source": "backend-api",
            "version": "1.0",
        },
    }

    kinesis.put_record(
        StreamName=STREAM_NAME,
        Data=json.dumps(record),
        PartitionKey=user_id,  # mismo shard para el mismo usuario → orden garantizado
    )


def publish_batch(events: list[dict]) -> dict:
    """Publica hasta 500 eventos en un solo request."""
    records = [
        {
            "Data": json.dumps(event),
            "PartitionKey": event.get("userId", str(uuid.uuid4())),
        }
        for event in events
    ]

    response = kinesis.put_records(StreamName=STREAM_NAME, Records=records)

    failed = response.get("FailedRecordCount", 0)
    if failed > 0:
        print(f"WARNING: {failed} records failed to publish")

    return response
Python — Lambda Consumer: Kinesis → Parquet en S3python
import base64
import json
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import io
from datetime import datetime, timezone

s3 = boto3.client("s3")
OUTPUT_BUCKET = "my-data-lake"


def handler(event: dict, context) -> dict:
    """
    Consume records de Kinesis, transforma a Parquet y escribe en S3.
    BisectOnFunctionError: si falla, Kinesis divide el batch y reintenta.
    """
    records = []
    failed_items = []

    for i, record in enumerate(event["Records"]):
        try:
            # Kinesis codifica el data en base64
            raw = base64.b64decode(record["kinesis"]["data"])
            event_data = json.loads(raw)
            records.append(transform(event_data))
        except Exception as e:
            print(f"Failed to process record {i}: {e}")
            failed_items.append(
                {"itemIdentifier": record["kinesis"]["sequenceNumber"]}
            )

    if records:
        write_parquet_to_s3(records)

    return {"batchItemFailures": failed_items}


def transform(raw: dict) -> dict:
    """Normaliza y enriquece el evento."""
    ts = datetime.fromisoformat(raw["timestamp"])
    return {
        "event_id": raw["eventId"],
        "event_type": raw["eventType"],
        "user_id": raw["userId"],
        "ts": raw["timestamp"],
        "year": ts.year,
        "month": ts.month,
        "day": ts.day,
        "hour": ts.hour,
        # Flatten payload
        **raw.get("payload", {}),
    }


def write_parquet_to_s3(records: list[dict]) -> None:
    now = datetime.now(timezone.utc)
    # Particionado Hive-compatible para partition pruning en Athena
    key = (
        f"events/year={now.year}/month={now.month:02d}/"
        f"day={now.day:02d}/hour={now.hour:02d}/"
        f"part-{now.strftime('%H%M%S%f')}.parquet"
    )

    table = pa.Table.from_pylist(records)
    buffer = io.BytesIO()
    pq.write_table(table, buffer, compression="snappy")
    buffer.seek(0)

    s3.put_object(Bucket=OUTPUT_BUCKET, Key=key, Body=buffer)
    print(f"Wrote {len(records)} records to s3://{OUTPUT_BUCKET}/{key}")