This is part 1 of an ongoing series on building production-grade Python tools for data engineering. If you haven’t already, check out my earlier posts on Python metaclasses and data schemas — this post builds directly on those ideas.
Explore Part 2, Part 3 and Part 4 through these links.
The day everything broke
Imagine you have 50 data pipelines. They all read from the same upstream API. One morning you come in and half the dashboards are red. Alerts are firing. The on-call engineer is confused.
The cause? The API provider silently renamed a field overnight.
# What your pipelines expected
{ "price": 182.5 }
# What the API now sends
{ "close_price": 182.5 }
No error was thrown when the schema changed. No warning was sent. By the time anyone noticed, stale data had been flowing silently for hours.
This is not a hypothetical. It happens constantly in data teams — at startups and at companies like Airbnb, Netflix, and Uber. It’s why the concept of data contracts exists.
What is a data contract?
Think of it like a legal contract between two parties — except the parties are your data producer (an API, a Kafka topic, a database) and your data consumers (pipelines, ML models, dashboards).
The contract says: “I promise to send you data that looks like this. If I ever change it in a way that breaks you, I’ll tell you first.”
Without a contract, the producer can change anything at any time and your pipelines just… break. With a contract, breaking changes are detected automatically — before they reach production.
Here’s the analogy I find most useful:
Imagine you hire a delivery driver. The contract says: “deliver packages to 42 Oak Street.” The driver knows the address. Everything works.
Now the street gets renamed to “Oak Avenue.” The driver still has the old address. Packages pile up. Nobody knows why.
A data contract would have caught the rename before the first failed delivery.
Why I built this framework
I wanted something that would:
- Detect when a field was renamed (like
price→close_price) - Detect when a field was removed entirely
- Detect when a field’s type changed (string instead of float)
- Know which downstream consumers are affected and notify them automatically
- Do all of this with zero boilerplate for the person defining the schema
That last point is the important one. I didn’t want engineers to have to remember to call register_schema(MySchema) after every class definition. Humans forget. I wanted the framework to be impossible to opt out of accidentally.
The result is data-contracts — an open source Python framework I built to solve exactly this problem.
How it works — the 60-second version
You define your schema like this:
from data_contracts import ContractBase, SchemaDiff
from data_contracts.registry import SchemaRegistry
# Version 1 — what 50 pipelines consume today
class TradeSchemaV1(ContractBase):
__version__ = "1.0.0"
symbol: str
price: float # this field is about to be renamed
volume: int
# Version 2 — what the API now sends
class TradeSchemaV2(ContractBase):
__version__ = "2.0.0"
symbol: str
close_price: float # renamed — BREAKING
volume: int
timestamp: str # new field — safe
Then you run a diff:
v1 = SchemaRegistry.get_latest("TradeSchemaV1")
v2 = SchemaRegistry.get_latest("TradeSchemaV2")
report = SchemaDiff(v1, v2).generate_report()
report.print_report()
And the output tells you exactly what happened:
============================================================
Schema migration report: TradeSchemaV1
1.0.0 -> 2.0.0
============================================================
Status: BREAKING
Changes: 2 (breaking: 1, safe: 1)
Breaking changes:
[FIELD_RENAMED] price
'price' -> 'close_price'. Update all consumers.
Safe changes:
[FIELD_ADDED] timestamp
Consumers unaffected (additive change).
============================================================
Notice the framework detected that price wasn’t just deleted — it was renamed to close_price. That’s the rename heuristic at work: if a field disappears and a new field of the same type appears, it’s almost certainly a rename, not an independent removal and addition.
The magic under the hood — and why it matters
Here’s what makes this framework different from just writing a validation function: you never call anything manually.
When you write class TradeSchema(ContractBase), the schema registers itself automatically. No register(TradeSchema) call. No configuration file. It just happens.
This works through a Python concept called a metaclass. If you want to understand exactly how that works under the hood, I wrote a deep-dive on Python metaclasses here. The short version is this:
In Python, when you define a class, something has to build that class object. Normally that’s Python’s built-in
type. A metaclass lets you replacetypewith your own logic — so you can intercept class creation and do work automatically. It’s like having a supervisor who processes every new employee’s paperwork the moment they join, without the employee having to ask.
The framework uses the same pattern that Django’s ORM uses internally. When you write class Post(models.Model), Django’s metaclass intercepts it, reads your field definitions, and builds the SQL table mapping — all automatically.
Validating data in real time
Schema diffing is only half the picture. The framework also validates incoming data rows against the schema at runtime:
# Good data — passes
good_row = {"symbol": "AAPL", "price": 182.5, "volume": 1000}
result = TradeSchemaV1.validate(good_row)
print(result.summary())
# ✅ Validation passed
# Bad data — wrong type + missing field
bad_row = {"symbol": "AAPL", "price": "one-eighty"}
result = TradeSchemaV1.validate(bad_row)
print(result.summary())
# ❌ Validation failed (2 errors):
# • Missing required field: 'volume'
# • Field 'price': expected float, got str = 'one-eighty'
The key design decision here: extra fields (fields in the data that aren’t in the schema) produce a warning, not an error. That’s because new fields being added upstream is generally safe — your existing consumers just ignore them. Only removals, renames, and type changes are truly breaking.
Notifying consumers automatically
The last piece: when a breaking change is detected, the right people need to know. The framework includes a notification bus — teams subscribe to schema names they care about, and get alerted when a breaking report lands.
from data_contracts.notifications import NotificationBus, Consumer
bus = NotificationBus()
bus.register(Consumer("Quant team", "quant@firm.com", ["TradeSchemaV1"]))
bus.register(Consumer("Risk system", "risk@firm.com", ["TradeSchemaV1"]))
bus.register(Consumer("ML pipeline", "ml@firm.com", ["TradeSchemaV1"]))
bus.notify(report)
# 📬 Notified Quant team: breaking change in 'TradeSchemaV1'
# 📬 Notified Risk system: breaking change in 'TradeSchemaV1'
# 📬 Notified ML pipeline: breaking change in 'TradeSchemaV1'
This follows the Observer pattern — the diff engine fires an event and has no idea who’s listening or what they’ll do. In production you’d swap the print() calls for Slack webhooks, PagerDuty alerts, or a Kafka publish. The pattern is the same regardless.
What I learned building this
The most valuable insight was about the difference between making something possible and making it automatic.
A validation function that you have to remember to call is just slightly better than nothing — because the moment your team is under pressure, someone skips it. A framework that makes validation impossible to skip is a different thing entirely.
That’s the design philosophy behind the metaclass approach: don’t rely on discipline. Build the guardrail into the floor.
The other thing this project forced me to understand deeply was the difference between breaking and safe changes:
| Change type | Breaking? | Why |
|---|---|---|
price removed | Yes | Consumers reading price will raise a KeyError |
price renamed to close_price | Yes | Same effect as removal from the consumer’s perspective |
price type changed float → str | Yes | Arithmetic on a string blows up silently or raises an error |
timestamp field added | No | Consumers that don’t use it simply ignore it |
The additive-only rule is something distributed systems engineers call forward compatibility — you can add things freely, but you can never take things away without coordination.
Try it yourself
git clone https://github.com/devminda/data-contracts
cd data-contracts
pip install -e ".[dev]"
python examples/trade_pipeline.py
The full source is on GitHub. The examples/trade_pipeline.py file runs the exact scenario from this post — define two schemas, diff them, validate data, notify consumers — in about 60 lines of code.
In the next post I’ll walk through the internals: how the metaclass works, how the rename heuristic is implemented, and how to extend the notification bus to publish breaking changes to a Kafka topic.
Pingback: Inside the Engine: How a Python Metaclass Automatically Registers Data Schemas
Pingback: Breaking vs Safe: How the data-contracts Diff Engine Detects Schema Changes and Notifies Consumers
Pingback: How to Test a Python Framework: Global State, Metaclasses, Parametrize, and Edge Cases