Skip to content
Vladimir Chavkov
Go back

Apache Kafka: Complete Event Streaming Platform Guide

Edit page

Apache Kafka: Complete Event Streaming Platform Guide

Apache Kafka is a distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Originally developed at LinkedIn and open-sourced through the Apache Software Foundation, Kafka has become the de facto standard for real-time event streaming. This comprehensive guide covers Kafka architecture, deployment, and production best practices.

What is Apache Kafka?

Apache Kafka is a distributed, fault-tolerant, high-throughput event streaming platform that provides:

Key Features

  1. High Throughput: Millions of messages per second with low latency
  2. Distributed: Horizontal scaling across multiple brokers
  3. Fault Tolerant: Replication ensures no data loss
  4. Durable: Persistent storage with configurable retention
  5. Exactly-Once Semantics: Transactional message delivery
  6. Stream Processing: Built-in Kafka Streams library
  7. Connectors: Kafka Connect for source/sink integrations
  8. Schema Registry: Enforce data contracts with Avro/Protobuf/JSON Schema

Kafka vs. Other Messaging Systems

FeatureApache KafkaRabbitMQApache PulsarAmazon SQSNATS
ThroughputVery High (millions/sec)ModerateVery HighModerateVery High
OrderingPer-partitionPer-queuePer-partitionFIFO queuesPer-subject
RetentionConfigurable (time/size)Until consumedConfigurable14 days maxIn-memory
ReplayYes (offset-based)NoYes (cursor)NoJetStream only
ProtocolCustom binaryAMQPCustom binaryHTTP/SQS APINATS protocol
Stream ProcessingKafka Streams/ksqlDBNoPulsar FunctionsNoNo
Multi-TenancyTopic-level ACLsVhostsNamespace isolationQueue-levelAccounts
Exactly-OnceYes (transactions)NoYesNoJetStream
LicenseApache 2.0MPL 2.0Apache 2.0ProprietaryApache 2.0
Operational ComplexityHighLowHighManagedLow
EcosystemVery largeLargeGrowingAWS-nativeGrowing

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ (Controller)│ │ │ │ │ │
│ │ │ │ │ │ │ │
│ │ ┌──────────┐│ │ ┌──────────┐│ │ ┌──────────┐│ │
│ │ │ Topic A ││ │ │ Topic A ││ │ │ Topic A ││ │
│ │ │ P0 (L) ││ │ │ P0 (F) ││ │ │ P1 (F) ││ │
│ │ │ P1 (L) ││ │ │ P2 (L) ││ │ │ P2 (F) ││ │
│ │ └──────────┘│ │ └──────────┘│ │ └──────────┘│ │
│ │ │ │ │ │ │ │
│ │ ┌──────────┐│ │ ┌──────────┐│ │ ┌──────────┐│ │
│ │ │ Topic B ││ │ │ Topic B ││ │ │ Topic B ││ │
│ │ │ P0 (F) ││ │ │ P0 (L) ││ │ │ P1 (L) ││ │
│ │ │ P1 (F) ││ │ │ ││ │ │ ││ │
│ │ └──────────┘│ │ └──────────┘│ │ └──────────┘│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ L = Leader F = Follower │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ KRaft Controller Quorum │ │
│ │ (replaces ZooKeeper in 3.3+) │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│ │ │
┌────┴────┐ ┌───┴────┐ ┌───┴────┐
│Producer │ │Consumer│ │Consumer│
│ App │ │Group A │ │Group B │
└─────────┘ └────────┘ └────────┘

Core Concepts

ConceptDescription
BrokerServer that stores data and serves client requests
TopicNamed stream of records (like a database table)
PartitionOrdered, immutable sequence of records within a topic
OffsetUnique sequential ID for each record within a partition
ProducerClient that publishes records to topics
ConsumerClient that reads records from topics
Consumer GroupSet of consumers sharing a subscription
ReplicaCopy of a partition for fault tolerance
LeaderReplica that handles all reads and writes for a partition
ISRIn-Sync Replicas — followers that are caught up to leader

Installation

Since Kafka 3.3+, KRaft mode is production-ready and eliminates the ZooKeeper dependency:

Terminal window
# Download Kafka
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0
# Generate a cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# Format storage directories
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID \
-c config/kraft/server.properties
# Start Kafka
bin/kafka-server-start.sh config/kraft/server.properties

KRaft Configuration

config/kraft/server.properties
# Roles: broker, controller, or both
process.roles=broker,controller
# Node ID (unique per node)
node.id=1
# Controller quorum voters
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# Listeners
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
advertised.listeners=PLAINTEXT://broker1:9092
# Log directories
log.dirs=/var/kafka-logs
# Topic defaults
num.partitions=6
default.replication.factor=3
min.insync.replicas=2
# Log retention
log.retention.hours=168
log.retention.bytes=-1
log.segment.bytes=1073741824
# Network threads
num.network.threads=8
num.io.threads=16

