0. Table Of Contents

 

 

1. 많은 인덱스를 생성하지 않는다.

다음 사진과 같이 인덱스를 하나의 사전이라고 생각해보자.

위 같은 구조에서 collection에 name이라는 property가 “AB“라는 document를 추가한다고 가정하자. 현재 collection에 등록된 document의 name property에 의한 순서는 “A-B-C-D”로 되어있지만, 추가가 되게 되면 “A-AB-B-C-D“로 변경이 되어야 한다.

먄약에 위처럼 데이터가 엄청 많이 추가되는 상황이라면 위 사전의 목차는 계속해서 업데이트 되어야 할 것이다. 즉, collection의 업데이트가 많은 구조의 index 또한 업데이트가 많이 일어날 것이며 index가 많을 수록 이러한 현상이 많아질 가능성이 높아진다.

또한 인덱스는 시스템 메모리에 상주하고 있는데, 인덱스가 많아져서 메모 디스크 공간을 메모리 처럼 사용하기 위해 가상메모리를 형성하게 된다. 이러한 상태가 지속되게되면 mongodb의 전체적인 퍼포먼스에 부정적인 영향을 줄 수 있다.

이러한 이유로 index를 많이 만드는 것을 지양하는 것이 좋다.

 

 

2. Index Prefix를 적극적으로 이용하자.

MongoDB를 이용해본 개발자라면 compound index의 개념을 알 것이다. 모르는 분은 여기(MongoDB Official Document)에서 개념을 참고한다. 아래와 같은 compound index를 예를 들어보자.

 

db.students.createIndex({name:1, grade:1, createdAt:1})

 

 

위같은 compound index를 해석하면 다음과 같다.

“name으로 asc 정렬한 다음 grade를 asc정렬하고, 마지막으로 createdAt으로 asc 정렬한 index“

 

위 처럼 해석된 compound index가 존재하려면 다음과 같은 조건이 필요하다.

“name으로 asc 정렬한 다음 grade를 asc 정렬한 index”

 

또한, 위처럼 해석된 compound index가 존재하려면 다음과 같은 조건이 필요하다.

“name으로 asc 정렬한 index“

즉, compound index의 경우에 순서를 지킨 (compound)index가 자동으로 적용된다는 것을 알 수 있으며, 공식문서에도 이에 대한 내용을 볼 수 있다. 정리하면, 위 index를 생성함으로써 부가적으로 사용가능해진 index는 다음과 같다.

 

db.students.createIndex({name:1, grade:1, createdAt:1})
db.students.createIndex({name:1, grade:1})
db.students.createIndex({name:1})

 

 

 

 

3. Multi Sorting의 경우 sort 방향 신경써서 index를 설계하자

single index의 경우에는 sort 방향이 필요 없다. 위에서 언급한 사전을 예로 들면, 단순히 사전의 목차를 처음부터 보느냐 마지막꺼부터 역방향으로 보느냐 차이기 때문이다. 그러나 compound index의 경우, 정렬된 것을 다른 것을 기준으로 정렬하기 때문에 sorting 방향을 엄격하게 지킬 필요가 있다. 이를 지키지 않을 시, index를 찾는 것이 아닌 collection 상에서 full scan이 일어나게 된다.

 

 

아래 처럼 index를 만들었다고 가정하자.

 

db.students.createIndex({name:1, grade:1})
db.students.createIndex({name:1, grade:-1})
db.students.createIndex({grade:1, name:1})
db.students.createIndex({grade:1, name:-1})

 


첫 번째 라인의 index를 이용하면 아래와 같은 index들을 사용할 수 있다.

 

db.students.createIndex({name:1, grade:1})    -> original index
db.students.createIndex({name:1})             -> can be used by index prefix
db.students.createIndex({name:-1})            -> can be used inverse of index prefix
db.students.createIndex({name:-1, grade:-1}). -> can be used by inverse of original index

 

 

 

나머지 index들도 위 케이스와 같이 동일하게 적용하면 아래와 같은 find method에 대해 full scan을 할 필요 없이 index만으로 sorting 처리할 수 있게 된다.

 

db.find().sort({name:1, grade:1})
db.find().sort({name:1, grade:-1})
db.find().sort({name:-1, grade:1})
db.find().sort({name:-1, grade:-1})
db.find().sort({grade:1, name:1})
db.find().sort({grade:1, name:-1})
db.find().sort({grade:-1, name:1})
db.find().sort({grade:-1, name:-1})
db.find().sort({grade:1})
db.find().sort({grade:-1})
db.find().sort({name:1})
db.find().sort({name:-1})

 

 

이 외의 sort를 이용하기 위해서는 추가적인 index를 설계해야할 것이며, 현재 가지고 있는 index가 얼마만큼의 sorting을 cover할 수 있는지 잘 판단하여야 한다.

 

 

4. 하나의 collection을 여러개의 collection으로 분리하자

하나의 collection 내부에 많은 document를 가지게 되면 아래와 같은 현상을 자연스럽게 수반하게 된다.

  • index의 size가 증가한다.
  • index의 카디널리티가 증가한다

위 상황은 아래와 같은 상황에서 인덱스를 이용하여 query result를 return 할 때, query processor가 불필요한 index 를 참조하기 때문에 퍼포먼스가 낮아질 수 있다. 따라서, 여러 collection으로 분리하는 전략이 하나의 선택이 될 수 있다.

 

 

5. MongoDB를 4.0이상 버전으로 유지하자.

 

5.1. Non blocking Secondary Read

4.0 이전 버전에는 write가 primary에 완전히 commit 된 데이터들을 secondary에 전달완료 하기 전까지는 read operation을 block한다.

이를 mongodb는 WiredTiger timestamp를 이용하여 update에 대한 order를 보장하고, consistency snapshot를 이용하여 secondary 복제가 일어나는 순간에는 secondary가 아닌 snapshot를 읽게 함으로써 non blocking secondary read를 구현하였다.

 

 

5.2. Multi Transaction

RDB의 경우, 서로 관계가 있는 데이터의 경우는 정규화를 해서 데이터를 insert하지만, mongodb의 경우 이를 정규화 하지 않고 하나의 document에 저장하는 성질을 가지고 있다. 이 이론에 기반하여 MongoDB에서 제시하는 이상적인 collection 설계에 따르면 document 하나로 데이터의 무결성이 보장이 된다. 그러나 모든것이 이상적일 수가 없기 때문에 이를 개발자들은 관계를 가진 여러개의 document에 대해 무결성을 보장하기 위해 pending, rollback을 모두 직접 구현한 2-Phase-Commit 패턴으로 해결하였다. 그러나 4.0 이상의 버전에는 multi trransaction을 지원하므로 이를 사용하는 것을 권장한다.

 

 

 

6. Reference

0. Table Of Contents

 

1. 설계 배경

개인 프로젝트를 진행하던 도중 업로드한 개인 프로필 사진을 저장해야하는 기능을 개발하게 되었다. 프로필 사진을 저장하고 불러오는 단계에서 최대한의 성능을 내게 하고자 고민하게 되었다.

 

 

2. 고려해야할 항목

  • 사진파일이 올라갈 때 너무 큰 크기의 사진이 올라가게 되면 돈을 많이 내게 된다.
  • 사진 파일이 올라간 aws s3경로를 보고 이 사진이 어떤 사진인지 확실하게 알면 좋다.
  • AWS S3도 내부적으로 파일을 찾을 때 성능적인 문제가 생긴다고 하던데, 이를 고려한 설계가 들어가면 좋다.

 

 

3. AWS S3 Reference 분석

AWS 내부적으로 S3 Bucket에서 파일을 어떻게 분포시키느냐에 따라 search 성능이 떨어질 수 있다고 들은게 있기 때문에 이에 대해서 성능적인 이슈를 하나씩 찾아본 결과, aws official blog를 찾을 수 있었고, 이를 아래에 요약을 해보았다.

S3에는 splitting이 필요한 keyspace를 지속적으로 모니터링하는 자동화기능이 있습니다. 내부적으로 high rate request, 너무 많은 key(aws s3 object) 등의 요소에 따라 파티션을 새로 생성하여 key를 이동시킵니다. 이러한 작업이 성능적으로 큰 영향을 미치진 않지만, 단일 파티션에 많은 key에 대해 request rate가 증가하면 이러한 작업이 많이 발생하기 때문에 사전에 내부적으로 파티셔닝이 많이 일어나지 않는 key path 설계를 하는 것이 중요합니다.

 

3.1. 문제상황 예시

위 요약 내용을 설명하기 위해 아래에 예시를 통해 알아보도록 하자.

당신이 쓰기 및 읽기가 많이 이루어지는 파일들을 다음과 같은 형식으로 Service라는 bucket안에 s3 key로 만들었다고 가정하자.

bucket/user/schedule/<userId>
bucket/user/info/<userId>
bucket/user/secret/<userId>
bucket/user/company/<userId>
bucket/user/log/<userId>

 

위 같은 경우처럼 설계되었을 시, 유저가 요청하면 aws는 아래 그림과 같이 aws key를 탐색한다.

 

위 그림과 같은 아키텍처에서 엄청나게 많은 요청이 들어왔을 때, user 디렉토리에 부하가 많이 걸릴 것이다. 이 결과로 aws s3 내부적으로는 부하를 해결하기 위해 파티셔닝이 일어날 것이다.

 

3.2. 문제상황 해결

위 3.1 그림에서 한 디렉토리로 request가 몰리지 않도록해보자. 가장 분포가 넓은 것을 위주로 key path를 설계를 한다면 기존 user라는 디렉토리로 과부하가 read/write 요청이 몰리지 않을 것이다.

위 사진에서 가장 분포가 넓고 공통점을 많이 가진 key path를 재설계해보았을 때 아래와 같은 구조를 생각해볼 수 있다.

bucket/<userId>/schedule
bucket/<userId>/info
bucket/<userId>/secret
bucket/<userId>/etc
bucket/<userId>/log
.
.
.

위 구조를 모식화 하면 아래 사진과 같다.

 

 

 

초안보다 개선된 점은 다음과 같다.

1. 기존 user 디렉토리를 여러명의 유저 개개인 관점으로 쪼개어 하나의 디렉토리로 몰리지 않게 함.
2. 유저의 속성으로 쪼갤 수 있었으나, 유저의 속성(schedule, info, secret…)보다는 userId 값이 분포가 훨씬 넓기 때문에 userId로 쪼갠 것이 부하분산에 유리하다.

 

 

