zs5200258 发表于 前天 10:27

cdc实时采集greatsql数据,Unsupported table metadata field type 0

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) ~        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) ~        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) ~        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) ~        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) ~        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) ~        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) ~        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_392]        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_392]        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_392]        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_392]        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)         at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)         at akka.actor.Actor.aroundReceive(Actor.scala:537)         at akka.actor.Actor.aroundReceive$(Actor.scala:535)         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)         at akka.actor.ActorCell.invoke(ActorCell.scala:547)         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)         at akka.dispatch.Mailbox.run(Mailbox.scala:231)         at akka.dispatch.Mailbox.exec(Mailbox.scala:243)         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_392]        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_392]        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_392]        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_392]Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) ~        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) ~        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) ~        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_392]        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_392]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392]        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) ~        at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85) ~        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1553) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1064) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1755740154000, eventType=TABLE_MAP, serverId=1003313, headerLength=19, dataLength=210, nextPosition=619854, flags=0}        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1488) ~        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1553) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1064) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1755740154000, eventType=TABLE_MAP, serverId=1003313, headerLength=19, dataLength=210, nextPosition=619854, flags=0}        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341) ~        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:313) ~        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:237) ~        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]Caused by: java.io.IOException: Unsupported table metadata field type 0        at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventMetadataDeserializer.deserialize(TableMapEventMetadataDeserializer.java:52) ~        at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:59) ~        at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:37) ~        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:335) ~        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:313) ~        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:237) ~        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~

yejr 发表于 前天 13:11

表结构请先贴一下,以及cdc的一些相关配置。

另外,作为测试,可以尝试把数据源换成MySQL 8.0.32版本看是否还会报错,有可能是cdc本身存在问题。

zs5200258 发表于 前天 15:01

yejr 发表于 2025-8-21 13:11
表结构请先贴一下,以及cdc的一些相关配置。

另外,作为测试,可以尝试把数据源换成MySQL 8.0.32版本看是 ...

-- zs_db.greatdb_mb_0804 definition