Docker Compose Deployment

docker-compose.yml
version: '3.8'
services:
kafka-1:
image: apache/kafka:3.7.0
hostname: kafka-1
container_name: kafka-1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LOG_DIRS: /var/kafka-logs
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-1-data:/var/kafka-logs
kafka-2:
image: apache/kafka:3.7.0
hostname: kafka-2
container_name: kafka-2
ports:
- "9093:9092"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LOG_DIRS: /var/kafka-logs
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-2-data:/var/kafka-logs
kafka-3:
image: apache/kafka:3.7.0
hostname: kafka-3
container_name: kafka-3
ports:
- "9094:9092"
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LOG_DIRS: /var/kafka-logs
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-3-data:/var/kafka-logs
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
depends_on:
- kafka-1
- schema-registry
volumes:
kafka-1-data:
kafka-2-data:
kafka-3-data:

Kubernetes Deployment with Strimzi

strimzi-operator.yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka
---
# Install Strimzi operator via Helm
# helm repo add strimzi https://strimzi.io/charts/
# helm install strimzi strimzi/strimzi-kafka-operator -n kafka
# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: production-cluster
namespace: kafka
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: loadbalancer
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
num.partitions: 6
log.retention.hours: 168
log.segment.bytes: 1073741824
auto.create.topics.enable: false
storage:
type: persistent-claim
size: 500Gi
class: fast-ssd
resources:
requests:
memory: 8Gi
cpu: "2"
limits:
memory: 16Gi
cpu: "4"
jvmOptions:
-Xms: 4096m
-Xmx: 4096m
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
zookeeper:
replicas: 0 # KRaft mode
entityOperator:
topicOperator: {}
userOperator: {}

Topic Management

Creating Topics

Terminal window
# Create a topic with specific configuration
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config compression.type=lz4 \
--config min.insync.replicas=2
# Create a compacted topic (for changelogs/state)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic user-profiles \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config delete.retention.ms=86400000

Topic Configuration

Terminal window
# Describe a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# Alter topic configuration
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name orders \
--add-config retention.ms=259200000
# Increase partitions (cannot decrease)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic orders \
--partitions 24
# List all topics
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Delete a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic old-topic

Partition Strategy

StrategyUse CaseKey Selection
Round-robinEven distribution, no ordering neededNo key (null)
Key-basedOrdering per entity (user, order)Entity ID as key
Custom partitionerSpecific routing logicCustom Partitioner class
Single partitionTotal ordering requiredAll to partition 0

Producer API

Java Producer

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class OrderProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
try (KafkaProducer<String, String> producer =
new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "order-123",
"{\"id\":\"123\",\"amount\":99.99}");
// Async send with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception);
} else {
System.out.printf("Sent to %s-%d offset %d%n",
metadata.topic(), metadata.partition(),
metadata.offset());
}
});
}
}
}

Python Producer (confluent-kafka)

from confluent_kafka import Producer
import json
conf = {
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'batch.size': 32768,
'linger.ms': 20,
'retries': 2147483647,
}
producer = Producer(conf)
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# Send messages
for i in range(100):
order = {'id': f'order-{i}', 'amount': 99.99 + i}
producer.produce(
topic='orders',
key=f'order-{i}',
value=json.dumps(order).encode('utf-8'),
callback=delivery_callback
)
# Wait for all messages to be delivered
producer.flush()

Consumer API

Java Consumer

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class OrderConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Consumer configuration
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
try (KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("orders"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Key: %s, Value: %s, " +
"Partition: %d, Offset: %d%n",
record.key(), record.value(),
record.partition(), record.offset());
// Process the record
processOrder(record.value());
}
// Manual commit after processing
consumer.commitSync();
}
}
}
private static void processOrder(String orderJson) {
// Business logic here
}
}

Python Consumer

from confluent_kafka import Consumer
import json
conf = {
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
order = json.loads(msg.value().decode('utf-8'))
print(f"Received: {order}")
# Process and commit
process_order(order)
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()

Kafka Connect

Kafka Connect is a framework for streaming data between Kafka and external systems.

Source Connector — PostgreSQL CDC

{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "${vault:postgres/password}",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_orders",
"publication.name": "dbz_publication",
"topic.prefix": "cdc",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "cdc\\.public\\.(.*)",
"transforms.route.replacement": "cdc-$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}

Sink Connector — Elasticsearch

{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "cdc-orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": false,
"schema.ignore": true,
"behavior.on.null.values": "delete",
"write.method": "upsert",
"batch.size": 2000,
"max.buffered.records": 20000,
"flush.timeout.ms": 120000,
"transforms": "extractKey,dropPrefix",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "cdc-(.*)",
"transforms.dropPrefix.replacement": "$1"
}
}

Sink Connector — Amazon S3

{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "orders,customers",
"s3.region": "eu-west-1",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": 67108864,
"flush.size": 10000,
"rotate.interval.ms": 600000,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": 3600000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at"
}
}

