| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 유효성검사
- Kafka
- Thymeleaf
- 중복되지 않는 값 만들기
- msa
- UUID
- auto-offset-reset
- 관계형 데이터베이스
- 공백검사
- JPA
- springboot
- 스프링 시큐리티
- ORM
- 모던 자바스크립트 Deep Dive
- spring security
- 자바 ORM 표준 JPA 프로그래밍
- 자바스크립트
- 게시판 작성자를 아이디로
- 출처 모던 자바스크립트 Deep Dive
- java
- 자바
- Today
- Total
인지용
SpringBoot Kafka auto-offset-reset 옵션이 적용 안되던 건에 대하여 본문
auto-offset-reset 옵션이란
컨슈머가 오프셋 정보가 없을 때 파티션을 읽을 때 데이터를 어디서부터 읽을건지 정의한다.
(오프셋을 커밋한 적이 없거나, 오프셋의 레코드가 브로커에서 삭제된 경우)
ex) 컨슈머 그룹이 처음 만들어졌을 때
옵션은 3가지가 있다.
earliest: 파티션의 맨 처음부터 모든 데이터를 읽음
latest (기본값): 가장 최신 레코드부터 읽음 (컨슈머 작동하기 시작한 다음부터 쓰여진 레코드부터)
none: 유효하지 않은 오프셋부터 읽으려고 하면 예외 발생
auto-offset-reset 옵션 테스트 해보기
적용이 안되는 문제의 코드
package com.min.mockcoin.infrastructure.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonSerializer
@EnableKafka
@Configuration
class KafkaConfig {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val configProps: MutableMap<String, Any> = HashMap()
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mockcoin")
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
return DefaultKafkaConsumerFactory(configProps)
}
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configProps: MutableMap<String, Any> = HashMap()
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
spring:
config:
activate:
on-profile: local
data:
redis:
host: localhost
port: 6379
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@KafkaListener(topics = arrayOf("mysqlserver.test1.TB_MARKET_ORDER"), groupId = "test-group3")
fun consume3(message: String) {
println("컨슈머3 데이터 받기 성공!!!! message = ${message}")
}
mysqlserver.test1.TB_MARKET_ORDER 토픽에는 이미 여러 데이터들이 있는 상황.
@KafkaListener에 새로운 컨슈머 그룹 (test-group3)을 적어주고 서버를 실행했다.
예상대로라면 맨 처음 데이터부터 데이터 개수만큼 println 구문이 실행돼야했다.
근데 안 찍히네... 뭐지?
원인
원인은 설정에 있었다.
yml에 카프카 설정을 적어두면 KafkaTemplate, ConsumerFactory 등이 자동으로 빈으로 등록된다.
근데 나는 또 ConsumerFactory를 직접 빈으로 정의했고, auto-offset-reset 옵션을 안 넣었었다.
즉 yml이 만든 빈을 덮어쓴 것이지..
그래서 아무리 yml에 auto-offset-reset 설정을 적용해도 소용이 없었다.
해결방법
1. ConsumerFactory Bean 등록 하지말고 yml만 설정하기
2. 직접 선언한 ConsumerFactory Bean에 auto-offset-reset 옵션 적용 하기
나는 1번 방법을 사용했다.
새로운 그룹을 만들고 다시 서버를 실행하니 정상 동작!
예상대로 토픽 맨 처음 데이터부터 끝까지 println이 찍혔다. bbb
(나는 컨슈머, 프로듀서 모두 yml 설정으로 쓰기 위해서 클래스 자체를 주석처리함)
정상 동작 코드
//package com.min.mockcoin.infrastructure.kafka
//
//import org.apache.kafka.clients.consumer.ConsumerConfig
//import org.apache.kafka.clients.producer.ProducerConfig
//import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.kafka.common.serialization.StringSerializer
//import org.springframework.context.annotation.Bean
//import org.springframework.context.annotation.Configuration
//import org.springframework.kafka.annotation.EnableKafka
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
//import org.springframework.kafka.core.*
//import org.springframework.kafka.support.serializer.JsonSerializer
//
//
//@EnableKafka
//@Configuration
//class KafkaConfig {
//
// @Bean
// fun consumerFactory(): ConsumerFactory<String, String> {
// val configProps: MutableMap<String, Any> = HashMap()
// configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mockcoin")
// configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
// configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
// return DefaultKafkaConsumerFactory(configProps)
// }
//
// @Bean
// fun producerFactory(): ProducerFactory<String, String> {
// val configProps: MutableMap<String, Any> = HashMap()
// configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
// configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
// return DefaultKafkaProducerFactory(configProps)
// }
//
// @Bean
// fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
// val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
// factory.consumerFactory = consumerFactory()
// return factory
// }
//
// @Bean
// fun kafkaTemplate(): KafkaTemplate<String, String> {
// return KafkaTemplate(producerFactory())
// }
//
//}
spring:
config:
activate:
on-profile: local
data:
redis:
host: localhost
port: 6379
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: market-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@KafkaListener(topics = arrayOf("mysqlserver.test1.TB_MARKET_ORDER"), groupId = "new-test-group4")
fun consume3(message: String) {
println("컨슈머3 데이터 받기 성공!!!! message = ${message}")
}
결론
1. 간단하게 쓰려면 yml에만 카프카 설정 정보 적어주기
2. yml에 카프카 설정을 썼다면 같은 빈을 직접 등록하지 말기
3. 직접 빈을 등록할 거면 필요한 옵션 다 넣어주기
'정보들' 카테고리의 다른 글
| Mysql load data local infile 방법 대용량 인서트 (0) | 2022.01.26 |
|---|---|
| 스프링부트 초반 셋팅 방법 (0) | 2022.01.26 |
| 자바 is a 관계와(상속관계) has a 관계(포함관계) (0) | 2022.01.21 |
| 메모리란 (0) | 2021.08.27 |
| java UUID란 (0) | 2021.08.25 |