4. 실전 적용

표현해야하는 정보 및 고려사항은 다음과 같다.

  • 유저 프로필 사진을 업로드하고, 경로를 보고 이 사진이 어떤 사진인지 알 수 있어야 한다.
  • S3 key 이름에 특정인이 암시되지 않아야 한다.
  • key를 가지고 버전관리가 되어야하며, 지난 사진은 삭제시켜야한다.
  • 프로필 사진의 크기가 여러개 존재할 수 있다.

이 사항들과 위 내용을 고려한 결과, 다음과 같은 s3 key path를 설계할 수 있었다.

 

bucket/<USER_ID>/profile/<IMG_SIZE>/<TIMESTAMP>

 

  • <USER_ID> : 유저의 고유 id
  • profile : 유저의 프로필 사진이라는 뜻의 path, 유저의 다른 부분들이 저장될 수 있기 때문에 이 부분을 유저의 어떤 자원인지 명시해주기로 함.
  • <IMG_SIZE> : 200x200, 400x400같이 해당 사진의 크기가 몇인지 표시해주는 path
  • <TIMESTAMP> : 원본 사진 파일 이름을 알 필요가 없으며, 주기적으로 aws lambda를 이용해 timestamp가 오래된것들은 삭제시킬 수 있다.

 

 

5. 결론 및 느낀점

  • Cloud 자원 역시 근본은 데이터센터 자원을 사용하는 것이기 때문에, 최대한의 효율을 낼 수 있는 방법을 충분히 리서치를 하여 적용시키는 방법이 더 바람직하다.
  • 위 aws s3 key설계가 현재 학습 중인 mongoDB의 get 부하분산이랑 되게 비슷하다는 느낌을 받았다.

 

 

6. Reference

0. Table Of Contents

1. Coldstart

FaaS Service는 쓰지 않는 상태일 때는 function instance 대기 상태가 아닌 생성되지 않은 상태로 유지되다가 request가 들어올 경우, function instance가 생성되며 이에 대한 요청을 핸들링하기 시작한다. 이 때, request를 handling할 instance가 없으면 delay가 생길 수 있다.

 

cloud function은 다음과 같은 경우 새로운 함수 인스턴스가 생성된다.

  • cloud function을 새로 배포할 경우
  • auto scaling으로 인해 확장되는 경우
  • 긴 시간동안 function이 호출이 되지 않았을 경우
  • 내부적인 오류로 인스턴스를 대체할 경우

 

위의 경우에 대해 대책을 세우지 않으면 무거운 FaaS를 설계하였을 때, delay가 길어질 수 있으니 주의해야한다.

이를 극복할 수 있는 방법으로 해당 function instance가 cold start가 되지 않도록 지속적으로 health check를 하는 방법이 있다.

 

 

2. Stateless Environment

function이 실행되더라도 각 요청마다 다른 함수 인스턴스에서 요청처리를 할 수 있기 때문에, 전역변수를 이용한 출력은 매번 달라질 수 있다. 그렇기 때문에 이러한 데이터는 db, cloud storage 등의 서비스를 이용하여 제어를 해야하는 것이 바람직하다. 그에 대한 이유는 아래 코드 및 실행결과를 보도록 하자.

 

cloud function sample code

 

7월 15일 오후 2시 37분 실행했을 때의 상태

 

 

 

7월 15일 오후 6시 8분 실행했을 때의 상태

 

 

2시 37분 cloud function 을 invoke하고 나서, 약 3시간 30분 정도를 실행하고 있지 않다가 실행한 결과, 전역변수가 초기화 된 것을 볼 수 있다. 즉, function instance를 cold start를 하였기 때문에 전역변수가 초기화 된 것을 볼 수 있다.

이를 방지하기 위해 상태값을 가진 변수 및 객체는 DB를 이용하여 다른 곳에서 캐싱 또는 저장이 되어있어야 한다.

 

 

 

3. 한도

GCP Cloud Function의 한도의 경우, 아래 사진을 참고하도록 한다. 자유자재로 customizing 한 다른 resource와 달리, FaaS Service는 경량화된 서비스이기 때문에 이러한 부분에서 약점을 지닌다. 서비스 설계시 정말로 FaaS를 사용해서 설계를 해도 문제가 없는 아키텍처인지 분석이 필요하다.

 

 

  1. 누구게여 2021.07.17 01:56

    멋진사람 !!

0. Table Of Contents

 

 

1. 문제 현황 분석

 

1.1. 문제 상황

docker version을 업그레이드 하고 나서 다음과 같이 잘 되던 .env file parsing이 잘 되지 않는 오류가 발생하였다.

 

 

1.2. 기존 docker version과 upgrade한 docker version

Mac Docker Desktop 기준으로 작성되었습니다.
  • 기존 Docker version : 3.0.0
    • Docker compose version : 1.27.4
    • Docker version : ?
  • 업그레이드 된 Docker version : 3.4.0
    • Docker compose version : 1.29.0
    • Docker version : 20.10.7

 

1.3. 프로젝트 폴더 구조

 

1.4. 명령어 call 순서

  • docker.sh를 이용하여 docker-start-local.sh 실행
  • docker-start-local.sh./docker-compose/docker-compose.postgres.yml를 참조하여 아래와 같은 명령어를 실행시킨다.
  • 업그레이드 전 아래 명령어는 정상적으로 동작하여 .env파일을 정상적으로 파싱하고 있었다.
docker-compose \
  -p {PROJECT_NAME} \
  -f ./docker-compose/docker-compose.postgres.yml \
  -f ./docker-compose/api/docker-compose.base.yml \
  -f ./docker-compose/api/docker-compose.local.yml \
  up $build --remove-orphans

 

 

1.5. 주요 파일 구성 확인

혹시나 싶어서 docker-compose.postgres.yml 파일과 .env가 제대로 구성이 안되어있는지 확인을 해본 결과 다음과 같았다.

version: '3'

services:
  postgres:
    image: <PROJECT_NAME>/postgres
    container_name: "<PROJECT_NAME>-postgres"
    build:
      context: ../docker/postgres
      dockerfile: Dockerfile
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - ../data/postgres/data:/var/lib/postgresql/data
      - ../docker/postgres/init/:/docker-entrypoint-initdb.d/
    ports:
      - "${POSTGRES_PORT}:${POSTGRES_PORT}"
    restart: unless-stopped
    ulimits:
      nproc: 65535
      nofile:
        soft: 65535
        hard: 65535
    healthcheck:
      test: ["CMD", "docker-healthcheck"]
      interval: 30s
      timeout: 10s
      retries: 3
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "5"

 

# .env
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=postgres
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres

 

위와 같이 .envdocker-comspose.postgres.yml의 환경변수가 잘 매핑이 되어 있었기 때문에 parsing error가 날리가 없다고 생각했다.

 

 

 

 

2. 문제 해결 삽질

 

2.1. 문제 해결을 위한 searching

지금까지 어깨 너머로 배워서 docker-dompose를 사용하였지만, 문서를 상세히 보고 사용한 적이 없기 때문에 공식 document를 보면서 제일 먼저 정리하기로 했다.

Docker-Compose Command로 env를 정의하고, 실행했기 때문에, env file configuration 쪽을 찾아보는 도중 다음과 같은 문구를 발견 할 수 있었다.

 

출처 : Docker-compose Document

 

위 문서 중 내가 겪은 문제와 관련된 부분을 한글화하여 요약하면 다음과 같다.

1.28 미만의 Docker Compose의 경우, command가 실행된 현재 작업중인 directory에서 .env파일을 가지고 오거나, --project-directory argument에서 설장된 path에서 .env 파일을 가지고 옵니다.


이러한 모호함을 1.28 이상의 버전에서는 .env의 default path를 project directory로 제한하는 것으로 개선하였습니다. --env-file 옵션을 이용하여 기본 .env default path를 override 할 수 있습니다.


project directory는 다음 순서로 정의됩니다.

1. --project-directory flag에 정의된 path

2. 첫번째 --file (-f)로 flag에 정의된 directory

3. 현재 directory

 

 

2.2. 문제 디버깅

기존에 사용한 docker-compose version은 1.27.4였기 때문에 위 사항에 일치하였다. 위 사항을 이용하여 docker desktop 업데이트 이후 발생한 오류를 디버깅 해보면 다음과 같다.

  • 프로젝트 루트 폴더에서 shell script를 이용하여 docker-compose command를 실행시켰다.
  • 위에서 정의된 docker-compose command에는 project directory가 정의되어 있지 않다.
  • docker compose에 이용할 yml파일로 ./docker-compose/docker-compose.postgres.yml가 입력되었다.
  • project directory 정의 순서에 따라, ./docker-compose/가 project directory로 정의된다.
  • ./docker-compose/에는 .env 파일이 없다.
  • 없는 파일을 참조하였기 때문에 yaml파일 내부에서 ${}로 정의한 변수들이 전부 빈 string으로 대체된다.
  • postgresql port는 필수로 필요하지만, 입력이 되지 않았기 때문에 에러가 발생하였다.

 

 

2.3. 디버깅내용이 맞는지에 대한 검증

project directory인 ./docker-compose/에 루트 프로젝트 디렉토리에 있는 .env 파일을 다음 사진과 같이 복붙하여 .env파일을 하나 더 만든 다음 기존에 만들어 놓은 쉘스크립트를 이용하여 docker-compose를 실행시킨 결과, 정상적으로 docker가 빌드가 되어 정상적으로 실행까지 되었다.

 

2.4. 최종 문제 해결 flow

docker-compose 1.28.6 version release note를 보면 --env-file flag는 현재 작업중인 directory를 참조하도록 고쳐졌다고 한다.

 

 

env file을 제대로 인식 시켜주지 못하고 있기 때문에 이에 대해 명확히 설정을 해 줄 필요가 있다. compose에 사용되는 파일이 root project directory에 존재하기 때문에, docker-compose command 를 실행시킬 때, --env-file flag를 이용하여 명확하게 내가 사용하고자 하는 .env를 아래와 같이 명시해주었다.

 

 

 

3. 삽질을 통해 얻은 결론 및 개인적인 프로젝트 구조에 대한 회고

