Skip to content

Topic

Event streams with structured schemas - the foundation of event-driven architecture


Overview

The Topic entity represents event streams, message topics, and queues across messaging platforms. Topics contain structured events with schemas defined in Avro, Protobuf, JSON Schema, or other formats, enabling type-safe event-driven communication.

Hierarchy:

graph LR
    SVC[Messaging Service] --> TOPIC[Topic]
    TOPIC --> SCHEMA[Schema Fields]

    style SVC fill:#667eea,color:#fff
    style TOPIC fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
    style SCHEMA fill:#00f2fe,color:#333

Relationships

Topic has comprehensive relationships with entities across the metadata platform:

graph TD
    subgraph Hierarchy
        SVC[MessagingService<br/>kafka_prod]
        SVC --> TOPIC[Topic<br/>customer_events]
    end

    subgraph Schema
        TOPIC --> SCH[Schema<br/>Avro/Protobuf]
        SCH --> F1[Field<br/>customer_id]
        SCH --> F2[Field<br/>event_type]
        SCH --> F3[Field<br/>timestamp]
        SCH --> F4[Field<br/>payload]
    end

    subgraph Producers
        PROD1[Service<br/>customer_service] -.->|publishes to| TOPIC
        PROD2[Pipeline<br/>cdc_pipeline] -.->|publishes to| TOPIC
        PROD3[Application<br/>web_app] -.->|publishes to| TOPIC
    end

    subgraph Consumers
        TOPIC -.->|consumed by| CONS1[Service<br/>analytics_service]
        TOPIC -.->|consumed by| CONS2[Pipeline<br/>realtime_etl]
        TOPIC -.->|consumed by| CONS3[Application<br/>notification_service]
    end

    subgraph Data Lineage
        TBL1[Table<br/>customers] -.->|changes tracked in| TOPIC
        TOPIC -.->|materialized to| TBL2[Table<br/>customer_events_log]
        TOPIC -.->|feeds| TBL3[Table<br/>customer_aggregates]
    end

    subgraph Ownership
        TOPIC -.->|owned by| TEAM[Team<br/>Platform Team]
        TOPIC -.->|owned by| USER[User<br/>stream.admin]
    end

    subgraph Governance
        TOPIC -.->|in domain| DOM[Domain<br/>Customer Platform]
        TOPIC -.->|tagged| TAG1[Tag<br/>Tier.Gold]
        TOPIC -.->|tagged| TAG2[Tag<br/>PII.Sensitive]
        TOPIC -.->|tagged| TAG3[Tag<br/>RealTime]
        TOPIC -.->|linked to| GT[GlossaryTerm<br/>Customer Events]
    end

    subgraph Quality
        TC1[TestCase<br/>schema_validation] -.->|validates| SCH
        TC2[TestCase<br/>message_throughput] -.->|monitors| TOPIC
        TC3[TestCase<br/>lag_check] -.->|monitors| CONS1
    end

    subgraph Pipelines
        PIPE1[Pipeline<br/>stream_processor] -.->|transforms| TOPIC
        TOPIC -.->|feeds| PIPE2[Pipeline<br/>sink_connector]
    end

    style SVC fill:#667eea,color:#fff
    style TOPIC fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
    style SCH fill:#00f2fe,color:#333
    style F1 fill:#00f2fe,color:#333
    style F2 fill:#00f2fe,color:#333
    style F3 fill:#00f2fe,color:#333
    style F4 fill:#00f2fe,color:#333
    style PROD1 fill:#00ac69,color:#fff
    style PROD2 fill:#00ac69,color:#fff
    style PROD3 fill:#00ac69,color:#fff
    style CONS1 fill:#f5576c,color:#fff
    style CONS2 fill:#f5576c,color:#fff
    style CONS3 fill:#f5576c,color:#fff
    style TBL1 fill:#764ba2,color:#fff
    style TBL2 fill:#764ba2,color:#fff
    style TBL3 fill:#764ba2,color:#fff
    style TEAM fill:#43e97b,color:#fff
    style USER fill:#43e97b,color:#fff
    style DOM fill:#fa709a,color:#fff
    style TAG1 fill:#f093fb,color:#fff
    style TAG2 fill:#f093fb,color:#fff
    style TAG3 fill:#f093fb,color:#fff
    style GT fill:#ffd700,color:#333
    style TC1 fill:#9b59b6,color:#fff
    style TC2 fill:#9b59b6,color:#fff
    style TC3 fill:#9b59b6,color:#fff
    style PIPE1 fill:#ff6b6b,color:#fff
    style PIPE2 fill:#ff6b6b,color:#fff

Relationship Types:

  • Solid lines (→): Hierarchical containment (Service contains Topic, Topic contains Schema)
  • Dashed lines (-.->): References and associations (ownership, governance, lineage, producers/consumers)

Parent Entities

  • MessagingService: The messaging platform service hosting this topic

Child Entities

  • Schema: Event schema (Avro, Protobuf, JSON Schema)
  • Field: Schema fields defining event structure

Associated Entities

  • Owners: Users or teams owning this topic
  • Followers: Users following this topic for notifications
  • Domains: Business domains this topic belongs to
  • DataProducts: Data products this entity is part of
  • Tags: Classification tags
  • GlossaryTerm: Business terminology
  • Votes: User votes on the entity
  • LifeCycle: Life cycle properties of the entity
  • Certification: Asset certification information
  • Table: Source tables (CDC) and target tables (materialization)
  • Pipeline: Stream processing pipelines, CDC pipelines, sink connectors
  • Service: Producer and consumer microservices
  • TestCase: Schema validation, throughput monitoring, consumer lag checks

Schema Specifications

View the complete Topic schema in your preferred format:

Complete JSON Schema Definition

{
  "$id": "https://open-metadata.org/schema/entity/data/topic.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Topic",
  "$comment": "@om-entity-type",
  "description": "A `Topic` is a feed or an event stream in a `Messaging Service` into which publishers publish messages and consumed by consumers.",
  "type": "object",
  "javaType": "org.openmetadata.schema.entity.data.Topic",
  "javaInterfaces": ["org.openmetadata.schema.EntityInterface"],

  "definitions": {
    "cleanupPolicy": {
      "javaType": "org.openmetadata.schema.type.topic.CleanupPolicy",
      "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
      "enum": ["delete", "compact"]
    },
    "topicConfig": {
      "javaType": "org.openmetadata.schema.type.topic.TopicConfig",
      "description": "Contains key/value pair of topic configuration.",
      "type": "object"
    },
    "topicSampleData": {
      "type": "object",
      "javaType": "org.openmetadata.schema.type.topic.TopicSampleData",
      "description": "This schema defines the type to capture sample data for a topic.",
      "properties": {
        "messages": {
          "description": "List of local sample messages for a topic.",
          "type": "array",
          "items": {
            "type": "string"
          }
        }
      },
      "additionalProperties": false
    }
  },

  "properties": {
    "id": {
      "description": "Unique identifier that identifies this topic instance.",
      "$ref": "../../type/basic.json#/definitions/uuid"
    },
    "name": {
      "description": "Name that identifies the topic.",
      "$ref": "../../type/basic.json#/definitions/entityName"
    },
    "fullyQualifiedName": {
      "description": "Name that uniquely identifies a topic in the format 'messagingServiceName.topicName'.",
      "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
    },
    "displayName": {
      "description": "Display Name that identifies this topic. It could be title or label from the source services.",
      "type": "string"
    },
    "description": {
      "description": "Description of the topic instance.",
      "$ref": "../../type/basic.json#/definitions/markdown"
    },
    "version": {
      "description": "Metadata version of the entity.",
      "$ref": "../../type/entityHistory.json#/definitions/entityVersion"
    },
    "updatedAt": {
      "description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
      "$ref": "../../type/basic.json#/definitions/timestamp"
    },
    "updatedBy": {
      "description": "User who made the update.",
      "type": "string"
    },
    "impersonatedBy": {
      "description": "Bot user that performed the action on behalf of the actual user.",
      "$ref": "../../type/basic.json#/definitions/impersonatedBy"
    },
    "service": {
      "description": "Link to the messaging cluster/service where this topic is hosted in.",
      "$ref": "../../type/entityReference.json"
    },
    "serviceType": {
      "description": "Service type where this topic is hosted in.",
      "$ref": "../services/messagingService.json#/definitions/messagingServiceType"
    },
    "messageSchema": {
      "$ref": "../../type/schema.json"
    },
    "partitions": {
      "description": "Number of partitions into which the topic is divided.",
      "type": "integer",
      "minimum": 1
    },
    "cleanupPolicies": {
      "description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.",
      "type": "array",
      "items": {
        "$ref": "#/definitions/cleanupPolicy"
      }
    },
    "retentionTime": {
      "description": "Retention time in milliseconds. For Kafka - `retention.ms` configuration.",
      "type": "number"
    },
    "replicationFactor": {
      "description": "Replication Factor in integer (more than 1).",
      "type": "integer"
    },
    "maximumMessageSize": {
      "description": "Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.",
      "type": "integer"
    },
    "minimumInSyncReplicas": {
      "description": "Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.",
      "type": "integer"
    },
    "retentionSize": {
      "description": "Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.",
      "type": "number",
      "default": "-1"
    },
    "topicConfig": {
      "description": "Contains key/value pair of topic configuration.",
      "$ref": "#/definitions/topicConfig"
    },
    "sampleData": {
      "description": "Sample data for a topic.",
      "$ref": "#/definitions/topicSampleData",
      "default": null
    },
    "owners": {
      "description": "Owners of this topic.",
      "$ref": "../../type/entityReferenceList.json"
    },
    "followers": {
      "description": "Followers of this table.",
      "$ref": "../../type/entityReferenceList.json",
      "default": null
    },
    "tags": {
      "description": "Tags for this table.",
      "type": "array",
      "items": {
        "$ref": "../../type/tagLabel.json"
      },
      "default": []
    },
    "href": {
      "description": "Link to the resource corresponding to this entity.",
      "$ref": "../../type/basic.json#/definitions/href"
    },
    "changeDescription": {
      "description": "Change that lead to this version of the entity.",
      "$ref": "../../type/entityHistory.json#/definitions/changeDescription"
    },
    "incrementalChangeDescription": {
      "description": "Change that lead to this version of the entity.",
      "$ref": "../../type/entityHistory.json#/definitions/changeDescription"
    },
    "deleted": {
      "description": "When `true` indicates the entity has been soft deleted.",
      "type": "boolean",
      "default": false
    },
    "extension": {
      "description": "Entity extension data with custom attributes added to the entity.",
      "$ref": "../../type/basic.json#/definitions/entityExtension"
    },
    "sourceUrl": {
      "description": "Source URL of topic.",
      "$ref": "../../type/basic.json#/definitions/sourceUrl"
    },
    "domains": {
      "description": "Domains the Topic belongs to. When not set, the Topic inherits the domain from the messaging service it belongs to.",
      "$ref": "../../type/entityReferenceList.json"
    },
    "dataProducts": {
      "description": "List of data products this entity is part of.",
      "$ref": "../../type/entityReferenceList.json"
    },
    "votes": {
      "description": "Votes on the entity.",
      "$ref": "../../type/votes.json"
    },
    "lifeCycle": {
      "description": "Life Cycle properties of the entity",
      "$ref": "../../type/lifeCycle.json"
    },
    "certification": {
      "$ref": "../../type/assetCertification.json"
    },
    "sourceHash": {
      "description": "Source hash of the entity",
      "type": "string",
      "minLength": 1,
      "maxLength": 32
    },
    "entityStatus": {
      "description": "Status of the Topic.",
      "$ref": "../../type/status.json"
    }
  },

  "required": ["id", "name", "partitions", "service"],
  "additionalProperties": false
}

View Full JSON Schema →

RDF/OWL Ontology Definition

@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

# Topic Class Definition
om:Topic a owl:Class ;
    rdfs:subClassOf om:DataAsset ;
    rdfs:label "Topic" ;
    rdfs:comment "An event stream or message topic with a defined schema" ;
    om:hierarchyLevel 2 .

# Properties
om:topicName a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range xsd:string ;
    rdfs:label "name" ;
    rdfs:comment "Name of the topic" .

om:fullyQualifiedName a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range xsd:string ;
    rdfs:label "fullyQualifiedName" ;
    rdfs:comment "Complete hierarchical name: service.topic" .

om:schemaType a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:SchemaType ;
    rdfs:label "schemaType" ;
    rdfs:comment "Schema format: Avro, Protobuf, JSON, etc." .

om:schemaText a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range xsd:string ;
    rdfs:label "schemaText" ;
    rdfs:comment "Full schema definition" .

om:partitionCount a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range xsd:integer ;
    rdfs:label "partitions" ;
    rdfs:comment "Number of partitions" .

om:replicationFactor a owl:DatatypeProperty ;
    rdfs:domain om:Topic ;
    rdfs:range xsd:integer ;
    rdfs:label "replicationFactor" ;
    rdfs:comment "Replication factor" .

om:hasSchemaField a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:SchemaField ;
    rdfs:label "hasSchemaField" ;
    rdfs:comment "Fields in the message schema" .

om:belongsToService a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:MessagingService ;
    rdfs:label "belongsToService" ;
    rdfs:comment "Parent messaging service" .

om:hasOwners a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:Owner ;
    rdfs:label "hasOwners" ;
    rdfs:comment "Users or teams that own this topic" .

om:hasTag a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:Tag ;
    rdfs:label "hasTag" ;
    rdfs:comment "Classification tags applied to topic" .

om:linkedToGlossaryTerm a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:GlossaryTerm ;
    rdfs:label "linkedToGlossaryTerm" ;
    rdfs:comment "Business glossary terms" .

# SchemaType Enumeration
om:SchemaType a owl:Class ;
    owl:oneOf (
        om:AvroSchema
        om:ProtobufSchema
        om:JSONSchema
        om:NoSchema
    ) .

om:hasDomains a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:Domain ;
    rdfs:label "hasDomains" ;
    rdfs:comment "Domains the Topic belongs to" .

om:hasFollowers a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:User ;
    rdfs:label "hasFollowers" ;
    rdfs:comment "Users following this topic" .

om:hasDataProducts a owl:ObjectProperty ;
    rdfs:domain om:Topic ;
    rdfs:range om:DataProduct ;
    rdfs:label "hasDataProducts" ;
    rdfs:comment "Data products this topic is part of" .

# Example Instance
ex:userEventsTopic a om:Topic ;
    om:topicName "user_events" ;
    om:fullyQualifiedName "kafka_prod.user_events" ;
    om:partitionCount 12 ;
    om:replicationFactor 3 ;
    om:belongsToService ex:kafkaProduction ;
    om:hasOwners ex:analyticsTeam ;
    om:hasTag ex:tierGold ;
    om:hasTag ex:complianceGDPR ;
    om:linkedToGlossaryTerm ex:userEventTerm ;
    om:hasSchemaField ex:userIdField ;
    om:hasSchemaField ex:eventTypeField ;
    om:hasSchemaField ex:timestampField .

View Full RDF Ontology →

JSON-LD Context and Example

{
  "@context": {
    "@vocab": "https://open-metadata.org/schema/",
    "om": "https://open-metadata.org/schema/",
    "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
    "xsd": "http://www.w3.org/2001/XMLSchema#",

    "Topic": "om:Topic",
    "name": {
      "@id": "om:topicName",
      "@type": "xsd:string"
    },
    "fullyQualifiedName": {
      "@id": "om:fullyQualifiedName",
      "@type": "xsd:string"
    },
    "displayName": {
      "@id": "om:displayName",
      "@type": "xsd:string"
    },
    "description": {
      "@id": "om:description",
      "@type": "xsd:string"
    },
    "schemaType": {
      "@id": "om:schemaType",
      "@type": "@vocab"
    },
    "schemaText": {
      "@id": "om:schemaText",
      "@type": "xsd:string"
    },
    "messageSchema": {
      "@id": "om:hasSchemaField",
      "@type": "@id",
      "@container": "@list"
    },
    "service": {
      "@id": "om:belongsToService",
      "@type": "@id"
    },
    "owners": {
      "@id": "om:hasOwners",
      "@type": "@id",
      "@container": "@set"
    },
    "followers": {
      "@id": "om:hasFollowers",
      "@type": "@id",
      "@container": "@set"
    },
    "domains": {
      "@id": "om:hasDomains",
      "@type": "@id",
      "@container": "@set"
    },
    "dataProducts": {
      "@id": "om:hasDataProducts",
      "@type": "@id",
      "@container": "@set"
    },
    "tags": {
      "@id": "om:hasTag",
      "@type": "@id",
      "@container": "@set"
    },
    "glossaryTerms": {
      "@id": "om:linkedToGlossaryTerm",
      "@type": "@id",
      "@container": "@set"
    },
    "votes": {
      "@id": "om:hasVotes",
      "@type": "@id"
    },
    "lifeCycle": {
      "@id": "om:hasLifeCycle",
      "@type": "@id"
    },
    "certification": {
      "@id": "om:hasCertification",
      "@type": "@id"
    }
  }
}

Example JSON-LD Instance:

{
  "@context": "https://open-metadata.org/context/topic.jsonld",
  "@type": "Topic",
  "@id": "https://example.com/topics/user_events",

  "name": "user_events",
  "fullyQualifiedName": "kafka_prod.user_events",
  "displayName": "User Activity Events",
  "description": "Real-time user activity event stream",
  "schemaType": "Avro",

  "partitions": {
    "partitions": 12,
    "replicationFactor": 3
  },

  "cleanupPolicies": ["delete"],
  "retentionTime": 604800000,

  "service": {
    "@id": "https://example.com/services/kafka_prod",
    "@type": "MessagingService",
    "name": "kafka_prod"
  },

  "owners": [
    {
      "@id": "https://example.com/teams/analytics",
      "@type": "Team",
      "name": "Analytics"
    }
  ],

  "tags": [
    {
      "@id": "https://open-metadata.org/tags/Tier/Gold",
      "tagFQN": "Tier.Gold"
    },
    {
      "@id": "https://open-metadata.org/tags/Compliance/GDPR",
      "tagFQN": "Compliance.GDPR"
    }
  ],

  "glossaryTerms": [
    {
      "@id": "https://example.com/glossary/UserEvent",
      "@type": "GlossaryTerm",
      "fullyQualifiedName": "BusinessGlossary.UserEvent"
    }
  ],

  "messageSchema": [
    {
      "@type": "SchemaField",
      "name": "user_id",
      "dataType": "string",
      "description": "Unique user identifier",
      "tags": [
        {
          "@id": "https://open-metadata.org/tags/PII/Sensitive/UserID"
        }
      ]
    },
    {
      "@type": "SchemaField",
      "name": "event_type",
      "dataType": "string",
      "description": "Type of user event"
    },
    {
      "@type": "SchemaField",
      "name": "timestamp",
      "dataType": "long",
      "description": "Event timestamp in milliseconds"
    }
  ]
}

View Full JSON-LD Context →


Use Cases

  • Catalog Kafka topics, Kinesis streams, Pulsar topics
  • Document event schemas (Avro, Protobuf, JSON Schema)
  • Track schema evolution and compatibility
  • Apply governance tags to event fields (PII, PHI)
  • Map event lineage from producers to consumers
  • Discover event-driven data flows
  • Monitor topic configuration (partitions, retention, replication)
  • Integrate with schema registries
  • Define data contracts for event streams
  • Track event volume and lag metrics

JSON Schema Specification

Core Properties

id (uuid)

Type: string (UUID format) Required: Yes (system-generated) Description: Unique identifier for this topic instance

{
  "id": "t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6"
}

name (entityName)

Type: string Required: Yes Pattern: ^[^.]*$ (no dots allowed) Min Length: 1 Max Length: 256 Description: Name of the topic (unqualified)

{
  "name": "user_events"
}

fullyQualifiedName (fullyQualifiedEntityName)

Type: string Required: Yes (system-generated) Pattern: ^((?!::).)*$ Description: Fully qualified name in the format service.topic

{
  "fullyQualifiedName": "kafka_prod.user_events"
}

displayName

Type: string Required: No Description: Human-readable display name

{
  "displayName": "User Activity Events"
}

description (markdown)

Type: string (Markdown format) Required: No Description: Rich text description of the topic's purpose and usage

{
  "description": "# User Activity Events\n\nReal-time stream of user activity events from web and mobile applications.\n\n## Event Types\n- page_view\n- button_click\n- form_submit\n- session_start\n- session_end\n\n## Consumers\n- Analytics pipeline\n- Recommendation engine\n- Audit log system"
}

Schema Properties

schemaType (topicSchemaType)

Type: string enum Required: No Allowed Values:

  • Avro - Apache Avro schema
  • Protobuf - Protocol Buffers schema
  • JSON - JSON Schema
  • None - No schema enforcement
{
  "schemaType": "Avro"
}

schemaText

Type: string Required: No Description: Full schema definition in Avro, Protobuf, or JSON Schema format

Avro Example:

