인지용

io.debezium.DebeziumException: The db history topic is missing 에러 본문

카테고리 없음

io.debezium.DebeziumException: The db history topic is missing 에러

인지용 2025. 10. 14. 00:11

 

kafka 사용중 에러가 발생했다.

io.debezium.DebeziumException: The db history topic is missing

 

 

전체 에러 메시지

더보기

{
  "name": "mysql-coin-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "127.0.0.1:8083",
      "trace": "io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.\n\tat io.debeziuhttp://m.connector.common.BaseSourceTask.validateSchemaHistory(BaseSourceTask.java:135)\n\tat io.debeziuhttp://m.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:138)\n\tat io.debeziuhttp://m.connector.common.BaseSourceTask.start(BaseSourceTask.java:260)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:279)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:176)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"
    }
  ],
  "type": "source"
}

 

 

ddl 변경 후 꼬여서 CDC가 안돼서

mysql source connector json의 "schema.history.internal.kafka.topic" 옵션에 설정된 토픽을 지웠었다.

 

그 이후 에러가 났다.

 

 

임시 해결방법

1. connector json의 name 변경해주기

2. snapshot.mode: initial 옵션 추가

 

{
  "name": "mysql-coin-connector",  <- 1. name 변경
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "test",
    "database.password": test
    "database.include.list": "test1",
    "database.server.id": "184055",
    "snapshot.mode": "initial" <- 2. 추가
	...
  }
}

 

 

 

 

 

전체 json

{
  "name": "mysql-coin-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "test",
    "database.password": "test",
    "database.include.list": "test1",
    "database.server.id": "184057",
    "database.connectionTimeZone": "Asia/Seoul",
    "table.include.list": "test1.TB_MARKET_ORDER,test1.TB_COIN_TRADE_HIST",
    "topic.prefix": "mysqlserver",
    
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history.internal.kafka.topic": "schemahistory.v3",
    "snapshot.mode": "initial",
    
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    
    "transforms": "unwrap, extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "id",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    
    "errors.tolerance": "all",
    "errors.log.enable": "true"
  }
}

 

 

출처

https://groups.google.com/g/debezium/c/IO55ZeAEM14