Apache Kafka: Complete Event Streaming Platform Guide
Apache Kafka is a distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Originally developed at LinkedIn and open-sourced through the Apache Software Foundation, Kafka has become the de facto standard for real-time event streaming. This comprehensive guide covers Kafka architecture, deployment, and production best practices.
What is Apache Kafka?
Apache Kafka is a distributed, fault-tolerant, high-throughput event streaming platform that provides:
Key Features
- High Throughput: Millions of messages per second with low latency
- Distributed: Horizontal scaling across multiple brokers
- Fault Tolerant: Replication ensures no data loss
- Durable: Persistent storage with configurable retention
- Exactly-Once Semantics: Transactional message delivery
- Stream Processing: Built-in Kafka Streams library
- Connectors: Kafka Connect for source/sink integrations
- Schema Registry: Enforce data contracts with Avro/Protobuf/JSON Schema
Kafka vs. Other Messaging Systems
| Feature | Apache Kafka | RabbitMQ | Apache Pulsar | Amazon SQS | NATS |
|---|---|---|---|---|---|
| Throughput | Very High (millions/sec) | Moderate | Very High | Moderate | Very High |
| Ordering | Per-partition | Per-queue | Per-partition | FIFO queues | Per-subject |
| Retention | Configurable (time/size) | Until consumed | Configurable | 14 days max | In-memory |
| Replay | Yes (offset-based) | No | Yes (cursor) | No | JetStream only |
| Protocol | Custom binary | AMQP | Custom binary | HTTP/SQS API | NATS protocol |
| Stream Processing | Kafka Streams/ksqlDB | No | Pulsar Functions | No | No |
| Multi-Tenancy | Topic-level ACLs | Vhosts | Namespace isolation | Queue-level | Accounts |
| Exactly-Once | Yes (transactions) | No | Yes | No | JetStream |
| License | Apache 2.0 | MPL 2.0 | Apache 2.0 | Proprietary | Apache 2.0 |
| Operational Complexity | High | Low | High | Managed | Low |
| Ecosystem | Very large | Large | Growing | AWS-native | Growing |
Architecture
┌─────────────────────────────────────────────────────────────────────┐│ Kafka Cluster ││ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ ││ │ (Controller)│ │ │ │ │ ││ │ │ │ │ │ │ ││ │ ┌──────────┐│ │ ┌──────────┐│ │ ┌──────────┐│ ││ │ │ Topic A ││ │ │ Topic A ││ │ │ Topic A ││ ││ │ │ P0 (L) ││ │ │ P0 (F) ││ │ │ P1 (F) ││ ││ │ │ P1 (L) ││ │ │ P2 (L) ││ │ │ P2 (F) ││ ││ │ └──────────┘│ │ └──────────┘│ │ └──────────┘│ ││ │ │ │ │ │ │ ││ │ ┌──────────┐│ │ ┌──────────┐│ │ ┌──────────┐│ ││ │ │ Topic B ││ │ │ Topic B ││ │ │ Topic B ││ ││ │ │ P0 (F) ││ │ │ P0 (L) ││ │ │ P1 (L) ││ ││ │ │ P1 (F) ││ │ │ ││ │ │ ││ ││ │ └──────────┘│ │ └──────────┘│ │ └──────────┘│ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ L = Leader F = Follower ││ ││ ┌─────────────────────────────────────────┐ ││ │ KRaft Controller Quorum │ ││ │ (replaces ZooKeeper in 3.3+) │ ││ └─────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────────┘ │ │ │ ┌────┴────┐ ┌───┴────┐ ┌───┴────┐ │Producer │ │Consumer│ │Consumer│ │ App │ │Group A │ │Group B │ └─────────┘ └────────┘ └────────┘Core Concepts
| Concept | Description |
|---|---|
| Broker | Server that stores data and serves client requests |
| Topic | Named stream of records (like a database table) |
| Partition | Ordered, immutable sequence of records within a topic |
| Offset | Unique sequential ID for each record within a partition |
| Producer | Client that publishes records to topics |
| Consumer | Client that reads records from topics |
| Consumer Group | Set of consumers sharing a subscription |
| Replica | Copy of a partition for fault tolerance |
| Leader | Replica that handles all reads and writes for a partition |
| ISR | In-Sync Replicas — followers that are caught up to leader |
Installation
KRaft Mode (Recommended — No ZooKeeper)
Since Kafka 3.3+, KRaft mode is production-ready and eliminates the ZooKeeper dependency:
# Download Kafkawget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgztar -xzf kafka_2.13-3.7.0.tgzcd kafka_2.13-3.7.0
# Generate a cluster UUIDKAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# Format storage directoriesbin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID \ -c config/kraft/server.properties
# Start Kafkabin/kafka-server-start.sh config/kraft/server.propertiesKRaft Configuration
# Roles: broker, controller, or bothprocess.roles=broker,controller
# Node ID (unique per node)node.id=1
# Controller quorum voterscontroller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# Listenerslisteners=PLAINTEXT://:9092,CONTROLLER://:9093inter.broker.listener.name=PLAINTEXTcontroller.listener.names=CONTROLLERadvertised.listeners=PLAINTEXT://broker1:9092
# Log directorieslog.dirs=/var/kafka-logs
# Topic defaultsnum.partitions=6default.replication.factor=3min.insync.replicas=2
# Log retentionlog.retention.hours=168log.retention.bytes=-1log.segment.bytes=1073741824
# Network threadsnum.network.threads=8num.io.threads=16Docker Compose Deployment
version: '3.8'
services: kafka-1: image: apache/kafka:3.7.0 hostname: kafka-1 container_name: kafka-1 ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093 KAFKA_LOG_DIRS: /var/kafka-logs KAFKA_NUM_PARTITIONS: 6 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" volumes: - kafka-1-data:/var/kafka-logs
kafka-2: image: apache/kafka:3.7.0 hostname: kafka-2 container_name: kafka-2 ports: - "9093:9092" environment: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093 KAFKA_LOG_DIRS: /var/kafka-logs KAFKA_NUM_PARTITIONS: 6 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" volumes: - kafka-2-data:/var/kafka-logs
kafka-3: image: apache/kafka:3.7.0 hostname: kafka-3 container_name: kafka-3 ports: - "9094:9092" environment: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093 KAFKA_LOG_DIRS: /var/kafka-logs KAFKA_NUM_PARTITIONS: 6 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" volumes: - kafka-3-data:/var/kafka-logs
schema-registry: image: confluentinc/cp-schema-registry:7.6.0 hostname: schema-registry container_name: schema-registry depends_on: - kafka-1 - kafka-2 - kafka-3 ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092 SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092 KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081 depends_on: - kafka-1 - schema-registry
volumes: kafka-1-data: kafka-2-data: kafka-3-data:Kubernetes Deployment with Strimzi
apiVersion: v1kind: Namespacemetadata: name: kafka---# Install Strimzi operator via Helm# helm repo add strimzi https://strimzi.io/charts/# helm install strimzi strimzi/strimzi-kafka-operator -n kafka
# kafka-cluster.yamlapiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: production-cluster namespace: kafkaspec: kafka: version: 3.7.0 replicas: 3 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true - name: external port: 9094 type: loadbalancer tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 num.partitions: 6 log.retention.hours: 168 log.segment.bytes: 1073741824 auto.create.topics.enable: false storage: type: persistent-claim size: 500Gi class: fast-ssd resources: requests: memory: 8Gi cpu: "2" limits: memory: 16Gi cpu: "4" jvmOptions: -Xms: 4096m -Xmx: 4096m metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: kafka-metrics key: kafka-metrics-config.yml zookeeper: replicas: 0 # KRaft mode entityOperator: topicOperator: {} userOperator: {}Topic Management
Creating Topics
# Create a topic with specific configurationbin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic orders \ --partitions 12 \ --replication-factor 3 \ --config retention.ms=604800000 \ --config cleanup.policy=delete \ --config compression.type=lz4 \ --config min.insync.replicas=2
# Create a compacted topic (for changelogs/state)bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic user-profiles \ --partitions 6 \ --replication-factor 3 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.5 \ --config delete.retention.ms=86400000Topic Configuration
# Describe a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 \ --describe --topic orders
# Alter topic configurationbin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter --entity-type topics --entity-name orders \ --add-config retention.ms=259200000
# Increase partitions (cannot decrease)bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --alter --topic orders \ --partitions 24
# List all topicsbin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Delete a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 \ --delete --topic old-topicPartition Strategy
| Strategy | Use Case | Key Selection |
|---|---|---|
| Round-robin | Even distribution, no ordering needed | No key (null) |
| Key-based | Ordering per entity (user, order) | Entity ID as key |
| Custom partitioner | Specific routing logic | Custom Partitioner class |
| Single partition | Total ordering required | All to partition 0 |
Producer API
Java Producer
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class OrderProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Reliability settings props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance settings props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); props.put(ProducerConfig.LINGER_MS_CONFIG, 20); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>("orders", "order-123", "{\"id\":\"123\",\"amount\":99.99}");
// Async send with callback producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("Send failed: " + exception); } else { System.out.printf("Sent to %s-%d offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } }); } }}Python Producer (confluent-kafka)
from confluent_kafka import Producerimport json
conf = { 'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092', 'acks': 'all', 'enable.idempotence': True, 'compression.type': 'lz4', 'batch.size': 32768, 'linger.ms': 20, 'retries': 2147483647,}
producer = Producer(conf)
def delivery_callback(err, msg): if err: print(f'Message delivery failed: {err}') else: print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# Send messagesfor i in range(100): order = {'id': f'order-{i}', 'amount': 99.99 + i} producer.produce( topic='orders', key=f'order-{i}', value=json.dumps(order).encode('utf-8'), callback=delivery_callback )
# Wait for all messages to be deliveredproducer.flush()Consumer API
Java Consumer
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;import java.util.List;import java.util.Properties;
public class OrderConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Consumer configuration props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("orders"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { System.out.printf("Key: %s, Value: %s, " + "Partition: %d, Offset: %d%n", record.key(), record.value(), record.partition(), record.offset());
// Process the record processOrder(record.value()); }
// Manual commit after processing consumer.commitSync(); } } }
private static void processOrder(String orderJson) { // Business logic here }}Python Consumer
from confluent_kafka import Consumerimport json
conf = { 'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092', 'group.id': 'order-processor', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'max.poll.interval.ms': 300000,}
consumer = Consumer(conf)consumer.subscribe(['orders'])
try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print(f"Consumer error: {msg.error()}") continue
order = json.loads(msg.value().decode('utf-8')) print(f"Received: {order}")
# Process and commit process_order(order) consumer.commit(asynchronous=False)
except KeyboardInterrupt: passfinally: consumer.close()Kafka Connect
Kafka Connect is a framework for streaming data between Kafka and external systems.
Source Connector — PostgreSQL CDC
{ "name": "postgres-source", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "replicator", "database.password": "${vault:postgres/password}", "database.dbname": "orders_db", "database.server.name": "orders", "table.include.list": "public.orders,public.customers", "plugin.name": "pgoutput", "slot.name": "debezium_orders", "publication.name": "dbz_publication", "topic.prefix": "cdc", "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "cdc\\.public\\.(.*)", "transforms.route.replacement": "cdc-$1", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" }}Sink Connector — Elasticsearch
{ "name": "elasticsearch-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "cdc-orders", "connection.url": "http://elasticsearch:9200", "type.name": "_doc", "key.ignore": false, "schema.ignore": true, "behavior.on.null.values": "delete", "write.method": "upsert", "batch.size": 2000, "max.buffered.records": 20000, "flush.timeout.ms": 120000, "transforms": "extractKey,dropPrefix", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropPrefix.regex": "cdc-(.*)", "transforms.dropPrefix.replacement": "$1" }}Sink Connector — Amazon S3
{ "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "orders,customers", "s3.region": "eu-west-1", "s3.bucket.name": "data-lake-raw", "s3.part.size": 67108864, "flush.size": 10000, "rotate.interval.ms": 600000, "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "partition.duration.ms": 3600000, "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", "locale": "en-US", "timezone": "UTC", "timestamp.extractor": "RecordField", "timestamp.field": "created_at" }}Kafka Streams
Stream Processing Topology
import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.*;import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;import java.util.Properties;
public class OrderStreamProcessor { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic KStream<String, String> orders = builder.stream("orders");
// Filter high-value orders KStream<String, String> highValue = orders .filter((key, value) -> { Order order = parseOrder(value); return order.amount() > 1000; });
// Branch into categories orders .split(Named.as("order-")) .branch((key, value) -> parseOrder(value).amount() > 1000, Branched.as("high-value")) .branch((key, value) -> parseOrder(value).amount() > 100, Branched.as("medium-value")) .defaultBranch(Branched.as("low-value"));
// Windowed aggregation: orders per minute per region orders .groupBy((key, value) -> parseOrder(value).region()) .windowedBy(TimeWindows.ofSizeWithNoGrace( Duration.ofMinutes(1))) .count(Materialized.as("orders-per-minute")) .toStream() .to("order-counts");
// Join orders with customer data KTable<String, String> customers = builder.table("customers");
orders .selectKey((key, value) -> parseOrder(value).customerId()) .join(customers, (order, customer) -> enrichOrder(order, customer)) .to("enriched-orders");
// Build and start KafkaStreams streams = new KafkaStreams( builder.build(), props); streams.start();
Runtime.getRuntime().addShutdownHook( new Thread(streams::close)); }}Schema Registry
Avro Schema Management
# Register a schemacurl -X POST http://schema-registry:8081/subjects/orders-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}" }'
# Get latest schema versioncurl http://schema-registry:8081/subjects/orders-value/versions/latest
# Check compatibilitycurl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"status\",\"type\":[\"null\",\"string\"],\"default\":null}]}" }'
# Set compatibility modecurl -X PUT http://schema-registry:8081/config/orders-value \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"compatibility": "BACKWARD"}'Compatibility Modes
| Mode | Description | Allowed Changes |
|---|---|---|
| BACKWARD | New schema can read old data | Add optional fields, remove fields |
| FORWARD | Old schema can read new data | Remove optional fields, add fields |
| FULL | Both backward and forward | Add/remove optional fields |
| NONE | No compatibility checks | Any change |
| BACKWARD_TRANSITIVE | Backward for all versions | Stricter BACKWARD |
| FULL_TRANSITIVE | Full for all versions | Strictest mode |
Security
SASL/SCRAM Authentication
listeners=SASL_SSL://0.0.0.0:9093security.inter.broker.protocol=SASL_SSLsasl.mechanism.inter.broker.protocol=SCRAM-SHA-512sasl.enabled.mechanisms=SCRAM-SHA-512
# SSL settingsssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jksssl.keystore.password=${KEYSTORE_PASSWORD}ssl.key.password=${KEY_PASSWORD}ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jksssl.truststore.password=${TRUSTSTORE_PASSWORD}ssl.client.auth=requiredssl.endpoint.identification.algorithm=httpsACL Configuration
# Create user credentialsbin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter --add-config 'SCRAM-SHA-512=[password=secret]' \ --entity-type users --entity-name order-service
# Grant producer permissionsbin/kafka-acls.sh --bootstrap-server localhost:9092 \ --add --allow-principal User:order-service \ --operation Write --operation Describe \ --topic orders
# Grant consumer permissionsbin/kafka-acls.sh --bootstrap-server localhost:9092 \ --add --allow-principal User:analytics-service \ --operation Read --operation Describe \ --topic orders \ --group analytics-consumer
# Grant topic managementbin/kafka-acls.sh --bootstrap-server localhost:9092 \ --add --allow-principal User:admin \ --operation All \ --topic '*' \ --group '*' \ --cluster
# List ACLsbin/kafka-acls.sh --bootstrap-server localhost:9092 --listPerformance Tuning
Broker Optimization
# server.properties — Performance tuning
# Network and I/O threadsnum.network.threads=8num.io.threads=16socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600
# Log configurationlog.flush.interval.messages=10000log.flush.interval.ms=1000log.segment.bytes=1073741824log.retention.check.interval.ms=300000
# Replicationreplica.fetch.max.bytes=10485760replica.fetch.wait.max.ms=500replica.socket.receive.buffer.bytes=1048576num.replica.fetchers=4
# Compressioncompression.type=producer
# Message sizesmessage.max.bytes=10485760Producer Tuning
| Parameter | Low Latency | High Throughput |
|---|---|---|
acks | 1 | all |
batch.size | 16384 | 65536 |
linger.ms | 0 | 50 |
compression.type | none | lz4 |
buffer.memory | 33554432 | 134217728 |
max.in.flight.requests | 1 | 5 |
Consumer Tuning
| Parameter | Low Latency | High Throughput |
|---|---|---|
fetch.min.bytes | 1 | 65536 |
fetch.max.wait.ms | 100 | 500 |
max.poll.records | 100 | 1000 |
max.partition.fetch.bytes | 1048576 | 10485760 |
JVM Settings
# KAFKA_HEAP_OPTS for brokersexport KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# G1GC settingsexport KAFKA_JVM_PERFORMANCE_OPTS=" -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"Monitoring
Prometheus JMX Exporter
lowercaseOutputName: truelowercaseOutputLabelNames: truerules: # Broker metrics - pattern: kafka.server<type=BrokerTopicMetrics, name=(.+), topic=(.+)><>(.+) name: kafka_server_brokertopicmetrics_$1_$3 labels: topic: "$2"
# Request metrics - pattern: kafka.network<type=RequestMetrics, name=(.+), request=(.+)><>(.+) name: kafka_network_requestmetrics_$1_$3 labels: request: "$2"
# Partition metrics - pattern: kafka.server<type=ReplicaManager, name=(.+)><>(.+) name: kafka_server_replicamanager_$1_$2
# Consumer group lag - pattern: kafka.server<type=FetcherLagMetrics, name=(.+), clientId=(.+), topic=(.+), partition=(.+)><>Value name: kafka_server_fetcherlagmetrics_$1 labels: clientId: "$2" topic: "$3" partition: "$4"Key Metrics to Monitor
| Metric | Description | Alert Threshold |
|---|---|---|
UnderReplicatedPartitions | Partitions where ISR < replicas | > 0 |
OfflinePartitionsCount | Partitions with no active leader | > 0 |
ActiveControllerCount | Number of active controllers | != 1 |
MessagesInPerSec | Producer throughput | Baseline deviation |
BytesInPerSec | Network ingress | Capacity threshold |
BytesOutPerSec | Network egress | Capacity threshold |
RequestLatency99th | P99 request latency | > 100ms |
ConsumerLag | Messages behind producer | Growing continuously |
LogFlushLatency | Disk write latency | > 10ms |
IsrShrinkRate | ISR shrink events | > 0 |
Grafana Dashboard Query Examples
# Messages produced per second per topicsum(rate(kafka_server_brokertopicmetrics_messagesinpersec_count[5m])) by (topic)
# Consumer group lagsum(kafka_consumergroup_lag) by (consumergroup, topic)
# Under-replicated partitionskafka_server_replicamanager_underreplicatedpartitions
# Broker CPU usagerate(process_cpu_seconds_total{job="kafka"}[5m])
# Request rate by typesum(rate(kafka_network_requestmetrics_requestspersec_count[5m])) by (request)Common Patterns
Event Sourcing
┌──────────┐ ┌───────────┐ ┌──────────────┐│ Command │────>│ Kafka │────>│ Event Store ││ Service │ │ Topic │ │ (Compacted) │└──────────┘ └───────────┘ └──────────────┘ │ ┌────┴────┐ │ Consumer│ │ (CQRS │ │ Read) │ └─────────┘Dead Letter Queue
// Handle poison pills with DLQtry { processRecord(record); consumer.commitSync();} catch (DeserializationException e) { // Send to DLQ producer.send(new ProducerRecord<>( "orders.dlq", record.key(), record.value() )); consumer.commitSync();}Transactional Outbox Pattern
-- Application writes to orders table and outbox table-- in a single database transactionBEGIN;
INSERT INTO orders (id, customer_id, amount)VALUES ('order-123', 'cust-456', 99.99);
INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload)VALUES (gen_random_uuid(), 'Order', 'order-123', 'OrderCreated', '{"id":"order-123","amount":99.99}');
COMMIT;
-- Debezium CDC connector reads the outbox table-- and publishes events to KafkaTroubleshooting
Common Issues
# Check broker statusbin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \ --cluster-id $CLUSTER_ID
# Check consumer group statusbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --describe --group order-processor
# Reset consumer group offsetsbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group order-processor --topic orders \ --reset-offsets --to-earliest --execute
# Check partition reassignmentbin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --verify --reassignment-json-file reassignment.json
# Dump log segmentsbin/kafka-dump-log.sh --files /var/kafka-logs/orders-0/00000000000000000000.log \ --print-data-log
# Verify cluster healthbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092Performance Debugging
# Check disk I/Oiostat -x 1 5
# Check network throughputsar -n DEV 1 5
# Check JVM garbage collectionjstat -gcutil $(pgrep -f kafka) 1000
# Producer performance testbin/kafka-producer-perf-test.sh \ --topic perf-test \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer-props bootstrap.servers=localhost:9092 \ acks=all \ compression.type=lz4 \ batch.size=65536 \ linger.ms=20
# Consumer performance testbin/kafka-consumer-perf-test.sh \ --bootstrap-server localhost:9092 \ --topic perf-test \ --messages 1000000 \ --threads 3Production Best Practices
Cluster Sizing
| Workload | Brokers | Partitions per Topic | Replication Factor |
|---|---|---|---|
| Small (< 100 MB/s) | 3 | 6 | 3 |
| Medium (100 MB/s - 1 GB/s) | 5-10 | 12-24 | 3 |
| Large (> 1 GB/s) | 10+ | 24-64 | 3 |
Checklist
- Use KRaft mode (no ZooKeeper) for new deployments
- Set
min.insync.replicas=2withacks=all - Disable
auto.create.topics.enablein production - Use Schema Registry for all topics
- Implement idempotent producers
- Use manual offset commits with at-least-once processing
- Monitor consumer lag and under-replicated partitions
- Configure log retention based on business requirements
- Use LZ4 compression for best throughput/CPU balance
- Deploy across multiple availability zones
- Use dedicated disks for Kafka log directories
- Set up alerting for key broker metrics
- Implement Dead Letter Queues for error handling
- Plan partition count for future growth (cannot decrease)
- Use SSL/TLS and SASL for production security
- Regular backup of topic configurations and ACLs
- Test disaster recovery procedures periodically
- Document topic naming conventions and schemas
Transform Your Team’s Kafka Skills
Building a reliable event streaming platform requires deep expertise in distributed systems, performance tuning, and operational best practices. At chavkov.com, I deliver hands-on Apache Kafka training that takes your team from fundamentals to production mastery.
Training Options
| Format | Duration | Focus |
|---|---|---|
| Kafka Fundamentals | 2 days | Core concepts, producers, consumers, topic design |
| Kafka Operations | 3 days | Cluster management, monitoring, security, DR |
| Kafka Streams & ksqlDB | 2 days | Stream processing, stateful operations, windowing |
| Event-Driven Architecture | 3 days | Patterns, CQRS, event sourcing, schema evolution |
All trainings include hands-on labs with real Kafka clusters. Contact me to discuss your team’s training needs and schedule a customized program.