Task¶
Pipeline execution units - individual steps in data workflows
Overview¶
The Task entity represents individual units of work within a pipeline. Tasks are the atomic operations that perform specific actions like extracting data, transforming records, loading to destinations, or running quality checks.
Hierarchy:
graph LR
SVC[Pipeline Service] --> PIPE[Pipeline]
PIPE --> TASK[Task]
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff
style TASK fill:#00f2fe,color:#333,stroke:#4c51bf,stroke-width:3px Schema Specifications¶
View the complete Task schema in your preferred format:
Complete JSON Schema Definition
This schema is defined within the Pipeline schema at definitions.task:
{
"task": {
"type": "object",
"javaType": "org.openmetadata.schema.type.Task",
"properties": {
"name": {
"description": "Name that identifies this task instance uniquely.",
"type": "string"
},
"displayName": {
"description": "Display Name that identifies this Task. It could be title or label from the pipeline services.",
"type": "string"
},
"fullyQualifiedName": {
"description": "A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.",
"type": "string"
},
"description": {
"description": "Description of this Task.",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"sourceUrl": {
"description": "Task URL to visit/manage. This URL points to respective pipeline service UI.",
"$ref": "../../type/basic.json#/definitions/sourceUrl"
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"type": "string"
},
"default": null
},
"taskType": {
"description": "Type of the Task. Usually refers to the class it implements.",
"type": "string"
},
"taskSQL": {
"description": "SQL used in the task. Can be used to determine the lineage.",
"$ref": "../../type/basic.json#/definitions/sqlQuery"
},
"startDate": {
"description": "start date for the task.",
"type": "string"
},
"endDate": {
"description": "end date for the task.",
"type": "string"
},
"tags": {
"description": "Tags for this task.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": []
},
"owners": {
"description": "Owners of this task.",
"$ref": "../../type/entityReferenceList.json"
}
},
"required": ["name"],
"additionalProperties": false
}
}
RDF/OWL Ontology Definition
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# Task Class Definition
om:Task a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Task" ;
rdfs:comment "A single unit of work within a pipeline workflow" ;
om:hierarchyLevel 3 .
# Properties
om:taskName a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "name" ;
rdfs:comment "Name that identifies this task instance uniquely" .
om:displayName a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "displayName" ;
rdfs:comment "Display name from the pipeline services" .
om:fullyQualifiedName a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "fullyQualifiedName" ;
rdfs:comment "Complete hierarchical name: service.pipeline.task" .
om:description a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "description" ;
rdfs:comment "Description of this task" .
om:sourceUrl a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:anyURI ;
rdfs:label "sourceUrl" ;
rdfs:comment "Task URL to visit/manage in pipeline service UI" .
om:taskType a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "taskType" ;
rdfs:comment "Type of the task - usually refers to the class it implements" .
om:taskSQL a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "taskSQL" ;
rdfs:comment "SQL used in the task - can be used to determine lineage" .
om:startDate a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "startDate" ;
rdfs:comment "Start date for the task" .
om:endDate a owl:DatatypeProperty ;
rdfs:domain om:Task ;
rdfs:range xsd:string ;
rdfs:label "endDate" ;
rdfs:comment "End date for the task" .
om:hasDownstreamTask a owl:ObjectProperty ;
rdfs:domain om:Task ;
rdfs:range om:Task ;
rdfs:label "hasDownstreamTask" ;
rdfs:comment "All the tasks that are downstream of this task" .
om:taskOwnedBy a owl:ObjectProperty ;
rdfs:domain om:Task ;
rdfs:range om:Owner ;
rdfs:label "ownedBy" ;
rdfs:comment "Owners of this task" .
om:taskHasTag a owl:ObjectProperty ;
rdfs:domain om:Task ;
rdfs:range om:Tag ;
rdfs:label "hasTag" ;
rdfs:comment "Tags for this task" .
# Example Instance
ex:extractCustomersTask a om:Task ;
om:taskName "extract_customers" ;
om:fullyQualifiedName "airflow_prod.customer_etl.extract_customers" ;
om:displayName "Extract Customers" ;
om:description "Extracts customer records from MongoDB production cluster" ;
om:taskType "Python" ;
om:sourceUrl "https://airflow.company.com/dags/customer_etl/tasks/extract_customers"^^xsd:anyURI ;
om:hasDownstreamTask ex:transformCustomersTask ;
om:taskOwnedBy ex:dataEngTeam ;
om:taskHasTag ex:tierGold .
JSON-LD Context and Example
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"Task": "om:Task",
"name": {
"@id": "om:taskName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"sourceUrl": {
"@id": "om:sourceUrl",
"@type": "xsd:anyURI"
},
"taskType": {
"@id": "om:taskType",
"@type": "xsd:string"
},
"taskSQL": {
"@id": "om:taskSQL",
"@type": "xsd:string"
},
"startDate": {
"@id": "om:startDate",
"@type": "xsd:string"
},
"endDate": {
"@id": "om:endDate",
"@type": "xsd:string"
},
"downstreamTasks": {
"@id": "om:hasDownstreamTask",
"@type": "@id",
"@container": "@set"
},
"owners": {
"@id": "om:taskOwnedBy",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:taskHasTag",
"@type": "@id",
"@container": "@set"
}
}
}
Example JSON-LD Instance:
{
"@context": "https://open-metadata.org/context/task.jsonld",
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/extract_customers",
"name": "extract_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.extract_customers",
"displayName": "Extract Customers from MongoDB",
"description": "Extracts customer records from MongoDB and stages them for transformation",
"taskType": "Python",
"sourceUrl": "https://airflow.company.com/dags/customer_etl/tasks/extract_customers",
"taskSQL": "SELECT * FROM customers WHERE status = 'active'",
"startDate": "2024-01-01",
"endDate": "2024-12-31",
"downstreamTasks": [
"transform_customers"
],
"owners": [
{
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "data-engineering",
"displayName": "Data Engineering"
}
],
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Gold",
"tagFQN": "Tier.Gold"
},
{
"@id": "https://open-metadata.org/tags/Operation/Extract",
"tagFQN": "Operation.Extract"
}
]
}
Use Cases¶
- Document individual pipeline task logic and dependencies
- Track task execution status and history
- Monitor task performance and duration
- Capture task-level lineage (tables/files read/written)
- Define task ownership and responsibilities
- Apply governance tags to sensitive operations
- Link tasks to business processes
- Audit data transformation steps
JSON Schema Specification¶
Core Properties¶
name¶
Type: string Required: Yes Description: Name that identifies this task instance uniquely
displayName¶
Type: string Required: No Description: Display name that identifies this task. It could be title or label from the pipeline services.
fullyQualifiedName¶
Type: string Required: No (system-generated) Description: A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'
description (markdown)¶
Type: string (Markdown format) Required: No Description: Description of this task
{
"description": "# Extract Customers Task\n\nExtracts customer records from MongoDB production cluster.\n\n## Logic\n1. Connect to MongoDB `crm` database\n2. Query `customers` collection with filter for active customers\n3. Stage results in XCom for downstream processing"
}
Task Configuration Properties¶
sourceUrl¶
Type: string (URI format - sourceUrl reference) Required: No Description: Task URL to visit/manage. This URL points to respective pipeline service UI.
taskType¶
Type: string Required: No Description: Type of the task. Usually refers to the class it implements.
taskSQL (sqlQuery)¶
Type: string Required: No Description: SQL used in the task. Can be used to determine the lineage.
{
"taskSQL": "SELECT \n customer_id,\n email,\n first_name,\n last_name,\n created_at\nFROM raw_customers\nWHERE status = 'active'\n AND updated_at >= CURRENT_DATE - INTERVAL '1 day'"
}
Dependency Properties¶
downstreamTasks[]¶
Type: array of strings Required: No Default: null Description: All the tasks that are downstream of this task
Timing Properties¶
startDate¶
Type: string Required: No Description: Start date for the task
endDate¶
Type: string Required: No Description: End date for the task
Governance Properties¶
owners (EntityReferenceList)¶
Type: EntityReferenceList Required: No Description: Owners of this task
{
"owners": [
{
"id": "8f9a0b1c-2d3e-4f5a-6b7c-8d9e0f1a2b3c",
"type": "team",
"name": "data-engineering",
"displayName": "Data Engineering Team"
}
]
}
tags[] (TagLabel[])¶
Type: array Required: No Default: [] Description: Tags for this task
{
"tags": [
{
"tagFQN": "Tier.Gold",
"description": "Critical task",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
},
{
"tagFQN": "Operation.Extract",
"source": "Classification",
"labelType": "Automated",
"state": "Confirmed"
}
]
}
Complete Example¶
{
"name": "extract_customers",
"displayName": "Extract Customers from MongoDB",
"fullyQualifiedName": "airflow_prod.customer_etl.extract_customers",
"description": "# Extract Customers Task\n\nExtracts customer records from MongoDB production cluster.",
"sourceUrl": "https://airflow.company.com/dags/customer_etl/tasks/extract_customers",
"taskType": "PythonOperator",
"taskSQL": "SELECT customer_id, email, first_name, last_name FROM customers WHERE status = 'active'",
"downstreamTasks": ["transform_customers", "data_quality_check"],
"startDate": "2024-01-01",
"endDate": "2024-12-31",
"owners": [
{
"id": "8f9a0b1c-2d3e-4f5a-6b7c-8d9e0f1a2b3c",
"type": "team",
"name": "data-engineering",
"displayName": "Data Engineering Team"
}
],
"tags": [
{"tagFQN": "Tier.Gold"},
{"tagFQN": "Operation.Extract"}
]
}
RDF Representation¶
Ontology Class¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
om:Task a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Task" ;
rdfs:comment "A single unit of work within a pipeline workflow" ;
om:hasProperties [
om:name "string" ;
om:displayName "string" ;
om:fullyQualifiedName "string" ;
om:description "string" ;
om:sourceUrl "anyURI" ;
om:taskType "string" ;
om:taskSQL "string" ;
om:startDate "string" ;
om:endDate "string" ;
om:downstreamTasks "string[]" ;
om:owners "EntityReferenceList" ;
om:tags "TagLabel[]" ;
] .
Instance Example¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/pipelines/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
ex:extract_customers a om:Task ;
om:taskName "extract_customers" ;
om:fullyQualifiedName "airflow_prod.customer_etl.extract_customers" ;
om:displayName "Extract Customers from MongoDB" ;
om:description "Extracts customer records from MongoDB" ;
om:sourceUrl "https://airflow.company.com/dags/customer_etl/tasks/extract_customers"^^xsd:anyURI ;
om:taskType "PythonOperator" ;
om:taskSQL "SELECT * FROM customers WHERE status = 'active'" ;
om:startDate "2024-01-01" ;
om:endDate "2024-12-31" ;
om:hasDownstreamTask ex:transform_customers ;
om:taskOwnedBy ex:data_engineering_team ;
om:taskHasTag ex:tier_gold ;
om:taskHasTag ex:operation_extract .
JSON-LD Context¶
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"Task": "om:Task",
"name": "om:taskName",
"fullyQualifiedName": "om:fullyQualifiedName",
"displayName": "om:displayName",
"description": "om:description",
"sourceUrl": {
"@id": "om:sourceUrl",
"@type": "xsd:anyURI"
},
"taskType": "om:taskType",
"taskSQL": "om:taskSQL",
"startDate": "om:startDate",
"endDate": "om:endDate",
"downstreamTasks": {
"@id": "om:hasDownstreamTask",
"@type": "@id",
"@container": "@set"
},
"owners": {
"@id": "om:taskOwnedBy",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:taskHasTag",
"@type": "@id",
"@container": "@set"
}
}
}
JSON-LD Example¶
{
"@context": "https://open-metadata.org/context/task.jsonld",
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/extract_customers",
"name": "extract_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.extract_customers",
"displayName": "Extract Customers from MongoDB",
"description": "Extracts customer records from MongoDB",
"sourceUrl": "https://airflow.company.com/dags/customer_etl/tasks/extract_customers",
"taskType": "PythonOperator",
"taskSQL": "SELECT * FROM customers WHERE status = 'active'",
"startDate": "2024-01-01",
"endDate": "2024-12-31",
"downstreamTasks": ["transform_customers"],
"owners": [
{
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "data-engineering"
}
],
"tags": [
{"@id": "https://open-metadata.org/tags/Tier/Gold"}
]
}
Relationships¶
Task has comprehensive relationships with entities across the metadata platform:
graph TD
subgraph Hierarchy
SVC[PipelineService<br/>airflow_prod]
PIPE[Pipeline<br/>customer_etl]
SVC --> PIPE
PIPE --> TASK[Task<br/>transform_customers]
end
subgraph Task Dependencies
TASK1[Task<br/>extract_customers] -->|upstream| TASK
TASK -->|downstream| TASK2[Task<br/>load_customers]
TASK -->|downstream| TASK3[Task<br/>data_quality_check]
end
subgraph Data Sources
SRC1[Table<br/>raw_customers] -.->|reads from| TASK
SRC2[Table<br/>customer_enrichment] -.->|joins| TASK
end
subgraph Data Targets
TASK -.->|writes to| TGT1[Table<br/>transformed_customers]
TASK -.->|creates| TGT2[Table<br/>customer_metrics]
end
subgraph Ownership
TASK -.->|owned by| TEAM[Team<br/>Data Engineering]
TASK -.->|owned by| USER[User<br/>pipeline.dev]
end
subgraph Governance
TASK -.->|in domain| DOM[Domain<br/>Customer Data]
TASK -.->|tagged| TAG1[Tag<br/>Transformation]
TASK -.->|tagged| TAG2[Tag<br/>Critical]
TASK -.->|linked to| GT[GlossaryTerm<br/>Data Transformation]
end
subgraph Quality
TC1[TestCase<br/>output_schema_check] -.->|validates| TGT1
TC2[TestCase<br/>row_count_validation] -.->|monitors| TASK
TC3[TestCase<br/>data_quality_check] -.->|tests| TGT1
end
subgraph Execution
STATUS[Status<br/>Success] -.->|tracks| TASK
LOG[Logs<br/>execution_logs] -.->|records| TASK
METRIC[Metrics<br/>duration: 45s] -.->|measures| TASK
end
subgraph Code
REPO[Repository<br/>github.com/pipelines] -.->|source code| TASK
CONFIG[Config<br/>transform_config.yaml] -.->|configures| TASK
SQL[SQL<br/>transformation.sql] -.->|defines logic| TASK
end
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff
style TASK fill:#00f2fe,color:#333,stroke:#4c51bf,stroke-width:3px
style TASK1 fill:#00f2fe,color:#333
style TASK2 fill:#00f2fe,color:#333
style TASK3 fill:#00f2fe,color:#333
style SRC1 fill:#764ba2,color:#fff
style SRC2 fill:#764ba2,color:#fff
style TGT1 fill:#764ba2,color:#fff
style TGT2 fill:#764ba2,color:#fff
style TEAM fill:#43e97b,color:#fff
style USER fill:#43e97b,color:#fff
style DOM fill:#fa709a,color:#fff
style TAG1 fill:#f093fb,color:#fff
style TAG2 fill:#f093fb,color:#fff
style GT fill:#ffd700,color:#333
style TC1 fill:#9b59b6,color:#fff
style TC2 fill:#9b59b6,color:#fff
style TC3 fill:#9b59b6,color:#fff
style STATUS fill:#00ac69,color:#fff
style LOG fill:#00ac69,color:#fff
style METRIC fill:#00ac69,color:#fff
style REPO fill:#f5576c,color:#fff
style CONFIG fill:#f5576c,color:#fff
style SQL fill:#f5576c,color:#fff Relationship Types:
- Solid lines (→): Hierarchical containment and task dependencies (Service → Pipeline → Task, Task → Task)
- Dashed lines (-.->): References and associations (ownership, governance, lineage)
Parent Entities¶
- Pipeline: The pipeline containing this task
- PipelineService: The service hosting the parent pipeline
Child Entities¶
- None (leaf node in hierarchy)
Associated Entities¶
- Owner: User or team owning this task
- Domain: Business domain assignment
- Tag: Classification tags
- GlossaryTerm: Business terminology
- Table: Source tables (reads) and target tables (writes)
- Task: Upstream and downstream task dependencies within the same pipeline
- TestCase: Output validation, data quality, and schema compliance tests
- Repository: Source code repository
- Config: Configuration files
Custom Properties¶
This entity supports custom properties through the extension field. Common custom properties include:
- Data Classification: Sensitivity level
- Cost Center: Billing allocation
- Retention Period: Data retention requirements
- Application Owner: Owning application/team
See Custom Properties for details on defining and using custom properties.
API Operations¶
Create Task¶
POST /api/v1/pipelines/{pipelineId}/tasks
Content-Type: application/json
{
"name": "extract_customers",
"taskType": "Python",
"downstreamTasks": ["transform_customers"]
}
Get Task¶
Update Task¶
PATCH /api/v1/pipelines/{pipelineId}/tasks/{taskId}
Content-Type: application/json-patch+json
[
{
"op": "replace",
"path": "/taskCode",
"value": "# Updated Python code"
}
]
Get Task Status¶
Get Task Lineage¶
Related Documentation¶
- Pipeline Service - Service configuration
- Pipeline - Pipeline specification
- Lineage - Task-level lineage tracking
- Data Quality - Testing task outputs