Kafka Streams

Stream Processing Topology

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.Properties;
public class OrderStreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"order-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> orders = builder.stream("orders");
// Filter high-value orders
KStream<String, String> highValue = orders
.filter((key, value) -> {
Order order = parseOrder(value);
return order.amount() > 1000;
});
// Branch into categories
orders
.split(Named.as("order-"))
.branch((key, value) -> parseOrder(value).amount() > 1000,
Branched.as("high-value"))
.branch((key, value) -> parseOrder(value).amount() > 100,
Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
// Windowed aggregation: orders per minute per region
orders
.groupBy((key, value) -> parseOrder(value).region())
.windowedBy(TimeWindows.ofSizeWithNoGrace(
Duration.ofMinutes(1)))
.count(Materialized.as("orders-per-minute"))
.toStream()
.to("order-counts");
// Join orders with customer data
KTable<String, String> customers = builder.table("customers");
orders
.selectKey((key, value) -> parseOrder(value).customerId())
.join(customers,
(order, customer) -> enrichOrder(order, customer))
.to("enriched-orders");
// Build and start
KafkaStreams streams = new KafkaStreams(
builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(
new Thread(streams::close));
}
}

Schema Registry

Avro Schema Management

Terminal window
# Register a schema
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
}'
# Get latest schema version
curl http://schema-registry:8081/subjects/orders-value/versions/latest
# Check compatibility
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"status\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}'
# Set compatibility mode
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'

Compatibility Modes

ModeDescriptionAllowed Changes
BACKWARDNew schema can read old dataAdd optional fields, remove fields
FORWARDOld schema can read new dataRemove optional fields, add fields
FULLBoth backward and forwardAdd/remove optional fields
NONENo compatibility checksAny change
BACKWARD_TRANSITIVEBackward for all versionsStricter BACKWARD
FULL_TRANSITIVEFull for all versionsStrictest mode

Security

SASL/SCRAM Authentication

server.properties
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# SSL settings
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEY_PASSWORD}
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
ssl.client.auth=required
ssl.endpoint.identification.algorithm=https

ACL Configuration

Terminal window
# Create user credentials
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'SCRAM-SHA-512=[password=secret]' \
--entity-type users --entity-name order-service
# Grant producer permissions
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:order-service \
--operation Write --operation Describe \
--topic orders
# Grant consumer permissions
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:analytics-service \
--operation Read --operation Describe \
--topic orders \
--group analytics-consumer
# Grant topic management
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:admin \
--operation All \
--topic '*' \
--group '*' \
--cluster
# List ACLs
bin/kafka-acls.sh --bootstrap-server localhost:9092 --list

Performance Tuning

Broker Optimization

# server.properties — Performance tuning
# Network and I/O threads
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
# Log configuration
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# Replication
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.socket.receive.buffer.bytes=1048576
num.replica.fetchers=4
# Compression
compression.type=producer
# Message sizes
message.max.bytes=10485760

Producer Tuning

ParameterLow LatencyHigh Throughput
acks1all
batch.size1638465536
linger.ms050
compression.typenonelz4
buffer.memory33554432134217728
max.in.flight.requests15

Consumer Tuning

ParameterLow LatencyHigh Throughput
fetch.min.bytes165536
fetch.max.wait.ms100500
max.poll.records1001000
max.partition.fetch.bytes104857610485760

JVM Settings

Terminal window
# KAFKA_HEAP_OPTS for brokers
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# G1GC settings
export KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-XX:MaxInlineLevel=15
-Djava.awt.headless=true
"

Monitoring

Prometheus JMX Exporter

kafka-jmx-exporter.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Broker metrics
- pattern: kafka.server<type=BrokerTopicMetrics, name=(.+), topic=(.+)><>(.+)
name: kafka_server_brokertopicmetrics_$1_$3
labels:
topic: "$2"
# Request metrics
- pattern: kafka.network<type=RequestMetrics, name=(.+), request=(.+)><>(.+)
name: kafka_network_requestmetrics_$1_$3
labels:
request: "$2"
# Partition metrics
- pattern: kafka.server<type=ReplicaManager, name=(.+)><>(.+)
name: kafka_server_replicamanager_$1_$2
# Consumer group lag
- pattern: kafka.server<type=FetcherLagMetrics, name=(.+), clientId=(.+), topic=(.+), partition=(.+)><>Value
name: kafka_server_fetcherlagmetrics_$1
labels:
clientId: "$2"
topic: "$3"
partition: "$4"

