Topic¶
Event streams with structured schemas - the foundation of event-driven architecture
Overview¶
The Topic entity represents event streams, message topics, and queues across messaging platforms. Topics contain structured events with schemas defined in Avro, Protobuf, JSON Schema, or other formats, enabling type-safe event-driven communication.
Hierarchy:
graph LR
SVC[Messaging Service] --> TOPIC[Topic]
TOPIC --> SCHEMA[Schema Fields]
style SVC fill:#667eea,color:#fff
style TOPIC fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style SCHEMA fill:#00f2fe,color:#333 Relationships¶
Topic has comprehensive relationships with entities across the metadata platform:
graph TD
subgraph Hierarchy
SVC[MessagingService<br/>kafka_prod]
SVC --> TOPIC[Topic<br/>customer_events]
end
subgraph Schema
TOPIC --> SCH[Schema<br/>Avro/Protobuf]
SCH --> F1[Field<br/>customer_id]
SCH --> F2[Field<br/>event_type]
SCH --> F3[Field<br/>timestamp]
SCH --> F4[Field<br/>payload]
end
subgraph Producers
PROD1[Service<br/>customer_service] -.->|publishes to| TOPIC
PROD2[Pipeline<br/>cdc_pipeline] -.->|publishes to| TOPIC
PROD3[Application<br/>web_app] -.->|publishes to| TOPIC
end
subgraph Consumers
TOPIC -.->|consumed by| CONS1[Service<br/>analytics_service]
TOPIC -.->|consumed by| CONS2[Pipeline<br/>realtime_etl]
TOPIC -.->|consumed by| CONS3[Application<br/>notification_service]
end
subgraph Data Lineage
TBL1[Table<br/>customers] -.->|changes tracked in| TOPIC
TOPIC -.->|materialized to| TBL2[Table<br/>customer_events_log]
TOPIC -.->|feeds| TBL3[Table<br/>customer_aggregates]
end
subgraph Ownership
TOPIC -.->|owned by| TEAM[Team<br/>Platform Team]
TOPIC -.->|owned by| USER[User<br/>stream.admin]
end
subgraph Governance
TOPIC -.->|in domain| DOM[Domain<br/>Customer Platform]
TOPIC -.->|tagged| TAG1[Tag<br/>Tier.Gold]
TOPIC -.->|tagged| TAG2[Tag<br/>PII.Sensitive]
TOPIC -.->|tagged| TAG3[Tag<br/>RealTime]
TOPIC -.->|linked to| GT[GlossaryTerm<br/>Customer Events]
end
subgraph Quality
TC1[TestCase<br/>schema_validation] -.->|validates| SCH
TC2[TestCase<br/>message_throughput] -.->|monitors| TOPIC
TC3[TestCase<br/>lag_check] -.->|monitors| CONS1
end
subgraph Pipelines
PIPE1[Pipeline<br/>stream_processor] -.->|transforms| TOPIC
TOPIC -.->|feeds| PIPE2[Pipeline<br/>sink_connector]
end
style SVC fill:#667eea,color:#fff
style TOPIC fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style SCH fill:#00f2fe,color:#333
style F1 fill:#00f2fe,color:#333
style F2 fill:#00f2fe,color:#333
style F3 fill:#00f2fe,color:#333
style F4 fill:#00f2fe,color:#333
style PROD1 fill:#00ac69,color:#fff
style PROD2 fill:#00ac69,color:#fff
style PROD3 fill:#00ac69,color:#fff
style CONS1 fill:#f5576c,color:#fff
style CONS2 fill:#f5576c,color:#fff
style CONS3 fill:#f5576c,color:#fff
style TBL1 fill:#764ba2,color:#fff
style TBL2 fill:#764ba2,color:#fff
style TBL3 fill:#764ba2,color:#fff
style TEAM fill:#43e97b,color:#fff
style USER fill:#43e97b,color:#fff
style DOM fill:#fa709a,color:#fff
style TAG1 fill:#f093fb,color:#fff
style TAG2 fill:#f093fb,color:#fff
style TAG3 fill:#f093fb,color:#fff
style GT fill:#ffd700,color:#333
style TC1 fill:#9b59b6,color:#fff
style TC2 fill:#9b59b6,color:#fff
style TC3 fill:#9b59b6,color:#fff
style PIPE1 fill:#ff6b6b,color:#fff
style PIPE2 fill:#ff6b6b,color:#fff Relationship Types:
- Solid lines (→): Hierarchical containment (Service contains Topic, Topic contains Schema)
- Dashed lines (-.->): References and associations (ownership, governance, lineage, producers/consumers)
Parent Entities¶
- MessagingService: The messaging platform service hosting this topic
Child Entities¶
- Schema: Event schema (Avro, Protobuf, JSON Schema)
- Field: Schema fields defining event structure
Associated Entities¶
- Owners: Users or teams owning this topic
- Followers: Users following this topic for notifications
- Domains: Business domains this topic belongs to
- DataProducts: Data products this entity is part of
- Tags: Classification tags
- GlossaryTerm: Business terminology
- Votes: User votes on the entity
- LifeCycle: Life cycle properties of the entity
- Certification: Asset certification information
- Table: Source tables (CDC) and target tables (materialization)
- Pipeline: Stream processing pipelines, CDC pipelines, sink connectors
- Service: Producer and consumer microservices
- TestCase: Schema validation, throughput monitoring, consumer lag checks
Schema Specifications¶
View the complete Topic schema in your preferred format:
Complete JSON Schema Definition
{
"$id": "https://open-metadata.org/schema/entity/data/topic.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Topic",
"$comment": "@om-entity-type",
"description": "A `Topic` is a feed or an event stream in a `Messaging Service` into which publishers publish messages and consumed by consumers.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.Topic",
"javaInterfaces": ["org.openmetadata.schema.EntityInterface"],
"definitions": {
"cleanupPolicy": {
"javaType": "org.openmetadata.schema.type.topic.CleanupPolicy",
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"enum": ["delete", "compact"]
},
"topicConfig": {
"javaType": "org.openmetadata.schema.type.topic.TopicConfig",
"description": "Contains key/value pair of topic configuration.",
"type": "object"
},
"topicSampleData": {
"type": "object",
"javaType": "org.openmetadata.schema.type.topic.TopicSampleData",
"description": "This schema defines the type to capture sample data for a topic.",
"properties": {
"messages": {
"description": "List of local sample messages for a topic.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false
}
},
"properties": {
"id": {
"description": "Unique identifier that identifies this topic instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Name that identifies the topic.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"fullyQualifiedName": {
"description": "Name that uniquely identifies a topic in the format 'messagingServiceName.topicName'.",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"displayName": {
"description": "Display Name that identifies this topic. It could be title or label from the source services.",
"type": "string"
},
"description": {
"description": "Description of the topic instance.",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"version": {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy": {
"description": "User who made the update.",
"type": "string"
},
"impersonatedBy": {
"description": "Bot user that performed the action on behalf of the actual user.",
"$ref": "../../type/basic.json#/definitions/impersonatedBy"
},
"service": {
"description": "Link to the messaging cluster/service where this topic is hosted in.",
"$ref": "../../type/entityReference.json"
},
"serviceType": {
"description": "Service type where this topic is hosted in.",
"$ref": "../services/messagingService.json#/definitions/messagingServiceType"
},
"messageSchema": {
"$ref": "../../type/schema.json"
},
"partitions": {
"description": "Number of partitions into which the topic is divided.",
"type": "integer",
"minimum": 1
},
"cleanupPolicies": {
"description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.",
"type": "array",
"items": {
"$ref": "#/definitions/cleanupPolicy"
}
},
"retentionTime": {
"description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.",
"type": "number"
},
"replicationFactor": {
"description": "Replication Factor in integer (more than 1).",
"type": "integer"
},
"maximumMessageSize": {
"description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.",
"type": "integer"
},
"minimumInSyncReplicas": {
"description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.",
"type": "integer"
},
"retentionSize": {
"description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
"type": "number",
"default": "-1"
},
"topicConfig": {
"description": "Contains key/value pair of topic configuration.",
"$ref": "#/definitions/topicConfig"
},
"sampleData": {
"description": "Sample data for a topic.",
"$ref": "#/definitions/topicSampleData",
"default": null
},
"owners": {
"description": "Owners of this topic.",
"$ref": "../../type/entityReferenceList.json"
},
"followers": {
"description": "Followers of this table.",
"$ref": "../../type/entityReferenceList.json",
"default": null
},
"tags": {
"description": "Tags for this table.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": []
},
"href": {
"description": "Link to the resource corresponding to this entity.",
"$ref": "../../type/basic.json#/definitions/href"
},
"changeDescription": {
"description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"incrementalChangeDescription": {
"description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"deleted": {
"description": "When `true` indicates the entity has been soft deleted.",
"type": "boolean",
"default": false
},
"extension": {
"description": "Entity extension data with custom attributes added to the entity.",
"$ref": "../../type/basic.json#/definitions/entityExtension"
},
"sourceUrl": {
"description": "Source URL of topic.",
"$ref": "../../type/basic.json#/definitions/sourceUrl"
},
"domains": {
"description": "Domains the Topic belongs to. When not set, the Topic inherits the domain from the messaging service it belongs to.",
"$ref": "../../type/entityReferenceList.json"
},
"dataProducts": {
"description": "List of data products this entity is part of.",
"$ref": "../../type/entityReferenceList.json"
},
"votes": {
"description": "Votes on the entity.",
"$ref": "../../type/votes.json"
},
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"certification": {
"$ref": "../../type/assetCertification.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
},
"entityStatus": {
"description": "Status of the Topic.",
"$ref": "../../type/status.json"
}
},
"required": ["id", "name", "partitions", "service"],
"additionalProperties": false
}
RDF/OWL Ontology Definition
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# Topic Class Definition
om:Topic a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Topic" ;
rdfs:comment "An event stream or message topic with a defined schema" ;
om:hierarchyLevel 2 .
# Properties
om:topicName a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range xsd:string ;
rdfs:label "name" ;
rdfs:comment "Name of the topic" .
om:fullyQualifiedName a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range xsd:string ;
rdfs:label "fullyQualifiedName" ;
rdfs:comment "Complete hierarchical name: service.topic" .
om:schemaType a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range om:SchemaType ;
rdfs:label "schemaType" ;
rdfs:comment "Schema format: Avro, Protobuf, JSON, etc." .
om:schemaText a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range xsd:string ;
rdfs:label "schemaText" ;
rdfs:comment "Full schema definition" .
om:partitionCount a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range xsd:integer ;
rdfs:label "partitions" ;
rdfs:comment "Number of partitions" .
om:replicationFactor a owl:DatatypeProperty ;
rdfs:domain om:Topic ;
rdfs:range xsd:integer ;
rdfs:label "replicationFactor" ;
rdfs:comment "Replication factor" .
om:hasSchemaField a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:SchemaField ;
rdfs:label "hasSchemaField" ;
rdfs:comment "Fields in the message schema" .
om:belongsToService a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:MessagingService ;
rdfs:label "belongsToService" ;
rdfs:comment "Parent messaging service" .
om:hasOwners a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:Owner ;
rdfs:label "hasOwners" ;
rdfs:comment "Users or teams that own this topic" .
om:hasTag a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:Tag ;
rdfs:label "hasTag" ;
rdfs:comment "Classification tags applied to topic" .
om:linkedToGlossaryTerm a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:GlossaryTerm ;
rdfs:label "linkedToGlossaryTerm" ;
rdfs:comment "Business glossary terms" .
# SchemaType Enumeration
om:SchemaType a owl:Class ;
owl:oneOf (
om:AvroSchema
om:ProtobufSchema
om:JSONSchema
om:NoSchema
) .
om:hasDomains a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:Domain ;
rdfs:label "hasDomains" ;
rdfs:comment "Domains the Topic belongs to" .
om:hasFollowers a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:User ;
rdfs:label "hasFollowers" ;
rdfs:comment "Users following this topic" .
om:hasDataProducts a owl:ObjectProperty ;
rdfs:domain om:Topic ;
rdfs:range om:DataProduct ;
rdfs:label "hasDataProducts" ;
rdfs:comment "Data products this topic is part of" .
# Example Instance
ex:userEventsTopic a om:Topic ;
om:topicName "user_events" ;
om:fullyQualifiedName "kafka_prod.user_events" ;
om:partitionCount 12 ;
om:replicationFactor 3 ;
om:belongsToService ex:kafkaProduction ;
om:hasOwners ex:analyticsTeam ;
om:hasTag ex:tierGold ;
om:hasTag ex:complianceGDPR ;
om:linkedToGlossaryTerm ex:userEventTerm ;
om:hasSchemaField ex:userIdField ;
om:hasSchemaField ex:eventTypeField ;
om:hasSchemaField ex:timestampField .
JSON-LD Context and Example
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"Topic": "om:Topic",
"name": {
"@id": "om:topicName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"schemaType": {
"@id": "om:schemaType",
"@type": "@vocab"
},
"schemaText": {
"@id": "om:schemaText",
"@type": "xsd:string"
},
"messageSchema": {
"@id": "om:hasSchemaField",
"@type": "@id",
"@container": "@list"
},
"service": {
"@id": "om:belongsToService",
"@type": "@id"
},
"owners": {
"@id": "om:hasOwners",
"@type": "@id",
"@container": "@set"
},
"followers": {
"@id": "om:hasFollowers",
"@type": "@id",
"@container": "@set"
},
"domains": {
"@id": "om:hasDomains",
"@type": "@id",
"@container": "@set"
},
"dataProducts": {
"@id": "om:hasDataProducts",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:hasTag",
"@type": "@id",
"@container": "@set"
},
"glossaryTerms": {
"@id": "om:linkedToGlossaryTerm",
"@type": "@id",
"@container": "@set"
},
"votes": {
"@id": "om:hasVotes",
"@type": "@id"
},
"lifeCycle": {
"@id": "om:hasLifeCycle",
"@type": "@id"
},
"certification": {
"@id": "om:hasCertification",
"@type": "@id"
}
}
}
Example JSON-LD Instance:
{
"@context": "https://open-metadata.org/context/topic.jsonld",
"@type": "Topic",
"@id": "https://example.com/topics/user_events",
"name": "user_events",
"fullyQualifiedName": "kafka_prod.user_events",
"displayName": "User Activity Events",
"description": "Real-time user activity event stream",
"schemaType": "Avro",
"partitions": {
"partitions": 12,
"replicationFactor": 3
},
"cleanupPolicies": ["delete"],
"retentionTime": 604800000,
"service": {
"@id": "https://example.com/services/kafka_prod",
"@type": "MessagingService",
"name": "kafka_prod"
},
"owners": [
{
"@id": "https://example.com/teams/analytics",
"@type": "Team",
"name": "Analytics"
}
],
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Gold",
"tagFQN": "Tier.Gold"
},
{
"@id": "https://open-metadata.org/tags/Compliance/GDPR",
"tagFQN": "Compliance.GDPR"
}
],
"glossaryTerms": [
{
"@id": "https://example.com/glossary/UserEvent",
"@type": "GlossaryTerm",
"fullyQualifiedName": "BusinessGlossary.UserEvent"
}
],
"messageSchema": [
{
"@type": "SchemaField",
"name": "user_id",
"dataType": "string",
"description": "Unique user identifier",
"tags": [
{
"@id": "https://open-metadata.org/tags/PII/Sensitive/UserID"
}
]
},
{
"@type": "SchemaField",
"name": "event_type",
"dataType": "string",
"description": "Type of user event"
},
{
"@type": "SchemaField",
"name": "timestamp",
"dataType": "long",
"description": "Event timestamp in milliseconds"
}
]
}
Use Cases¶
- Catalog Kafka topics, Kinesis streams, Pulsar topics
- Document event schemas (Avro, Protobuf, JSON Schema)
- Track schema evolution and compatibility
- Apply governance tags to event fields (PII, PHI)
- Map event lineage from producers to consumers
- Discover event-driven data flows
- Monitor topic configuration (partitions, retention, replication)
- Integrate with schema registries
- Define data contracts for event streams
- Track event volume and lag metrics
JSON Schema Specification¶
Core Properties¶
id (uuid)¶
Type: string (UUID format) Required: Yes (system-generated) Description: Unique identifier for this topic instance
name (entityName)¶
Type: string Required: Yes Pattern: ^[^.]*$ (no dots allowed) Min Length: 1 Max Length: 256 Description: Name of the topic (unqualified)
fullyQualifiedName (fullyQualifiedEntityName)¶
Type: string Required: Yes (system-generated) Pattern: ^((?!::).)*$ Description: Fully qualified name in the format service.topic
displayName¶
Type: string Required: No Description: Human-readable display name
description (markdown)¶
Type: string (Markdown format) Required: No Description: Rich text description of the topic's purpose and usage
{
"description": "# User Activity Events\n\nReal-time stream of user activity events from web and mobile applications.\n\n## Event Types\n- page_view\n- button_click\n- form_submit\n- session_start\n- session_end\n\n## Consumers\n- Analytics pipeline\n- Recommendation engine\n- Audit log system"
}
Schema Properties¶
schemaType (topicSchemaType)¶
Type: string enum Required: No Allowed Values:
Avro- Apache Avro schemaProtobuf- Protocol Buffers schemaJSON- JSON SchemaNone- No schema enforcement
schemaText¶
Type: string Required: No Description: Full schema definition in Avro, Protobuf, or JSON Schema format
Avro Example:
{
"schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"namespace\":\"com.example.events\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\",\"doc\":\"Unique user identifier\"},{\"name\":\"event_type\",\"type\":\"string\",\"doc\":\"Type of event\"},{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Event timestamp in milliseconds\"},{\"name\":\"properties\",\"type\":{\"type\":\"map\",\"values\":\"string\"},\"default\":{}}]}"
}
Protobuf Example:
{
"schemaText": "syntax = \"proto3\";\n\npackage com.example.events;\n\nmessage UserEvent {\n string user_id = 1;\n string event_type = 2;\n int64 timestamp = 3;\n map<string, string> properties = 4;\n}"
}
messageSchema[] (SchemaField[])¶
Type: array of SchemaField objects Required: No (parsed from schemaText) Description: Structured representation of message fields
SchemaField Object Properties:
| Property | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Field name |
dataType | string | Yes | Field data type (string, int, long, double, boolean, record, array, map, etc.) |
dataTypeDisplay | string | No | Formatted type display |
description | markdown | No | Field description |
children | SchemaField[] | No | Nested fields (for record/struct types) |
tags | TagLabel[] | No | Tags applied to field |
glossaryTerms | GlossaryTerm[] | No | Business terms |
Example:
{
"messageSchema": [
{
"name": "user_id",
"dataType": "string",
"dataTypeDisplay": "string",
"description": "Unique user identifier (UUID format)",
"tags": [
{
"tagFQN": "PII.Sensitive.UserID",
"source": "Classification"
}
]
},
{
"name": "event_type",
"dataType": "string",
"dataTypeDisplay": "string",
"description": "Type of user event (page_view, click, etc.)"
},
{
"name": "timestamp",
"dataType": "long",
"dataTypeDisplay": "long",
"description": "Event timestamp in milliseconds since epoch"
},
{
"name": "user_profile",
"dataType": "record",
"dataTypeDisplay": "record",
"description": "User profile information",
"children": [
{
"name": "email",
"dataType": "string",
"description": "User email address",
"tags": [
{
"tagFQN": "PII.Sensitive.Email",
"source": "Classification"
}
]
},
{
"name": "country",
"dataType": "string",
"description": "User country code"
}
]
},
{
"name": "properties",
"dataType": "map",
"dataTypeDisplay": "map<string, string>",
"description": "Additional event properties",
"children": [
{
"name": "key",
"dataType": "string"
},
{
"name": "value",
"dataType": "string"
}
]
}
]
}
Configuration Properties¶
partitions¶
Type: integer Required: Yes Minimum: 1 Description: Number of partitions into which the topic is divided
replicationFactor¶
Type: integer Required: No Description: Replication Factor in integer (more than 1)
cleanupPolicies[] (cleanupPolicy[])¶
Type: array of enums Required: No Allowed Values:
delete- Delete old segments based on retentioncompact- Keep only latest value per key
retentionSize¶
Type: number (bytes) Required: No Default: -1 Description: Maximum size of a partition in bytes before old data is discarded. For Kafka - retention.bytes configuration
retentionTime¶
Type: number (milliseconds) Required: No Description: Retention time in milliseconds. For Kafka - retention.ms configuration
maximumMessageSize¶
Type: integer (bytes) Required: No Description: Maximum message size in bytes. For Kafka - max.message.bytes configuration
minimumInSyncReplicas¶
Type: integer Required: No Description: Minimum number replicas in sync to control durability. For Kafka - min.insync.replicas configuration
topicConfig¶
Type: object Required: No Description: Contains key/value pair of topic configuration
sampleData¶
Type: object (topicSampleData) Required: No Default: null Description: Sample data for a topic
topicSampleData Properties:
| Property | Type | Description |
|---|---|---|
messages | string[] | List of local sample messages for a topic |
{
"sampleData": {
"messages": [
"{\"user_id\":\"123\",\"event_type\":\"page_view\",\"timestamp\":1704240000000}",
"{\"user_id\":\"456\",\"event_type\":\"button_click\",\"timestamp\":1704240001000}"
]
}
}
Location Properties¶
service (EntityReference)¶
Type: object Required: Yes Description: Reference to parent messaging service
{
"service": {
"id": "service-uuid",
"type": "messagingService",
"name": "kafka_prod",
"fullyQualifiedName": "kafka_prod"
}
}
Governance Properties¶
owners (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: Owners of this topic
{
"owners": [
{
"id": "team-uuid",
"type": "team",
"name": "Analytics",
"displayName": "Analytics Team"
}
]
}
followers (EntityReferenceList)¶
Type: array of EntityReference Required: No Default: null Description: Followers of this table
{
"followers": [
{
"id": "user-uuid",
"type": "user",
"name": "john.doe",
"displayName": "John Doe"
}
]
}
domains (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: Domains the Topic belongs to. When not set, the Topic inherits the domain from the messaging service it belongs to
{
"domains": [
{
"id": "domain-uuid",
"type": "domain",
"name": "UserEngagement",
"fullyQualifiedName": "UserEngagement"
}
]
}
dataProducts (EntityReferenceList)¶
Type: array of EntityReference Required: No Description: List of data products this entity is part of
{
"dataProducts": [
{
"id": "dataproduct-uuid",
"type": "dataProduct",
"name": "CustomerAnalytics",
"fullyQualifiedName": "CustomerAnalytics"
}
]
}
tags[] (TagLabel[])¶
Type: array Required: No Description: Classification tags applied to the topic
{
"tags": [
{
"tagFQN": "Tier.Gold",
"description": "Critical data stream",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
},
{
"tagFQN": "Compliance.GDPR",
"source": "Classification",
"labelType": "Automated",
"state": "Confirmed"
}
]
}
glossaryTerms[] (GlossaryTerm[])¶
Type: array Required: No Description: Business glossary terms linked to this topic
{
"glossaryTerms": [
{
"fullyQualifiedName": "BusinessGlossary.UserEvent"
},
{
"fullyQualifiedName": "BusinessGlossary.RealTimeAnalytics"
}
]
}
votes¶
Type: object (Votes) Required: No Description: Votes on the entity
{
"votes": {
"upVotes": 15,
"downVotes": 2,
"upVoters": ["user1-uuid", "user2-uuid"],
"downVoters": ["user3-uuid"]
}
}
lifeCycle¶
Type: object (LifeCycle) Required: No Description: Life Cycle properties of the entity
{
"lifeCycle": {
"created": {
"timestamp": 1704067200000,
"user": "admin"
},
"updated": {
"timestamp": 1704240000000,
"user": "data.engineer"
}
}
}
certification¶
Type: object (AssetCertification) Required: No Description: Asset certification information
Versioning Properties¶
version (entityVersion)¶
Type: number Required: Yes (system-managed) Description: Metadata version number, incremented on changes
updatedAt (timestamp)¶
Type: integer (Unix epoch milliseconds) Required: Yes (system-managed) Description: Last update timestamp
updatedBy (string)¶
Type: string Required: Yes (system-managed) Description: User who made the update
impersonatedBy¶
Type: object (ImpersonatedBy) Required: No Description: Bot user that performed the action on behalf of the actual user
serviceType¶
Type: string enum Required: No Description: Service type where this topic is hosted in
href¶
Type: string (URI) Required: No (system-generated) Description: Link to the resource corresponding to this entity
changeDescription¶
Type: object (ChangeDescription) Required: No Description: Change that lead to this version of the entity
{
"changeDescription": {
"fieldsAdded": [],
"fieldsUpdated": [
{
"name": "partitions",
"oldValue": 6,
"newValue": 12
}
],
"fieldsDeleted": [],
"previousVersion": 2.0
}
}
incrementalChangeDescription¶
Type: object (ChangeDescription) Required: No Description: Change that lead to this version of the entity
deleted¶
Type: boolean Required: No Default: false Description: When true indicates the entity has been soft deleted
extension¶
Type: object (EntityExtension) Required: No Description: Entity extension data with custom attributes added to the entity
sourceUrl¶
Type: string (URI) Required: No Description: Source URL of topic
sourceHash¶
Type: string Required: No Min Length: 1 Max Length: 32 Description: Source hash of the entity
entityStatus¶
Type: object (Status) Required: No Description: Status of the Topic
Complete Example¶
{
"id": "t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6",
"name": "user_events",
"fullyQualifiedName": "kafka_prod.user_events",
"displayName": "User Activity Events",
"description": "# User Activity Events\n\nReal-time stream of user activity events.",
"version": 3.1,
"updatedAt": 1704240000000,
"updatedBy": "data.engineer",
"service": {
"id": "service-uuid",
"type": "messagingService",
"name": "kafka_prod",
"fullyQualifiedName": "kafka_prod"
},
"serviceType": "Kafka",
"messageSchema": {
"schemaType": "Avro",
"schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}",
"schemaFields": [
{
"name": "user_id",
"dataType": "STRING",
"description": "Unique user identifier",
"tags": [
{"tagFQN": "PII.Sensitive.UserID"}
]
},
{
"name": "event_type",
"dataType": "STRING",
"description": "Type of user event"
},
{
"name": "timestamp",
"dataType": "LONG",
"description": "Event timestamp in milliseconds"
}
]
},
"partitions": 12,
"replicationFactor": 3,
"cleanupPolicies": ["delete"],
"retentionTime": 604800000,
"retentionSize": -1,
"maximumMessageSize": 1048576,
"minimumInSyncReplicas": 2,
"topicConfig": {
"compression.type": "snappy"
},
"sampleData": {
"messages": [
"{\"user_id\":\"123\",\"event_type\":\"page_view\",\"timestamp\":1704240000000}"
]
},
"owners": [
{
"id": "team-uuid",
"type": "team",
"name": "Analytics",
"displayName": "Analytics Team"
}
],
"followers": [
{
"id": "user-uuid",
"type": "user",
"name": "john.doe"
}
],
"tags": [
{
"tagFQN": "Tier.Gold",
"source": "Classification"
},
{
"tagFQN": "Compliance.GDPR",
"source": "Classification"
}
],
"domains": [
{
"id": "domain-uuid",
"type": "domain",
"name": "UserEngagement",
"fullyQualifiedName": "UserEngagement"
}
],
"dataProducts": [
{
"id": "dataproduct-uuid",
"type": "dataProduct",
"name": "CustomerAnalytics"
}
],
"votes": {
"upVotes": 15,
"downVotes": 2
},
"lifeCycle": {
"created": {
"timestamp": 1704067200000,
"user": "admin"
}
},
"certification": {
"tagLabel": {
"tagFQN": "Certification.Gold"
}
},
"href": "https://example.com/api/v1/topics/t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6",
"deleted": false
}
RDF Representation¶
Ontology Class¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
om:Topic a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Topic" ;
rdfs:comment "A Topic is a feed or an event stream in a Messaging Service into which publishers publish messages and consumed by consumers" ;
om:hasProperties [
om:name "string" ;
om:messageSchema "Schema" ;
om:partitions "integer" ;
om:replicationFactor "integer" ;
om:service "MessagingService" ;
om:owners "Owner[]" ;
om:followers "User[]" ;
om:domains "Domain[]" ;
om:dataProducts "DataProduct[]" ;
om:tags "Tag[]" ;
om:votes "Votes" ;
om:lifeCycle "LifeCycle" ;
om:certification "AssetCertification" ;
] .
Instance Example¶
@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/topics/> .
ex:user_events a om:Topic ;
om:name "user_events" ;
om:fullyQualifiedName "kafka_prod.user_events" ;
om:displayName "User Activity Events" ;
om:description "Real-time user activity event stream" ;
om:partitionCount 12 ;
om:replicationFactor 3 ;
om:retentionTime 604800000 ;
om:maximumMessageSize 1048576 ;
om:belongsToService ex:kafka_prod ;
om:hasOwners ex:analyticsTeam ;
om:hasFollowers ex:johnDoe ;
om:hasTag ex:tierGold ;
om:hasTag ex:complianceGDPR ;
om:hasDomains ex:userEngagement ;
om:hasDataProducts ex:customerAnalytics ;
om:linkedToGlossaryTerm ex:userEventTerm ;
om:hasSchemaField ex:userIdField ;
om:hasSchemaField ex:eventTypeField .
JSON-LD Context¶
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"Topic": "om:Topic",
"name": "om:name",
"fullyQualifiedName": "om:fullyQualifiedName",
"displayName": "om:displayName",
"partitions": "om:partitionCount",
"replicationFactor": "om:replicationFactor",
"messageSchema": {
"@id": "om:messageSchema",
"@type": "@id"
},
"service": {
"@id": "om:belongsToService",
"@type": "@id"
},
"owners": {
"@id": "om:hasOwners",
"@type": "@id",
"@container": "@set"
},
"followers": {
"@id": "om:hasFollowers",
"@type": "@id",
"@container": "@set"
},
"domains": {
"@id": "om:hasDomains",
"@type": "@id",
"@container": "@set"
},
"dataProducts": {
"@id": "om:hasDataProducts",
"@type": "@id",
"@container": "@set"
},
"tags": {
"@id": "om:hasTag",
"@type": "@id",
"@container": "@set"
},
"votes": {
"@id": "om:hasVotes",
"@type": "@id"
},
"lifeCycle": {
"@id": "om:hasLifeCycle",
"@type": "@id"
},
"certification": {
"@id": "om:hasCertification",
"@type": "@id"
}
}
}
JSON-LD Example¶
{
"@context": "https://open-metadata.org/context/topic.jsonld",
"@type": "Topic",
"@id": "https://example.com/topics/user_events",
"name": "user_events",
"fullyQualifiedName": "kafka_prod.user_events",
"partitions": 12,
"replicationFactor": 3,
"cleanupPolicies": ["delete"],
"retentionTime": 604800000,
"service": {
"@id": "https://example.com/services/kafka_prod",
"@type": "MessagingService"
},
"owners": [
{
"@id": "https://example.com/teams/analytics",
"@type": "Team"
}
],
"domains": [
{
"@id": "https://example.com/domains/userEngagement",
"@type": "Domain"
}
],
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Gold"
}
]
}
Relationships¶
Parent Entities¶
- MessagingService: The messaging platform hosting this topic
Associated Entities¶
- Owner: User or team owning this topic
- Domain: Business domain assignment
- Tag: Classification tags
- GlossaryTerm: Business terminology
- Pipeline: Data pipelines producing/consuming events
- Dashboard: Dashboards monitoring this topic
- DataContract: Schema contracts and SLAs
Custom Properties¶
This entity supports custom properties through the extension field. Common custom properties include:
- Data Classification: Sensitivity level
- Cost Center: Billing allocation
- Retention Period: Data retention requirements
- Application Owner: Owning application/team
See Custom Properties for details on defining and using custom properties.
Followers¶
Users can follow topics to receive notifications about schema changes, partition updates, and configuration modifications. See Followers for details.
API Operations¶
All Topic operations are available under the /v1/topics endpoint.
List Topics¶
Get a list of topics, optionally filtered by service.
GET /v1/topics
Query Parameters:
- fields: Fields to include (messageSchema, tags, owner, sampleData, etc.)
- service: Filter by messaging service name
- limit: Number of results (1-1000000, default 10)
- before/after: Cursor-based pagination
- include: all | deleted | non-deleted (default: non-deleted)
Response: TopicList
Create Topic¶
Create a new topic under a messaging service.
POST /v1/topics
Content-Type: application/json
{
"name": "user_events",
"service": "kafka_prod",
"description": "User activity event stream",
"partitions": 12,
"replicationFactor": 3,
"retentionTime": 604800000,
"schemaType": "Avro",
"messageSchema": {
"schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}",
"schemaFields": [
{
"name": "user_id",
"dataType": "STRING",
"description": "User identifier",
"tags": [{"tagFQN": "PII.UserId"}]
},
{
"name": "event_type",
"dataType": "STRING"
},
{
"name": "timestamp",
"dataType": "LONG"
}
]
},
"topicConfig": {
"cleanupPolicy": "delete",
"compressionType": "snappy"
}
}
Response: Topic
Get Topic by Name¶
Get a topic by its fully qualified name.
GET /v1/topics/name/{fqn}
Query Parameters:
- fields: Fields to include (messageSchema, tags, owner, sampleData, etc.)
- include: all | deleted | non-deleted
Example:
GET /v1/topics/name/kafka_prod.user_events?fields=messageSchema,tags,owner,topicConfig
Response: Topic
Get Topic by ID¶
Get a topic by its unique identifier.
GET /v1/topics/{id}
Query Parameters:
- fields: Fields to include
- include: all | deleted | non-deleted
Response: Topic
Update Topic¶
Update a topic using JSON Patch.
PATCH /v1/topics/name/{fqn}
Content-Type: application/json-patch+json
[
{"op": "add", "path": "/tags/-", "value": {"tagFQN": "Tier.Gold"}},
{"op": "replace", "path": "/partitions", "value": 24},
{"op": "replace", "path": "/description", "value": "Updated event stream"}
]
Response: Topic
Create or Update Topic¶
Create a new topic or update if it exists.
PUT /v1/topics
Content-Type: application/json
{
"name": "order_events",
"service": "kafka_prod",
"partitions": 6,
"messageSchema": {...}
}
Response: Topic
Delete Topic¶
Delete a topic by fully qualified name.
DELETE /v1/topics/name/{fqn}
Query Parameters:
- hardDelete: Permanently delete (default: false)
Response: 200 OK
Update Message Schema¶
Update the schema for messages in the topic.
PUT /v1/topics/{id}/messageSchema
Content-Type: application/json
{
"schemaType": "Avro",
"schemaText": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"fields\":[...]}",
"schemaFields": [...]
}
Response: Topic
Get Topic Sample Data¶
Get sample messages from the topic.
Get Topic Versions¶
Get all versions of a topic.
Follow Topic¶
Add a follower to a topic.
Get Followers¶
Get all followers of a topic.
Bulk Operations¶
Create or update multiple topics.
PUT /v1/topics/bulk
Content-Type: application/json
{
"entities": [...]
}
Response: BulkOperationResult
Related Documentation¶
- MessagingService - Service configuration
- Messaging Overview - Messaging assets overview
- Schema Registry - Schema management
- Event Lineage - Event-driven lineage
- Data Quality - Event validation
- Data Contracts - Schema contracts