개발일지

kafka 성능 개선기 (feat. 배치 리스너)

Tommy__Kim 2024. 9. 2. 21:18

들어가며 

최근 사용자들의 행동 로그를 활용해 추천 서비스 기업에 관련 데이터를 보내는 프로젝트를 진행했었다. 

아직 어리둥절한 신입에게 다소 무서웠던 프로젝트였지만, 어찌저찌 잘 해결해나가고 있었다.

해당 프로젝트를 진행하며 처음으로 kafka를 활용했었고, 현재 회사 내 다른 프로젝트에서 사용하고 있는 설정법을 보며 똑같이 설정을 했었다.

서비스 기업에서 어느날 피드백이 왔었는데, 지금 데이터들이 20시간가량 밀려있다고 해서, 확인을 했었는데, 잘못된 kafka 사용으로 인해 lag가 14,500,000개 정도 쌓여있었다. 

이를 해결하기 위해 했던 과정들을 기록하고자 한다. 

문제 원인

1. consumer와 producer의 차이 

현재 회사의 로그 관련한 서비스 구조는 다음과 같다. 

기존에 추천 서비스 기업을 하나 사용하고 있었다.

kafka 메세지를 발행하고, 소비하면서 추천 기업에 데이터를 가공하여 보내는 구조를 가지고 있었다. 

앞으로 로그를 활용해 다양한 시도를 할 수 있을 것 같다는 판단을 하였고, 로그를 받아서 처리하는 서버를 새로 증축하였다. 

 

LogServer에서 kafka topic에 연결된 producer의 수는 4개였다, 그에 반해 consumer의 개수는 3개밖에 되지 않았다.

 

2. 빠른 속도의 메시지 발행 속도 

서비스를 이용하는 고객들의 행동이 일어날 때마다, 메시지를 발행하기 때문에, 많은 양의 데이터가 producing 되는 상황.

대략적으로 평균적으로 초당 5000 건 정도의 메세지가 발행되고 있었다.

 

해결 방법

1. consumer와 producer의 개수 맞추기

consumer의 개수를 producer의 개수와 동일하게 맞춰주었다.

둘 간의 개수를 맞춰준 이유는 다음과 같다.

  1. 데이터 처리량 균형 유지
    1. 프로듀서가 생성하는 데이터 수를 컨슈머의 처리속도가 따라가지 못한다면, 큐에 데이터가 쌓인다.
    2. 이는 결과적으로 메시지 지연 혹은 오버플로우를 야기할 수 있음
  2. 데이터 처리 연속성 유지
    1. 프로듀서와 컨슈머의 처리속도가 균형을 이룰 때 데이터 처리의 연속성이 보장됨

 

2. BatchListener 적용

 

BatchListener란

카프카에서 메시지를 소비할 때 여러 개의 메시지를 한번에 배치로 처리하는 기능을 제공하는 리스너
배치리스너를 활용하게 되면 여러 개의 메세지를 모아 한 번에 처리할 수 있기 때문에, 성능 향상과 시스템 리소스 최적화에 도움이 된다

 

1의 해결방법을 적용한 후에 모니터링을 해 본 결과 전보다는 덜하긴 하지만, 여전히 lag가 쌓이는 것을 확인할 수 있었다.

카프카의 파티션의 경우 증가시킬 순 있지만, 줄일 순 없기 때문에, 다른 방식으로 해결하는 방법을 찾아봤다. 

배치 리스너의 경우, 지정된 크기만큼의 메시지 묶음을 한번에 가져와 처리한다. 

이러한 일괄 처리 방식의 경우 네트워크 요청의 수를 줄이고, 외부 시스템에 대한 입출력 작업을 최적화할 수 있기 때문에 큰 성능 개선을 이끌 수 있을 것이라 판단했다. 

 

구현 방식은 간단하게 다음과 같다. 

 

1. ConcurrentKafkaListenerContainerFactory 구현 

해당 빈의 경우 어떠한 방식으로 동작하게끔 할 것인지 튜닝하는 빈이다. 

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WebLogKafkaDto> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WebLogKafkaDto> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // application에서 몇개의 consumer가 동작하게 끔 할 것인지
    factory.setConcurrency(1);
    // batchlistener 활성 여부
    factory.setBatchListener(true);
    // 커밋 설정
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    // timeout 설정 
    factory.getContainerProperties().setPollTimeout(3000L);
    factory.setCommonErrorHandler(new DefaultErrorHandler());
    return factory;
}

 

2. ConsumerFactory 구현 

kafka의 consumerfactory 설정을 해주는 부분이다. 

json deserializer 설정의 경우 카프카에서 메시지를 받아 json을 통해 deserialize 하는 경우 필요한 설정이다. 

아래 설정값 중 VALUE_DEFAULT_TYPE은 변환해야하는 Dto의 경로를 적어주면 된다.

만약 AWS MSK를 사용하고 있다고 한다면 관련 설정을 넣어주면 된다.