이처럼 실행환경에 대해서 정의할 때에는 최대한 사용할 파일들에 대해 command 또는 yaml, script에 정확히 명시를 하여 이러한 오류를 피해가게 하고, 다른 사람이 script를 읽었을 때 어떤 파일을 사용하는지 이해하기 쉽게 하는 것이 중요하다고 생각이 되었다. 향후, 위 문제처럼 다른 파일을 참조하고 있지만 묵시적인 방식으로 파일을 참조하고 있다면 명시적으로 나는 이걸 쓸거다라는 코드를 추가를 해야겠다.

 

+ env에 docker compose에서 쓰는 environment와 Dockerfile에서 사용하는 environment가 혼재하고있는데 향후 체계적으로 environment를 관리하기 위해서는 이를 분리하여 명확하게 어디서 쓰는 environment인지 정의하면 훨씬 더 좋을거같다.

 

 

 

4. Reference

Environment variables in Compose

Docker Compose release notes

 

  1. 민트초코 2021.06.24 18:17

    저한테는 어려운 말이지만 무척 멋있네요!!

  2. 크로커스개발자 2021.06.28 10:50

    오우 엄청난 글이네요

 

0. Table Of Content

 

 

 

1. Server Gateway Interface가 왜 필요한가?

일반적으로 우리가 보고 있는 웹 서비스는 브라우저를 통해서 흘러나온 웹서버의 내용들이다.
대부분의 어플리케이션의 경우 웹과 소통하는 미들웨어를 가장 널리 사용되는 tomcat, apache를 채택하여 사용하고 있다. 아쉽게도 Tomcat, Apache는 Java기반으로 만들어졌기 때문에, Python기반의 프레임워크에서는 가장 널리 사용되는 웹 서버를 사용하기 위해서는 중간에서 Java기반 미들웨어가 말하는 것을 해석해 줄 또다른 미들웨어가 필요하게 된 것이다. 물론, 파이썬 기반의 미들웨어를 사용해도 괜찮지만, 이미 검증된 것을 포기할만큼 매력이 없거나 큰 리스크를 동반해야하기 때문에 Apache, Tomcat을 그대로 사용하고 이를 중간에서 번역해주는 python framework 전용 미들웨어를 하나 만들게 되었다. 그것이 바로 Server Gateway Interface이다. Python Server Gateway Interface의 경우, 널리 사용되는 것이 WSGI와 ASGI가 있는데, 상세 설명은 다음 챕터부터 진행 하겠다.

 

 

 

 

2. WSGI (Web Server Gateway Interface)

기존 python 기반 framework가 Java Middleware와 통신하기 위해서는 Medusa(Python으로 작성된 middleware), mod_python(embed Python), CGI / FastCGI(invoke Python via a gateway protocol) 같은 API를 사용해야 했었다. 그러나 위에서 명시된 API들은 특정 요소만을 고려해서 제작된 API기 때문에, 해당 API에 맞는 부분만을 개발자가 바라보게 했기 때문에, 개발자들이 선호하는 특정 영역에만 시야가 한정되었다.
그러나, 범용으로 쓰일 수 있는 WSGI의 등장으로 위의 문제점들이 사라지게 되었으며 WSGI는 PEP3333에 정식으로 채용(?)이 되었다.

 

 

 

3. ASGI (Asynchronous Server Gateway Interface)

그러나 WSGI도 시간이 지나면서 문제점이 발생하기 시작했다.

 

3.1. 기존 WSGI에는 어떤 문제점이 있었는가?

WSGI가 개발 중일 당시, WSGI는 오직 웹개발을 위한 공통 기반을 제공하는 프로토콜을 만드는 것이었다. 이 덕분에 파이썬 기반 웹개발자는 프레임워크 세부사항에 신경 쓰지 않고 여러 프레임워크에서 쉽게 작업을 할 수 있었다. 그러나, WebSocket 개념이 웹 개발자 사이에서 인기를 얻기 시작했을 때, WSGI는 single, synchoronous callable한 특성을 가지고 있었기 때문에, 다음과 같은 특성을 지니고 있어 webSocket과는 맞지 않았다.

  • HTTP는 Connection이 짧게 유지되는 특성을 지니고 있었기 때문에, Long-Polling HTTP와 WebSocket 같이 상대적으로 connection이 긴 특성을 지닌 Protocol과는 맞지 않았다.
  • HTTP Request는 application내부에서 오직 하나의 path를 가질 수 있기 때문에, 여러개의 path를 통해 이벤트를 수신하는 WebSocket의 이벤트를 처리할 수 없었다.



3.2. ASGI는 어떤 방식으로 WSGI의 문제점을 해결했는가?

ASGI의 구성요소와 책임은 다음과 같다.

  • 소켓을 종료하고 이를 connection에 매핑하는 프로토콜 서버
  • 포로토콜 서버 내부에서 실행되는 어플리케이션 연결을 인스턴스화(per 1 connection) 하며, 이벤트 메시지의 처리


WSGI와 비슷하게 ASGI도 기능이 비슷한 것처럼 보인다. 그러나, 다음요소에서 차이가 난다.

  • Connection의 lifetime과 protocol을 정의하는 Connection Scope
  • Application으로 보내지는 Connection 동안 일어날 사건에 대한 명세 Event

 

 

 

 

ASGI Application은 단일, 비동기 callable 속성을 지니고 있다. 수신 요청에 대한 정보를 포함하는 scope를 accept하고, 클라이언트에 이벤트를 보내고 받을 수 있는 awaitable를 보내고 받을 수 있다. 이 덕분에, ASGI Application은 WSGI의 한계점을 뛰어넘는 수신 / 발신 event를 허영할 수 있다. 그 뿐만아니라 ASGI Application은 background coroutine을 허용하기 때문에 application은 요청을 처리하면서 background에서 다른 작업도 수행할 수 있게 되었다. (ex. 외부 event를 listening 하고 있는 redis queue 등)

ASGI Application을 통해서 보내거나 받는 모든 event는 Python Dictionary Type이다. 이러한 사전 정의된 event의 format은 ASGI Application가 쉽게 다른 웹 서버에서 다른 웹서버로 쉽게 전환할 수 있게 한다.

 

 

 

3.3. 간단한 ASGI Application 예제

async def application(scope, receive, send):
    event = await receive()    
    ....     
    await send({"type": "websocket.send", ...}

 

 

 

4. Reference

1. 서론 및 얽힌 이야기

내부 시스템에 대해 지속적으로 healthCheck를 하는 Spring Boot기반 서비스를 개발한 적이 있다. DB에 등록된 시스템 리스트를 가져와 순차적으로 healthCheck를 실행하는 방식의 서비스이다.

그러나, 10/12 HealthCheck Target별로 HealthCheck 시점의 정합성이 맞지 않는 문제가 발생하였다. 

 

문제 상황 서술 전에 적용중인 시스템 아키텍처 및 HealthChecker 서비스에 대한 개요를 먼저 풀어본다.

  • A Group의 WAS에 대해 배포를 진행하였다.
  • 배포가 진행되면, 해당 인스턴스로의 직접적인 호출에 대해서는 동작하지 않게 된다.
  • static file을 제외한 모든 경로에 대해서 reverse proxy를 통하여 데이터가 was로 전달된다.
  • healthCheck를 하는 경로 /api/healthcheck는 was에 설계되어 있기 때문에 was가 동작하지 않으면 web도 동작하지 않는다.
  • WAS에 APP이 deploy되기 까지 약 25초의 시간이 소모되며 WEB이 배포되기 까지 5초의 시간이 소모된다.
  • 최초의 healthCheck가 실패하면, 5초 간격으로 추가 5회의 재시도를 진행하며 모두 실패하여야만 최종 실패로 간주된다.

 

 

 

위 서술된 환경에서 발생한 문제를 시간 순서대로 다이어그램과 함께 아래에 서술하겠다.

 

 

 

 

 

위 같이 같은 시점에서 healthCheck를 진행했으면 같은 시점의 결과가 출력되어야 하나, 순차적으로 실행하는 도중 healthCheck Error로 인해 재시도를 함으로써 다음 healthCheck가 미뤄졌다.

그 결과, WAS와 WEB의 healthCheck의 시점의 차이가 커져 시점에 대한 일관성이 맞지 않는 문제가 발생하게 되었다.

 

이 문제를 해결하기 위해 delay없이 최대한 모든 target에 대해 비슷한 시점에 실행하여야 했기때문에, thread를 이용함으로써 동시다발적으로 최대한 같은 시점에서 target healthCheck를 진행해보기로 하였다.

그 중 @Async를 활용한 Thread를 이용하여 개선작업을 하기로 하고 문제가 되는 코드를 추려낸 결과, 다음과 같았다.

 

@Service
public class MonitoringSystemService {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
 
    @Autowired
    private MessageRepository messageRepo;
 
    @Autowired
    private MonitoringSystemRepository monitoringRepo;
 
 
 
    /*
     * 스케줄러 method
     *
     * @parameter
     * @return
     *          Type : void
     * @cycle period : 모든 target의 healthCheck가 끝난 후, 15초 대기
     */
 
    @Scheduled(fixedDelay = 15000)
    @Transactional
    public void scheduledMonitoringService() throws Exception {
 
 
        try {
 
            this.setPid();
 
            List<MonitoringSystem> systemInfoList = monitoringRepo.getAllMonitoringSystemList();
 
 
            //시스템별 상태 체크 및 톡 발송 실행
            for( MonitoringSystem systemInfo : systemInfoList) {
 
 
 
                switch(systemInfo.getMonFlagCd()) {
 
                    //RealTime 호출 (엔드포인트 호출형)
                    case "R":
                            logger.info("모니터링 유형 : 엔드포인트 호출");
                            logger.info("타겟 엔드포인트 : " + systemInfo.getMonSysUrl());
                            logger.info("타겟 엔드포인트 포트 : " + systemInfo.getMonSysPort());
                            logger.info("HttpRequest Method : " + systemInfo.getReqMtd());
                            boolean httpRequestResult = HealthChecker.run(systemInfo.getMonSysUrl(), systemInfo.getReqMtd());
 
 
                            systemInfo = this.sendMessage(systemInfo, httpRequestResult);
                            this.modifySystemInfo(systemInfo, httpRequestResult);
 
                        break;
 
                    //Static 호출 (직접적인 쿼리 실행)
                    case "S":
                        logger.info("모니터링 유형 : 직접적인 쿼리");
                        logger.info("실행 쿼리 : " + systemInfo.getMonSysQury());
 
                        boolean scheduledSystemStatus = monitoringRepo.getScheduledSystemStatus(systemInfo.getMonSysQury());
                        logger.info("쿼리 업데이트 정상 여부 : " + String.valueOf(scheduledSystemStatus));
 
                        systemInfo = this.sendMessage(systemInfo, scheduledSystemStatus);
                        this.modifySystemInfo(systemInfo, scheduledSystemStatus);
                        break;
                }
 
                logger.info("=======================================================================================================");
            }
 
        }catch (Exception e) {
            logger.error(e.getMessage());
        }
 
        logger.info("#################################################################################################스케줄러 끝난 시간 : " + getNowTime());
 
 
    }
 
 
}

 

 

public class Http5xxErrorRetryHandler implements ServiceUnavailableRetryStrategy {
 
    private final Set<Integer> retryableErrorCodes = new HashSet<>(Arrays.asList(500, 503, 502));
 
    private static final Logger logger = LoggerFactory.getLogger(Http5xxErrorRetryHandler.class);
 
    @Override
    public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
 
 
        int statusCode = response.getStatusLine().getStatusCode();
 
 
 
        if (executionCount > 5) {
            logger.error("재시도 5회 초과로 에러처리 합니다.");
            return false;
        }
 
        if (retryableErrorCodes.contains(statusCode)) {
            logger.warn(statusCode + "오류 발생");
            logger.info("재시도 횟수 : " + executionCount);
            return true;
        }
 
        return false;
    }
 
    @Override
    public long getRetryInterval() {
        return 5000;
    }
 
}

