Building Robust Data Pipelines with Metadata: A Python MCP Pattern Demo
Data pipelines are the backbone of modern data systems, but they can often become complex and brittle. A common pain point is managing state and passing context between different processing stages. Relying on cryptic filename conventions or implicit directory structures often leads to errors, difficult debugging, and maintenance headaches.
What if there was a better way? What if we could explicitly pass instructions and track the state of our data directly alongside it? This is where metadata-driven pipelines come in.
This post explores a pattern for building more robust pipelines using explicit metadata files for context passing. It showcases a simple Python project demonstrating this technique, inspired by the Model Context Protocol (MCP) concept, to manage a file validation and loading workflow.
Find the complete project code on GitHub
The Problem: Fragile Pipeline Choreography
Imagine a typical scenario:
- Files arrive from different departments (Sales, Marketing).
- They need different validation rules applied.
- Validated files need to be loaded into different target locations (e.g., database tables).
Traditional approaches might involve:
- Filename Parsing:
SALES_Leads_US_20231101_validated.csv
- Prone to typos, hard to extend. - Directory Watching: Moving files between
/pending/
,/validated/
,/failed/
- State management becomes complex, race conditions possible. - Implicit Knowledge: Scripts "just know" that files starting with "SALES_" go to the sales table. This breaks as soon as requirements change.
These methods lack clear traceability and make debugging a nightmare when something inevitably goes wrong. Where did this file come from? What rules should have been applied? Why did it end up here?
The Solution: Metadata-Driven Context Passing
The core idea is simple: separate the context about the data from the data itself, and pass this context explicitly between pipeline stages.
We achieve this using a metadata sidecar file. For every data file (e.g., sales_data_xyz.csv
), we create a corresponding JSON file (sales_data_xyz.csv.mcp.json
) that acts as its passport or manifest.
Our "MCP" Implementation: The JSON Context File
Inspired by the concept of Model Context Protocols (providing context around an ML model), we use a structured JSON file. Note that this is a project-specific convention, not a formal, standardised I/O protocol. It's a pattern for information exchange.
Key fields in our .mcp.json
file include:
job_id
: A unique ID for this specific file's journey through the pipeline.source_context
: Information about the origin (e.g.,department: "Sales"
).processing_directives
: Instructions for downstream tasks (e.g.,validation_ruleset_id: "SALES_LEADS_V1"
,load_target_destination: "sales_leads_table"
).status_info
: Tracks thecurrent_status
("Uploaded", "Validated", "Loaded", "Failed", etc.) and maintains astatus_history
log.current_*_filepath
: Explicitly stores the current path of the data and MCP files as they move.
// Example MCP Structure (abbreviated)
{
"mcp_version": "1.0",
"job_id": "abc-123",
"current_data_filepath": "/path/to/incoming/sales_data_xyz.csv",
"current_mcp_filepath": "/path/to/incoming/sales_data_xyz.csv.mcp.json",
"source_context": { "department": "Sales", "file_type": "leads" },
"processing_directives": {
"validation_ruleset_id": "SALES_LEADS_V1",
"load_target_destination": "sales_leads_table"
},
"status_info": {
"current_status": "Uploaded",
"status_history": [
/* ... */
],
"error_message": null
}
}
Showcasing the Project:
To see this pattern in action, I built a simple Python project simulating a file processing workflow:
Project Structure Overview
metadata-driven-mcp-pipeline/
βββ archive/ # Final resting place for processed files
β βββ failed/ # Files that failed validation or loading
β βββ success/ # Files that were successfully processed and loaded
βββ config/ # Configuration files
β βββ validation_rules.json # Defines required CSV columns for different rulesets
βββ incoming/ # Landing zone for newly "uploaded" files (data + MCP)
βββ processing_loading/ # Staging area for files that passed validation, awaiting loading
βββ scripts/ # Python scripts driving the workflow
βββ __init__.py # Makes 'scripts' a Python package
βββ common_utils.py # Helper functions for MCP I/O, status updates, file moves
βββ create_upload.py # Simulates file upload, creates initial data + MCP
βββ data_generator.py # Generates dummy CSV data files
βββ load_files.py # Performs the (simulated) loading stage
βββ validate_files.py # Performs the validation stage
Workflow Stages & MCP Interaction
-
Upload (
create_upload.py
):- Simulates an upload (e.g., from "Sales").
- Creates a dummy data CSV (
data_generator.py
). - Generates the initial
.mcp.json
file inincoming/
. - Crucially, it populates the
source_context
and determines the correctprocessing_directives
(likevalidation_ruleset_id
) based on the source department. - Sets the initial
current_status
to "Uploaded" and logs this instatus_history
.
-
Validation (
validate_files.py
):- Scans
incoming/
for MCP files withcurrent*status == "Uploaded"
. - Reads the
validation_ruleset_id
from the MCP file. - Loads corresponding rules from
config/validation_rules.json
. - Performs validation (here, just checking CSV headers).
- Updates the MCP: Sets
current_status
to "Validated" or "ValidationFailed" (with error details). - Moves Files: Relocates both data and MCP files to
processing_loading/
(on success) orarchive/failed/
(on failure). - Updates Paths: Critically, updates the
current_*_filepath
fields within the MCP before saving it in the new location.
- Scans
-
Loading (
load_files.py
):- Scans
processing_loading/
for MCP files withcurrent_status == "Validated"
. - Reads the
load_target_destination
from the MCP file. - Performs a simulated load operation based on the target.
- Updates the MCP: Sets
current_status
to "Loaded" or "LoadFailed". - Moves Files: Relocates both files to
archive/success/
orarchive/failed/
. - Updates Paths & Saves: Saves the final MCP state in the archive directory.
- Scans
Benefits Observed
- Clarity: Each stage knows exactly what to do based on explicit instructions in the MCP.
- Configuration: Validation rules are externalised, not hardcoded.
- Traceability: The status_history and error_message in the final MCP provide a clear audit trail.
- State Management: Combining MCP status and dedicated directories provides robust state tracking.
- Decoupling: Stages operate independently based on the MCP contract.
Why Adopt This Pattern?
- Using metadata files for context passing offers significant advantages:
- Robustness: Less prone to errors caused by implicit conventions.
- Maintainability: Easier to modify or extend individual stages or add new rules.
- Testability: Stages can be tested more easily by providing mock MCP files.
- Observability: Explicit state and history simplify monitoring and debugging.
- Scalability: While our example is sequential, this pattern lends itself well to parallel processing managed by orchestrators, where each file's context travels with it independently.
Limitations & Next Steps
This project is a simplified demonstration. Real-world implementations could be enhanced by:
- Schema Validation: Validating the structure of the MCP JSON itself (e.g., using JSON Schema).
- Richer Validation: Implementing more complex data validation (data types, formats, cross-field checks) using libraries like Pandera or Great Expectations.
- Real Loading: Interacting with actual databases, APIs, or storage systems.
- Parallelism/Orchestration: Integrating these scripts into tools like Airflow, Prefect, or Dagster to manage execution, dependencies, retries, and enable parallel processing.
Conclusion
Moving away from brittle, implicit pipeline conventions towards explicit, metadata-driven context passing can dramatically improve the robustness, traceability, and maintainability of data workflows. While the "MCP" concept often relates to ML models, the underlying pattern of using structured metadata sidecar files is a valuable technique for many data engineering challenges. Our simple Python example demonstrates the core principles and benefits of this approach.
Consider giving this pattern a try in your next pipeline project! Check out the code and experiment.
Link to Repository: https://github.com/scriptstar/metadata-driven-mcp-pipeline
Post Tags: