Skip to main content

Typed Headers

Typr Events generates strongly-typed Kafka headers for correlation, tracing, and metadata.

Defining Header Schemas​

Configure header schemas in your generation options:

val options = AvroOptions.default(...).copy(
headerSchemas = Map(
"standard" -> HeaderSchema(List(
HeaderField("correlationId", HeaderType.UUID, required = true),
HeaderField("timestamp", HeaderType.Instant, required = true),
HeaderField("source", HeaderType.String, required = false)
))
),
defaultHeaderSchema = Some("standard")
)

Generated Header Class​

public record StandardHeaders(
UUID correlationId,
Instant timestamp,
Optional<String> source
) {
// Serialize to Kafka headers
public Headers toKafkaHeaders() { ... }

// Deserialize from Kafka headers
public static StandardHeaders fromKafkaHeaders(Headers headers) { ... }
}

Supported Header Types​

Header TypeJava TypeSerialization
HeaderType.StringStringUTF-8 bytes
HeaderType.UUIDUUIDString representation
HeaderType.InstantInstantISO-8601 string
HeaderType.LongLongString representation
HeaderType.IntIntegerString representation
HeaderType.BooleanBoolean"true" / "false"

Using Headers​

Producing​

var headers = new StandardHeaders(
UUID.randomUUID(),
Instant.now(),
Optional.of("order-service")
);

producer.send("order-123", event, headers);

Consuming​

Headers are automatically deserialized and passed to handlers:

@Override
public void handleOrderPlaced(String key, OrderPlaced event, StandardHeaders headers) {
log.info("Processing order {} with correlation {}",
event.orderId(), headers.correlationId());

// Propagate correlation ID to downstream calls
downstreamClient.call(headers.correlationId());
}

Multiple Header Schemas​

You can define multiple header schemas for different use cases:

val options = AvroOptions.default(...).copy(
headerSchemas = Map(
"standard" -> HeaderSchema(List(
HeaderField("correlationId", HeaderType.UUID, required = true),
HeaderField("timestamp", HeaderType.Instant, required = true)
)),
"audit" -> HeaderSchema(List(
HeaderField("correlationId", HeaderType.UUID, required = true),
HeaderField("userId", HeaderType.String, required = true),
HeaderField("action", HeaderType.String, required = true),
HeaderField("timestamp", HeaderType.Instant, required = true)
))
),
defaultHeaderSchema = Some("standard")
)

Tracing Integration​

Headers are ideal for distributed tracing context:

var headers = new StandardHeaders(
UUID.randomUUID(),
Instant.now(),
Optional.of(Span.current().getSpanContext().getTraceId())
);