인지용

SpringBoot Kafka auto-offset-reset 옵션이 적용 안되던 건에 대하여 본문

정보들

SpringBoot Kafka auto-offset-reset 옵션이 적용 안되던 건에 대하여

인지용 2025. 10. 3. 16:05

 

auto-offset-reset 옵션이란

더보기

컨슈머가 오프셋 정보가 없을 때 파티션을 읽을 때 데이터를 어디서부터 읽을건지 정의한다.

 

(오프셋을 커밋한 적이 없거나, 오프셋의 레코드가 브로커에서 삭제된 경우)

ex) 컨슈머 그룹이 처음 만들어졌을 때

 

 

옵션은 3가지가 있다.

earliest: 파티션의 맨 처음부터 모든 데이터를 읽음

latest (기본값): 가장 최신 레코드부터 읽음 (컨슈머 작동하기 시작한 다음부터 쓰여진 레코드부터)

none: 유효하지 않은 오프셋부터 읽으려고 하면 예외 발생

 

 

auto-offset-reset 옵션 테스트 해보기

적용이 안되는 문제의 코드

package com.min.mockcoin.infrastructure.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonSerializer


@EnableKafka
@Configuration
class KafkaConfig {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val configProps: MutableMap<String, Any> = HashMap()
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mockcoin")
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
        return DefaultKafkaConsumerFactory(configProps)
    }

    @Bean
    fun producerFactory(): ProducerFactory<String, String> {
        val configProps: MutableMap<String, Any> = HashMap()
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        return factory
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

}
spring:
  config:
    activate:
      on-profile: local
  data:
    redis:
      host: localhost
      port: 6379
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

 

@KafkaListener(topics = arrayOf("mysqlserver.test1.TB_MARKET_ORDER"), groupId = "test-group3")
fun consume3(message: String) {
    println("컨슈머3 데이터 받기 성공!!!! message = ${message}")
}

 

 

mysqlserver.test1.TB_MARKET_ORDER 토픽에는 이미 여러 데이터들이 있는 상황.

 

@KafkaListener에 새로운 컨슈머 그룹 (test-group3)을 적어주고 서버를 실행했다.

 

예상대로라면 맨 처음 데이터부터 데이터 개수만큼 println 구문이 실행돼야했다.

 

근데 안 찍히네... 뭐지?

 

원인

원인은 설정에 있었다.

 

yml에 카프카 설정을 적어두면 KafkaTemplate, ConsumerFactory 등이 자동으로 빈으로 등록된다.

 

근데 나는 또 ConsumerFactory를 직접 빈으로 정의했고, auto-offset-reset 옵션을 안 넣었었다.

 

즉 yml이 만든 빈을 덮어쓴 것이지..

 

그래서 아무리 yml에 auto-offset-reset 설정을 적용해도 소용이 없었다.

 

 

해결방법

1. ConsumerFactory Bean 등록 하지말고 yml만 설정하기

2. 직접 선언한 ConsumerFactory Bean에 auto-offset-reset 옵션 적용 하기

 

나는 1번 방법을 사용했다.

 

새로운 그룹을 만들고 다시 서버를 실행하니 정상 동작!

예상대로 토픽 맨 처음 데이터부터 끝까지 println이 찍혔다. bbb

 

(나는 컨슈머, 프로듀서 모두 yml 설정으로 쓰기 위해서 클래스 자체를 주석처리함)

 

정상 동작 코드

//package com.min.mockcoin.infrastructure.kafka
//
//import org.apache.kafka.clients.consumer.ConsumerConfig
//import org.apache.kafka.clients.producer.ProducerConfig
//import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.kafka.common.serialization.StringSerializer
//import org.springframework.context.annotation.Bean
//import org.springframework.context.annotation.Configuration
//import org.springframework.kafka.annotation.EnableKafka
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
//import org.springframework.kafka.core.*
//import org.springframework.kafka.support.serializer.JsonSerializer
//
//
//@EnableKafka
//@Configuration
//class KafkaConfig {
//
//    @Bean
//    fun consumerFactory(): ConsumerFactory<String, String> {
//        val configProps: MutableMap<String, Any> = HashMap()
//        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
//        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mockcoin")
//        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
//        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
//        return DefaultKafkaConsumerFactory(configProps)
//    }
//
//    @Bean
//    fun producerFactory(): ProducerFactory<String, String> {
//        val configProps: MutableMap<String, Any> = HashMap()
//        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
//        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
//        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
//        return DefaultKafkaProducerFactory(configProps)
//    }
//
//    @Bean
//    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
//        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
//        factory.consumerFactory = consumerFactory()
//        return factory
//    }
//
//    @Bean
//    fun kafkaTemplate(): KafkaTemplate<String, String> {
//        return KafkaTemplate(producerFactory())
//    }
//
//}

 

spring:
  config:
    activate:
      on-profile: local
  data:
    redis:
      host: localhost
      port: 6379
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: market-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

@KafkaListener(topics = arrayOf("mysqlserver.test1.TB_MARKET_ORDER"), groupId = "new-test-group4")
fun consume3(message: String) {
    println("컨슈머3 데이터 받기 성공!!!! message = ${message}")
}

 

 

 

결론

1. 간단하게 쓰려면 yml에만 카프카 설정 정보 적어주기

2. yml에 카프카 설정을 썼다면 같은 빈을 직접 등록하지 말기

3. 직접 빈을 등록할 거면 필요한 옵션 다 넣어주기

'정보들' 카테고리의 다른 글

Mysql load data local infile 방법 대용량 인서트  (0) 2022.01.26
스프링부트 초반 셋팅 방법  (0) 2022.01.26
자바 is a 관계와(상속관계) has a 관계(포함관계)  (0) 2022.01.21
메모리란  (0) 2021.08.27
java UUID란  (0) 2021.08.25