diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index 88c5e9d4c1dc1..769e721a14de1 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -34,6 +34,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -53,6 +56,9 @@ public abstract class AbstractJsonDeserializationSchema implements Deserializati private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(AbstractJsonDeserializationSchema.class); + /** Flag indicating whether to fail if a field is missing. */ protected final boolean failOnMissingField; @@ -155,4 +161,16 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat); } + + /** + * Logs a debug message for parsing errors only when debug logs are enabled. + * + * @param message the original JSON message that failed to parse + * @param t the throwable that was caught + */ + protected void logParseErrorIfDebugEnabled(byte[] message, Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to deserialize JSON '{}'.", new String(message), t); + } + } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java index eea1f607f1c58..8a21962ad7dd5 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java @@ -100,6 +100,7 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti throw new IOException( format("Failed to deserialize JSON '%s'.", new String(message)), t); } + logParseErrorIfDebugEnabled(message, t); } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index e8f76eca52aba..a34d4f8d9b687 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -84,6 +84,7 @@ public void deserialize(@Nullable byte[] message, Collector out) throws // will be caught by outer try-catch throw t; } + logParseErrorIfDebugEnabled(message, t); } } } else { @@ -97,6 +98,7 @@ public void deserialize(@Nullable byte[] message, Collector out) throws throw new IOException( format("Failed to deserialize JSON '%s'.", new String(message)), t); } + logParseErrorIfDebugEnabled(message, t); } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java index 5f4594343a7dc..a4c797f7a48d4 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java @@ -36,6 +36,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -62,6 +65,8 @@ public final class CanalJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CanalJsonDeserializationSchema.class); + private static final String FIELD_OLD = "old"; private static final String OP_INSERT = "INSERT"; private static final String OP_UPDATE = "UPDATE"; @@ -288,6 +293,12 @@ public void deserialize(@Nullable byte[] message, Collector out) throws "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message))); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Unknown \"type\" value '{}'. The Canal JSON message is '{}'.", + type, + new String(message)); + } } } catch (Throwable t) { // a big try catch to protect the processing. @@ -295,6 +306,9 @@ public void deserialize(@Nullable byte[] message, Collector out) throws throw new IOException( format("Corrupt Canal JSON message '%s'.", new String(message)), t); } + if (LOG.isDebugEnabled()) { + LOG.debug("Corrupt Canal JSON message '{}'.", new String(message), t); + } } for (GenericRowData genericRowData : genericRowDataList) { out.collect(genericRowData); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java index 774e271cd0af0..9db88209aebda 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java @@ -33,6 +33,9 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -57,6 +60,9 @@ public final class DebeziumJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(DebeziumJsonDeserializationSchema.class); + private static final String OP_READ = "r"; // snapshot read private static final String OP_CREATE = "c"; // insert private static final String OP_UPDATE = "u"; // update @@ -176,6 +182,12 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message))); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Unknown \"op\" value '{}'. The Debezium JSON message is '{}'.", + op, + new String(message)); + } } } catch (Throwable t) { // a big try catch to protect the processing. @@ -183,6 +195,9 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti throw new IOException( format("Corrupt Debezium JSON message '%s'.", new String(message)), t); } + if (LOG.isDebugEnabled()) { + LOG.debug("Corrupt Debezium JSON message '{}'.", new String(message), t); + } } for (GenericRowData genericRowData : genericRowDataList) { out.collect(genericRowData); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java index 0a31d6a95450d..5e369a380a881 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java @@ -34,6 +34,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -57,6 +60,9 @@ public class MaxwellJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 2L; + private static final Logger LOG = + LoggerFactory.getLogger(MaxwellJsonDeserializationSchema.class); + private static final String FIELD_OLD = "old"; private static final String OP_INSERT = "insert"; private static final String OP_UPDATE = "update"; @@ -171,6 +177,12 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message))); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Unknown \"type\" value '{}'. The Maxwell JSON message is '{}'.", + type, + new String(message)); + } } } catch (Throwable t) { // a big try catch to protect the processing. @@ -178,6 +190,9 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti throw new IOException( format("Corrupt Maxwell JSON message '%s'.", new String(message)), t); } + if (LOG.isDebugEnabled()) { + LOG.debug("Corrupt Maxwell JSON message '{}'.", new String(message), t); + } } for (GenericRowData genericRowData : genericRowDataList) { out.collect(genericRowData); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java index 8fc5ca9fc3952..d6351f61c39aa 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java @@ -33,6 +33,9 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -57,6 +60,8 @@ public final class OggJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(OggJsonDeserializationSchema.class); + private static final String OP_CREATE = "I"; // insert private static final String OP_UPDATE = "U"; // update private static final String OP_DELETE = "D"; // delete @@ -198,6 +203,12 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti "Unknown \"op_type\" value \"%s\". The Ogg JSON message is '%s'", op, new String(message))); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Unknown \"op_type\" value '{}'. The Ogg JSON message is '{}'.", + op, + new String(message)); + } } } catch (Throwable t) { // a big try catch to protect the processing. @@ -205,6 +216,9 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti throw new IOException( format("Corrupt Ogg JSON message '%s'.", new String(message)), t); } + if (LOG.isDebugEnabled()) { + LOG.debug("Corrupt Ogg JSON message '{}'.", new String(message), t); + } } for (GenericRowData genericRowData : genericRowDataList) { out.collect(genericRowData); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 94f59dbba364a..43d06dc92abf0 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -34,6 +34,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -45,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import java.math.BigDecimal; import java.sql.Timestamp; @@ -87,6 +89,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.slf4j.event.Level.DEBUG; /** * Tests for {@link JsonRowDataDeserializationSchema}, {@link @@ -97,6 +100,10 @@ public class JsonRowDataSerDeSchemaTest { private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(AbstractJsonDeserializationSchema.class, DEBUG); + @Parameter public boolean isJsonParser; @Parameters(name = "isJsonParser={0}") @@ -839,6 +846,30 @@ private void testParseErrors(TestSpec spec) { .hasMessageContaining(spec.errorMessage); } + @TestTemplate + void testIgnoreParseErrorsLogsDebugMessage() throws Exception { + RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType(); + String invalidJson = "{\"id\":\"not_a_number\"}"; + + DeserializationSchema schema = + createDeserializationSchema( + isJsonParser, rowType, false, true, TimestampFormat.SQL); + schema.open(new DummyInitializationContext()); + + // Deserialize invalid JSON - should not throw but should log + RowData result = schema.deserialize(invalidJson.getBytes()); + + // Result should be null since parsing failed + assertThat(result).isNull(); + + // Verify the error was logged at DEBUG level + assertThat(loggerExtension.getMessages()) + .anyMatch( + msg -> + msg.contains("Failed to deserialize JSON") + && msg.contains(invalidJson)); + } + private static List testData = Arrays.asList( TestSpec.json("{\"id\": \"trueA\"}") diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java index 74fc6648900c4..6441321537d70 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java @@ -28,9 +28,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.File; import java.io.IOException; @@ -54,10 +56,15 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.slf4j.event.Level.DEBUG; /** Tests for {@link CanalJsonSerializationSchema} and {@link CanalJsonDeserializationSchema}. */ class CanalJsonSerDeSchemaTest { + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(CanalJsonDeserializationSchema.class, DEBUG); + private static final DataType PHYSICAL_DATA_TYPE = ROW( FIELD("id", INT().notNull()), @@ -105,6 +112,57 @@ void testIgnoreParseErrors() throws Exception { "An error occurred while collecting data.")); } + @Test + void testIgnoreParseErrorsLogsDebugMessage() throws Exception { + String corruptMessage = + "{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\",\"database\":\"test\",\"table\":\"test\"}"; + CanalJsonDeserializationSchema deserializationSchema = + CanalJsonDeserializationSchema.builder( + PHYSICAL_DATA_TYPE, + Collections.emptyList(), + InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType())) + .setIgnoreParseErrors(true) + .build(); + open(deserializationSchema); + + SimpleCollector collector = new SimpleCollector(); + deserializationSchema.deserialize( + corruptMessage.getBytes(StandardCharsets.UTF_8), collector); + + // Verify no records were collected + assertThat(collector.list).isEmpty(); + + // Verify the error was logged at DEBUG level + assertThat(loggerExtension.getMessages()) + .anyMatch( + msg -> + msg.contains("Unknown \"type\" value") + || msg.contains("Corrupt Canal JSON")); + } + + @Test + void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse() { + String corruptMessage = "{\"type\":\"INVALID\"}"; + + CanalJsonDeserializationSchema deserializationSchema = + CanalJsonDeserializationSchema.builder( + PHYSICAL_DATA_TYPE, + Collections.emptyList(), + InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType())) + .setIgnoreParseErrors(false) + .build(); + open(deserializationSchema); + + SimpleCollector collector = new SimpleCollector(); + + assertThatThrownBy( + () -> + deserializationSchema.deserialize( + corruptMessage.getBytes(StandardCharsets.UTF_8), collector)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Unknown \"type\" value"); + } + @Test void testDeserializeNullRow() throws Exception { final List requestedMetadata = Arrays.asList(ReadableMetadata.values()); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java index 6c207c78bcfe0..3252c090f5983 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -27,9 +27,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.File; import java.io.IOException; @@ -53,12 +55,17 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.slf4j.event.Level.DEBUG; /** * Tests for {@link DebeziumJsonSerializationSchema} and {@link DebeziumJsonDeserializationSchema}. */ class DebeziumJsonSerDeSchemaTest { + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(DebeziumJsonDeserializationSchema.class, DEBUG); + private static final DataType PHYSICAL_DATA_TYPE = ROW( FIELD("id", INT().notNull()), @@ -142,6 +149,34 @@ void testIgnoreParseErrors() throws Exception { "An error occurred while collecting data.")); } + @Test + void testIgnoreParseErrorsLogsDebugMessage() throws Exception { + String corruptMessage = "{\"before\":null,\"after\":null,\"op\":\"UNKNOWN_OP\"}"; + DebeziumJsonDeserializationSchema deserializationSchema = + new DebeziumJsonDeserializationSchema( + PHYSICAL_DATA_TYPE, + Collections.emptyList(), + InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()), + false, + true, // ignoreParseErrors + TimestampFormat.ISO_8601); + open(deserializationSchema); + + SimpleCollector collector = new SimpleCollector(); + deserializationSchema.deserialize( + corruptMessage.getBytes(StandardCharsets.UTF_8), collector); + + // Verify no records were collected + assertThat(collector.list).isEmpty(); + + // Verify the error was logged at DEBUG level + assertThat(loggerExtension.getMessages()) + .anyMatch( + msg -> + msg.contains("Unknown \"op\" value") + || msg.contains("Corrupt Debezium JSON")); + } + @Test void testDeserializationWithMetadata() throws Exception { testDeserializationWithMetadata( diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java index d0fbb2d49ce18..a25fc6551cbdf 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java @@ -27,9 +27,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.File; import java.io.IOException; @@ -52,12 +54,17 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.slf4j.event.Level.DEBUG; /** * Tests for {@link MaxwellJsonSerializationSchema} and {@link MaxwellJsonDeserializationSchema}. */ class MaxwellJsonSerDerTest { + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(MaxwellJsonDeserializationSchema.class, DEBUG); + private static final DataType PHYSICAL_DATA_TYPE = ROW( FIELD("id", INT().notNull()), @@ -130,6 +137,33 @@ void testIgnoreParseErrors() throws Exception { "An error occurred while collecting data.")); } + @Test + void testIgnoreParseErrorsLogsDebugMessage() throws Exception { + String corruptMessage = "{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\"}"; + MaxwellJsonDeserializationSchema deserializationSchema = + new MaxwellJsonDeserializationSchema( + PHYSICAL_DATA_TYPE, + Collections.emptyList(), + InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()), + true, // ignoreParseErrors + TimestampFormat.ISO_8601); + open(deserializationSchema); + + SimpleCollector collector = new SimpleCollector(); + deserializationSchema.deserialize( + corruptMessage.getBytes(StandardCharsets.UTF_8), collector); + + // Verify no records were collected + assertThat(collector.list).isEmpty(); + + // Verify the error was logged at DEBUG level + assertThat(loggerExtension.getMessages()) + .anyMatch( + msg -> + msg.contains("Unknown \"type\" value") + || msg.contains("Corrupt Maxwell JSON")); + } + @Test void testSerializationDeserialization() throws Exception { List lines = readLines("maxwell-data.txt"); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java index 00cdf512ef679..b5a81e40d37ed 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java @@ -27,10 +27,12 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.Collector; import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.File; import java.io.IOException; @@ -53,10 +55,15 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.slf4j.event.Level.DEBUG; /** Tests for {@link OggJsonSerializationSchema} and {@link OggJsonDeserializationSchema}. */ class OggJsonSerDeSchemaTest { + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(OggJsonDeserializationSchema.class, DEBUG); + private static final DataType PHYSICAL_DATA_TYPE = ROW( FIELD("id", INT().notNull()), @@ -108,6 +115,33 @@ void testIgnoreParseErrors() throws Exception { "An error occurred while collecting data.")); } + @Test + void testIgnoreParseErrorsLogsDebugMessage() throws Exception { + String corruptMessage = "{\"before\":null,\"after\":null,\"op_type\":\"UNKNOWN_OP\"}"; + OggJsonDeserializationSchema deserializationSchema = + new OggJsonDeserializationSchema( + PHYSICAL_DATA_TYPE, + Collections.emptyList(), + InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()), + true, // ignoreParseErrors + TimestampFormat.ISO_8601); + open(deserializationSchema); + + SimpleCollector collector = new SimpleCollector(); + deserializationSchema.deserialize( + corruptMessage.getBytes(StandardCharsets.UTF_8), collector); + + // Verify no records were collected + assertThat(collector.list).isEmpty(); + + // Verify the error was logged at DEBUG level + assertThat(loggerExtension.getMessages()) + .anyMatch( + msg -> + msg.contains("Unknown \"op_type\" value") + || msg.contains("Corrupt Ogg JSON")); + } + @Test void testTombstoneMessages() throws Exception { OggJsonDeserializationSchema deserializationSchema =