카테고리 없음

[Apache Kafka] Kafka 튜닝 기록 아카이빙 (2023.05.01)

JeongHyeongKim 2023. 5. 22. 15:42

0. Table Of Content

 

 

1. Kafka Batch data produce 도중 발생 한 문제

{"level":"ERROR","timestamp":"2023-04-24T08:40:10.368Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 7)","broker":"211.49.126.109:9092","clientId":"rmkr","error":"The request included a message larger than the max message size the server will accept","correlationId":9,"size":77}
{"level":"ERROR","timestamp":"2023-04-24T08:40:10.370Z","logger":"kafkajs","message":"[Producer] The request included a message larger than the max message size the server will accept","retryCount":0,"retryTime":287}
KafkaJSProtocolError: The request included a message larger than the max message size the server will accept
     at createErrorFromCode (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/protocol/error.js:581:10)
     at Object.parse (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/protocol/requests/produce/v3/response.js:45:11)
     at Connection.send (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/network/connection.js:433:35)
     at processTicksAndRejections (internal/process/task_queues.js:95:5)
     at async Broker.[private:Broker:sendRequest] (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/broker/index.js:904:14)
     at async Broker.produce (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/broker/index.js:241:12)
     at async /home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/producer/sendMessages.js:94:24
     at async Promise.all (index 0)
     at async /home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/producer/sendMessages.js:133:9
     at async sendBatch (/home/crocus/project/acelo-grid-local-data-producer/node_modules/kafkajs/src/producer/messageProducer.js:95:12) {
   retriable: false,
   helpUrl: undefined,
   cause: undefined,
   type: 'MESSAGE_TOO_LARGE',
   code: 10
 }

여러 iot 데이터를 수집하는 도중 아래와 같은 로그가 발생하며 정상적으로 produce 되지 않는 오류가 발생하였다.

 

 

위 로그를 정리하면 , Kafka Broker에 설정된 최대 메시지 크기보다 더 큰메시지를 받게 된다는 의미이다. 실제로 kafka broker의 해당 topic에 데이터가 들어왔는지 확인한 결과 어떠한 데이터도 들어와 있지 않았다.

이를 해결하기 위해 batch 및 데이터 전송, 처리 관련 파라미터를 리서치 하다 아래와 같은 프로퍼티를 발견할 수 있었다.

 

2. 문제를 해결하기 위해 튜닝 해야 할 파라미터에 대한 추측

위 문제를 해결하기 위해 문제가 된 파라미터들을 유추하여 나열한 결과 아래와 같은 파라미터를 찾을 수 있었다.

 

message.max.bytes (Broker Config)

  • Broker가 허용할 Producer측에서 전송한 하나의 배치를 통해 생산된 메시지의 최대 크기(압축이 적용 되었다면, 압축 후 크기)를 의미한다.

 

socket.request.max.bytes (Broker Config)

  • socket 통신으로 kafka broker에 데이터를 전송 할 때, 허용할 최대 byte 수

 

fetch.max.bytes (Consumer Config)

  • Consumer instance가 한번의 fetch에 가장 많이 들고올 수 있는 데이터의 양

 

fetch.min.bytes (Consumer Config)

  • Consumer Instance가 한번의 fetch에 가장 적게 들고 올 수 있는 데이터의 양

 

fetch.max.wait.ms (Consumer Config)

  • fetch.min.byte보다 반환할 데이터의 양이 적지만 Consume 요청에 응답해야할 때, fetch.min.byte를 충족시키기 위해 기다릴 수 있는 최대의 시간 값.

 

fetch.max.wait.ms (Consumer Config)

  • fetch.min.byte보다 반환할 데이터의 양이 적지만 Consume 요청에 응답해야할 때, fetch.min.byte를 충족시키기 위해 기다릴 수 있는 최대의 시간 값.

 

maxBytesPerPartition (Consumer Config)

  • 파티션 별 얼마나 많은 데이터를 들고 올지에 대한 파라미터이며, kafkaJS에만 있는 설정 값으로 보인다.

 

 

3. 조치 사항

우선적으로, Kafka Broker에서 메시지 크기가 제한되어 이를 늘려주기로 결정하였으며, 현재 사내에서는 kafka가 bitnami 에서 제작한 docker로 운영되고 있었기 때문에, 이에 대한 설정을 docker-compose.yaml에 해주기로 하였으며 문서를 참조하여 아래와 같이 설정하였다.

 
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_MESSAGE_MAX_BYTES=10485880
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://211.49.126.109:9092
    depends_on:
      - zookeeper

 

처음에는 env 설정하는 방법을 찾지 못해 해매다가 bitnami kafka docker document를 찾아보니 명시되지 않은 parameter들은 KAFKA_CFG prefix를 붙이면 된다고 한다. 이를 이용하여 docker-compose.yaml에 KAFKA_CFG_MESSAGE_MAX_BYTES=10485880 를 선언하여 임시로 10mb 까지 늘려주었다. 현장에 디바이스 수가 많아짐에 따라 한번의 배치에 많은 데이터를 담아 produce하게 되니 위와 같은 문제가 발생 한 것으로 보인다.

 

추가적으로 kafkaJS에만 존재하던 값을 튜닝해보기로 하였다.

현재 테스트를 진행 중인 상태이므로, iot data stream을 담당하는 topic의 경우 현재 partition을 하나만 사용하고 있다. 이 때, partition별로 가지고 올 수 있는 데이터를 더 크게 늘리면 한번에 더 많은 사이즈의 데이터를 batch를 통해 elasticsearch로 전달 할 수 있을 것이라 생각했다.

실제로 이 값을 바꾸어 돌려본 결과, 아래와 같았다.

 

partitionMaxByte가 10mb 일 때
partitionMaxByte가 1mb 일 때

  • partitionMaxByte가 10mb 일 때 : 58초에 19739 byte
  • partitionMaxByte가 1mb 일 때 : 19초에 7063 byte

 

계산해보면 거의 비슷한 성능을 내는것 처럼 보이지만, 한번에 많은 데이터를 밀어넣는 10mb로 세팅하였을 때가 elasticsearch bulk query의 성능을 내는데에는 더 좋다고 생각이 되었다.