{
  "schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"namespace\":\"com.example.events\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\",\"doc\":\"Unique user identifier\"},{\"name\":\"event_type\",\"type\":\"string\",\"doc\":\"Type of event\"},{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Event timestamp in milliseconds\"},{\"name\":\"properties\",\"type\":{\"type\":\"map\",\"values\":\"string\"},\"default\":{}}]}"
}

Protobuf Example:

{
  "schemaText": "syntax = \"proto3\";\n\npackage com.example.events;\n\nmessage UserEvent {\n  string user_id = 1;\n  string event_type = 2;\n  int64 timestamp = 3;\n  map<string, string> properties = 4;\n}"
}

messageSchema[] (SchemaField[])

Type: array of SchemaField objects Required: No (parsed from schemaText) Description: Structured representation of message fields

SchemaField Object Properties:

Property Type Required Description
name string Yes Field name
dataType string Yes Field data type (string, int, long, double, boolean, record, array, map, etc.)
dataTypeDisplay string No Formatted type display
description markdown No Field description
children SchemaField[] No Nested fields (for record/struct types)
tags TagLabel[] No Tags applied to field
glossaryTerms GlossaryTerm[] No Business terms

Example:

{
  "messageSchema": [
    {
      "name": "user_id",
      "dataType": "string",
      "dataTypeDisplay": "string",
      "description": "Unique user identifier (UUID format)",
      "tags": [
        {
          "tagFQN": "PII.Sensitive.UserID",
          "source": "Classification"
        }
      ]
    },
    {
      "name": "event_type",
      "dataType": "string",
      "dataTypeDisplay": "string",
      "description": "Type of user event (page_view, click, etc.)"
    },
    {
      "name": "timestamp",
      "dataType": "long",
      "dataTypeDisplay": "long",
      "description": "Event timestamp in milliseconds since epoch"
    },
    {
      "name": "user_profile",
      "dataType": "record",
      "dataTypeDisplay": "record",
      "description": "User profile information",
      "children": [
        {
          "name": "email",
          "dataType": "string",
          "description": "User email address",
          "tags": [
            {
              "tagFQN": "PII.Sensitive.Email",
              "source": "Classification"
            }
          ]
        },
        {
          "name": "country",
          "dataType": "string",
          "description": "User country code"
        }
      ]
    },
    {
      "name": "properties",
      "dataType": "map",
      "dataTypeDisplay": "map<string, string>",
      "description": "Additional event properties",
      "children": [
        {
          "name": "key",
          "dataType": "string"
        },
        {
          "name": "value",
          "dataType": "string"
        }
      ]
    }
  ]
}

Configuration Properties

partitions

Type: integer Required: Yes Minimum: 1 Description: Number of partitions into which the topic is divided

{
  "partitions": 12
}

replicationFactor

Type: integer Required: No Description: Replication Factor in integer (more than 1)

{
  "replicationFactor": 3
}

cleanupPolicies[] (cleanupPolicy[])

Type: array of enums Required: No Allowed Values:

  • delete - Delete old segments based on retention
  • compact - Keep only latest value per key
{
  "cleanupPolicies": ["delete"]
}

retentionSize

Type: number (bytes) Required: No Default: -1 Description: Maximum size of a partition in bytes before old data is discarded. For Kafka - retention.bytes configuration

{
  "retentionSize": 10737418240
}

retentionTime

Type: number (milliseconds) Required: No Description: Retention time in milliseconds. For Kafka - retention.ms configuration

{
  "retentionTime": 604800000
}

maximumMessageSize

Type: integer (bytes) Required: No Description: Maximum message size in bytes. For Kafka - max.message.bytes configuration

{
  "maximumMessageSize": 1048576
}

minimumInSyncReplicas

Type: integer Required: No Description: Minimum number replicas in sync to control durability. For Kafka - min.insync.replicas configuration

{
  "minimumInSyncReplicas": 2
}

topicConfig

Type: object Required: No Description: Contains key/value pair of topic configuration

{
  "topicConfig": {
    "compression.type": "snappy",
    "segment.bytes": "1073741824"
  }
}

sampleData

Type: object (topicSampleData) Required: No Default: null Description: Sample data for a topic

topicSampleData Properties:

Property Type Description
messages string[] List of local sample messages for a topic
{
  "sampleData": {
    "messages": [
      "{\"user_id\":\"123\",\"event_type\":\"page_view\",\"timestamp\":1704240000000}",
      "{\"user_id\":\"456\",\"event_type\":\"button_click\",\"timestamp\":1704240001000}"
    ]
  }
}

Location Properties

service (EntityReference)

Type: object Required: Yes Description: Reference to parent messaging service

{
  "service": {
    "id": "service-uuid",
    "type": "messagingService",
    "name": "kafka_prod",
    "fullyQualifiedName": "kafka_prod"
  }
}

Governance Properties

owners (EntityReferenceList)

Type: array of EntityReference Required: No Description: Owners of this topic

{
  "owners": [
    {
      "id": "team-uuid",
      "type": "team",
      "name": "Analytics",
      "displayName": "Analytics Team"
    }
  ]
}

followers (EntityReferenceList)

Type: array of EntityReference Required: No Default: null Description: Followers of this table

{
  "followers": [
    {
      "id": "user-uuid",
      "type": "user",
      "name": "john.doe",
      "displayName": "John Doe"
    }
  ]
}

domains (EntityReferenceList)

