개요
회사에서 Kafka 클러스터링 구성을 진행하다보니 KIP-848에 대해서 알게 되었다.
나름 Kafka동작방식에 있어서 획기적인 아이디어를 적용하였으니 한 번 알아보면 좋을 것 같아서 좀 찾아보았다.
KIP848
- KIP-848은 Kafka의 새로운 Consumer Rebalance Protocol을 도입
기존 리밸런싱 방식의 문제점
1.모든 Consumer들이 Coordinator에게 Join 요청
- Coordinator가 모든 Consumer의 metadata 수집
- 리더 Consumer(보통 첫 번째 consumer)가 전체 assignment를 계산
- Coordinator가 각 Consumer에게 할당
- 모든 Consumer가 Join 완료되어야만 리밸런스가 가능
- 느린 Consumer 하나 때문에 전체 Group이 지연됨
- Rebalance 중에는 메시지 처리 중단
- Consumer 수가 많을수록 느려짐
메세지 처리중단은 클러스터를 구성하는데 나름 크리티컬한 문제가 될 수 있다고 생각하여 KIP848를 적용하는게 낫다고 생각했다. 적용방식도 크게 어렵지는 않다.
KIP848의 이점
KIP848은 리밸런싱을 서버가 담당하여 처리하게 되는데 이로인해서 다음과 같은 이점을 얻을 수 있다.
항목 | 설명 |
---|---|
group.protocol=consumer | 새로운 리밸런스 방식을 활성화하는 설정 |
Server-side assignment | 기존 Consumer 리더가 아닌 Broker가 직접 파티션 할당 |
Epoch 기반 상태 관리 | Partition 할당 정보를 Epoch으로 버전 관리 |
Partial Join 지원 | 느린 Consumer를 기다릴 필요 없이 리밸런스 가능 |
Poll() 병렬 처리 가능 | 일부 Consumer가 늦게 Join해도 다른 Consumer는 메시지를 처리 가능 |
조건
KIP848은 kafka client가 3.5버전 이상이여야 적용이 가능하며 Spring-kafka의존성은 3.1버전 이상에서 적용가능하다.
kafka Docker Compose 작성
먼저 kafka를 띄워야하기에 Kafka Docker compose를 작성하도록 한다.
services:
broker:
image: bitnami/kafka:3.7.0
hostname: broker
container_name: broker
ports:
- "7092:9092"
- "10000:9094"
environment:
# Kraft Settings
KAFKA_CFG_NODE_ID: 0
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_PROCESS_ROLES: controller,broker
# Listeners
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# Clustering
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1
# KIP-848
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
Kafka Producer 설정
@Component
public class ProducerRunner implements ApplicationRunner {
private final KafkaTemplate<String, String> template;
ProducerRunner(KafkaTemplate<String, String> template) {
this.template = template;
}
@Bean
public NewTopic myTopic() {
return new NewTopic("KIP848-topic", 5, (short) 1); // 파티션 5개, replication factor 1
}
@Override
public void run(ApplicationArguments args) {
for (int i = 0; i < 20; i++) {
String key = String.valueOf(i % 5); // 파티션 균등 분산
String message = (i == 3 || i == 10 || i == 17) ? "slow-message-" + i : "message-" + i;
template.send("KIP848-topic", key, message);
}
}
}
//properties
spring.kafka.bootstrap-servers=localhost:10000
파티션을 5개로 나누고 새로운 토픽을 만든다.
Consumer 설정
@Component
@KafkaListener(groupId = "rebalance-consumer", topics = "KIP848-topic")
public class KafkaConsumerService {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
@KafkaHandler
public void consume(String message) {
String threadName = Thread.currentThread().getName();
log.info("[{}][{}] START - message: {}", threadName, LocalDateTime.now(), message);
if (message.contains("slow")) {
try {
Thread.sleep(5000); // 일부 Consumer는 일부러 느리게 동작
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("[{}][{}] END - message: {}", threadName, LocalDateTime.now(), message);
}
}
//application-consumer1.properties
spring.kafka.consumer.properties.group.protocol=consumer
spring.kafka.consumer.group-id=rebalance-consumer
spring.kafka.consumer.client-id=consumer1
spring.kafka.bootstrap-servers=localhost:10000
spring.kafka.consumer.properties.group.protocol=consumer
이 문법이 있어야 KIP848를 적용할 수 있고,
실제 구동을 하게 되면
이런 문구가 뜨게 된다. 이러면 잘 적용된 것이다.
사실 KIP-848에 대한 테스트를 완벽하게 진행하고 싶었지만
리밸런싱 타이밍을 정확히 맞추기는 어려워 완벽한 테스트를 진행하기 쉽지 않았다.
컨슈머가 10대 정도는 있고 대규모 처리시에 리밸런싱에 대한 관측이 쉬울 것 같으므로 실무에 적용하고 이에 따른 모니터링 결과가 있으면 추가로 적도록 하겠다.