위 2개의 코드로 인해 최초 health check가 실패한 경우, 다음 healthCheck 대상으로 넘어가기까지 최소 30초 이상의 시간이 소모되기 때문에 동시성이 보장되지 않고 있는 상황이다.

 

 

 

 

 

 

 

2. 개선코드 및 작업내용

 

순차적으로 healthCheck를 하는 것이 아닌 thread를 이용하여 동시다발적으로 처리할 수 있도록 하기 위해 가장 먼저 thread를 실행시킬 executor를 생성해주어야 한다.

나는 아래와 같이 AsyncConfigurer를 이용하여 세부설정이 완료된 executor 객체를 bean으로 만들어 어디서든 이용 할 수 있도록 하였으며, 세부 설정은 아래 코드를 참조한다.

@Configuration
@EnableAsync
public class AsyncThreadConfig implements AsyncConfigurer{
 
 
 
    // 기본 thread 개수
    private static int THREAD_CORE_POOL_SIZE = 5;
 
    // 최대 thread 개수
    private int THREAD_MAX_POOL_SIZE = 10;
 
    // Thread Queue 사이즈
    private static int THREAD_QUEUE_CAPACITY = 5;
 
    private static String THREAD_NAME = "healthCheckExecutor";
 
    @Resource(name = "healthCheckExecutor")
    private ThreadPoolTaskExecutor healthCheckExecutor;
 
    @Override
    @Bean(name = "healthCheckExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //THREAD_MAX_POOL_SIZE = monitoringSystemRepository.getMonitoringSystemCount();
 
        executor.setCorePoolSize(THREAD_CORE_POOL_SIZE);
        executor.setMaxPoolSize(THREAD_MAX_POOL_SIZE);
        executor.setQueueCapacity(THREAD_QUEUE_CAPACITY);
        executor.setBeanName(THREAD_NAME);
        executor.initialize();
        return executor;
    }
 
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        // TODO Auto-generated method stub
        return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
    }
 
 
    public boolean isThreadPoolAvailable(int createCnt) {
 
        boolean threadStatus = true;
 
        if ((healthCheckExecutor.getActiveCount() + createCnt) > (THREAD_MAX_POOL_SIZE + THREAD_QUEUE_CAPACITY)) {
            threadStatus = false;
        }
 
        return threadStatus;
    }
 
 
    public boolean isThreadPoolAvailable() {
 
        boolean threadStatus = true;
 
        if ((healthCheckExecutor.getActiveCount()) > (THREAD_MAX_POOL_SIZE + THREAD_QUEUE_CAPACITY)) {
            threadStatus = false;
        }
 
        return threadStatus;
    }
}

 

 

 

 

위와 같이 executor를 등록하였으면, 이제 executor에 넣을 비동기프로세스를 작성할 차례이다.

실질적인 처리 로직이 담긴 부분에서 비동기 처리할 메소드에 @Async  Annotation을 선언하여 비동기 스레드를 통해 처리되도록 하였다.

다음과 같이 비동기식으로 처리할 메소드에 @Async Annotation과 함께 앞서 선언한 Executor Bean 이름을 명시해주어야 한다.

@Service
public class ThreadExecutorService {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Autowired
    private MessageRepository messageRepo;
 
    @Autowired
    private MonitoringSystemRepository monitoringRepo;
 
 
    @Async("healthCheckExecutor")
    public void threadExecutor(MonitoringSystem systemInfo) {
        //시스템별 상태 체크 및 톡 발송 실행
        this.setPid();
 
        try {
            switch(systemInfo.getMonFlagCd()) {
 
                //RealTime 호출 (엔드포인트 호출형)
                case "R":
                        logger.info("모니터링 유형 : 엔드포인트 호출");
                        logger.info("타겟 엔드포인트 : " + systemInfo.getMonSysUrl());
                        logger.info("타겟 엔드포인트 포트 : " + systemInfo.getMonSysPort());
                        logger.info("HttpRequest Method : " + systemInfo.getReqMtd());
                        boolean httpRequestResult = HealthChecker.run(systemInfo.getMonSysUrl(), systemInfo.getReqMtd());
 
 
                        systemInfo = this.sendMessage(systemInfo, httpRequestResult);
                        this.modifySystemInfo(systemInfo, httpRequestResult);
 
                    break;
 
                //Static 호출 (직접적인 쿼리 실행)
                case "S":
                    logger.info("모니터링 유형 : 직접적인 쿼리");
                    logger.info("실행 쿼리 : " + systemInfo.getMonSysQury());
 
                    boolean scheduledSystemStatus = monitoringRepo.getScheduledSystemStatus(systemInfo.getMonSysQury());
                    logger.info("---------------------------------------시스템 정상 여부 : " + String.valueOf(scheduledSystemStatus));
 
                    systemInfo = this.sendMessage(systemInfo, scheduledSystemStatus);
                    this.modifySystemInfo(systemInfo, scheduledSystemStatus);
                    break;
            }
        }catch (Exception e) {
            logger.error(e.getMessage());
        }
            logger.info("=======================================================================================================");
    }
}

 

 

 

 

이제 필요할때 마다 비동기식으로 처리되도록 선언한 메소드를 호출해보자.

앞서 선언한 AsyncThreadConfig와 비동기 프로세스에 대해 작성한 ThreadExecutorService를 IOC Container에서 불러온다.

Bean을 불러온 뒤, 다음과 같이 원하는 비동기 메소드를 호출한다.

@Service
public class HealthCheckSchedulerService {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
 
    @Autowired
    private MonitoringSystemRepository monitoringRepo;
 
    @Autowired
    private ThreadExecutorService threadExecutorService;
 
    @Autowired
    private AsyncThreadConfig asyncConfig;
 
 
    @Scheduled(fixedDelay = 15000)
    @Transactional
    public void scheduledMonitoringService() throws Exception {
 
 
        try {
            List<MonitoringSystem> systemInfoList = monitoringRepo.getAllMonitoringSystemList();
 
            for( MonitoringSystem systemInfo : systemInfoList) {
 
                 try {
                        // 등록 가능 여부 체크
                        if (asyncConfig.isThreadPoolAvailable()) {
                            // task 사용
                            threadExecutorService.threadExecutor(systemInfo);
                        } else {
                            logger.info("Thread 한도 초과");
                        }
                    } catch (TaskRejectedException e) {
                        logger.info(e.getLocalizedMessage());
                    }
 
            }
 
        }catch (Exception e) {
            logger.error(e.getMessage());
        }
 
 
    }
 
}

 

 

 

 

 

 

 

3. 개선 결과

 

 

먼저 개선전 서비스 로그를 보면 하나의 스레드를 이용하여 순차적으로 실행하기 때문에, 중간에 내부 서비스가 장애가 날시, 그 다음 서비스에 대해 healthCheck하는 것이 재시도 한만큼 지연되고있다.

 

 

 

그러나 개선된 다음에는 재시도 및 여러개의 요청이 각기 다른 스레드를 통해 진행되는 것을 확인할 수 있었으며 시간의 정합성을 조금 더 개선할 수 있었다.

 

 

Table of content

1. What is the Apache Kafka

Apache Kafka는 대량의 데이터를 처리할 수 있으며, 엔드포인트간 메시지를 전달 할 수 있는 분산 발행-구독 메시징 시스템이다. Kafka 메시지는 스토리지에 유지되고 데이터 복제를 통해 데이터 손실을 방지 할 수 있다. ZooKeeper라는 동기화 서비스 기반의 시스템이며, 실시간 데이터 스트리밍 분석 툴인 Apache Storm과 Spark와 통합되어 자주 사용된다.

 

 

2. Benefits of Apache Kafka

  • 신뢰성 : 데이터의 분산, 분할, 복제를 통해 데이터의 신뢰성을 보장한다.

  • 확장성 : topic의 발행만으로 down time 없이 쉽게 확장이 가능하다

  • 내구성 : 분산된 commit log를 통해 클러스터간 동기화를 하며, failure시 이 로그를 통해 빠른 복구가 가능하다.

  • 퍼포먼스 : TB단위의 메시지가 시스템에 저장되어도 안정된 퍼포먼스를 제공한다.

 

 

 

 

 

 

3. Apache Kafka 아키텍처

 

 

 

3.1. 주체 단위 아키텍처

 

3.1.1. Producer

  • 하나 이상의 topic으로 메시지를 발행하여 broker로 전송한다.

  • 각 메시지는 key, value, timestamp로 이루어져 있다.

  • 새로운 메시지를 publish 할 때, 몇개의 파티션으로 나눌 것인지 결정한다. (이미 발행 된 메시지에 대해서 도 가능)

