본문 바로가기

IT/OpenSource

[Apache Kafka] Kafka Data Ordering issue (2023.01.26)

0. Table Of Contents

 

1. 서론

현재 기획중인 데이터 파이프라인의 경우 아래와 같은 특성을 지니고 있다.

  • 하나의 source가 아닌 다양한 환경의 source에서 데이터를 받는다.
  • 현장 local server가 존재하지만, 통계 및 분석을 local server에서 지원하게되면 disk 및 성능 문제가 발생 할 수 있으며 network가 단절될 시 큰 문제가 발생할 수 있다.
  • 여러 현장의 데이터를 중앙에서 관리하고 주기적으로 아카이빙 하기 용이해야한다.

위의 이유로 인해 kafka를 도입하게 되었다. 그러나 kafka를 도입하면서 partition을 늘릴경우, data consume 하였을 때 data sorting이 의도된 대로 동작을 하지 않는 것을 보게 되었다.

 

2. 문제점

partition이 하나의 경우를 시뮬레이션 하였을 때 아래와 같다.

 

 

2.1. partition이 하나일 경우의 데이터 ordering test

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic order-test-single-partition
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/tools/build/dependant-libs-2.13.8/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/trogdor/build/dependant-libs-2.13.8/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
Created topic order-test-single-partition.

 

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-console-producer.sh --topic order-test-single-partition --bootstrap-server 127.0.0.1:9092 
>1
>2
>3
>4
>5
>6
>7
>8
>9
>0
superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-console-consumer.sh --topic testtest11 --bootstrap-server 127.0.0.1:9092 --from-beginning        
1
2
3
4
5
6
7
8
9
0

2.2. partition이 복수개의 경우

 

2.2.1. kafka clustering

clustering을 하기 위해 하나의 zookeeper과 3개의 kafka broker를 구성 할 것이다.

 

아래와 같이 server.properties를 3개 작성해준다.

############################# Server Basics #############################

# 각 브로커마다 id값이 달라야 하며 유일한 값이어야 한다.
broker.id=0

############################# Socket Server Settings #############################
#각 브로커가 사용할 port를 명시해주어야한다.
listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
# 외부에 공개할 별칭
advertised.listeners=PLAINTEXT://127.0.0.1:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################
# log dir의 경우 각 kafka server마다 달라야한다.
log.dirs=/tmp/kafka-logs1

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168


log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# 연결할 zookeeper node정보
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

 

listeners, advertised.listeners, broker.id를 각 broker별로 다르게 구성한 다음, 아래 명령어를 각 broker를 아래 명령어를 이용하여 실행시켜준다.

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-server-start.sh {{YOUR_CONFIG_FILE_PATH}}

2.2.2. data ordering test

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 10 --topic order-test-multi-partition
Created topic order-test-single-partition.

 

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-console-producer.sh --topic order-test-single-partition --bootstrap-server 127.0.0.1:9092 
>1
>2
>3
>4
>5
>6
>7
>8
>9
>0

 

superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1 --topic order-test
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/tools/build/dependant-libs-2.13.8/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/trogdor/build/dependant-libs-2.13.8/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/superdev/Downloads/kafka-3.3.1-src/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
Created topic order-test.
superdev@dev-Jhonason kafka-3.3.1-src % bin/kafka-console-consumer.sh --topic order-test-single-partition --bootstrap-server 127.0.0.1:9092 --from-beginning        
0
9
6
2
7
6
3
8
4
1

 

파티션이 1개일 경우와 확연한 차이를 볼 수 있다.

 

3. 문제점 분석

partition 전략이 round robin(or spraying) 전략일 경우에 파티션에 돌아가면서 데이터가 적재되게 된다.

 

 

또한, 파티션별로 실시간 접근 속도가 다르기 때문에 가져올 때 빠른 파티션의 데이터를 먼저 가지고 온다. 따라서 위 그림과 같은 데이터 출력이 이루어지게 된다. 이로 인해 우리가 기대하던 데이터 ordering이 되지 않는 상황이 발생하게 되었다.

 

 

4. 해결법

4.1. partitioning key

데이터를 입력할 시, 파티셔닝 키를 참조하여 데이터를 insert 할 수 있다. kafka는 파티셔닝 키를 이용하여 해쉬를 계산한 다음 kafka 자체 알고리즘에 의해 파티션을 결정하여 데이터를 produce 하게 된다.

그러나 이 방법은 해쉬값에 의해 파티션이 고정되기 때문에 최적화를 개발자가 하지 않으면 한 파티션으로 데이터가 몰릴 수 있기 때문에 조심해야한다.

 

 

 

4.2. sticky partition

참고로, kafka 2.4부터 sticky session을 기본 파티셔닝 전략으로 채택했다고 한다.

partition 전략 관련 kafka parameter 공식 문서를 보면 아래와 같다.

 

partitioner.class가 설정되지 않으면 기본 파티셔너가 사용된다. 이 전략은 batch.size 만큼의 데이터가 파티션에 차기 전까지 특정 파티션에 데이터를 고정시키는 전략이다. 이 전략이 동작하는 세부적인 사항은 다음과 같다.

  • 파티션이 명시되지 않았으나 키가 존재한다면, 키의 해쉬 값에 의해 파티션이 선택된다.
  • 파티션과 키 둘다 존재하지 않는다면 batch.size 만큼의 데이터가 특정 파티션에 고정되어 입력되고, 이를 넘길 시 다른 파티션에 고정되어 입력된다.

 

이를 이용하게 되면 아래 그림과 같이 파티션에 데이터들이 프로듀싱 될 것이다.

 

 

kafka 세팅시 batch.size, linger.ms 설정을 배치 서비스에 맞게 잘 설정하였다면, 배치 데이터가 하나의 파티션에 묶이기 때문에 최소 하나의 배치에 대해서는 데이터 순서를 보장받을 수 있다.