Key Metrics to Monitor

MetricDescriptionAlert Threshold
UnderReplicatedPartitionsPartitions where ISR < replicas> 0
OfflinePartitionsCountPartitions with no active leader> 0
ActiveControllerCountNumber of active controllers!= 1
MessagesInPerSecProducer throughputBaseline deviation
BytesInPerSecNetwork ingressCapacity threshold
BytesOutPerSecNetwork egressCapacity threshold
RequestLatency99thP99 request latency> 100ms
ConsumerLagMessages behind producerGrowing continuously
LogFlushLatencyDisk write latency> 10ms
IsrShrinkRateISR shrink events> 0

Grafana Dashboard Query Examples

# Messages produced per second per topic
sum(rate(kafka_server_brokertopicmetrics_messagesinpersec_count[5m])) by (topic)
# Consumer group lag
sum(kafka_consumergroup_lag) by (consumergroup, topic)
# Under-replicated partitions
kafka_server_replicamanager_underreplicatedpartitions
# Broker CPU usage
rate(process_cpu_seconds_total{job="kafka"}[5m])
# Request rate by type
sum(rate(kafka_network_requestmetrics_requestspersec_count[5m])) by (request)

Common Patterns

Event Sourcing

┌──────────┐ ┌───────────┐ ┌──────────────┐
│ Command │────>│ Kafka │────>│ Event Store │
│ Service │ │ Topic │ │ (Compacted) │
└──────────┘ └───────────┘ └──────────────┘
┌────┴────┐
│ Consumer│
│ (CQRS │
│ Read) │
└─────────┘

Dead Letter Queue

// Handle poison pills with DLQ
try {
processRecord(record);
consumer.commitSync();
} catch (DeserializationException e) {
// Send to DLQ
producer.send(new ProducerRecord<>(
"orders.dlq",
record.key(),
record.value()
));
consumer.commitSync();
}

Transactional Outbox Pattern

-- Application writes to orders table and outbox table
-- in a single database transaction
BEGIN;
INSERT INTO orders (id, customer_id, amount)
VALUES ('order-123', 'cust-456', 99.99);
INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload)
VALUES (gen_random_uuid(), 'Order', 'order-123', 'OrderCreated',
'{"id":"order-123","amount":99.99}');
COMMIT;
-- Debezium CDC connector reads the outbox table
-- and publishes events to Kafka

Troubleshooting

Common Issues

Terminal window
# Check broker status
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--cluster-id $CLUSTER_ID
# Check consumer group status
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Reset consumer group offsets
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-earliest --execute
# Check partition reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--verify --reassignment-json-file reassignment.json
# Dump log segments
bin/kafka-dump-log.sh --files /var/kafka-logs/orders-0/00000000000000000000.log \
--print-data-log
# Verify cluster health
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Performance Debugging

Terminal window
# Check disk I/O
iostat -x 1 5
# Check network throughput
sar -n DEV 1 5
# Check JVM garbage collection
jstat -gcutil $(pgrep -f kafka) 1000
# Producer performance test
bin/kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092 \
acks=all \
compression.type=lz4 \
batch.size=65536 \
linger.ms=20
# Consumer performance test
bin/kafka-consumer-perf-test.sh \
--bootstrap-server localhost:9092 \
--topic perf-test \
--messages 1000000 \
--threads 3

Production Best Practices

Cluster Sizing

WorkloadBrokersPartitions per TopicReplication Factor
Small (< 100 MB/s)363
Medium (100 MB/s - 1 GB/s)5-1012-243
Large (> 1 GB/s)10+24-643

Checklist


Transform Your Team’s Kafka Skills

Building a reliable event streaming platform requires deep expertise in distributed systems, performance tuning, and operational best practices. At chavkov.com, I deliver hands-on Apache Kafka training that takes your team from fundamentals to production mastery.

Training Options

FormatDurationFocus
Kafka Fundamentals2 daysCore concepts, producers, consumers, topic design
Kafka Operations3 daysCluster management, monitoring, security, DR
Kafka Streams & ksqlDB2 daysStream processing, stateful operations, windowing
Event-Driven Architecture3 daysPatterns, CQRS, event sourcing, schema evolution

All trainings include hands-on labs with real Kafka clusters. Contact me to discuss your team’s training needs and schedule a customized program.


Edit page
Share this post on:

Previous Post
VMware to Proxmox Migration: Complete Transition Guide
Next Post
OKD: Community Kubernetes Platform Complete Guide