출처 : https://kafka-python.readthedocs.io/ (Kafka-Python API Document Homepage)

 

 

 

3.1.2. Broker

(한 VM 안에 있는 broker 전체집합을 카프카라고 한다.)

  • producer에게서 받은 메시지를 세부 설정(파티션, 오프셋 정책…)에 따라서 저장하는 역할

  • 저장된 메시지는 정책에 따라 관리되며, consumer의 메시지 pull요청에 의해서만 메시지를 전송할 수 있다.

  • 클러스터 내부에 대표 브로커를 선정하여 controller의 역할을 부여한다.

  • Controller는 클러스터 내부 모든 브로커에 대해 partition을 관리한다.

  • 1.0버전 부터는 offset 정보를 broker에서 관리한다.

 

 

 

3.1.3. Zookeeper

  • 클러스터 내부의 브로커 간 통신하기 위해서는 zookeeper를 거쳐야 한다.

  • VM 내부의 브로커들이 구동되기 위해서는 zookeeper가 반드시 선행으로 실행되어야 한다.

  • 다른 VM의 Zookeeper와 논리적으로 결합되어 있다.

  • 외부에서 들어온 요청에 대해서 해당 Leader Partition이 존재하는 브로커로 연결시켜준다.

 

 

 

3.2. 데이터 단위의 아키텍처

 

3.2.1. Topic

  • 메시지가 분류되는 키워드.

  • 쉽게 말하면 메시지의 주제를 의미한다.

  • 하나의 토픽은 여러개의 파티션으로 구성되어 있다.

 

 

 

3.2.2. Partition

  • 토픽 발행 시, partitioning factor * replication factor의 수만큼 partition이 생성된다.

  • Partition에는 2가지 유형이 있다.

    • Leader Partition : 한 토픽의 구성에 대해 외부와 인터페이스를 담당하는 Partition이다. 데이터 조회시, 이 반드시 leader Partition을 통해 데이터를 얻을 수 있으며, Partition Factor의 수만큼 Leader Partition이 생성된다.

    • Follower Partition : Leader Partition의 failure에 대비한 예비 leader Partition이다. Leader가 failure시, 이들 중 하나가 자동으로 Leader로 승급된다.

    • Follower는 Leader의 변경사항을 pull받는 방식으로 synchronize한다. 이러한 방식을 카프카에서는 ISR(In Sync Replica)라고 명명한다.

    • Follower는 Leader에 대해 pull 방식으로 동기화를 하며, 리더는 이 동기화 요청을 일정시간 받지 못하게 되면 ISR에서 해당 partition을 제외시키게 된다.

  • Partitioning 한만큼, 데이터 병렬처리가 이루어지기 때문에 처리속도가 빨라진다.

3개의 공을 한명의 포수가 받는 시간과 3명이 나누어서 받을 때 걸리는 시간을 비교해보자.

 

 

 

3.3.3. Offset

  • Kafka의 최소 데이터 단위

  • 1 Message publish = 1 Offset

  • Producer의 Partitioning Algorithm에 따라 데이터가 위치할 Partition이 결정된다.

  • 오프셋별로 commit log가 남아 이를 이용하여 Fail Over를 수행한다.

  • 별도의 카프카 데이터 보존 정책에 의해 데이터를 유지한다.

 

 

 

 

 

4. Failover 과정

 

FailOver가 어떻게 이루어지는지에 대해 아키텍처 기반으로 설명하겠다.

다음 그림은 2개의 토픽에 대해 “replication factor = 3, partition factor = 2”의 설정으로 메시지가 발행되어 정상적으로 Broker서버에 저장되어 있는 상태이다.

 

 

 

 

위 아키텍처에서 Broker 0 과 Broker 3에서 장애가 나면 아키텍처는 다음 그림과 같아진다. 이 때, Topic1의 P0 Partition과 Topic2의 P1 Partition의 Leader Partition이 Broker가 장애가 나면서 더이상 Partition에 대해 접근을 할 수 없게 되었다.

 

 

하지만, Kafka에는 다음과 같은 규칙이 있다.

“모든 Partition은 Leader Partition이 될 수 있다.“

 

위 규칙에 의해 Kafka 내부 알고리즘에 의해 기존에 존재하던 Follower Partition이 새로운 Leader로 뽑히게 되어 다음 그림과 같이 장애에 대해 자동으로 Failover가 수행된다.

 

 

 

 

5. 가장 이상적인 Kafka 아키텍처

 

지금까지의 내용을 기억한다면, 데이터에 대한 접근은 오직 “Leader Partition“에 대해서만 접근이 이루어진다는 것을 알 수 있다. 그러므로, Leader Partition이 클러스터 내부의 broker에 골고루 분포가 된다면 한 broker로 트래픽이 몰려 timeout이나 다른 에러가 나는 상황을 방지 할 수 있다.

다시 말해, Leader Partition을 최대한 많은 브로커에 분산시켜 트래픽에 대한 병목 현상을 방지하며, failover에 대해서도 이를 유지할 수 있는 아키텍처가 이상적인 아키텍처이다.

또한 설계하기 나름이지만, offset 설계를 잘하여 데이터의 정합성까지 지켜준다면 가상 이상적인 아키텍처가 될 것이다.

 

'IT > OpenSource' 카테고리의 다른 글

[OpenSource] Apache Kafka  (0) 2020.09.07
[Apache Kafka] Kafka Cluster 구성하기(수정중)  (0) 2020.07.01

Table Of Contents

0. 서론

1. Connection Pool이 무엇인가

2. Connection Pool이 왜 필요한가?

3. HikariCP의 주요 파라미터

4. 문제 분석

5. 후기 및 조치

6. 참고 문헌

 

클릭하시면 해당 content로 넘어갑니다.

 


 

 

0. 서론

 

지난주, 플랫폼 각 내부의 엔드포인트를 호출하여 각 서비스 및 데이터베이스가 살아있는지에 대해 모니터링 하는 Spring Boot Application에서 다음과 같은 에러 메시지를 보았다.

