Examples & Tutorials
This section provides end-to-end examples of using Luminara for various data processing tasks.
1. Basic ETL: CSV to JSON
A simple batch pipeline that reads a CSV file, parses the data, and writes it to a JSON Lines file.
import asyncio
import csv
import json
from luminara import Pipeline, stages
class CSVSource(stages.Source):
def __init__(self, filepath):
self.filepath = filepath
async def read(self):
with open(self.filepath, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
yield row
class JSONSink(stages.Sink):
def __init__(self, filepath):
self.filepath = filepath
async def write(self, record):
with open(self.filepath, 'a') as f:
f.write(json.dumps(record) + '\n')
async def main():
pipeline = Pipeline(
source=CSVSource('data.csv'),
sink=JSONSink('output.jsonl')
)
await pipeline.run()
if __name__ == "__main__":
asyncio.run(main())
2. Streaming Data Processing
This example demonstrates how to consume a continuous stream of data from a WebSocket API and process it in real-time.
import aiohttp
from luminara import Pipeline, stages
class WebSocketSource(stages.Source):
async def read(self):
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://api.example.com/feed') as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
yield json.loads(msg.data)
class FilterTrades(stages.Transform):
async def process(self, record):
# Keep only trades above $1000
if record['amount'] > 1000:
return record
return None # Drop small trades
async def main():
pipeline = Pipeline(
source=WebSocketSource(),
stages=[FilterTrades()],
sink=stages.ConsoleSink() # Built-in sink
)
# Run indefinitely until interrupted
await pipeline.run_forever()
3. Creating a Custom Plugin
Luminara is designed to be extensible. You can easily create reusable plugins by subclassing the core stage types.
from luminara.stages import Transform
import hashlib
class HashField(Transform):
"""
A reusable transform that hashes a specific field using SHA256.
"""
def __init__(self, field_name: str, salt: str = ""):
self.field = field_name
self.salt = salt
async def process(self, record: dict) -> dict:
if self.field in record:
val = str(record[self.field]) + self.salt
hashed = hashlib.sha256(val.encode()).hexdigest()
record[f"{self.field}_hash"] = hashed
return record
# Usage
# pipeline = Pipeline(..., stages=[HashField("email", salt="s3cr3t")], ...)
4. Error Handling Patterns
Handling errors gracefully is critical in data pipelines. Luminara provides a mechanism to redirect failed records to a "Dead Letter Queue" (DLQ).
from luminara import Pipeline
from luminara.stages import Transform, FileSink
class RiskyTransform(Transform):
async def process(self, record):
try:
# Simulate a risky operation
record['value'] = 100 / record['divisor']
return record
except Exception as e:
# Tag the record with the error
record['error'] = str(e)
# Route to DLQ (handled by pipeline config)
raise stages.DropRecord(reason="calculation_error", record=record)
async def main():
dlq = FileSink("errors.jsonl")
pipeline = Pipeline(
source=...,
stages=[RiskyTransform()],
sink=...,
dlq_sink=dlq # Failures go here
)
await pipeline.run()