Pipeline¶
Data workflows and DAGs - orchestrating data transformations
Overview¶
The Pipeline entity represents data workflows, DAGs (Directed Acyclic Graphs), or data pipelines that orchestrate data processing tasks. Pipelines define the sequence and dependencies of tasks that move and transform data across systems.
Hierarchy:
graph LR
SVC[Pipeline Service] --> PIPE[Pipeline]
PIPE --> TASK[Task]
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style TASK fill:#00f2fe,color:#333 Schema Specifications¶
View the complete Pipeline schema in your preferred format:
Complete JSON Schema Definition
{
"$id": "https://open-metadata.org/schema/entity/data/pipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Pipeline",
"description": "A `Pipeline` entity represents a workflow or DAG that orchestrates data processing tasks.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.Pipeline",
"definitions": {
"pipelineStatus": {
"description": "Pipeline execution status",
"type": "string",
"enum": [
"Successful", "Failed", "Pending", "Running",
"Stopped", "Skipped", "UpForRetry", "Queued"
]
},
"scheduleInterval": {
"description": "Pipeline schedule",
"type": "object",
"properties": {
"scheduleExpression": {
"description": "Cron or rate expression",
"type": "string"
},
"startDate": {
"description": "Schedule start date",
"type": "string",
"format": "date-time"
},
"endDate": {
"description": "Schedule end date",
"type": "string",
"format": "date-time"
}
}
}
},
"properties": {
"id": {
"description": "Unique identifier",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Pipeline name",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"fullyQualifiedName": {
"description": "Fully qualified name: service.pipeline",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"displayName": {
"description": "Display name",
"type": "string"
},
"description": {
"description": "Markdown description",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"pipelineUrl": {
"description": "URL to pipeline in orchestration tool",
"type": "string",
"format": "uri"
},
"sourceUrl": {
"description": "URL to pipeline source code",
"type": "string",
"format": "uri"
},
"tasks": {
"description": "Pipeline tasks",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
}
},
"scheduleInterval": {
"$ref": "#/definitions/scheduleInterval"
},
"pipelineStatus": {
"$ref": "#/definitions/pipelineStatus"
},
"startDate": {
"description": "Pipeline creation/start date",
"type": "string",
"format": "date-time"
},
"concurrency": {
"description": "Maximum concurrent runs",
"type": "integer"
},
"pipelineLocation": {
"description": "Pipeline code location",
"type": "string"
},
"service": {
"description": "Pipeline service",
"$ref": "../../type/entityReference.json"
},
"owner": {
"description": "Owner (user or team)",
"$ref": "../../type/entityReference.json"
},
"domain": {
"description": "Data domain",
"$ref": "../../type/entityReference.json"
},
"tags": {
"description": "Classification tags",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
}
},
"glossaryTerms": {
"description": "Business glossary terms",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
}
},
"version": {
"description": "Metadata version",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
}
},
"required": ["id", "name", "service"]
}
RDF/OWL Ontology Definition
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# Pipeline Class Definition
om:Pipeline a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Pipeline" ;
rdfs:comment "A workflow or DAG that orchestrates data processing tasks" ;
om:hierarchyLevel 2 .
# Properties
om:pipelineName a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "name" ;
rdfs:comment "Name of the pipeline" .
om:fullyQualifiedName a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "fullyQualifiedName" ;
rdfs:comment "Complete hierarchical name: service.pipeline" .
om:pipelineUrl a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:anyURI ;
rdfs:label "pipelineUrl" ;
rdfs:comment "URL to pipeline in orchestration tool" .
om:scheduleInterval a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "scheduleInterval" ;
rdfs:comment "Cron or rate expression for pipeline schedule" .
om:pipelineStatus a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:PipelineStatus ;
rdfs:label "pipelineStatus" ;
rdfs:comment "Current execution status" .
om:hasTask a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Task ;
rdfs:label "hasTask" ;
rdfs:comment "Tasks in this pipeline" .
om:belongsToPipelineService a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:PipelineService ;
rdfs:label "belongsToService" ;
rdfs:comment "Service managing this pipeline" .
om:pipelineOwnedBy a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Owner ;
rdfs:label "ownedBy" ;
rdfs:comment "User or team that owns this pipeline" .
om:pipelineHasTag a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Tag ;
rdfs:label "hasTag" ;
rdfs:comment "Classification tags applied to pipeline" .
om:pipelineLinkedToGlossaryTerm a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:GlossaryTerm ;
rdfs:label "linkedToGlossaryTerm" ;
rdfs:comment "Business glossary terms" .
# Pipeline Status Enumeration
om:PipelineStatus a owl:Class ;
owl:oneOf (
om:Successful
om:Failed
om:Pending
om:Running
om:Stopped
) .
# Example Instance
ex:customerEtlPipeline a om:Pipeline ;
om:pipelineName "customer_etl" ;
om:fullyQualifiedName "airflow_prod.customer_etl" ;
om:displayName "Customer ETL Pipeline" ;
om:scheduleInterval "0 2 * * *" ;
om:pipelineStatus om:Successful ;
om:belongsToPipelineService ex:airflowProdService ;
om:pipelineOwnedBy ex:dataEngTeam ;
om:pipelineHasTag ex:tierGold ;
om:hasTask ex:extractCustomersTask ;
om:hasTask ex:transformCustomersTask ;
om:hasTask ex:loadCustomersTask .
JSON-LD Context and Example
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"Pipeline": "om:Pipeline",
"name": {
"@id": "om:pipelineName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"pipelineUrl": {
"@id": "om:pipelineUrl",
"@type": "xsd:anyURI"
},
"scheduleInterval": {
"@id": "om:scheduleInterval"
},
"pipelineStatus": {
"@id": "om:pipelineStatus",
"@type": "@vocab"
},
"tasks": {
"@id": "om:hasTask",
"@type": "@id",
"@container": "@list"
},
"service": {
"@id": "om:belongsToPipelineService",
"@type": "@id"
},
"owner": {
"@id": "om:pipelineOwnedBy",
"@type": "@id"
},
"domain": {
"@id": "om:inDomain",
"@type": "@id"
},
"tags": {
"@id": "om:pipelineHasTag",
"@type": "@id",
"@container": "@set"
},
"glossaryTerms": {
"@id": "om:pipelineLinkedToGlossaryTerm",
"@type": "@id",
"@container": "@set"
}
}
}
Example JSON-LD Instance:
{
"@context": "https://open-metadata.org/context/pipeline.jsonld",
"@type": "Pipeline",
"@id": "https://example.com/pipelines/customer_etl",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"description": "Daily ETL pipeline for customer data",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"sourceUrl": "https://github.com/company/pipelines/blob/main/dags/customer_etl.py",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"startDate": "2024-01-01T00:00:00Z"
},
"pipelineStatus": "Successful",
"concurrency": 1,
"service": {
"@id": "https://example.com/services/airflow_prod",
"@type": "PipelineService",
"name": "airflow_prod"
},
"owner": {
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "data-engineering",
"displayName": "Data Engineering"
},
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Gold",
"tagFQN": "Tier.Gold"
},
{
"@id": "https://open-metadata.org/tags/Schedule/Daily",
"tagFQN": "Schedule.Daily"
}
],
"glossaryTerms": [
{
"@id": "https://example.com/glossary/ETL",
"@type": "GlossaryTerm",
"fullyQualifiedName": "BusinessGlossary.ETL"
}
],
"tasks": [
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/extract_customers",
"name": "extract_customers"
},
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/transform_customers",
"name": "transform_customers"
},
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/load_customers",
"name": "load_customers"
}
]
}
Use Cases¶
- Document ETL/ELT workflows and data pipelines
- Track pipeline schedules and execution history
- Monitor pipeline success rates and failures
- Capture data lineage through pipelines
- Define pipeline ownership and responsibilities
- Apply governance tags to critical pipelines
- Link pipelines to business processes
- Audit data transformation logic
JSON Schema Specification¶
Core Properties¶
id (uuid)¶
Type: string (UUID format) Required: Yes (system-generated) Description: Unique identifier for this pipeline instance
name (entityName)¶
Type: string Required: Yes Pattern: ^[^.]*$ (no dots allowed) Min Length: 1 Max Length: 256 Description: Name of the pipeline (unqualified)
fullyQualifiedName (fullyQualifiedEntityName)¶
Type: string Required: Yes (system-generated) Pattern: ^((?!::).)*$ Description: Fully qualified name in the format service.pipeline
displayName¶
Type: string Required: No Description: Human-readable display name
description (markdown)¶
Type: string (Markdown format) Required: No Description: Rich text description of the pipeline's purpose and logic
{
"description": "# Customer ETL Pipeline\n\nDaily pipeline that extracts customer data from MongoDB, transforms it, and loads into PostgreSQL.\n\n## Schedule\n- Runs daily at 2 AM UTC\n- Average duration: 45 minutes\n\n## Data Flow\n1. Extract from MongoDB `customers` collection\n2. Deduplicate and validate\n3. Enrich with geographic data\n4. Load to PostgreSQL `public.customers` table"
}
Pipeline Configuration Properties¶
pipelineUrl (URI)¶
Type: string (URI format) Required: No Description: URL to view pipeline in orchestration tool
sourceUrl (URI)¶
Type: string (URI format) Required: No Description: URL to pipeline source code repository
scheduleInterval (ScheduleInterval)¶
Type: object Required: No Description: Pipeline execution schedule
ScheduleInterval Object Properties:
| Property | Type | Required | Description |
|---|---|---|---|
scheduleExpression | string | No | Cron expression or rate |
startDate | string (ISO 8601) | No | Schedule start date |
endDate | string (ISO 8601) | No | Schedule end date |
Example - Cron Schedule:
Example - Rate Expression:
{
"scheduleInterval": {
"scheduleExpression": "@hourly",
"startDate": "2024-01-01T00:00:00Z",
"endDate": "2024-12-31T23:59:59Z"
}
}
pipelineStatus (PipelineStatus enum)¶
Type: string enum Required: No (system-populated from latest run) Allowed Values:
Successful- Last run completed successfullyFailed- Last run failedPending- Waiting to startRunning- Currently executingStopped- Manually stoppedSkipped- Skipped executionUpForRetry- Failed, waiting for retryQueued- In execution queue
concurrency (integer)¶
Type: integer Required: No Description: Maximum number of concurrent pipeline runs
pipelineLocation (string)¶
Type: string Required: No Description: File path or location of pipeline definition
startDate (timestamp)¶
Type: string (ISO 8601 date-time) Required: No Description: Pipeline creation or first run date
Structure Properties¶
tasks[] (Task[])¶
Type: array of EntityReference Required: No Description: List of tasks in the pipeline
{
"tasks": [
{
"id": "3b4c5d6e-7f8a-9b0c-1d2e-3f4a5b6c7d8e",
"type": "task",
"name": "extract_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.extract_customers"
},
{
"id": "4c5d6e7f-8a9b-0c1d-2e3f-4a5b6c7d8e9f",
"type": "task",
"name": "transform_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.transform_customers"
},
{
"id": "5d6e7f8a-9b0c-1d2e-3f4a-5b6c7d8e9f0a",
"type": "task",
"name": "load_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.load_customers"
}
]
}
Location Properties¶
service (EntityReference)¶
Type: object Required: Yes Description: Reference to parent pipeline service
{
"service": {
"id": "1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
"type": "pipelineService",
"name": "airflow_prod",
"fullyQualifiedName": "airflow_prod"
}
}
Governance Properties¶
owner (EntityReference)¶
Type: object Required: No Description: User or team that owns this pipeline
{
"owner": {
"id": "6e7f8a9b-0c1d-2e3f-4a5b-6c7d8e9f0a1b",
"type": "team",
"name": "data-engineering",
"displayName": "Data Engineering Team"
}
}
domain (EntityReference)¶
Type: object Required: No Description: Data domain this pipeline belongs to
{
"domain": {
"id": "7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c",
"type": "domain",
"name": "CustomerData",
"fullyQualifiedName": "CustomerData"
}
}
tags[] (TagLabel[])¶
Type: array Required: No Description: Classification tags applied to the pipeline
{
"tags": [
{
"tagFQN": "Tier.Gold",
"description": "Critical production pipeline",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
},
{
"tagFQN": "Schedule.Daily",
"source": "Classification",
"labelType": "Automated",
"state": "Confirmed"
}
]
}
glossaryTerms[] (GlossaryTerm[])¶
Type: array Required: No Description: Business glossary terms linked to this pipeline
{
"glossaryTerms": [
{
"fullyQualifiedName": "BusinessGlossary.ETL"
},
{
"fullyQualifiedName": "BusinessGlossary.CustomerData"
}
]
}
Lineage Properties¶
upstream (EntityLineage)¶
Type: array Required: No (system-populated) Description: Upstream data assets (tables, files, etc.)
{
"upstream": [
{
"id": "source-table-uuid",
"type": "table",
"name": "customers",
"fullyQualifiedName": "mongodb_prod.crm.customers"
}
]
}
downstream (EntityLineage)¶
Type: array Required: No (system-populated) Description: Downstream data assets
{
"downstream": [
{
"id": "target-table-uuid",
"type": "table",
"name": "customers",
"fullyQualifiedName": "postgres_prod.ecommerce.public.customers"
}
]
}
Versioning Properties¶
version (entityVersion)¶
Type: number Required: Yes (system-managed) Description: Metadata version number, incremented on changes
updatedAt (timestamp)¶
Type: integer (Unix epoch milliseconds) Required: Yes (system-managed) Description: Last update timestamp
updatedBy (string)¶
Type: string Required: Yes (system-managed) Description: User who made the update
changeDescription (ChangeDescription)¶
Type: object Required: No Description: Details of what changed in this version
{
"changeDescription": {
"fieldsAdded": [
{
"name": "tasks.data_quality_check",
"newValue": "Task for validating data quality"
}
],
"fieldsUpdated": [
{
"name": "scheduleInterval.scheduleExpression",
"oldValue": "0 1 * * *",
"newValue": "0 2 * * *"
}
],
"fieldsDeleted": [],
"previousVersion": 3.0
}
}
Complete Example¶
{
"id": "2a3b4c5d-6e7f-8a9b-0c1d-2e3f4a5b6c7d",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"description": "# Customer ETL Pipeline\n\nDaily pipeline that extracts customer data from MongoDB, transforms it, and loads into PostgreSQL.",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"sourceUrl": "https://github.com/company/pipelines/blob/main/dags/customer_etl.py",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"startDate": "2024-01-01T00:00:00Z"
},
"pipelineStatus": "Successful",
"concurrency": 1,
"startDate": "2024-01-01T00:00:00Z",
"tasks": [
{
"id": "3b4c5d6e-7f8a-9b0c-1d2e-3f4a5b6c7d8e",
"type": "task",
"name": "extract_customers"
},
{
"id": "4c5d6e7f-8a9b-0c1d-2e3f-4a5b6c7d8e9f",
"type": "task",
"name": "transform_customers"
},
{
"id": "5d6e7f8a-9b0c-1d2e-3f4a-5b6c7d8e9f0a",
"type": "task",
"name": "load_customers"
}
],
"service": {
"id": "1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
"type": "pipelineService",
"name": "airflow_prod"
},
"owner": {
"id": "6e7f8a9b-0c1d-2e3f-4a5b-6c7d8e9f0a1b",
"type": "team",
"name": "data-engineering"
},
"domain": {
"id": "7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c",
"type": "domain",
"name": "CustomerData"
},
"tags": [
{"tagFQN": "Tier.Gold"},
{"tagFQN": "Schedule.Daily"}
],
"glossaryTerms": [
{"fullyQualifiedName": "BusinessGlossary.ETL"}
],
"version": 3.1,
"updatedAt": 1704240000000,
"updatedBy": "john.doe"
}
RDF Representation¶
Ontology Class¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
om:Pipeline a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Pipeline" ;
rdfs:comment "A workflow or DAG that orchestrates data processing" ;
om:hasProperties [
om:name "string" ;
om:tasks "Task[]" ;
om:scheduleInterval "string" ;
om:service "PipelineService" ;
om:owner "Owner" ;
om:tags "Tag[]" ;
] .
Instance Example¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/pipelines/> .
ex:customer_etl a om:Pipeline ;
om:pipelineName "customer_etl" ;
om:fullyQualifiedName "airflow_prod.customer_etl" ;
om:displayName "Customer ETL Pipeline" ;
om:description "Daily pipeline that extracts customer data" ;
om:pipelineUrl "https://airflow.company.com/dags/customer_etl"^^xsd:anyURI ;
om:scheduleInterval "0 2 * * *" ;
om:pipelineStatus "Successful" ;
om:belongsToPipelineService ex:airflow_prod ;
om:pipelineOwnedBy ex:data_engineering_team ;
om:pipelineHasTag ex:tier_gold ;
om:hasTask ex:extract_customers ;
om:hasTask ex:transform_customers ;
om:hasTask ex:load_customers .
JSON-LD Context¶
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"Pipeline": "om:Pipeline",
"name": "om:pipelineName",
"fullyQualifiedName": "om:fullyQualifiedName",
"displayName": "om:displayName",
"description": "om:description",
"pipelineUrl": "om:pipelineUrl",
"scheduleInterval": "om:scheduleInterval",
"tasks": {
"@id": "om:hasTask",
"@type": "@id",
"@container": "@list"
},
"service": {
"@id": "om:belongsToPipelineService",
"@type": "@id"
},
"owner": {
"@id": "om:pipelineOwnedBy",
"@type": "@id"
},
"tags": {
"@id": "om:pipelineHasTag",
"@type": "@id",
"@container": "@set"
}
}
}
JSON-LD Example¶
{
"@context": "https://open-metadata.org/context/pipeline.jsonld",
"@type": "Pipeline",
"@id": "https://example.com/pipelines/customer_etl",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *"
},
"service": {
"@id": "https://example.com/services/airflow_prod",
"@type": "PipelineService"
},
"owner": {
"@id": "https://example.com/teams/data-engineering",
"@type": "Team"
},
"tags": [
{"@id": "https://open-metadata.org/tags/Tier/Gold"}
]
}
Relationships¶
Pipeline has comprehensive relationships with entities across the metadata platform:
graph TD
subgraph Hierarchy
SVC[PipelineService<br/>airflow_prod]
SVC --> PIPE[Pipeline<br/>customer_etl]
end
subgraph Tasks
PIPE --> TASK1[Task<br/>extract_customers]
PIPE --> TASK2[Task<br/>transform_customers]
PIPE --> TASK3[Task<br/>load_customers]
PIPE --> TASK4[Task<br/>data_quality_check]
TASK1 --> TASK2
TASK2 --> TASK3
TASK3 --> TASK4
end
subgraph Source Tables
SRC1[Table<br/>mongodb.crm.customers] -.->|extracted by| PIPE
SRC2[Table<br/>postgres.raw.customer_data] -.->|read by| PIPE
end
subgraph Target Tables
PIPE -.->|loads to| TGT1[Table<br/>warehouse.customers]
PIPE -.->|creates| TGT2[Table<br/>warehouse.customer_summary]
PIPE -.->|updates| TGT3[Table<br/>warehouse.customer_metrics]
end
subgraph Downstream
TGT1 -.->|feeds| DASH[Dashboard<br/>Customer Analytics]
TGT1 -.->|trains| ML[MLModel<br/>churn_predictor]
TGT2 -.->|consumed by| API[ApiEndpoint<br/>customer_stats]
end
subgraph Ownership
PIPE -.->|owned by| TEAM[Team<br/>Data Engineering]
PIPE -.->|owned by| USER[User<br/>etl.admin]
end
subgraph Governance
PIPE -.->|in domain| DOM[Domain<br/>Customer Data]
PIPE -.->|tagged| TAG1[Tag<br/>Tier.Gold]
PIPE -.->|tagged| TAG2[Tag<br/>Schedule.Daily]
PIPE -.->|tagged| TAG3[Tag<br/>Production]
PIPE -.->|linked to| GT[GlossaryTerm<br/>ETL Process]
end
subgraph Quality
TC1[TestCase<br/>execution_success] -.->|validates| PIPE
TC2[TestCase<br/>data_freshness] -.->|monitors| TGT1
TC3[TestCase<br/>row_count_check] -.->|validates| TGT1
TC4[TestCase<br/>schema_validation] -.->|checks| TGT2
end
subgraph Monitoring
MON[Dashboard<br/>Pipeline Monitor] -.->|tracks| PIPE
ALERT[Alert<br/>failure_notification] -.->|watches| PIPE
LOG[Logs<br/>execution_history] -.->|records| PIPE
end
subgraph Configuration
REPO[Repository<br/>github.com/pipelines] -.->|source code| PIPE
SCHED[Schedule<br/>Daily at 2 AM UTC] -.->|triggers| PIPE
CONFIG[Config<br/>pipeline_config.yaml] -.->|configures| PIPE
end
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style TASK1 fill:#00f2fe,color:#333
style TASK2 fill:#00f2fe,color:#333
style TASK3 fill:#00f2fe,color:#333
style TASK4 fill:#00f2fe,color:#333
style SRC1 fill:#764ba2,color:#fff
style SRC2 fill:#764ba2,color:#fff
style TGT1 fill:#764ba2,color:#fff
style TGT2 fill:#764ba2,color:#fff
style TGT3 fill:#764ba2,color:#fff
style DASH fill:#ff6b6b,color:#fff
style ML fill:#ff6b6b,color:#fff
style API fill:#ff6b6b,color:#fff
style TEAM fill:#43e97b,color:#fff
style USER fill:#43e97b,color:#fff
style DOM fill:#fa709a,color:#fff
style TAG1 fill:#f093fb,color:#fff
style TAG2 fill:#f093fb,color:#fff
style TAG3 fill:#f093fb,color:#fff
style GT fill:#ffd700,color:#333
style TC1 fill:#9b59b6,color:#fff
style TC2 fill:#9b59b6,color:#fff
style TC3 fill:#9b59b6,color:#fff
style TC4 fill:#9b59b6,color:#fff
style MON fill:#00ac69,color:#fff
style ALERT fill:#00ac69,color:#fff
style LOG fill:#00ac69,color:#fff
style REPO fill:#f5576c,color:#fff
style SCHED fill:#f5576c,color:#fff
style CONFIG fill:#f5576c,color:#fff Relationship Types:
- Solid lines (→): Hierarchical containment (Service contains Pipeline, Pipeline contains Tasks)
- Dashed lines (-.->): References and associations (ownership, governance, lineage)
Parent Entities¶
- PipelineService: The service managing this pipeline
Child Entities¶
- Task: Individual tasks/steps within the pipeline
Associated Entities¶
- Owner: User or team owning this pipeline
- Domain: Business domain assignment
- Tag: Classification tags
- GlossaryTerm: Business terminology
- Table: Source tables (upstream) and target tables (downstream)
- Dashboard: Dashboards consuming pipeline outputs or monitoring pipeline health
- MLModel: ML models trained on pipeline outputs
- ApiEndpoint: APIs serving pipeline outputs
- TestCase: Pipeline execution, data quality, and schema validation tests
- Alert: Failure notifications and monitoring alerts
Custom Properties¶
This entity supports custom properties through the extension field. Common custom properties include:
- Data Classification: Sensitivity level
- Cost Center: Billing allocation
- Retention Period: Data retention requirements
- Application Owner: Owning application/team
See Custom Properties for details on defining and using custom properties.
Followers¶
Users can follow pipelines to receive notifications about configuration changes, task modifications, and schedule updates. See Followers for details.
API Operations¶
All Pipeline operations are available under the /v1/pipelines endpoint.
List Pipelines¶
Get a list of pipelines, optionally filtered by service.
GET /v1/pipelines
Query Parameters:
- fields: Fields to include (tasks, tags, owner, lineage, pipelineStatus, etc.)
- service: Filter by pipeline service name
- limit: Number of results (1-1000000, default 10)
- before/after: Cursor-based pagination
- include: all | deleted | non-deleted (default: non-deleted)
Response: PipelineList
Create Pipeline¶
Create a new pipeline under a pipeline service.
POST /v1/pipelines
Content-Type: application/json
{
"name": "customer_etl",
"service": "airflow_prod",
"description": "Customer data ETL pipeline",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"scheduleType": "CRON"
},
"tasks": [
{
"name": "extract_customers",
"taskType": "PythonOperator",
"description": "Extract customer data from source"
},
{
"name": "transform_data",
"taskType": "SparkSubmitOperator",
"downstreamTasks": ["extract_customers"]
},
{
"name": "load_to_warehouse",
"taskType": "PostgresOperator",
"downstreamTasks": ["transform_data"]
}
]
}
Response: Pipeline
Get Pipeline by Name¶
Get a pipeline by its fully qualified name.
GET /v1/pipelines/name/{fqn}
Query Parameters:
- fields: Fields to include (tasks, tags, owner, lineage, etc.)
- include: all | deleted | non-deleted
Example:
GET /v1/pipelines/name/airflow_prod.customer_etl?fields=tasks,tags,owner,lineage,pipelineStatus
Response: Pipeline
Get Pipeline by ID¶
Get a pipeline by its unique identifier.
GET /v1/pipelines/{id}
Query Parameters:
- fields: Fields to include
- include: all | deleted | non-deleted
Response: Pipeline
Update Pipeline¶
Update a pipeline using JSON Patch.
PATCH /v1/pipelines/name/{fqn}
Content-Type: application/json-patch+json
[
{"op": "replace", "path": "/scheduleInterval/scheduleExpression", "value": "0 3 * * *"},
{"op": "add", "path": "/tags/-", "value": {"tagFQN": "Critical"}},
{"op": "replace", "path": "/description", "value": "Updated ETL pipeline"}
]
Response: Pipeline
Create or Update Pipeline¶
Create a new pipeline or update if it exists.
PUT /v1/pipelines
Content-Type: application/json
{
"name": "orders_pipeline",
"service": "airflow_prod",
"scheduleInterval": {...},
"tasks": [...]
}
Response: Pipeline
Delete Pipeline¶
Delete a pipeline by fully qualified name.
DELETE /v1/pipelines/name/{fqn}
Query Parameters:
- hardDelete: Permanently delete (default: false)
Response: 200 OK
Update Pipeline Tasks¶
Update tasks in a pipeline.
PUT /v1/pipelines/{id}/tasks
Content-Type: application/json
{
"tasks": [
{
"name": "data_quality_check",
"taskType": "PythonOperator",
"description": "Run data quality tests"
}
]
}
Response: Pipeline
Get Pipeline Status¶
Get the execution status of a pipeline.
GET /v1/pipelines/{id}/pipelineStatus
Response: PipelineStatus (latest runs, success/failure, execution time)
Update Pipeline Status¶
Update the status of a pipeline execution.
PUT /v1/pipelines/{id}/pipelineStatus
Content-Type: application/json
{
"executionStatus": "Successful",
"timestamp": 1700000000,
"executionDate": "2024-01-15"
}
Response: PipelineStatus
Get Pipeline Versions¶
Get all versions of a pipeline.
Follow Pipeline¶
Add a follower to a pipeline.
Get Followers¶
Get all followers of a pipeline.
Bulk Operations¶
Create or update multiple pipelines.
PUT /v1/pipelines/bulk
Content-Type: application/json
{
"entities": [...]
}
Response: BulkOperationResult
Related Documentation¶
- Pipeline Service - Service configuration
- Task - Task specification
- Lineage - Pipeline lineage tracking
- Data Quality - Testing pipeline outputs