[WARN ] 2020-07-30 16:43:39 299545737 [PoolBase.java][isConnectionAlive](184) : HikariPool-1 - Failed to validate connection org.mariadb.jdbc.MariaDbConnection@23d06b10
(Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value.

[WARN ] 2020-07-30 16:43:39 299545737 [PoolBase.java][isConnectionAlive](184) : HikariPool-1 - Failed to validate connection org.mariadb.jdbc.MariaDbConnection@17387ed0
(Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value.

[WARN ] 2020-07-30 16:43:39 299545737 [PoolBase.java][isConnectionAlive](184) : HikariPool-1 - Failed to validate connection org.mariadb.jdbc.MariaDbConnection@709f57f6
(Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value.

[WARN ] 2020-07-30 16:43:39 299545737 [PoolBase.java][isConnectionAlive](184) : HikariPool-1 - Failed to validate connection org.mariadb.jdbc.MariaDbConnection@5b2ab008
(Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value.

 

 

 

위 에러 메시지를 의역하면 다음과 같다.

 

Failed to validate connection MariaDbConnection
MariaDB Connection의 유효성을 검증하는데 실패하였습니다.

(Connection setNetworkTimeout cannot be called on a closed connection)
(Connection setNetworkTimeout은 닫힌 connection에서 호출할 수 없습니다.)

Possibly consider usdig shorter maxLifetime value.
현재보다 더 짧은 maxLifetime 값을 사용하는 것을 고려해보시기 바랍니다.

 

사실, 위 메시지대로 maxLifetime값만 바꿔주면 뜨지 않을 warning message였지만, 내가 코드를 작성하였기 때문에, 끝까지 책임지고 고도화를 하고 싶었다.

 

위 warning이 왜 일어났는지에 대해 알기 앞서 Connection Pool이 무엇인지부터 알아보려고 한다.

 

 

 

 

1. Connection Pool이 무엇인가

Connection Pool은 Application(WAS, Container 등)이 실행되면서, DB와 Connection에 대해 정의된 객체를 여러개 저장하고 있다. Application이 DB와 Connection이 필요할 때, Connection Pool은 Connection 객체를 제공하며 DB Connection을 이용한 작업이 끝나게 되면 Connection 객체를 다시 회수받아 다른 작업시 Connection 객체를 사용할 수 있도록 수명 관리를 하게된다.

 

 

 

 

2. Connection Pool이 왜 필요한가?

 

다음 코드는 DB와 connection부터 시작해서 query실행, connection 종료까지의 샘플 코드이다.

try {
    sql = "SELECT * FROM SOME_TABLE";

    connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);

    resultSet = connection.createStatement();

    resultSet = statement.executeQuery(sql);
    } catch (Exception e) {
    	logger.info(e.getMessage())
    } finally {
        connection.close();
        statement.close();
        resultSet.close();
    }
}

 

 

샘플 코드를 절차적으로 정리하면 다음과 같다.

1. DB Connection을 맺기 위해 JDBC Driver를 Load한다.
2. DB Connection Parameter를 이용하여 DB Connection 객체를 생성한다.
3. Connection 객체로부터 쿼리를 수행하기 위한 statement를 생성한다.
4. 쿼리를 수행하여 결과로 ResultSet 객체를 받아 필요한 데이터를 파싱하여 처리한다.
5. 처리가 끝나면 사용된 리소스를 반환한다.

 

위 절차 중 시간적인 cost가 가장 높은 부분은 connection 객체를 생성하는 부분이다. Connection을 생성할 시, DB와 파라미터 교환 뿐만 아니라 각 시스템에서 체크해야할 요소들이 많기 때문이다. 또한, 웹 어플리케이션은 클라이언트의 요청에 따라 스레드를 생성한다. 스레드를 생성할 때 마다 Connection 객체를 생성한다는 것은 물리적으로 계속하여 DB에 접근을 해야하는 것과 같으며 이는 곧 요청 수에 따라 Connection을 생성하게 되므로 서버에 과부하가 걸리기 쉽다.

 

이러한 상황을 방지하기 위해 미리 여유분의 Connection 객체를 만들어 Pool에 저장해놓았다가 클라이언트 요청이 생기면 Connection객체를 지원하주고, 작업 종료시 Connection 객체를 다시 반환하여 보관하는 방식으로 과도한 수의 Connection 객체가 생기는 것을 방지한다. 한번 맺은 DB Connection 객체를 작업이 끝나자마자 close하지 않고, Pool에 저장한 뒤, 다음 번에 동일한 Connection을 요청하면 바로 Pool에서 꺼내 제공을 함으로써 보다 개선된 Connection Time Cost를 보장해준다.

 

 

 

3. HikariCP의 주요 파라미터

서론의 에러메시지에서 문제가 된 파라미터의 기본값을 가장 먼저 찾아보기로 하였다. 이왕 이렇게 된 김에 HikariPool의 주요 파라미터들을 공부해보기로 했다.

HikariPool의 주요 파라미터는 다음과 같다.

 

  • autoCommit
    • 풀에서 반환된 connection의 자동 커밋을 설정한다.
  • connectionTimeout
    • 클라이언트가 connection으로부터 최대 대기할 수 있는 시간, 시간초과시 SQLException이 발생한다. 밀리세컨드 단위이며 최소 250ms, 디폴트 30000이다.
  • idleTimeout
    • Pool 안의 유휴 connection의 최대 유효기간을 설정한다. 이 설정은 minimumIdle이 maximumPoolSize보다 작을때만 적용되며, minimumIdle 값보다 pool안의 connection object의 개수가 작거나 같으면 유휴 연결이 폐기되지 않는다.
    • connection이 유휴상태로 전환되기까지 일반적으로 평균 15초에서 최대 30초까지의 시간이 추가로 걸린다. 이 시간이 초과되기 전까지는 connection이 유휴상태로 전환되지 않는다.
    • 최소 10000ms 값은 적용되어야 하며, 기본적으로 600000ms(10분)이 적용되어있다.
  • maxLifetime
    • Pool의 Connection의 최대수명을 제어한다. 현재 사용중인 connection은 폐기되지 않으며, connection이 closed상태일 때만 제거된다. Pool안에 존재하는 connection의 순간적인 대량 삭제를 방지하기 위해 약간의 음감쇠가 적용된다.
    • 의 공식문서를 보게되면, 이 파라미터를 데이터베이스 또는 인프라가 권고하는 connection time limit보다 짧게 잡을 것을 권장하고 있다.
    • idleTimeout에 따라 다르지만, 파라미터에 0을 setting하면 최대 생명주기는 infinite로 적용된다.
    • 최소 30000ms(30초) 값은 적용되어야 하며, 기본값으로 1800000(30분)이 적용되어 있다

 

  • minimumIdle
    • Connection Pool에서 유지하는 최소 유휴 Connection의 개수를 설정한다. 유휴 Connection이 minimumIdle 값보다 작을 시, HikariPool은 추가 connection을 만들어 채워넣는다.
    • HikariCP의 공식문서에서는 급격한 수요에 대해서 최상의 퍼포먼스와 반응성을 위해서 이 값을 수정하지 않고 사용하는 것을 권장하고 있다.
    • 기본 값은 maximumPoolSize와 같은 값으로 설정되어있다.
  • maximumPoolSize
    • Pool에서 유지할 유휴 connection, 사용중인 connection을 합친 최대의 connection 개수를 설정한다. 기본적으로 이 값은 실제 DB 벡엔드와의 연결 최대수를 결정한다.
    • Pool이 최대 사이즈에 도달했고, 유휴 Connection이 없을때 getConnection을 호출하면 connectionTimeout설정 값 전까지 getConnection이 지연되며, connectionTimeout을 초과시, SQLException이 발생한다.
    • 최적의 maximumPoolSize를 설정하는 것은 HikariPool을 적용한 환경에 대해 맞추는 것이 가장 좋으며, 이에 대해서는 다음 링크를 참고한다.

 

 

 

 

 

4. 문제 분석

 

위 주요 파라미터 중 우리가 주목해야 할 것은 HikarikPool의 maxLifetime과 DB 설정값 중의 wait_timeout이다. 

HikariPool이 적용된 환경은 따로 별도의 설정이 없었으며, connection을 맺고있는 DB의 설정은 다음과 같이 wait_timeout이 1200(20분)으로 설정되어 있었다.

 

 

 

서론에서의 로그는 HikariPool의 로그를 찍지 않고 있었기 때문에, HikariPool의 로그가 찍힌 화면을 다시 캡처해보았다.

 

 

위 로그를 보고 추측한 warning 원인은 다음과 같다.


  1. 위 로그에서는 보이지 않지만, 같은 MariaDbConnection 객체를 이용하여 commit을 하고 있었으며, 아래와 같이 Wrapping하고 있는 Proxy객체는 같지만, 본질적인 MariaDBConnection의 객체는 같은 것을 쓰고있었다.
    • [DEBUG] 2020-08-11 12:02:23 613936563 [DataSourceTransactionManager.java][doCommit](326) : Committing JDBC transaction on Connection [HikariProxyConnection@861338066 wrapping org.mariadb.jdbc.MariaDbConnection@17b01e2a]
    • [DEBUG] 2020-08-11 12:02:08 390098262 [DataSourceTransactionManager.java][doCommit](326) : Committing JDBC transaction on Connection [HikariProxyConnection@205121260 wrapping org.mariadb.jdbc.MariaDbConnection@17b01e2a]
  2. Spring Boot의 HikariCP 설정 프로퍼티 중, maxLifetime으로 인해, 사용하던 모든 Connection은 30분 주기로 위 사진과 같이 connection을 closing 후, 다시 새로운 connection을 생성하고 있었다.
  3. 사용하고 있던 connection도 closing 하면서 새로운 connection이 필요하여 HikariCP에 새로운 connection 객체를 요청한 것으로 보인다.
  4. 비교적 가장 오래전에 만들어진 객체를 HikariPool에서 발급 받은 후, HikariPool의 정책에 따라 이 connection이 유효한지를 검사한다.
  5. 이미 DB에서는 20분이 지나 닫힌 connection에 대해 validate하게 되므로 위와 같이 닫힌 connection에 대해서 아무것도 할수 없다고 로그가 찍혔다.

 

 

위 추측이 맞는지에 대해 직접 HikariCP의 코드를 뜯어보기로 하였다.

HikariPool.class

 /**
    * Get a connection from the pool, or timeout after the specified number of milliseconds.
    *
    * @param hardTimeout the maximum time to wait for a connection from the pool
    * @return a java.sql.Connection instance
    * @throws SQLException thrown if a timeout occurs trying to obtain a connection
    */
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }

 

위 HikariPool.class의 코드를 보면, 중간에 closeConnection을 할지에 대해 판단하는 로직이 포함되어 있었다. 해당 로직에서 검사하는 것을 정리하면 다음과 같다.

  • 해당 connection이 HikariCP에서 evict(직역하면 축출하다라고 함)되기로 한 객체인가?
  • Connection이 살아있는지 Bypass할 수 있는 시간보다 최근 접근 시간이 더 크고, connection이 아직 살아있는가?

 

 

로그 상에서 connection이 evict하다고 하지 않았으므로, 두번째 검사 조건에서 true를 반환하였기 때문에  closeConnection을 하도록 하였을 것이다.

두번째 조건을 warning이 발생한 조건에 맞추에 분석해보면 다음과 같다.

  • 하나의 connection 객체만 사용하고 있었으므로, 500ms보다는 훨씬 더 크기 때문에 true를 출력하였을 것이다.
  • DB에서 wait_timeout 값이 20분이므로, getConnection하였을 때, false를 출력하였을 것이다.

 

 

다음, isConnectionAlive의 코드를 살펴보면 문제가 된 상황을 한눈에 볼 수 있을 것이다.

PoolBase.class

boolean isConnectionAlive(final Connection connection)
   {
      try {
         try {
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(config.getConnectionTestQuery());
            }
         }
         finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      }
      catch (Exception e) {
         lastConnectionFailure.set(e);
         logger.warn("{} - Failed to validate connection {} ({}). Possibly consider using a shorter maxLifetime value.",
                     poolName, connection, e.getMessage());
         return false;
      }
   }

위 코드를 보면, try문 첫번째에서 setNetworkTimeout을 실행하고 있다. 그러나 connection은 DB측에서 이미 close한 상태이기 때문에, close된 connection에서 작업을 할 수 없기 때문에 catch문의 로직이 실행 되었을것이다.

catch문을 보면 우리가 보았던 해당 로그가 보일 것이다.

 

위 분석을 통해 내가 예측한 추측한 결과와 실제 코드의 로직이 일치하는 것을 볼 수 있었다.

 

 

5. 후기 및 조치

HikariCP의 공식문서에서 maxLifeTime을 데이터베이스 또는 인프라가 권고하는 connection time limit보다 짧게 잡을 것을 권장하여 maxLifeTime을 기존 30분에서 15분으로 변경하고 jenkins를 통해 deploy한 결과, 그 이후로는 warning이 발생하지 않았다.

그러나 30분 내내 HikariCP에서 처음 받은 단 하나의 connection을 사용하는 현상은 아직 내가 이해를 하지 못하여 이에 대해서는 조금 더 보충하여 새로 글을 작성할 계획이다. 이번 기회를 통해 Connection관리가 App안에서 어떻게 이루어 지는지에 대한 개론을 파악할 수 있었다.

 

 

 

6. 참고 문헌

  • https://github.com/brettwooldridge/HikariCP

0. Table Of Content

 

 

 

 

1. 서론

배치 프로세스에 대한 공통 프레임워크를 담당하여 작업을 진행하고 있었다. Spring Batch Starter를 사용하여 작업을 진행하는 것이 가장 효율적이었지만,

기존 만들어진 테이블에 맞춰야 했기 때문에 해당 테이블에 맞게 배치 프레임워크를 새로 개발하게 되었다. 

프레임워크에서 build된 결과물인 jar파일은 crontab에 등록이 되어 jar 실행 시 입력받은 augument값으로 해당 batch job이 실행되어야 하기 때문에, 

입력 argument에 따라서 분기되는 프로세스가 필요하였다. 이에 해당하는 코드는 아래와 같다.

@Component
public class BatchArgumentListener implements ApplicationListener<ApplicationReadyEvent>{
 
    @Autowired
    private ApplicationArguments applicationArgument;
 
    @Autowired
    private ConfigurableApplicationContext context;
 
    @Autowired
    private DBEncKeyProperties dbProps;
 