CREATE TABLE `greatdb_mb_0804` (
`id` int NOT NULL COMMENT '主键,整数值',
`idUn_column` int unsigned DEFAULT NULL COMMENT '无符号整数,只能存储正整数(包括0)',
`idUnZe_column` int(10) unsigned zerofill DEFAULT NULL COMMENT '无符号整数,零填充,用于确保数字位数',
`varchar_column` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '可变长度字符串',
`bit1_column` bit(1) DEFAULT NULL COMMENT '位类型',
`bit8_column` bit(8) DEFAULT NULL COMMENT '位类型',
`tinyint_column` tinyint(1) DEFAULT NULL COMMENT '非常小的整数',
`tinyint5_column` tinyint DEFAULT NULL COMMENT '非常小的整数',
`tinyintUn_column` tinyint unsigned DEFAULT NULL COMMENT '无符号的非常小的整数',
`tinyintUnZe_column` tinyint(3) unsigned zerofill DEFAULT NULL COMMENT '无符号的非常小的整数,零填充',
`smallint_column` smallint DEFAULT NULL COMMENT '小的整数',
`smallintUn_column` smallint unsigned DEFAULT NULL COMMENT '无符号的小整数',
`smallintUnZe_column` smallint(5) unsigned zerofill DEFAULT NULL COMMENT '无符号的小整数',
`mediumint_column` mediumint DEFAULT NULL COMMENT '中等大小的整数',
`mediumintUn_column` mediumint unsigned DEFAULT NULL COMMENT '无符号的中等大小的整数',
`mediumintUnZe_column` mediumint(8) unsigned zerofill DEFAULT NULL COMMENT '无符号的中等大小的整数,零填充',
`bigint_column` bigint DEFAULT NULL COMMENT '大整数',
`bigintUn_column` bigint unsigned DEFAULT NULL COMMENT '无符号的大整数',
`bigintUnZe_column` bigint(20) unsigned zerofill DEFAULT NULL COMMENT '无符号的大整数,零填充',
`real_column` double DEFAULT NULL COMMENT '单精度浮点数,近似值',
`realUn_column` double unsigned DEFAULT NULL COMMENT '无符号的单精度浮点数',
`realUnZe_column` double unsigned zerofill DEFAULT NULL COMMENT '无符号的单精度浮点数,零填充',
`float_column` float(4,1) DEFAULT NULL COMMENT '单精度浮点数,总位数为4,小数点后1位',
`floatUn_column` float unsigned DEFAULT NULL COMMENT '无符号的单精度浮点数',
`floatUnZe_column` float unsigned zerofill DEFAULT NULL COMMENT '无符号的单精度浮点数,零填充',
`double_column` double(4,2) DEFAULT NULL COMMENT '双精度浮点数,总位数为4,小数点后2位',
`doubleUn_column` double unsigned DEFAULT NULL COMMENT '无符号的双精度浮点数',
`doubleUnZe_column` double unsigned zerofill DEFAULT NULL COMMENT '无符号的双精度浮点数,零填充',
`doublePr_column` double DEFAULT NULL COMMENT '双精度浮点数,与DOUBLE相同',
`doublePrUn_column` double unsigned DEFAULT NULL COMMENT '无符号的双精度浮点数',
`doublePrUnZe_column` double unsigned zerofill DEFAULT NULL COMMENT '无符号的双精度浮点数,零填充',
`decimal_column` decimal(10,2) DEFAULT NULL COMMENT '精确的定点数,总位数为10,小数点后2位',
`numeric_column` decimal(10,0) DEFAULT NULL COMMENT '定点数,与DECIMAL相同',
`tinyblob_column` tinyblob COMMENT '非常小的BLOB,用于存储二进制数据',
`blob_column` blob COMMENT 'BLOB,用于存储二进制数据',
`binary_column` binary(50) DEFAULT NULL COMMENT '定长二进制数据',
`varbinary_column` varbinary(255) DEFAULT NULL COMMENT '可变长度二进制数据',
`mediumblob_column` mediumblob COMMENT '中等大小的BLOB,用于存储二进制数据',
`longblob_column` longblob COMMENT '大的BLOB,用于存储二进制数据',
`date_column` date DEFAULT NULL COMMENT '日期值,格式为YYYY-MM-DD',
`time_column` time DEFAULT NULL COMMENT '时间值或持续时间,格式为HH:MM:SS',
`year_column` year DEFAULT NULL COMMENT '年份值,格式为YYYY',
`datetime_column` datetime DEFAULT NULL COMMENT '混合日期和时间值,格式为YYYY-MM-DD HH:MM:SS',
`timestamp_column` timestamp NULL DEFAULT NULL COMMENT '时间戳,记录数据插入或更新的时间,自动更新',
`char_column` char(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '定长字符串',
`tinytext_column` tinytext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '短文本字符串',
`text_column` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '长文本数据',
`mediumtext_column` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '中等长度文本数据',
`longtext_column` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '极大文本数据',
`enum_column` enum('value1','value2','value3','value4','value5','value6') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '枚举值',
`serial_column` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '自增整数,通常用于主键,自动增加',
`boolean_column` tinyint(1) DEFAULT NULL COMMENT '布尔类型,用于存储TRUE或FALSE',
`json_column` json DEFAULT NULL COMMENT 'JSON类型,用于存储JSON格式的数据',
`set_column` set('option1','option2','option3','option4','option5','option6') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '集合类型',
`geometry_column` geometry DEFAULT NULL COMMENT '几何类型,用于存储点、线、多边形等几何形状',
`point_column` point DEFAULT NULL COMMENT '点类型,用于存储地理坐标点',
`linestring_column` linestring DEFAULT NULL COMMENT '线字符串类型,用于存储一系列的点,形成一条线',
`polygon_column` polygon DEFAULT NULL COMMENT '多边形类型,用于存储一个闭合的几何形状',
`multipoint_column` multipoint DEFAULT NULL COMMENT '多点类型,用于存储多个点',
`multilinestring_column` multilinestring DEFAULT NULL COMMENT '多线字符串类型,用于存储多条线',
`multipolygon_column` multipolygon DEFAULT NULL COMMENT '多多边形类型,用于存储多个多边形',
`geometrycollection_column` geomcollection DEFAULT NULL COMMENT '几何集合类型,用于存储多种几何类型的集合',
PRIMARY KEY (`id`),
UNIQUE KEY `serial_column` (`serial_column`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='MySQL对接数据标准所有字符类型';

zs5200258 发表于 前天 15:02

yejr 发表于 2025-8-21 13:11
表结构请先贴一下,以及cdc的一些相关配置。

另外,作为测试,可以尝试把数据源换成MySQL 8.0.32版本看是 ...



yejr 发表于 前天 15:44

zs5200258 发表于 2025-8-21 15:02


注意到你的测试表指定的排序规则(utf8mb4_general_ci)并不是utf8mb4默认对应的规则(utf8mb4_0900_ai_ci),请先修改这个之后再看是否正常。

另外,你可以跑成功的MySQL版本是8.0.32吗,还是其他?

zs5200258 发表于 前天 16:30

yejr 发表于 2025-8-21 15:44
注意到你的测试表指定的排序规则(utf8mb4_general_ci)并不是utf8mb4默认对应的规则(utf8mb4_0900_ai_c ...

flink-connector-mysql-cdc升级到3.0.1就没问题了,以前MySQL只测试过v5.7的

zs5200258 发表于 前天 16:31

zs5200258 发表于 2025-8-21 16:30
flink-connector-mysql-cdc升级到3.0.1就没问题了,以前MySQL只测试过v5.7的

TableMapEventMetadataDeserializer,3.0.1主要是重写了这个类就没有问题了

yejr 发表于 前天 16:45

zs5200258 发表于 2025-8-21 16:31
TableMapEventMetadataDeserializer,3.0.1主要是重写了这个类就没有问题了

好的,谢谢反馈。

5.7和8.0的一些底层元数据信息是不同的。
页: [1]
查看完整版本: cdc实时采集greatsql数据,Unsupported table metadata field type 0