MessagingService¶
Event streaming and message broker platforms - the backbone of event-driven architecture
Overview¶
The MessagingService entity represents message broker and event streaming platforms like Apache Kafka, Amazon Kinesis, Apache Pulsar, RabbitMQ, and Google Pub/Sub. It serves as the top-level container for organizing topics, queues, and event streams.
Hierarchy:
graph LR
A[MessagingService] --> B[Topic]
B --> C[Schema]
style A fill:#667eea,color:#fff
style B fill:#4facfe,color:#fff
style C fill:#00f2fe,color:#333 Relationships¶
MessagingService has comprehensive relationships with entities across the metadata platform:
graph TD
subgraph Service Layer
A[MessagingService<br/>kafka_prod]
end
subgraph Topics & Schemas
A --> B1[Topic:<br/>customer.events]
A --> B2[Topic:<br/>order.events]
A --> B3[Topic:<br/>payment.events]
B1 --> C1[Schema:<br/>CustomerEvent v2<br/>Avro]
B1 --> C2[Partition: 0-9<br/>Replicas: 3]
B2 --> C3[Schema:<br/>OrderEvent v3<br/>JSON]
B2 --> C4[Partition: 0-15<br/>Replicas: 3]
B3 --> C5[Schema:<br/>PaymentEvent v1<br/>Protobuf]
B3 --> C6[Partition: 0-5<br/>Replicas: 5]
end
subgraph Ownership
A -.->|owned by| D1[Team:<br/>Data Platform]
A -.->|owned by| D2[User:<br/>kafka.admin]
end
subgraph Governance
A -.->|in domain| E[Domain:<br/>Event Streaming]
A -.->|has tags| F1[Tag:<br/>Production]
A -.->|has tags| F2[Tag:<br/>Real-time]
A -.->|has tags| F3[Tag:<br/>Tier.Gold]
end
subgraph Producers
G1[Pipeline:<br/>customer_cdc] -.->|produces to| B1
G2[Application:<br/>Order Service] -.->|produces to| B2
G3[Pipeline:<br/>payment_processor] -.->|produces to| B3
H1[Table:<br/>customers] -.->|CDC to| B1
H2[Table:<br/>orders] -.->|CDC to| B2
end
subgraph Consumers
I1[Pipeline:<br/>customer_analytics] -.->|consumes from| B1
I2[Pipeline:<br/>order_enrichment] -.->|consumes from| B2
I3[Application:<br/>Fraud Detection] -.->|consumes from| B3
B1 -.->|writes to| J1[Table:<br/>customer_events_history]
B2 -.->|writes to| J2[Table:<br/>order_events_history]
B3 -.->|writes to| J3[Table:<br/>payment_events_history]
end
subgraph Monitoring
K1[Dashboard:<br/>Kafka Monitoring] -.->|monitors| A
K2[Dashboard:<br/>Topic Lag Monitor] -.->|tracks| B1
K3[Dashboard:<br/>Event Analytics] -.->|analyzes| B2
end
subgraph Data Quality
L1[TestCase:<br/>schema_validation] -.->|validates| C1
L2[TestCase:<br/>message_lag_check] -.->|monitors| B1
L3[TestCase:<br/>partition_balance] -.->|checks| B2
end
style A fill:#667eea,color:#fff,stroke:#4c51bf,stroke-width:3px
style B1 fill:#4facfe,color:#fff
style B2 fill:#4facfe,color:#fff
style B3 fill:#4facfe,color:#fff
style C1 fill:#00f2fe,color:#333
style C2 fill:#00f2fe,color:#333
style C3 fill:#00f2fe,color:#333
style C4 fill:#00f2fe,color:#333
style C5 fill:#00f2fe,color:#333
style C6 fill:#00f2fe,color:#333
style D1 fill:#43e97b,color:#fff
style D2 fill:#43e97b,color:#fff
style E fill:#fa709a,color:#fff
style F1 fill:#f093fb,color:#fff
style F2 fill:#f093fb,color:#fff
style F3 fill:#f093fb,color:#fff
style G1 fill:#00ac69,color:#fff
style G2 fill:#00ac69,color:#fff
style G3 fill:#00ac69,color:#fff
style H1 fill:#764ba2,color:#fff
style H2 fill:#764ba2,color:#fff
style I1 fill:#f5576c,color:#fff
style I2 fill:#f5576c,color:#fff
style I3 fill:#f5576c,color:#fff
style J1 fill:#ff6b6b,color:#fff
style J2 fill:#ff6b6b,color:#fff
style J3 fill:#ff6b6b,color:#fff
style K1 fill:#ffd700,color:#333
style K2 fill:#ffd700,color:#333
style K3 fill:#ffd700,color:#333
style L1 fill:#9b59b6,color:#fff
style L2 fill:#9b59b6,color:#fff
style L3 fill:#9b59b6,color:#fff Relationship Types:
- Solid lines (→): Hierarchical containment (Service manages Topics, Topics have Schemas and Partitions)
- Dashed lines (-.->): References and associations (ownership, governance, producers, consumers, monitoring, quality)
Child Entities¶
- Pipeline: Ingestion pipelines for extracting topic configs and schemas
Associated Entities¶
- Owners: Users or teams owning this service (plural)
- Domains: Business domain assignments (plural)
- DataProducts: Data products this service is part of
- Followers: Users following this service
- Tags: Classification tags
- IngestionRunner: The ingestion agent responsible for executing pipelines
- TestConnectionResult: Connection test status and results
Schema Specifications¶
View the complete MessagingService schema in your preferred format:
Complete JSON Schema Definition
{
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Messaging Service",
"description": "This schema defines the Messaging Service entity, such as Kafka and Pulsar.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.services.MessagingService",
"javaInterfaces": [
"org.openmetadata.schema.EntityInterface",
"org.openmetadata.schema.ServiceEntityInterface"
],
"definitions": {
"messagingServiceType": {
"description": "Type of messaging service - Kafka or Pulsar.",
"type": "string",
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
"enum": ["Kafka", "Redpanda", "Kinesis", "CustomMessaging"],
"javaEnums": [
{ "name": "Kafka" },
{ "name": "Redpanda" },
{ "name": "Kinesis" },
{ "name": "CustomMessaging" }
]
},
"brokers": {
"description": "Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.",
"type": "array",
"items": {
"type": "string"
},
"default": null
},
"messagingConnection": {
"type": "object",
"javaType": "org.openmetadata.schema.type.MessagingConnection",
"description": "Dashboard Connection.",
"javaInterfaces": [
"org.openmetadata.schema.ServiceConnectionEntityInterface"
],
"properties": {
"config": {
"mask": true,
"oneOf": [
{ "$ref": "./connections/messaging/kafkaConnection.json" },
{ "$ref": "./connections/messaging/redpandaConnection.json" },
{ "$ref": "./connections/messaging/kinesisConnection.json" },
{ "$ref": "connections/messaging/customMessagingConnection.json" }
]
}
},
"additionalProperties": false
}
},
"properties": {
"id": {
"description": "Unique identifier of this messaging service instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Name that identifies this messaging service.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"fullyQualifiedName": {
"description": "FullyQualifiedName same as `name`.",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"serviceType": {
"description": "Type of messaging service such as Kafka or Pulsar...",
"$ref": "#/definitions/messagingServiceType"
},
"description": {
"description": "Description of a messaging service instance.",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"displayName": {
"description": "Display Name that identifies this messaging service. It could be title or label from the source services.",
"type": "string"
},
"connection": {
"$ref": "#/definitions/messagingConnection"
},
"pipelines": {
"description": "References to pipelines deployed for this messaging service to extract topic configs and schemas.",
"$ref": "../../type/entityReferenceList.json"
},
"testConnectionResult": {
"description": "Last test connection results for this service",
"$ref": "connections/testConnectionResult.json"
},
"tags": {
"description": "Tags for this Message Service.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": []
},
"version": {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy": {
"description": "User who made the update.",
"type": "string"
},
"impersonatedBy": {
"description": "Bot user that performed the action on behalf of the actual user.",
"$ref": "../../type/basic.json#/definitions/impersonatedBy"
},
"owners": {
"description": "Owners of this messaging service.",
"$ref": "../../type/entityReferenceList.json"
},
"href": {
"description": "Link to the resource corresponding to this messaging service.",
"$ref": "../../type/basic.json#/definitions/href"
},
"changeDescription": {
"description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"incrementalChangeDescription": {
"description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"deleted": {
"description": "When `true` indicates the entity has been soft deleted.",
"type": "boolean",
"default": false
},
"dataProducts": {
"description": "List of data products this entity is part of.",
"$ref": "../../type/entityReferenceList.json"
},
"domains": {
"description": "Domains the Messaging service belongs to.",
"$ref": "../../type/entityReferenceList.json"
},
"followers": {
"description": "Followers of this entity.",
"$ref": "../../type/entityReferenceList.json"
},
"ingestionRunner": {
"description": "The ingestion agent responsible for executing the ingestion pipeline.",
"$ref": "../../type/entityReference.json"
}
},
"required": ["id", "name", "serviceType"],
"additionalProperties": false
}
RDF/OWL Ontology Definition
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# MessagingService Class Definition
om:MessagingService a owl:Class ;
rdfs:subClassOf om:Service ;
rdfs:label "MessagingService" ;
rdfs:comment "This schema defines the Messaging Service entity, such as Kafka and Pulsar." ;
om:hierarchyLevel 1 .
# Datatype Properties
om:serviceName a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range xsd:string ;
rdfs:label "name" ;
rdfs:comment "Name that identifies this messaging service" .
om:fullyQualifiedName a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range xsd:string ;
rdfs:label "fullyQualifiedName" ;
rdfs:comment "FullyQualifiedName same as name" .
om:displayName a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range xsd:string ;
rdfs:label "displayName" ;
rdfs:comment "Display Name that identifies this messaging service" .
om:description a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range xsd:string ;
rdfs:label "description" ;
rdfs:comment "Description of a messaging service instance" .
om:serviceType a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:MessagingServiceType ;
rdfs:label "serviceType" ;
rdfs:comment "Type of messaging service such as Kafka or Pulsar" .
om:deleted a owl:DatatypeProperty ;
rdfs:domain om:MessagingService ;
rdfs:range xsd:boolean ;
rdfs:label "deleted" ;
rdfs:comment "When true indicates the entity has been soft deleted" .
# Object Properties
om:hasConnection a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:MessagingConnection ;
rdfs:label "connection" ;
rdfs:comment "Connection configuration for the messaging service" .
om:hasPipeline a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:Pipeline ;
rdfs:label "pipelines" ;
rdfs:comment "References to pipelines deployed for this messaging service" .
om:hasOwner a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:EntityReference ;
rdfs:label "owners" ;
rdfs:comment "Owners of this messaging service" .
om:hasTag a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:TagLabel ;
rdfs:label "tags" ;
rdfs:comment "Tags for this Message Service" .
om:inDomain a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:Domain ;
rdfs:label "domains" ;
rdfs:comment "Domains the Messaging service belongs to" .
om:hasDataProduct a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:DataProduct ;
rdfs:label "dataProducts" ;
rdfs:comment "List of data products this entity is part of" .
om:hasFollower a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:EntityReference ;
rdfs:label "followers" ;
rdfs:comment "Followers of this entity" .
om:hasIngestionRunner a owl:ObjectProperty ;
rdfs:domain om:MessagingService ;
rdfs:range om:EntityReference ;
rdfs:label "ingestionRunner" ;
rdfs:comment "The ingestion agent responsible for executing the ingestion pipeline" .
# MessagingServiceType Enumeration
om:MessagingServiceType a owl:Class ;
owl:oneOf (
om:Kafka
om:Redpanda
om:Kinesis
om:CustomMessaging
) .
# Example Instance
ex:kafkaProduction a om:MessagingService ;
om:serviceName "kafka_prod" ;
om:fullyQualifiedName "kafka_prod" ;
om:displayName "Kafka Production Cluster" ;
om:description "Production Kafka cluster for real-time event streaming" ;
om:serviceType om:Kafka ;
om:hasConnection ex:kafkaProdConnection ;
om:hasOwner ex:dataEngineeringTeam ;
om:hasTag ex:tierCritical ;
om:inDomain ex:engineeringDomain ;
om:deleted false .
JSON-LD Context and Example
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"MessagingService": "om:MessagingService",
"name": {
"@id": "om:serviceName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"serviceType": {
"@id": "om:serviceType",
"@type": "@vocab"
},
"connection": {
"@id": "om:hasConnection",
"@type": "@id"
},
"pipelines": {
"@id": "om:hasPipeline",
"@type": "@id",
"@container": "@set"
},
"owners": {
"@id": "om:hasOwner",
"@type": "@id",
"@container": "@set"
},
"domains": {
"@id": "om:inDomain",
"@type": "@id",
"@container": "@set"
},
"dataProducts": {
"@id": "om:hasDataProduct",
"@type": "@id",
"@container": "@set"
},
"followers": {
"@id": "om:hasFollower",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:hasTag",
"@type": "@id",
"@container": "@set"
},
"ingestionRunner": {
"@id": "om:hasIngestionRunner",
"@type": "@id"
},
"deleted": {
"@id": "om:deleted",
"@type": "xsd:boolean"
}
}
}
Example JSON-LD Instance:
{
"@context": "https://open-metadata.org/context/messagingService.jsonld",
"@type": "MessagingService",
"@id": "https://example.com/services/kafka_prod",
"name": "kafka_prod",
"fullyQualifiedName": "kafka_prod",
"displayName": "Kafka Production Cluster",
"description": "Production Kafka cluster for real-time event streaming",
"serviceType": "Kafka",
"connection": {
"@id": "https://example.com/connections/kafka_prod_conn",
"config": {
"@type": "KafkaConnection",
"bootstrapServers": "kafka-prod.example.com:9092",
"schemaRegistryURL": "https://schema-registry.example.com"
}
},
"owners": [
{
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "DataEngineering"
}
],
"domains": [
{
"@id": "https://example.com/domains/Engineering",
"@type": "Domain",
"name": "Engineering"
}
],
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Critical",
"tagFQN": "Tier.Critical"
},
{
"@id": "https://open-metadata.org/tags/Environment/Production",
"tagFQN": "Environment.Production"
}
],
"pipelines": [
{
"@id": "https://example.com/pipelines/kafka_ingestion",
"@type": "Pipeline",
"name": "kafka_ingestion"
}
],
"deleted": false
}
Use Cases¶
- Catalog Kafka, Pulsar, Kinesis, and other messaging platforms
- Document event streaming architecture and topology
- Track topic ownership and governance
- Monitor schema evolution across topics
- Discover event-driven data flows
- Apply compliance tags to sensitive event streams
- Integrate with schema registries (Confluent, Apicurio)
- Map event lineage from producers to consumers
- Manage real-time data pipelines
JSON Schema Specification¶
Core Properties¶
id (uuid)¶
Type: string (UUID format) Required: Yes (system-generated) Description: Unique identifier for this messaging service instance
name (entityName)¶
Type: string Required: Yes Pattern: ^[^.]*$ (no dots allowed) Min Length: 1 Max Length: 256 Description: Name of the messaging service
fullyQualifiedName (fullyQualifiedEntityName)¶
Type: string Required: Yes (system-generated) Pattern: ^((?!::).)*$ Description: Fully qualified name (same as name for services)
displayName¶
Type: string Required: No Description: Human-readable display name
description (markdown)¶
Type: string (Markdown format) Required: No Description: Rich text description of the service's purpose and configuration
{
"description": "# Kafka Production Cluster\n\nProduction Kafka cluster for real-time event streaming.\n\n## Configuration\n- 12 brokers across 3 availability zones\n- Replication factor: 3\n- Min ISR: 2\n\n## Usage\n- User activity events\n- Order transaction streams\n- Audit logs"
}
Service Configuration¶
serviceType (messagingServiceType)¶
Type: string enum Required: Yes Allowed Values:
Kafka- Apache KafkaRedpanda- RedpandaKinesis- Amazon KinesisCustomMessaging- Custom messaging service
connection (messagingConnection)¶
Type: object Required: No Description: Connection configuration for the messaging platform
The connection object contains a config property that uses oneOf to specify the connection type. The config can be one of:
kafkaConnection.json- Apache Kafka connection configurationredpandaConnection.json- Redpanda connection configurationkinesisConnection.json- Amazon Kinesis connection configurationcustomMessagingConnection.json- Custom messaging service connection
Example:
{
"connection": {
"config": {
"type": "Kafka",
"bootstrapServers": "kafka-prod.example.com:9092",
"schemaRegistryURL": "https://schema-registry.example.com",
"securityProtocol": "SASL_SSL",
"saslMechanism": "SCRAM-SHA-256"
}
}
}
Child Resources¶
pipelines (EntityReferenceList)¶
Type: array of Pipeline references Required: No Description: References to pipelines deployed for this messaging service to extract topic configs and schemas
{
"pipelines": [
{
"id": "pipeline-uuid-1",
"type": "pipeline",
"name": "kafka_metadata_ingestion",
"fullyQualifiedName": "kafka_prod.kafka_metadata_ingestion"
}
]
}
Governance Properties¶
owners (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: Owners of this messaging service
{
"owners": [
{
"id": "team-uuid",
"type": "team",
"name": "DataEngineering",
"displayName": "Data Engineering Team"
},
{
"id": "user-uuid",
"type": "user",
"name": "john.doe",
"displayName": "John Doe"
}
]
}
domains (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: Domains the Messaging service belongs to
{
"domains": [
{
"id": "domain-uuid",
"type": "domain",
"name": "Engineering",
"fullyQualifiedName": "Engineering"
}
]
}
tags[] (TagLabel[])¶
Type: array Required: No Default: [] Description: Tags for this Message Service
{
"tags": [
{
"tagFQN": "Tier.Critical",
"description": "Critical infrastructure",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
},
{
"tagFQN": "Environment.Production",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
}
]
}
dataProducts (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: List of data products this entity is part of
{
"dataProducts": [
{
"id": "dataproduct-uuid",
"type": "dataProduct",
"name": "CustomerDataProduct",
"fullyQualifiedName": "CustomerDataProduct"
}
]
}
followers (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: Followers of this entity
{
"followers": [
{
"id": "user-uuid",
"type": "user",
"name": "jane.smith",
"displayName": "Jane Smith"
}
]
}
Connection and Runtime Properties¶
testConnectionResult (TestConnectionResult)¶
Type: object Required: No Description: Last test connection results for this service
{
"testConnectionResult": {
"status": "successful",
"timestamp": 1704240000000,
"message": "Connection test successful"
}
}
ingestionRunner (EntityReference)¶
Type: object Required: No Description: The ingestion agent responsible for executing the ingestion pipeline
{
"ingestionRunner": {
"id": "runner-uuid",
"type": "ingestionPipeline",
"name": "kafka_ingestion_runner"
}
}
System Properties¶
href (string)¶
Type: string (URI) Required: No (system-generated) Description: Link to the resource corresponding to this messaging service
{
"href": "https://openmetadata.example.com/api/v1/services/messagingServices/a1b2c3d4-e5f6-7g8h-9i0j-k1l2m3n4o5p6"
}
deleted (boolean)¶
Type: boolean Required: No Default: false Description: When true indicates the entity has been soft deleted
Versioning Properties¶
version (entityVersion)¶
Type: number Required: Yes (system-managed) Description: Metadata version number, incremented on changes
updatedAt (timestamp)¶
Type: integer (Unix epoch milliseconds) Required: Yes (system-managed) Description: Last update timestamp
updatedBy (string)¶
Type: string Required: No (system-managed) Description: User who made the update
impersonatedBy (EntityReference)¶
Type: object Required: No Description: Bot user that performed the action on behalf of the actual user
changeDescription (ChangeDescription)¶
Type: object Required: No (system-managed) Description: Change that lead to this version of the entity
{
"changeDescription": {
"fieldsAdded": [],
"fieldsUpdated": [
{
"name": "tags",
"oldValue": [],
"newValue": [{"tagFQN": "Tier.Critical"}]
}
],
"fieldsDeleted": [],
"previousVersion": 1.1
}
}
incrementalChangeDescription (ChangeDescription)¶
Type: object Required: No (system-managed) Description: Change that lead to this version of the entity (incremental)
{
"incrementalChangeDescription": {
"fieldsAdded": [],
"fieldsUpdated": [
{
"name": "displayName",
"oldValue": "Kafka Prod",
"newValue": "Kafka Production Cluster"
}
],
"fieldsDeleted": [],
"previousVersion": 1.1
}
}
Complete Example¶
{
"id": "a1b2c3d4-e5f6-7g8h-9i0j-k1l2m3n4o5p6",
"name": "kafka_prod",
"fullyQualifiedName": "kafka_prod",
"displayName": "Kafka Production Cluster",
"description": "Production Kafka cluster for real-time event streaming",
"serviceType": "Kafka",
"connection": {
"config": {
"type": "Kafka",
"bootstrapServers": "kafka-prod.example.com:9092",
"schemaRegistryURL": "https://schema-registry.example.com",
"securityProtocol": "SASL_SSL",
"saslMechanism": "SCRAM-SHA-256"
}
},
"pipelines": [
{
"id": "pipeline-uuid-1",
"type": "pipeline",
"name": "kafka_metadata_ingestion",
"fullyQualifiedName": "kafka_prod.kafka_metadata_ingestion"
}
],
"testConnectionResult": {
"status": "successful",
"timestamp": 1704240000000
},
"owners": [
{
"id": "team-uuid",
"type": "team",
"name": "DataEngineering",
"displayName": "Data Engineering Team"
}
],
"domains": [
{
"id": "domain-uuid",
"type": "domain",
"name": "Engineering",
"fullyQualifiedName": "Engineering"
}
],
"dataProducts": [
{
"id": "dataproduct-uuid",
"type": "dataProduct",
"name": "CustomerDataProduct"
}
],
"followers": [
{
"id": "user-uuid",
"type": "user",
"name": "jane.smith"
}
],
"tags": [
{"tagFQN": "Tier.Critical"},
{"tagFQN": "Environment.Production"}
],
"href": "https://openmetadata.example.com/api/v1/services/messagingServices/a1b2c3d4-e5f6-7g8h-9i0j-k1l2m3n4o5p6",
"deleted": false,
"version": 1.2,
"updatedAt": 1704240000000,
"updatedBy": "admin"
}
RDF Representation¶
Ontology Class¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
om:MessagingService a owl:Class ;
rdfs:subClassOf om:Service ;
rdfs:label "MessagingService" ;
rdfs:comment "This schema defines the Messaging Service entity, such as Kafka and Pulsar." .
Instance Example¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/services/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
ex:kafka_prod a om:MessagingService ;
om:serviceName "kafka_prod" ;
om:fullyQualifiedName "kafka_prod" ;
om:displayName "Kafka Production Cluster" ;
om:description "Production Kafka cluster for real-time event streaming" ;
om:serviceType om:Kafka ;
om:hasConnection ex:kafkaProdConnection ;
om:hasPipeline ex:kafkaMetadataIngestion ;
om:hasOwner ex:dataEngineeringTeam ;
om:inDomain ex:engineeringDomain ;
om:hasDataProduct ex:customerDataProduct ;
om:hasFollower ex:janeSmith ;
om:hasTag ex:tierCritical ;
om:hasTag ex:envProduction ;
om:deleted "false"^^xsd:boolean .
JSON-LD Context¶
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"MessagingService": "om:MessagingService",
"name": {
"@id": "om:serviceName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"serviceType": {
"@id": "om:serviceType",
"@type": "@vocab"
},
"connection": {
"@id": "om:hasConnection",
"@type": "@id"
},
"pipelines": {
"@id": "om:hasPipeline",
"@type": "@id",
"@container": "@set"
},
"owners": {
"@id": "om:hasOwner",
"@type": "@id",
"@container": "@set"
},
"domains": {
"@id": "om:inDomain",
"@type": "@id",
"@container": "@set"
},
"dataProducts": {
"@id": "om:hasDataProduct",
"@type": "@id",
"@container": "@set"
},
"followers": {
"@id": "om:hasFollower",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:hasTag",
"@type": "@id",
"@container": "@set"
},
"deleted": {
"@id": "om:deleted",
"@type": "xsd:boolean"
}
}
}
JSON-LD Example¶
{
"@context": "https://open-metadata.org/context/messagingService.jsonld",
"@type": "MessagingService",
"@id": "https://example.com/services/kafka_prod",
"name": "kafka_prod",
"fullyQualifiedName": "kafka_prod",
"displayName": "Kafka Production Cluster",
"description": "Production Kafka cluster for real-time event streaming",
"serviceType": "Kafka",
"connection": {
"@id": "https://example.com/connections/kafka_prod_conn",
"config": {
"@type": "KafkaConnection",
"bootstrapServers": "kafka-prod.example.com:9092",
"schemaRegistryURL": "https://schema-registry.example.com"
}
},
"owners": [
{
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "DataEngineering"
}
],
"domains": [
{
"@id": "https://example.com/domains/Engineering",
"@type": "Domain",
"name": "Engineering"
}
],
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Critical",
"tagFQN": "Tier.Critical"
}
],
"pipelines": [
{
"@id": "https://example.com/pipelines/kafka_ingestion",
"@type": "Pipeline",
"name": "kafka_ingestion"
}
],
"deleted": false
}
Custom Properties¶
This entity supports custom properties through the extension field. Common custom properties include:
- Data Classification: Sensitivity level
- Cost Center: Billing allocation
- Retention Period: Data retention requirements
- Application Owner: Owning application/team
See Custom Properties for details on defining and using custom properties.
API Operations¶
Create MessagingService¶
POST /api/v1/services/messagingServices
Content-Type: application/json
{
"name": "kafka_prod",
"serviceType": "Kafka",
"connection": {
"type": "Kafka",
"brokerUrl": "kafka-prod.example.com:9092",
"schemaRegistry": "https://schema-registry.example.com"
}
}
Get MessagingService¶
Update MessagingService¶
PATCH /api/v1/services/messagingServices/{id}
Content-Type: application/json-patch+json
[
{
"op": "add",
"path": "/tags/-",
"value": {"tagFQN": "Tier.Critical"}
}
]
Test Connection¶
POST /api/v1/services/messagingServices/testConnection
Content-Type: application/json
{
"connection": {
"type": "Kafka",
"brokerUrl": "kafka-prod.example.com:9092"
}
}
Related Documentation¶
- Topic - Topic entity specification
- Messaging Overview - Messaging assets overview
- Schema Registry Integration - Schema management
- Event Lineage - Event-driven lineage
- Data Pipelines - Pipeline integration