    @Autowired
    private MemberActionLogService memberLogService;
 
    @Autowired
    private EdiDailyTxScheduler ediDailyTxScheduler;
 
    @Autowired
    private EdiDailySettleScheduler dailySettleScheduler;
 
    @Autowired
    private EdiMonthlyTxScheduler ediMonthlyTxScheduler;
 
    @Autowired
    private EdiVbankIncomeScheduler ediVbankIncomeScheduler;
 
    @Autowired
    private MemberDormantService memberDormantService;
 
    @Autowired
    private MemberStatisticsService memberStatisticsService;
 
    @Autowired
    private MemberUseService memberUseService;
 
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
 
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        String publicKey = this.argumentListValidator(applicationArgument, BatchArgumentConstant.ARG_PUBLIC_KEY);
        dbProps.setPublicKey(publicKey);
 
        String batchType = this.argumentListValidator(applicationArgument, BatchArgumentConstant.ARG_TYPE);
 
        String targetDate = this.dateValidator(applicationArgument);
 
        logger.info("batchType : " + batchType);
        logger.info("targetDate : " + targetDate);
 
        try {
            switch(batchType) {
                case BatchArgumentConstant.BATCH_MEMBER_ACTION:
                        memberLogService.memberActionTask(targetDate);
                        memberLogService.memberLastActionTask(targetDate);
                break;
 
                case BatchArgumentConstant.BATCH_DAILY_SETTLE:
                    dailySettleScheduler.getDailySettle();
                    break;
 
                case BatchArgumentConstant.BATCH_DAILY_TX:
                    ediDailyTxScheduler.getDailyTxNormal();
                    break;
 
                case BatchArgumentConstant.BATCH_MONTHLY_TX:
                    ediMonthlyTxScheduler.getMonthlyTxNormal();
                    break;
 
                case BatchArgumentConstant.BATCH_VBANK_INCOME:
                    ediVbankIncomeScheduler.getVbankIncomeDate();
                    break;
 
                case BatchArgumentConstant.BATCH_MEMBER_DORMANT:
                    memberDormantService.memberDormantMailTask(targetDate);
                    memberDormantService.memberDormantTask(targetDate);
                    break;
 
                case BatchArgumentConstant.BATCH_STATISTICS:
                    memberStatisticsService.memberStatisticsTask();
                    break;
                case BatchArgumentConstant.BATCH_MEMBER_USE:
                    memberUseService.memberUseMailTask();
                    break;
 
                default:
                    logger.info(BatchArgumentConstant.MESSAGE_INVALID_ARG);
                }
        }catch (Exception e) {
            logger.info("Process is not completed.");
        } finally {
            System.exit(SpringApplication.exit(context));
        }
    }
}

 

개인적인 회고시, 나는 내가 짠 프레임워크를 보고 참 더럽다고 생각이 들었으며 더많은 코드가 내가 작성한 프레임워크에 올라가기 전에 더러운 코드를 청소하고 싶었다.

우선 위 코드의 문제점을 정리하면 다음과 같았다.

  • 새로운 배치 프로세스가 프레임워크에 올라갈 때 마다 case문을 추가하여 분기시켜주는 귀찮음이 있다.
  • case문을 추가하기 위해서는 같은 파일에 여러명이 작업을 하게 되고, 그에 대한 결과로 git repository merge가 어려워진다.
  • static 변수가 프레임워크 내부에 많이 있었기 때문에 이를 조금이라도 줄이고 싶었다.
  • 여러개의 배치 프로세스가 모여 하나의 job형태로 이루어질 수 있었기 때문에 통합하여 관리하고 싶었다.

 

또한 작성된 모든 프로세스는 BatchJob이라는 공통점을 가지고 있었기 때문에, BatchJob이라는 키워드 하나로 통합할 수 있다고 생각을 하게 되었다.

 

위 문제점들을 해결하기 위해, Spring Design Pattern을 공부하다가 구글링을 통해 SpringFramework Guru에서 디자인 패턴에 관한 글을 보게 되었으며

디자인 패턴 중 프레임워크에 가장 적합한 Factory Method Design Pattern을 적용시켜보기로 하였다.

 

 

 

 

2. Factory Pattern이란 무엇인가

자바 어플리케이션을 보면 new 연산자를 이용하여 객체를 자주 생성한다. 작은 규모의 어플리케이션에서는 문제가 되는 것이 없지만, 큰규모의 코드를 작성하게 되면 Object의 개수가 자연스럽게 증가하며 Object들에 대한 관리 복잡도 역시 증가한다.

이러한 상황을 핸들링하기 위해서 팩토리 패턴을 적용할 수 있다. 이름에서 알 수 있듯이 펙토리 메소드 패턴은 펙토리 역할을 하는 객체를 생성하는 클래스를 사용한다.

팩토리 패턴은 직접 생성자를 이용하여 객체를 생성하는 대신 메소드를 통해 객체를 생성하는 것을 원칙으로 한다. 

펙토리 메소드 패턴에서 당신은 클래스를 생성하기 위해 Java interface 또는 abstract class 같은 인터페이스를 제공하여 인터페이스의 

인터페이스의 펙토리 메소드는 하나 이상의 서브클래스에 대한 오브젝트의 생성을 연기시킨다. 서브클래스는 어떤 객체가 만들어질지에 대해 선택하기 위해 팩토리를 implement하여야 한다.

 

 

 

2.1. Factory Parrern 예제 

글로만 이해하면 이해가 잘 가지 않으니, 코드로 표현하면서 이해를 해보자. 먼저 작성될 클래스 및 인터페이스 구조는 다음과 같다.

 

 

가장 처음 해야 할 일은 C++, Python, Java의 공통점을 찾는 것이다. 수많은 공통점 중 사용할 공통점은 다음과 같다.

  • 모두 프로그래밍 언어이다
  • 컴파일을 한다.
  • HTML은 프로그래밍 언어가 아닙니다.

 

위 요구사항을 담은 interface를 작성하면 다음과 같다.

Language.java

package com.jeonghyeong.sample;
 
public interface Language {
 
    public void compile();
 
    public String getLanguageType();
}

 

 

위 인터페이스를  생성한 다음, implement 받은 CPP, Java, Python 클래스를 구분해 줄 String 식별자를 담고 있는 class를 작성한다.

package com.jeonghyeong.sample;
 
public class LanguageType {
 
    public static final String CPP = "cpp";
 
    public static final String JAVA = "java";
 
    public static final String PYTHON = "python";
 
}

 

 

 

위 식별자를 이용하여 getLanguageType() 메소드에는 본인임을 나타내는 String을 반환하도록 하자.

C++, Java, Python에 대한 정의는 다음과 같다.

 

CPP.java

package com.jeonghyeong.sample;
 
public class CPP implements Language{
 
    @Override
    public void compile() {
        System.out.println("CPP Compile");
    }
 
    @Override
    public String getLanguageType() {
        return LanguageType.CPP;
    }
}

Java.java

package com.jeonghyeong.sample;
 
public class Java implements Language{
 
    @Override
    public void compile() {
        System.out.println("Java Compile");
    }
 
    @Override
    public String getLanguageType() {
        return LanguageType.JAVA;
    }
}

 

Python.java

package com.jeonghyeong.sample;
 
public class Python implements Language{
 
    @Override
    public void compile() {
        System.out.println("Python Compile");
    }
 
    @Override
    public String getLanguageType() {
        return LanguageType.PYTHON;
    }
}




간단하게 Language에 대한 여러개의 파생 클래스를 생성할 수 있었다. 위와 같은 패턴으로 일단 interface를 통해 각기 다른 언어들의 Object를 Language의 개념으로 묶을 수 있었다.


그런데, 이렇게 설계 해놓고, 객체를 생성할 때, new Python()이런 식으로 객체를 생성하면 논리적으로만 Language으로 묶은것과 다름이 없다. 파라미터에 따라 상황에 맞는 객체를 받아올 수 있도록 Factory클래스를 작성해보자.

LanguageFactory

package com.jeonghyeong.sample;
 
import java.util.List;
 
import org.springframework.beans.factory.annotation.Autowired;
 
 
@Component
public class LanguageFactory {
 
    
    public Language getLanguage(String languageType) {
        if(languageType.equal(LanguageType.JAVA)){
            return new Java();
        }else if(languageType.equal(LanguageType.Python)){
            return new Python();
        }else if(languageType.equal(LanguageType.CPP)){
            return new CPP();
        }else
            return null;
    }
 
 
}

 

위와 같이 LanguageFactory를 이용해 getLanguage 메소드를 이용해 상황에 맞는 Language를 상속받은 객체를 가져올 수 있게 되었다.

 

그러나 위 코드 역시 if - else if - else문을 사용하기 때문에, Language interface를 상속받은 class가 생겨날 때 마다 수작업으로 else if를 이용해 코드를 작성해주어야 한다.

 

위 코드를 개선하는 방법으로 나는 Spring의 Bean을 생각하게 되었다.

 

위 Language interface를 상속받은 클래스를 모두 @Component annotation을 이용하여 Bean을 만들게 되면, @Autowired를 이용하여 Language를 상속받은 모든 Bean을 가져올 수 있게 된다.

 

개선된 Factory코드는 다음과 같다.

 

package com.jeonghyeong.sample;
 
import java.util.List;
 
import org.springframework.beans.factory.annotation.Autowired;
 
 
public class LanguageFactory {
 
    @Autowired
    private List<Language> languageList;
 
    public Language getLanguage(String languageType) {
        for(Language language : languageList) {
            if(languageType.equals(language.getLanguageType()))
                return language;
        }
        return null;
    }
 
}

 

 

이제 Factory를 이용하여 메소드를 이용하여 상황에 맞는 객체를 얻어보자. 간단하게 ApplicationListener를 상속받은 클래스를 하나 생성하여 테스트해보았다.

package com.jeonghyeong.sample;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;
 
@Service
public class TestFactory  implements ApplicationListener<ApplicationReadyEvent>{
 
    @Autowired
    private LanguageFactory factory;
 
 
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        Language cpp = factory.getLanguage(LanguageType.CPP);
        Language java = factory.getLanguage(LanguageType.JAVA);
        Language python = factory.getLanguage(LanguageType.PYTHON);
 
        cpp.compile();
        System.out.println(cpp.getClass());
 
