Pipeline de Datos Serverless
Ingesta y procesamiento de datos en tiempo real con Kinesis + Lambda + S3 + Glue + Athena. 100% Terraform.
Problema & Solución
Problema
Un sistema de e-commerce necesita procesar eventos de usuario en tiempo real (clicks, views, purchases) para analytics. El volumen es de ~10,000 eventos/minuto con picos de 100,000/minuto. Los datos deben estar disponibles para consultas SQL ad-hoc en menos de 5 minutos desde que se generaron.
Solución
Pipeline event streaming con Kinesis Data Streams como ingestor de alta velocidad. Lambda consume el stream en micro-batches, transforma los eventos (normalización, enriquecimiento) y los escribe en S3 en formato Parquet particionado por fecha/hora. AWS Glue Crawler actualiza el Data Catalog automáticamente. Athena permite queries SQL serverless directamente sobre S3 sin ETL adicional. Todo el stack definido en Terraform.
Diagrama de Arquitectura
Productores de eventos:
┌──────────┐ ┌──────────┐ ┌──────────────┐
│ Frontend │ │ Backend │ │ Mobile App │
│ (clicks) │ │ (orders) │ │ (sessions) │
└────┬─────┘ └────┬─────┘ └──────┬───────┘
│ │ │ PutRecords (SDK)
└─────────────┴──────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Kinesis Data Stream │
│ Shards: 2 (20,000 records/s capacity) │
│ Retention: 24 horas │
└──────────────────────┬──────────────────┘
│ Event Source Mapping
│ (BatchSize: 100, BisectOnError)
▼
┌────────────────────────────────────────────┐
│ Lambda (Python) — Transformer │
│ 1. Decodifica base64 │
│ 2. Parsea JSON │
│ 3. Valida schema (Pydantic) │
│ 4. Enriquece (geo-IP, user-agent parsing) │
│ 5. Convierte a Parquet (pyarrow) │
│ 6. Escribe a S3 particionado │
└──────────────────────┬─────────────────────┘
│ s3://data-lake/events/
│ year=2025/month=05/day=17/hour=14/
│ part-00001.parquet
▼
┌─────────────────────────────────────────────────┐
│ S3 Data Lake (Parquet, Snappy compression) │
│ Particionado: year/month/day/hour │
└──────────────────────┬──────────────────────────┘
│ (trigger: s3:ObjectCreated)
▼
┌──────────────────────────────┐
│ Glue Crawler (cada 15 min) │
│ → actualiza Data Catalog │
└──────────────────┬───────────┘
│
▼
┌──────────────────────────────────────────┐
│ Amazon Athena (SQL serverless) │
│ SELECT event_type, COUNT(*) as total │
│ FROM events │
│ WHERE year='2025' AND month='05' │
│ GROUP BY event_type │
│ → $0.005 por query típica │
└──────────────────────────────────────────┘Implementación
Ingesta con Kinesis Data Streams
Los productores (frontend, backend, mobile) envían eventos a Kinesis con PutRecords (batch de hasta 500 records). Cada record tiene un partition key (userId) que determina el shard destino — records del mismo usuario van al mismo shard, garantizando orden por usuario. El stream tiene 2 shards iniciales (20,000 records/s de ingesta, 4MB/s). Con Shard-level Enhanced Fan-Out, cada consumidor Lambda tiene su propio throughput de 2MB/s por shard.
Transformación en Lambda con batching
Lambda consume el stream con BisectOnFunctionError=true: si un batch falla, lo divide a la mitad y reintenta, aislando el record problemático. Cada record del stream contiene el evento en base64. Lambda: decodifica, valida con Pydantic (descarta eventos malformados con logging), enriquece (parsing de user-agent, geo-lookup si hay IP), agrupa 100 records y convierte a Parquet con pyarrow (columnar, compresión Snappy: ~10x menos espacio que JSON).
Escritura en S3 particionada
Los archivos Parquet se escriben con el esquema de particionado Hive-compatible: s3://data-lake/events/year=2025/month=05/day=17/hour=14/part-XXXXX.parquet. Este esquema permite a Athena hacer partition pruning — si se consulta solo el día de hoy, solo lee las particiones de hoy, no todo el dataset. Archivo típico: ~5MB Parquet = ~50MB JSON original.
Glue Crawler y Data Catalog
Glue Crawler escanea el bucket S3 cada 15 minutos (o se puede disparar vía Lambda al escribir nuevas particiones). Detecta el schema Parquet automáticamente y registra/actualiza la tabla en el Glue Data Catalog. Una vez en el catalog, Athena puede consultar los datos con SQL estándar sin ninguna configuración adicional.
Queries SQL con Athena
Athena ejecuta queries SQL directamente sobre los archivos Parquet en S3 sin necesidad de cargar datos en una base de datos. El costo es $5/TB de datos escaneados — con particionado y Parquet, una query típica escanea <1GB ($0.005). Los resultados se guardan en un bucket S3 de resultados y pueden consumirse desde cualquier herramienta BI (QuickSight, Tableau, Grafana con el plugin de Athena).
Tech Stack
Streaming & Ingesta
Procesamiento
Storage & Format
Analytics
IaC & CI/CD
Decisiones Técnicas
Kinesis vs SQS para ingesta de eventos
Elegido
Kinesis Data Streams
Alternativas
- —SQS — más simple, sin orden garantizado, sin replay, sin múltiples consumidores
- —Kinesis Data Firehose — más simple que Streams, sin replay, latencia mínima 60s
- —MSK (Kafka managed) — máxima flexibilidad y ecosistema, mucho más complejo y caro
Razón
Kinesis preserva el orden dentro de cada shard (eventos del mismo usuario van al mismo shard via partition key). Retiene datos 24h (configurable hasta 365 días) — permite replay de eventos. Múltiples consumidores pueden leer el mismo stream independientemente (Enhanced Fan-Out). SQS no garantiza orden en Standard Queues, y un mensaje solo puede ser consumido por un consumidor.
Parquet vs JSON en S3
Elegido
Apache Parquet con compresión Snappy
Alternativas
- —JSON — más simple, legible, mayor costo de almacenamiento y queries en Athena
- —ORC — similar a Parquet en performance, menos popular en el ecosistema AWS
- —Avro — mejor para streaming/serialización, no tan eficiente para analytical queries
Razón
Parquet es columnar: Athena solo lee las columnas que necesita la query (si haces SELECT event_type, COUNT(*), solo lee la columna event_type, no todas). Compresión Snappy: ~10x menos espacio que JSON. Athena cobra por datos escaneados — Parquet reduce el costo dramáticamente. Para 1TB de eventos JSON, en Parquet son ~100GB → queries 10x más baratas.
Snippets de Código
import boto3
import json
import uuid
from datetime import datetime, timezone
from typing import Any
kinesis = boto3.client("kinesis", region_name="us-east-1")
STREAM_NAME = "ecommerce-events"
def publish_event(event_type: str, payload: dict[str, Any], user_id: str) -> None:
"""
Publica un evento al stream de Kinesis.
Args:
event_type: Tipo de evento ('page_view', 'add_to_cart', 'purchase')
payload: Datos del evento
user_id: ID del usuario (usado como partition key para preservar orden)
"""
record = {
"eventId": str(uuid.uuid4()),
"eventType": event_type,
"userId": user_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"payload": payload,
"metadata": {
"source": "backend-api",
"version": "1.0",
},
}
kinesis.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(record),
PartitionKey=user_id, # mismo shard para el mismo usuario → orden garantizado
)
def publish_batch(events: list[dict]) -> dict:
"""Publica hasta 500 eventos en un solo request."""
records = [
{
"Data": json.dumps(event),
"PartitionKey": event.get("userId", str(uuid.uuid4())),
}
for event in events
]
response = kinesis.put_records(StreamName=STREAM_NAME, Records=records)
failed = response.get("FailedRecordCount", 0)
if failed > 0:
print(f"WARNING: {failed} records failed to publish")
return responseimport base64
import json
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import io
from datetime import datetime, timezone
s3 = boto3.client("s3")
OUTPUT_BUCKET = "my-data-lake"
def handler(event: dict, context) -> dict:
"""
Consume records de Kinesis, transforma a Parquet y escribe en S3.
BisectOnFunctionError: si falla, Kinesis divide el batch y reintenta.
"""
records = []
failed_items = []
for i, record in enumerate(event["Records"]):
try:
# Kinesis codifica el data en base64
raw = base64.b64decode(record["kinesis"]["data"])
event_data = json.loads(raw)
records.append(transform(event_data))
except Exception as e:
print(f"Failed to process record {i}: {e}")
failed_items.append(
{"itemIdentifier": record["kinesis"]["sequenceNumber"]}
)
if records:
write_parquet_to_s3(records)
return {"batchItemFailures": failed_items}
def transform(raw: dict) -> dict:
"""Normaliza y enriquece el evento."""
ts = datetime.fromisoformat(raw["timestamp"])
return {
"event_id": raw["eventId"],
"event_type": raw["eventType"],
"user_id": raw["userId"],
"ts": raw["timestamp"],
"year": ts.year,
"month": ts.month,
"day": ts.day,
"hour": ts.hour,
# Flatten payload
**raw.get("payload", {}),
}
def write_parquet_to_s3(records: list[dict]) -> None:
now = datetime.now(timezone.utc)
# Particionado Hive-compatible para partition pruning en Athena
key = (
f"events/year={now.year}/month={now.month:02d}/"
f"day={now.day:02d}/hour={now.hour:02d}/"
f"part-{now.strftime('%H%M%S%f')}.parquet"
)
table = pa.Table.from_pylist(records)
buffer = io.BytesIO()
pq.write_table(table, buffer, compression="snappy")
buffer.seek(0)
s3.put_object(Bucket=OUTPUT_BUCKET, Key=key, Body=buffer)
print(f"Wrote {len(records)} records to s3://{OUTPUT_BUCKET}/{key}")