Type: array of EntityReference Required: No Description: Domains the Topic belongs to. When not set, the Topic inherits the domain from the messaging service it belongs to

{
  "domains": [
    {
      "id": "domain-uuid",
      "type": "domain",
      "name": "UserEngagement",
      "fullyQualifiedName": "UserEngagement"
    }
  ]
}

dataProducts (EntityReferenceList)

Type: array of EntityReference Required: No Description: List of data products this entity is part of

{
  "dataProducts": [
    {
      "id": "dataproduct-uuid",
      "type": "dataProduct",
      "name": "CustomerAnalytics",
      "fullyQualifiedName": "CustomerAnalytics"
    }
  ]
}

tags[] (TagLabel[])

Type: array Required: No Description: Classification tags applied to the topic

{
  "tags": [
    {
      "tagFQN": "Tier.Gold",
      "description": "Critical data stream",
      "source": "Classification",
      "labelType": "Manual",
      "state": "Confirmed"
    },
    {
      "tagFQN": "Compliance.GDPR",
      "source": "Classification",
      "labelType": "Automated",
      "state": "Confirmed"
    }
  ]
}

glossaryTerms[] (GlossaryTerm[])

Type: array Required: No Description: Business glossary terms linked to this topic

{
  "glossaryTerms": [
    {
      "fullyQualifiedName": "BusinessGlossary.UserEvent"
    },
    {
      "fullyQualifiedName": "BusinessGlossary.RealTimeAnalytics"
    }
  ]
}

votes

Type: object (Votes) Required: No Description: Votes on the entity

{
  "votes": {
    "upVotes": 15,
    "downVotes": 2,
    "upVoters": ["user1-uuid", "user2-uuid"],
    "downVoters": ["user3-uuid"]
  }
}

lifeCycle

Type: object (LifeCycle) Required: No Description: Life Cycle properties of the entity

{
  "lifeCycle": {
    "created": {
      "timestamp": 1704067200000,
      "user": "admin"
    },
    "updated": {
      "timestamp": 1704240000000,
      "user": "data.engineer"
    }
  }
}

certification

Type: object (AssetCertification) Required: No Description: Asset certification information

{
  "certification": {
    "tagLabel": {
      "tagFQN": "Certification.Gold",
      "source": "Classification"
    }
  }
}

Versioning Properties

version (entityVersion)

Type: number Required: Yes (system-managed) Description: Metadata version number, incremented on changes

{
  "version": 3.1
}

updatedAt (timestamp)

Type: integer (Unix epoch milliseconds) Required: Yes (system-managed) Description: Last update timestamp

{
  "updatedAt": 1704240000000
}

updatedBy (string)

Type: string Required: Yes (system-managed) Description: User who made the update

{
  "updatedBy": "data.engineer"
}

impersonatedBy

Type: object (ImpersonatedBy) Required: No Description: Bot user that performed the action on behalf of the actual user

{
  "impersonatedBy": {
    "id": "bot-uuid",
    "type": "bot",
    "name": "ingestion-bot"
  }
}

serviceType

Type: string enum Required: No Description: Service type where this topic is hosted in

{
  "serviceType": "Kafka"
}

href

Type: string (URI) Required: No (system-generated) Description: Link to the resource corresponding to this entity

{
  "href": "https://example.com/api/v1/topics/t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6"
}

changeDescription

Type: object (ChangeDescription) Required: No Description: Change that lead to this version of the entity

{
  "changeDescription": {
    "fieldsAdded": [],
    "fieldsUpdated": [
      {
        "name": "partitions",
        "oldValue": 6,
        "newValue": 12
      }
    ],
    "fieldsDeleted": [],
    "previousVersion": 2.0
  }
}

incrementalChangeDescription

Type: object (ChangeDescription) Required: No Description: Change that lead to this version of the entity

{
  "incrementalChangeDescription": {
    "fieldsAdded": [],
    "fieldsUpdated": [],
    "fieldsDeleted": []
  }
}

deleted

Type: boolean Required: No Default: false Description: When true indicates the entity has been soft deleted

{
  "deleted": false
}

extension

Type: object (EntityExtension) Required: No Description: Entity extension data with custom attributes added to the entity

{
  "extension": {
    "customField1": "value1",
    "customField2": "value2"
  }
}

sourceUrl

Type: string (URI) Required: No Description: Source URL of topic

{
  "sourceUrl": "https://kafka-ui.example.com/topics/user_events"
}

sourceHash

Type: string Required: No Min Length: 1 Max Length: 32 Description: Source hash of the entity

{
  "sourceHash": "a1b2c3d4e5f6g7h8"
}

entityStatus

Type: object (Status) Required: No Description: Status of the Topic

{
  "entityStatus": {
    "status": "Active"
  }
}

Complete Example

