This is Part 3 of the data-contracts series. Part 1 introduced the problem. Part 2 explained how the metaclass, validation engine, and schema registry work. This post covers what happens when two schema versions don’t match — how breaking changes are detected, how renames are identified, and how the right people get notified automatically.
The engine is built. Now what?
In Part 2 we built the core of the framework: a metaclass that registers schemas automatically, a validation engine that type-checks incoming data, and a registry that stores every schema version by name.
But registration and validation only solve half the problem. The other half is this: what happens when a schema changes?
That’s what this post is about. We’ll look at three things:
- How
SchemaDiffcompares two schema versions and classifies every change - How the rename heuristic detects that
pricebecameclose_price - How the
NotificationBusalerts downstream consumers — and how Kafka fits in
Part 1: SchemaDiff — comparing two versions
The diff engine takes two SchemaVersion objects — snapshots from the registry — and produces a MigrationReport that tells you exactly what changed and whether any of it will break your consumers.
Here is the core of SchemaDiff.generate_report():
def generate_report(self) -> MigrationReport:
old_fields = self.old.fields # {"symbol": str, "price": float, "volume": int}
new_fields = self.new.fields # {"symbol": str, "close_price": float, "timestamp": str}
old_keys = set(old_fields.keys())
new_keys = set(new_fields.keys())
removed = old_keys - new_keys # in old, not in new
added = new_keys - old_keys # in new, not in old
common = old_keys & new_keys # in both
# run rename heuristic first — it claims fields from removed + added
renames = self._detect_renames(removed, added, old_fields, new_fields)
renamed_old = {c.old_value for c in renames}
renamed_new = {c.new_value for c in renames}
changes = [*renames]
# remaining removals (not part of a rename)
for f in removed - renamed_old:
changes.append(FieldChange(
change_type=ChangeType.FIELD_REMOVED, field_name=f, ...
))
# remaining additions (not the target of a rename)
for f in added - renamed_new:
changes.append(FieldChange(
change_type=ChangeType.FIELD_ADDED, field_name=f, ...
))
# type changes on fields that exist in both versions
for f in common:
if old_fields[f] != new_fields[f]:
changes.append(FieldChange(
change_type=ChangeType.TYPE_CHANGED, field_name=f, ...
))
return MigrationReport(
schema_name=self.old.name,
old_version=self.old.version,
new_version=self.new.version,
changes=changes,
)
The four change types
Every change the diff engine produces falls into one of four categories. Two are breaking. Two are safe. Understanding why is more important than memorising which is which.
| Change type | Breaking? | Why |
|---|---|---|
FIELD_REMOVED | Yes | Any consumer reading that field will get a KeyError or None |
FIELD_RENAMED | Yes | From the consumer’s perspective, the old name disappeared — same effect as removal |
TYPE_CHANGED | Yes | Code expecting a float and receiving a str will fail at the first arithmetic operation — often silently |
FIELD_ADDED | No | Consumers that don’t use the new field simply ignore it — they never asked for it |
The principle behind this table comes from distributed systems design — specifically something called forward compatibility. The rule is: you can always add. You can never take away without coordination.
Imagine a restaurant menu. Adding a new dish is safe — diners who don’t want it just don’t order it. But removing a dish that regulars depend on, or renaming it without telling anyone, means people show up expecting something that no longer exists. Schema changes work the same way.
The MigrationReport — computed fields with __post_init__
The report doesn’t just store raw changes. It automatically splits them into breaking_changes and safe_changes, and sets an is_breaking flag. This happens in __post_init__ — a special method that dataclasses call immediately after the generated __init__ runs:
@dataclass
class MigrationReport:
schema_name: str
old_version: str
new_version: str
changes: list[FieldChange] = field(default_factory=list)
# these are computed — not passed in by the caller
breaking_changes: list[FieldChange] = field(default_factory=list)
safe_changes: list[FieldChange] = field(default_factory=list)
is_breaking: bool = False
def __post_init__(self):
self.breaking_changes = [c for c in self.changes if c.is_breaking]
self.safe_changes = [c for c in self.changes if not c.is_breaking]
self.is_breaking = len(self.breaking_changes) > 0
The caller passes in a flat list of changes. The report automatically sorts them, sets the flag, and keeps everything consistent. You can never have a report where is_breaking is False but breaking_changes is non-empty — the data can’t get out of sync because only __post_init__ ever sets those fields.
Seeing it in action
v1 = SchemaRegistry.get_latest("TradeSchemaV1")
v2 = SchemaRegistry.get_latest("TradeSchemaV2")
report = SchemaDiff(v1, v2).generate_report()
report.print_report()
============================================================
Schema migration report: TradeSchemaV1
1.0.0 -> 2.0.0
============================================================
Status: BREAKING
Changes: 2 (breaking: 1, safe: 1)
Breaking changes:
[FIELD_RENAMED] price
'price' -> 'close_price'. Update all consumers.
Safe changes:
[FIELD_ADDED] timestamp
Consumers unaffected (additive change).
============================================================
Part 2: The rename heuristic
A naive diff would look at the schema above and report two changes: price was removed (breaking), and close_price was added (safe). Technically correct — but not very useful.
What the engineer actually needs to know is: “this field was renamed. Update your consumer to use the new name.” That’s a completely different instruction from “a field was removed and an unrelated field was added.”
The rename heuristic detects this automatically:
If a field disappears from the schema AND a new field appears with exactly the same type — and there is only one such candidate — it’s almost certainly a rename, not an independent removal and addition.
def _detect_renames(self, removed, added, old_fields, new_fields):
renames = []
matched_added = set()
for old_f in removed:
old_type = old_fields[old_f]
# find new fields with the same type, not already matched
candidates = [
f for f in added
if f not in matched_added
and new_fields[f] == old_type
]
# only fire when there's exactly ONE match — ambiguity = no rename
if len(candidates) == 1:
new_f = candidates[0]
renames.append(FieldChange(
change_type=ChangeType.FIELD_RENAMED,
field_name=old_f,
old_value=old_f,
new_value=new_f,
description=f"'{old_f}' -> '{new_f}'. Update all consumers."
))
matched_added.add(new_f) # prevent matching again
return renames
Walking through the example step by step
Let’s trace this with the TradeSchema case:
# Old fields: {"symbol": str, "price": float, "volume": int}
# New fields: {"symbol": str, "close_price": float, "volume": int, "timestamp": str}
removed = {"price"} # in old, not in new
added = {"close_price", "timestamp"} # in new, not in old
# Iteration 1: old_f = "price", old_type = float
# candidates = fields in added where type == float
# "close_price" is float ✓
# "timestamp" is str ✗
# candidates = ["close_price"] — exactly one match
# → FIELD_RENAMED: price -> close_price
# matched_added = {"close_price"}
# After rename detection:
# renamed_old = {"price"} → removed - renamed_old = {} (nothing left)
# renamed_new = {"close_price"} → added - renamed_new = {"timestamp"}
# "timestamp" falls through to FIELD_ADDED (safe)
The ambiguity guard — why len(candidates) == 1 matters
What if two fields of the same type disappear and two appear?
# Old: bid: float, ask: float
# New: bid_price: float, ask_price: float
# For "bid" (float), candidates = ["bid_price", "ask_price"] — two matches
# We can't know which is which. So we stay silent.
# Both "bid" and "ask" are reported as FIELD_REMOVED (breaking).
# Both "bid_price" and "ask_price" are reported as FIELD_ADDED (safe).
A wrong rename hint is worse than no hint at all. If the heuristic guessed bid → ask_price, it would send engineers down the wrong path entirely. Staying silent and reporting the raw changes is the safer outcome.
The matched_added set prevents a second problem: one new field being matched to multiple old fields. Once close_price is claimed by price, it’s off the table for every subsequent iteration.
Part 3: The NotificationBus — who needs to know?
Detecting a breaking change is only useful if the right people find out about it. That’s the job of the NotificationBus.
The design follows the Observer pattern: consumers subscribe to schema names they care about. When a breaking report lands, the bus iterates every subscriber and calls them. The diff engine — the thing that detected the change — has no idea who the consumers are or what they’ll do when notified. The two sides are completely decoupled.
class NotificationBus:
def __init__(self):
self._consumers = []
self._log = []
def register(self, consumer: Consumer) -> None:
self._consumers.append(consumer)
def notify(self, report: MigrationReport) -> None:
if not report.is_breaking:
return # safe changes don't trigger notifications
for consumer in self._consumers:
if report.schema_name in consumer.subscribed_schemas:
self._log.append({...})
print(
f"Notified {consumer.name}: breaking change in '{report.schema_name}'"
)
Notice that notify() returns immediately if the report isn’t breaking. Safe changes — new fields being added — don’t generate noise. Consumers only hear about things that will actually break them.
Here’s the bus in action:
bus = NotificationBus()
bus.register(Consumer("Quant team", "quant@firm.com", ["TradeSchemaV1"]))
bus.register(Consumer("Risk system", "risk@firm.com", ["TradeSchemaV1"]))
bus.register(Consumer("ML pipeline", "ml@firm.com", ["TradeSchemaV1"]))
bus.register(Consumer("Reporting svc", "bi@firm.com", ["PriceSchema"])) # different schema
bus.notify(report)
# Notified Quant team: breaking change in 'TradeSchemaV1'
# Notified Risk system: breaking change in 'TradeSchemaV1'
# Notified ML pipeline: breaking change in 'TradeSchemaV1'
# (Reporting svc is NOT notified — subscribed to a different schema)
The reporting service is subscribed to PriceSchema, not TradeSchemaV1. It never receives the notification. Only the teams that actually consume the changed schema are disturbed. This is the Observer pattern working exactly as intended — targeted, scoped, and quiet to everyone else.
Extending to Kafka
Right now the bus calls print(). In a production data platform, you almost certainly want breaking change events to flow through the same infrastructure as everything else — which usually means Kafka.
The idea is straightforward: instead of printing to stdout, the bus publishes a message to a Kafka topic — something like schema.breaking-changes. Every downstream team subscribes to that topic using their own consumer group. When a message lands, their alerting system routes it to Slack, PagerDuty, email, or wherever their on-call lives.
# Conceptual shape — the notification handler as a strategy
class KafkaNotificationHandler:
def __init__(self, bootstrap_servers, topic):
self.topic = topic
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
def send(self, report: MigrationReport) -> None:
message = {
"schema": report.schema_name,
"from_version": report.old_version,
"to_version": report.new_version,
"breaking_fields": [c.field_name for c in report.breaking_changes],
"timestamp": datetime.now().isoformat(),
}
self.producer.send(
self.topic,
json.dumps(message).encode()
)
The key design insight here is the Strategy pattern. The NotificationBus shouldn’t care whether it’s talking to Kafka, Slack, PagerDuty, or a log file. You define a common send(report) interface, and the concrete implementation is swapped at configuration time — not hardcoded into the bus.
This also makes testing trivial: in your test suite you inject a MockHandler that appends calls to a list instead of hitting a real Kafka cluster. The bus doesn’t know the difference. We’ll see exactly that pattern in Part 4.
The full picture — end to end
Across three posts, we’ve now built the complete lifecycle. Here it is from first line of code to consumer notification:
- Define time:
class TradeSchema(ContractBase)triggersContractMeta.__new__. Fields are extracted, aSchemaVersionis registered, andvalidate()is injected as a closure. - Validation: Incoming data rows are checked with
TradeSchema.validate(row). Missing fields and type mismatches are errors. Extra fields are warnings.int → floatcoercion passes silently. - Change detected: A new schema version is defined.
SchemaDiffpulls both versions from the registry and runsgenerate_report(). Set arithmetic produces removed, added, and common field sets. - Rename heuristic: Before classifying removals and additions, the heuristic looks for type-matched pairs. Unambiguous matches are flagged as
FIELD_RENAMEDand consumed from the sets. Everything else is classified normally. - Report assembled:
MigrationReport.__post_init__splits changes into breaking and safe lists and setsis_breakingautomatically. - Consumers notified:
NotificationBus.notify(report)iterates subscribers, matches on schema name, and calls each handler. In production that handler publishes to a Kafka topic. In tests it writes to a list.
Try it yourself
git clone https://github.com/devminda/data-contracts
cd data-contracts
pip install -e ".[dev]"
python examples/trade_pipeline.py # runs the full end-to-end scenario
The examples/trade_pipeline.py file runs every concept from this post in about 60 lines — define two schema versions, validate data, diff the schemas, and notify consumers. It’s the fastest way to see it all working together.
What’s next — Part 4: testing everything
Part 4 is dedicated entirely to testing. There are some genuinely interesting challenges here that you don’t see in typical Python testing tutorials — how do you test a metaclass? How do you test code that runs at import time? How do you prevent global registry state from leaking between tests?
We’ll cover autouse fixtures, parametrized tests for every change type, testing the rename heuristic’s edge cases, and how to write tests that verify the absence of a false positive — which is harder than it sounds.