        java.compile();
        System.out.println(java.getClass());
 
        python.compile();
        System.out.println(python.getClass());
 
    }
 
}

 

위 코드의 결과는 다음과 같으며, 정상적으로 잘 받아오는 것을 확인하였다.

 

 

 

 

3. Factory Pattern 사용으로 얻을 수 있는 기대 효과

  • 비슷한 성격의 객체를 인터페이스를 통해 하나로 관리할 수 있다.
  • 어떤 유형의 객체인지 판별하는 if-else문이 줄어들기 때문에 코드의 가독성이 증가한다.
  • 협업시, 공통코드를 건드리는 일이 없이 업무를 진행할 수 있기 때문에 효율성이 증가한다.
  • 추후 비슷한 유형의 객체가 새로 생성되어도 implement를 통해 쉽게 추가할 수 있다.

 

 

 

 

 

4. 실무에서 적용한 코드 일부

 

서론에 switch문으로 분기하던 더러운 코드를 다음과 같이 개선하여 가독성을 높였으며, 더이상 이 부분에 대해서 작업이 없어지게 되어 코드 관리가 용이해졌다.

또한, 무수히 많은 autowired가 사라졌으며 batchJob에 대한 로직을 각 job 패키지별로 따로 관리하게 되었다. 

개선된 BatchAgumentListener

@Component
public class BatchArgumentListener implements ApplicationListener<ApplicationReadyEvent>{
 
    @Autowired
    private ApplicationArguments applicationArgument;
 
    @Autowired
    private ConfigurableApplicationContext context;
 
    @Autowired
    private DBEncKeyProperties dbProps;
 
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
 
    @Autowired
    private BatchJobFactory batchJobFactory;
 
 
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
 
 
 
 
        String batchType = this.argumentListValidator(applicationArgument, BatchArgumentConstant.ARG_TYPE);
 
 
        logger.info("batchType : " + batchType);
 
 
        /*
         * DB암호화키를 얻기 위한 public key를 ApplicationArgument에서 가져온다
         */
        if(applicationArgument.containsOption(BatchArgumentConstant.ARG_PUBLIC_KEY)){
            String publicKey = applicationArgument.getOptionValues(BatchArgumentConstant.ARG_PUBLIC_KEY).get(0);
            dbProps.setPublicKey(publicKey);
        }
 
 
        /*
         * ApplicationArgument에서 type parameter의 value값을 가져온다.
         */
        if(applicationArgument.containsOption(BatchArgumentConstant.ARG_TYPE)) {
            batchType = applicationArgument.getOptionValues(BatchArgumentConstant.ARG_TYPE).get(0);
        }else {
            logger.info("Type parameter should be required.");
            System.exit(SpringApplication.exit(context));
        }
 
 
        /*
         * 가져온 type에 맞는 batchJob을 생성하여 프로세스를 실행시킨다.
         */
        try {
            BatchJob batchJob = batchJobFactory.getService(batchType);
            logger.info(batchJob.getBatchType());
            if(batchJob!=null) {
                batchJob.run();
            }
 
 
        }catch (Exception e) {
            logger.info(e.getMessage());
        }
 
 
    }
 
 
    /*
     * argument가 null인 상태에 대해 예외처리하는 메소드
     */
    private String argumentListValidator(ApplicationArguments arg, String argumentType) {
 
        String argValue = null;
        try {
            argValue = arg.getOptionValues(argumentType).get(0);
            return argValue;
        }catch (Exception e) {
            logger.info(BatchArgumentConstant.MESSAGE_NO_ARG_TYPE);
            System.exit(SpringApplication.exit(context));
        }
 
        return argValue;
    }
}

 

패키지별로 흩어져 있던 배치 job을 interface와 factory 패턴을 적용하여 관리가 용이하게 하였다. 

BatchJob

public interface BatchJob {
 
    public void run();
 
    public String getBatchType();
}

BatchJobFactory

@Component
public class BatchJobFactory {
 
    @Autowired
    private List<BatchJob> batchJobList;
 
 
 
    public BatchJob getService(String batchType) {
        for(BatchJob job : batchJobList) {
            if(batchType.equals(job.getBatchType()))
                return job;
        }
 
        return null;
    }
}

 

 

기존 BatchArgumentListener에서 볼 수 있었던 배치 job의 flow를 각 역할에 맞는 패키지의 implement된 BatchJob을 implement받은 객체에 작성함으로써 코드의 가독성과 관리의 용이성이 증가하였다.

또한 BatchArgumentListener에서 작업을 하지 않아도 되므로 여러사람이 협업시 git repo가 confilct날 일이 줄어들었다.

@BatchProcess
public class MemberActionLogJob implements BatchJob{
 
    @Autowired
    private MemberActionCollectService memberActionCollectService;
 
    @Autowired
    private MemberLastActionService memberLastActionService;
 
 
    @Override
    public void run() {
        /*
         * service 패키지에서 작성한 하나 이상의 프로세스가 올 수 있다.
         */
        memberActionCollectService.startBatch();
        memberLastActionService.startBatch();
    }
 
    @Override
    public String getBatchType() {
        /*
         * command line으로 받는 배치 type을 반환시켜준다.
         * BatchArgumentConstant에서 이를 추가 해주어야 한다.
         */
        return BatchArgumentConstant.BATCH_MEMBER_ACTION;
    }
}

 

 

 

 

5. 후기

처음 작성한 프레임워크인 만큼 열심히 하였으나, 여기저기서 git conflict가 발생하였고 코드가 너무 가독성이 떨어지며 쓰기 어렵다고 주변에서 컴플레인이 들어왔다.

이미 프레임워크에 여러 사람이 코드를 작성하고 있었고, 향후 더 많은 코드가 작성될 예정이라고 하였다.

먼저 코드를 작성한 분들께는 죄송하지만, 다음 작업을 더 편하게 하기 위해 프레임워크 개선을 진행하였고 관리 및 개인 업무가 더욱 용이하도록 factory 패턴을 적용하여 많은 개선을 할 수 있었으며 코드를 추가하기 쉬워졌다고 피드백을 받았다.

다음 프레임워크 또는 코드를 작성하기 전에 이러한 디자인 패턴을 적용 할 수 있는지도 고려를 해야겠다.

함께 일하는 직업인 만큼 모두가 웃을 수 있는 업무를 위해 많이 노력해야겠다.

 

0. Table of content

 

 

 

1. 목표 아키텍처

 

 

 

 

 

2. Apache Kafka Download

 

다음 명령어를 통해 Apache Kafka 최신 버전을 다운받고 java1.8을 설치한다.

sudo yum install -y java-1.8.0-openjdk-devel.x86_64
 
wget http://apache.mirror.cdnetworks.com/kafka/2.5.0/kafka-2.5.0-src.tgz
 
tar -xzf kafka-2.5.0-src.tgz

 

압축을 해제하면 다음과 같은 구조의 디렉토리를 볼 수 있을 것이다.

 

 

 

3. Zookeeper Cluster Configuration

 

Zookeeper Cluster를 구성하기 위해 3개의 VM을 생성하였으며, 상세 config 및 서버 스펙은 다음과 같다.

IP AddressZookeeper IdKafka Broker IdInstance Type

IP Address Zookeeper Id Kafka Broker Id Instance Type
10.0.6.218 1 1 t2.medium
10.0.7.5 2 2 t2.medium
10.0.6.195 3 3 t2.medium

 

ZooKeeper 데이터 디렉토리에는 특정 서빙 앙상블에 의해 저장된 znodes의 영구 사본 인 파일이 있습니다.

각 zookeeper의 설정을 {kafkaDir}/config/zookeeper.properties에 다음과 같이 3개의 vm에 입력한다.

# 여러 zookeeper에 의해 저장된 znode의 사본과 zookeeper id를 저장하는 dir
dataDir=/home/ec2-user/zookeeper

# zookeeper가 coordinate하고 있는 kafka에 접속하기 위한 클라이언트 port
clientPort=2181

# 클라이언트 connection 접속 제한 개수
maxClientCnxns=10

# Follower가 leader에 접속하고 싱크를 맞추기 위해 허용된 시간. 값이 증가할수록 zookeeper가 책임지는 데이터의 양이 증가한다.
initLimit=5

to allow followers to sync with ZooKeeper. If followers fall too far behind a leader, they will be dropped.
# Follower가 Zookeeper에 싱크를 맞추기 위해 허용된 시간. Follower가 leader에 대해 싱크를 맞추지 못하면 drop 된다고 햔다.
syncLimit=2

# Zookeeper VM의 정보
# 형식 : server.{id}={VM HostName or IP address}:{Cluster Sync Port}:{Leader Election Port}
# 입력시, localhost는 기입하지 않으며, 무조건 hostname, ip address를 이용하여 기입한더ㅏ.
server.1=10.0.6.218:2888:3888
server.2=10.0.7.5:2888:3888
server.3=10.0.6.195:2888:3888

 

 

또한, 각 Zookeeper id를 다음과 위에서 설정한 dataDir 경로에 myid라는 파일을 만들어 다음과 같이 입력한다.

echo 1 > {Your zookeeper dataDir in zookeeper.properties}/myid

 

입력한 후, 다음과 같이 각 VM의 zookeeper를 다음 명령어로 실행시킨다.

정상적으로 동작이 되는 경우는 첨부된 다음과 같은 사진과 같은 화면을 볼 수 있다.

# 테스트를 위해 데몬으로 실행시키지 않음.
# kafka cluster 실행을 위해서는 -daemon 옵션으로 백그라운드 실행시킨다.
bin/zookeeper-server-start.sh config/zookeeper.properties

 

 

4. Kafka Cluster Configuration

[업데이트 예정]

 

5. 참고자료

    - https://zookeeper.apache.org/doc/r3.4.7/zookeeperAdmin.html 

    - https://kafka.apache.org/documentation/#brokerconfigs 

 

6. 삽질프로젝트 로그

    - 20200630 : Zookeeper Cluster VM생성 완료

    - 20200701 : Zookeeper Cluster 구성완료

 

'IT > OpenSource' 카테고리의 다른 글

[OpenSource] Apache Kafka  (0) 2020.09.07
[Apache Kafka] Kafka Cluster 구성하기(수정중)  (0) 2020.07.01

+ Recent posts