@Bean
public ConsumerFactory<String, WebLogKafkaDto> consumerFactory() {
    Map<String, Object> props = new HashMap<>();

    // 필수 설정값
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerProperties.getBootstrapServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // JSON Deserializer 설정
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "microservices.processor.service.weblog.WebLogKafkaDto");
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false"); // Kafka 메시지 헤더에 타입 정보가 없는 경우

    // tuning parameter
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerProperties.getAutoOffsetReset());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerProperties.isEnableAutoCommit());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "150");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");

    // AWS MSK 관련 보안 설정
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
    props.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

    return new DefaultKafkaConsumerFactory<>(props);
}

 

3. Consumer 변경 

consumer의 경우 비교적 변경이 간단하다. 

@KafkaListener에서 bean으로 등록해 주었던 containerFactory를 등록해 주면 된다. 

 

변경 전 

    @KafkaListener(topics = "${kafka.consumer.topic.web-log-domain-event}", groupId = "${kafka.consumer.group-id.example}")
    public void eventHandler(@Payload WebLogKafkaDto webLogDto, Acknowledgment acknowledgment) {
        try {
            eventHandler.handleEvent(webLogDto);
        } catch (Exception e) {
            log.error("Consumer Exception id: {}, message : {}", webLogDto.getWebLogDto().getId(), e.getMessage());
        }

        acknowledgment.acknowledge();
    }

변경 후

@KafkaListener(
            topics = "${kafka.consumer.topic.web-log-domain-event}",
            groupId = "${kafka.consumer.group-id.example}",
            containerFactory = "kafkaListenerContainerFactory")
    public void eventHandler(@Payload List<WebLogKafkaDto> webLogDtoList, Acknowledgment acknowledgment) {
        try {
            eventHandler.handleEvent(webLogDtoList);
        } catch (Exception e) {
            log.error("Error Occurred in handling Event, message = {}", e.getMessage());
            doLog(webLogDtoList);
        }

        acknowledgment.acknowledge();
    }

    private void doLog(List<WebLogKafkaDto> webLogKafkaDtoList) {
        webLogKafkaDtoList.forEach(webLogKafkaDto -> log.info("MUST RESEND DATA id = {}", webLogKafkaDto.getWebLogDto().getId()));
    }

 

결과 

튜닝 후의 경우 기존에 쌓여있던 메시지들을 모두 소비한 뒤에 안정적으로 운용 가능할 때를 기준으로 체크하였다. 

튜닝 후에 안정적으로 데이터들을 소비하고 있는 것을 확인할 수 있었다.

튜닝 전 

 

튜닝 후 

문제 상황 

배치 리스너를 구현하면서 몇 가지 문제 상황이 발생했었다. 

1. 너무 많은 POLL_RECORDS

처음에 1000개는 얼마 높지 않은 개수겠거니 하며, 1000개를 poll을 했더니 문제가 발생했었다. 

그 날의 끔찍했던 기억...

너무 많은 데이터들을 한 번에 들고 오다 보니, CPU 사용량이 200%가량 찍혔었고, GC도 오랜 시간이 발생했었다. 

error log 같은 경우 webclient를 사용하고 있는데, 기존에 max connection 수를 200개를 사용하고 있었다. 이보다 더 높은 connection이 발생해 제대로 값이 보내지지 않아 error log가 찍혔다.

POLL_RECORDS의 경우 모니터링하면서 안정적으로 값을 올릴 필요가 있다.

2. 수동 커밋 시 장애가난 message 확인 불가

현재 외부 서비스 기업에서 장애가 나는 경우 혹은 우리 측에서 문제가 있을 경우 예외를 잡아 로그로 찍어두고 커밋을 하는 방식을 사용하고 있다.

카프카 배치리스너가 동작 중간에 장애가 나는 경우 재시도 시 중복 데이터가 발생할 수 있고, 혹은 계속해서 consume 실패를 할 시에 lag가 쌓일 수 있는 문제가 있다고 판단하여 해당 방식을 채택했다. 

문제는 어떤 메시지가 실패했는 지 기록을 하지 않아 실패한 메세지 추적을 하기가 어려웠었다. 

이에 따라 consumer에서 예외 발생 시 실패했던 message들을 로그로 남겨두고 재 전송 하는 방식을 채택하였다. 

 

 

BatchListener를 사용할 경우, 한번 메시지를 poll해올 때 많은 양의 데이터를 들고 와 일괄처리를 할 수 있어 성능 향상과 시스템 리소스 최적화에 도움이 될 수 있다. 하지만 메모리 사용량 증가, 에러처리 복잡성 증가 등 다양한 고려 사항이 있기 때문에 무조건 적용하는 것을 권하지는 않는다.

 

많은 양의 데이터를 빠르게 처리해야 한다면 batchlistener를 고려해도 좋을 것 같다는 생각이 든다.