{
  "id": "t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6",
  "name": "user_events",
  "fullyQualifiedName": "kafka_prod.user_events",
  "displayName": "User Activity Events",
  "description": "# User Activity Events\n\nReal-time stream of user activity events.",
  "version": 3.1,
  "updatedAt": 1704240000000,
  "updatedBy": "data.engineer",
  "service": {
    "id": "service-uuid",
    "type": "messagingService",
    "name": "kafka_prod",
    "fullyQualifiedName": "kafka_prod"
  },
  "serviceType": "Kafka",
  "messageSchema": {
    "schemaType": "Avro",
    "schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}",
    "schemaFields": [
      {
        "name": "user_id",
        "dataType": "STRING",
        "description": "Unique user identifier",
        "tags": [
          {"tagFQN": "PII.Sensitive.UserID"}
        ]
      },
      {
        "name": "event_type",
        "dataType": "STRING",
        "description": "Type of user event"
      },
      {
        "name": "timestamp",
        "dataType": "LONG",
        "description": "Event timestamp in milliseconds"
      }
    ]
  },
  "partitions": 12,
  "replicationFactor": 3,
  "cleanupPolicies": ["delete"],
  "retentionTime": 604800000,
  "retentionSize": -1,
  "maximumMessageSize": 1048576,
  "minimumInSyncReplicas": 2,
  "topicConfig": {
    "compression.type": "snappy"
  },
  "sampleData": {
    "messages": [
      "{\"user_id\":\"123\",\"event_type\":\"page_view\",\"timestamp\":1704240000000}"
    ]
  },
  "owners": [
    {
      "id": "team-uuid",
      "type": "team",
      "name": "Analytics",
      "displayName": "Analytics Team"
    }
  ],
  "followers": [
    {
      "id": "user-uuid",
      "type": "user",
      "name": "john.doe"
    }
  ],
  "tags": [
    {
      "tagFQN": "Tier.Gold",
      "source": "Classification"
    },
    {
      "tagFQN": "Compliance.GDPR",
      "source": "Classification"
    }
  ],
  "domains": [
    {
      "id": "domain-uuid",
      "type": "domain",
      "name": "UserEngagement",
      "fullyQualifiedName": "UserEngagement"
    }
  ],
  "dataProducts": [
    {
      "id": "dataproduct-uuid",
      "type": "dataProduct",
      "name": "CustomerAnalytics"
    }
  ],
  "votes": {
    "upVotes": 15,
    "downVotes": 2
  },
  "lifeCycle": {
    "created": {
      "timestamp": 1704067200000,
      "user": "admin"
    }
  },
  "certification": {
    "tagLabel": {
      "tagFQN": "Certification.Gold"
    }
  },
  "href": "https://example.com/api/v1/topics/t1o2p3i4-c5u6-7u8i-9d0a-b1c2d3e4f5g6",
  "deleted": false
}

RDF Representation

Ontology Class

@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .

om:Topic a owl:Class ;
    rdfs:subClassOf om:DataAsset ;
    rdfs:label "Topic" ;
    rdfs:comment "A Topic is a feed or an event stream in a Messaging Service into which publishers publish messages and consumed by consumers" ;
    om:hasProperties [
        om:name "string" ;
        om:messageSchema "Schema" ;
        om:partitions "integer" ;
        om:replicationFactor "integer" ;
        om:service "MessagingService" ;
        om:owners "Owner[]" ;
        om:followers "User[]" ;
        om:domains "Domain[]" ;
        om:dataProducts "DataProduct[]" ;
        om:tags "Tag[]" ;
        om:votes "Votes" ;
        om:lifeCycle "LifeCycle" ;
        om:certification "AssetCertification" ;
    ] .

Instance Example

@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/topics/> .

ex:user_events a om:Topic ;
    om:name "user_events" ;
    om:fullyQualifiedName "kafka_prod.user_events" ;
    om:displayName "User Activity Events" ;
    om:description "Real-time user activity event stream" ;
    om:partitionCount 12 ;
    om:replicationFactor 3 ;
    om:retentionTime 604800000 ;
    om:maximumMessageSize 1048576 ;
    om:belongsToService ex:kafka_prod ;
    om:hasOwners ex:analyticsTeam ;
    om:hasFollowers ex:johnDoe ;
    om:hasTag ex:tierGold ;
    om:hasTag ex:complianceGDPR ;
    om:hasDomains ex:userEngagement ;
    om:hasDataProducts ex:customerAnalytics ;
    om:linkedToGlossaryTerm ex:userEventTerm ;
    om:hasSchemaField ex:userIdField ;
    om:hasSchemaField ex:eventTypeField .

JSON-LD Context

{
  "@context": {
    "@vocab": "https://open-metadata.org/schema/",
    "om": "https://open-metadata.org/schema/",
    "Topic": "om:Topic",
    "name": "om:name",
    "fullyQualifiedName": "om:fullyQualifiedName",
    "displayName": "om:displayName",
    "partitions": "om:partitionCount",
    "replicationFactor": "om:replicationFactor",
    "messageSchema": {
      "@id": "om:messageSchema",
      "@type": "@id"
    },
    "service": {
      "@id": "om:belongsToService",
      "@type": "@id"
    },
    "owners": {
      "@id": "om:hasOwners",
      "@type": "@id",
      "@container": "@set"
    },
    "followers": {
      "@id": "om:hasFollowers",
      "@type": "@id",
      "@container": "@set"
    },
    "domains": {
      "@id": "om:hasDomains",
      "@type": "@id",
      "@container": "@set"
    },
    "dataProducts": {
      "@id": "om:hasDataProducts",
      "@type": "@id",
      "@container": "@set"
    },
    "tags": {
      "@id": "om:hasTag",
      "@type": "@id",
      "@container": "@set"
    },
    "votes": {
      "@id": "om:hasVotes",
      "@type": "@id"
    },
    "lifeCycle": {
      "@id": "om:hasLifeCycle",
      "@type": "@id"
    },
    "certification": {
      "@id": "om:hasCertification",
      "@type": "@id"
    }
  }
}

JSON-LD Example

{
  "@context": "https://open-metadata.org/context/topic.jsonld",
  "@type": "Topic",
  "@id": "https://example.com/topics/user_events",
  "name": "user_events",
  "fullyQualifiedName": "kafka_prod.user_events",
  "partitions": 12,
  "replicationFactor": 3,
  "cleanupPolicies": ["delete"],
  "retentionTime": 604800000,
  "service": {
    "@id": "https://example.com/services/kafka_prod",
    "@type": "MessagingService"
  },
  "owners": [
    {
      "@id": "https://example.com/teams/analytics",
      "@type": "Team"
    }
  ],
  "domains": [
    {
      "@id": "https://example.com/domains/userEngagement",
      "@type": "Domain"
    }
  ],
  "tags": [
    {
      "@id": "https://open-metadata.org/tags/Tier/Gold"
    }
  ]
}

Relationships

Parent Entities

  • MessagingService: The messaging platform hosting this topic

Associated Entities

  • Owner: User or team owning this topic
  • Domain: Business domain assignment
  • Tag: Classification tags
  • GlossaryTerm: Business terminology
  • Pipeline: Data pipelines producing/consuming events
  • Dashboard: Dashboards monitoring this topic
  • DataContract: Schema contracts and SLAs

Custom Properties

This entity supports custom properties through the extension field. Common custom properties include:

  • Data Classification: Sensitivity level
  • Cost Center: Billing allocation
  • Retention Period: Data retention requirements
  • Application Owner: Owning application/team

See Custom Properties for details on defining and using custom properties.


Followers

Users can follow topics to receive notifications about schema changes, partition updates, and configuration modifications. See Followers for details.


API Operations

All Topic operations are available under the /v1/topics endpoint.

List Topics

Get a list of topics, optionally filtered by service.

GET /v1/topics
Query Parameters:
  - fields: Fields to include (messageSchema, tags, owner, sampleData, etc.)
  - service: Filter by messaging service name
  - limit: Number of results (1-1000000, default 10)
  - before/after: Cursor-based pagination
  - include: all | deleted | non-deleted (default: non-deleted)

Response: TopicList

Create Topic

Create a new topic under a messaging service.

POST /v1/topics
Content-Type: application/json

{
  "name": "user_events",
  "service": "kafka_prod",
  "description": "User activity event stream",
  "partitions": 12,
  "replicationFactor": 3,
  "retentionTime": 604800000,
  "schemaType": "Avro",
  "messageSchema": {
    "schemaText": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}",
    "schemaFields": [
      {
        "name": "user_id",
        "dataType": "STRING",
        "description": "User identifier",
        "tags": [{"tagFQN": "PII.UserId"}]
      },
      {
        "name": "event_type",
        "dataType": "STRING"
      },
      {
        "name": "timestamp",
        "dataType": "LONG"
      }
    ]
  },
  "topicConfig": {
    "cleanupPolicy": "delete",
    "compressionType": "snappy"
  }
}

Response: Topic

Get Topic by Name

Get a topic by its fully qualified name.

GET /v1/topics/name/{fqn}
Query Parameters:
  - fields: Fields to include (messageSchema, tags, owner, sampleData, etc.)
  - include: all | deleted | non-deleted

Example:
GET /v1/topics/name/kafka_prod.user_events?fields=messageSchema,tags,owner,topicConfig

Response: Topic

Get Topic by ID

Get a topic by its unique identifier.

GET /v1/topics/{id}
Query Parameters:
  - fields: Fields to include
  - include: all | deleted | non-deleted

Response: Topic

Update Topic

Update a topic using JSON Patch.

PATCH /v1/topics/name/{fqn}
Content-Type: application/json-patch+json

[
  {"op": "add", "path": "/tags/-", "value": {"tagFQN": "Tier.Gold"}},
  {"op": "replace", "path": "/partitions", "value": 24},
  {"op": "replace", "path": "/description", "value": "Updated event stream"}
]

Response: Topic

Create or Update Topic

Create a new topic or update if it exists.

PUT /v1/topics
Content-Type: application/json

{
  "name": "order_events",
  "service": "kafka_prod",
  "partitions": 6,
  "messageSchema": {...}
}

Response: Topic

Delete Topic

Delete a topic by fully qualified name.

DELETE /v1/topics/name/{fqn}
Query Parameters:
  - hardDelete: Permanently delete (default: false)

Response: 200 OK

Update Message Schema

Update the schema for messages in the topic.

PUT /v1/topics/{id}/messageSchema
Content-Type: application/json

{
  "schemaType": "Avro",
  "schemaText": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"fields\":[...]}",
  "schemaFields": [...]
}

Response: Topic

Get Topic Sample Data

Get sample messages from the topic.

GET /v1/topics/{id}/sampleData

Response: TopicSampleData (sample messages)

Get Topic Versions

Get all versions of a topic.

GET /v1/topics/{id}/versions

Response: EntityHistory

Follow Topic

Add a follower to a topic.

PUT /v1/topics/{id}/followers/{userId}

Response: ChangeEvent

Get Followers

Get all followers of a topic.

GET /v1/topics/{id}/followers

Response: EntityReference[]

Bulk Operations

Create or update multiple topics.

PUT /v1/topics/bulk
Content-Type: application/json

{
  "entities": [...]
}

Response: BulkOperationResult