Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -53,6 +56,9 @@ public abstract class AbstractJsonDeserializationSchema implements Deserializati

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(AbstractJsonDeserializationSchema.class);

/** Flag indicating whether to fail if a field is missing. */
protected final boolean failOnMissingField;

Expand Down Expand Up @@ -155,4 +161,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
}

/**
* Logs a debug message for parsing errors only when debug logs are enabled.
*
* @param message the original JSON message that failed to parse
* @param t the throwable that was caught
*/
protected void logParseErrorIfDebugEnabled(byte[] message, Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to deserialize JSON '{}'.", new String(message), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
logParseErrorIfDebugEnabled(message, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
// will be caught by outer try-catch
throw t;
}
logParseErrorIfDebugEnabled(message, t);
}
}
} else {
Expand All @@ -97,6 +98,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
logParseErrorIfDebugEnabled(message, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -62,6 +65,8 @@
public final class CanalJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(CanalJsonDeserializationSchema.class);

private static final String FIELD_OLD = "old";
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
Expand Down Expand Up @@ -288,13 +293,22 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the IOException is the same as the debug content - can we use the same variable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit to the nit: It is not the same content. %s vs {}. Sharing this string would potentially make the whole code unnecessarily complex, and this part of the code is probably performance critical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point - lets not do this nit. .

type, new String(message)));
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unknown \"type\" value '{}'. The Canal JSON message is '{}'.",
type,
new String(message));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Canal JSON message '%s'.", new String(message)), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Corrupt Canal JSON message '{}'.", new String(message), t);
}
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -57,6 +60,9 @@
public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(DebeziumJsonDeserializationSchema.class);

private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
Expand Down Expand Up @@ -176,13 +182,22 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
"Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
op, new String(message)));
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unknown \"op\" value '{}'. The Debezium JSON message is '{}'.",
op,
new String(message));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Corrupt Debezium JSON message '{}'.", new String(message), t);
}
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -57,6 +60,9 @@
public class MaxwellJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 2L;

private static final Logger LOG =
LoggerFactory.getLogger(MaxwellJsonDeserializationSchema.class);

private static final String FIELD_OLD = "old";
private static final String OP_INSERT = "insert";
private static final String OP_UPDATE = "update";
Expand Down Expand Up @@ -171,13 +177,22 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
"Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'",
type, new String(message)));
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unknown \"type\" value '{}'. The Maxwell JSON message is '{}'.",
type,
new String(message));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Maxwell JSON message '%s'.", new String(message)), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Corrupt Maxwell JSON message '{}'.", new String(message), t);
}
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -57,6 +60,8 @@
public final class OggJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(OggJsonDeserializationSchema.class);

private static final String OP_CREATE = "I"; // insert
private static final String OP_UPDATE = "U"; // update
private static final String OP_DELETE = "D"; // delete
Expand Down Expand Up @@ -198,13 +203,22 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
"Unknown \"op_type\" value \"%s\". The Ogg JSON message is '%s'",
op, new String(message)));
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unknown \"op_type\" value '{}'. The Ogg JSON message is '{}'.",
op,
new String(message));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Corrupt Ogg JSON message '{}'.", new String(message), t);
}
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.jackson.JacksonMapperFactory;
Expand All @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.math.BigDecimal;
import java.sql.Timestamp;
Expand Down Expand Up @@ -87,6 +89,7 @@
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.slf4j.event.Level.DEBUG;

/**
* Tests for {@link JsonRowDataDeserializationSchema}, {@link
Expand All @@ -97,6 +100,10 @@ public class JsonRowDataSerDeSchemaTest {

private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();

@RegisterExtension
public final LoggerAuditingExtension loggerExtension =
new LoggerAuditingExtension(AbstractJsonDeserializationSchema.class, DEBUG);

@Parameter public boolean isJsonParser;

@Parameters(name = "isJsonParser={0}")
Expand Down Expand Up @@ -839,6 +846,30 @@ private void testParseErrors(TestSpec spec) {
.hasMessageContaining(spec.errorMessage);
}

@TestTemplate
void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
String invalidJson = "{\"id\":\"not_a_number\"}";

DeserializationSchema<RowData> schema =
createDeserializationSchema(
isJsonParser, rowType, false, true, TimestampFormat.SQL);
schema.open(new DummyInitializationContext());

// Deserialize invalid JSON - should not throw but should log
RowData result = schema.deserialize(invalidJson.getBytes());

// Result should be null since parsing failed
assertThat(result).isNull();

// Verify the error was logged at DEBUG level
assertThat(loggerExtension.getMessages())
.anyMatch(
msg ->
msg.contains("Failed to deserialize JSON")
&& msg.contains(invalidJson));
}

private static List<TestSpec> testData =
Arrays.asList(
TestSpec.json("{\"id\": \"trueA\"}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Collector;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.File;
import java.io.IOException;
Expand All @@ -54,10 +56,15 @@
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.slf4j.event.Level.DEBUG;

/** Tests for {@link CanalJsonSerializationSchema} and {@link CanalJsonDeserializationSchema}. */
class CanalJsonSerDeSchemaTest {

@RegisterExtension
public final LoggerAuditingExtension loggerExtension =
new LoggerAuditingExtension(CanalJsonDeserializationSchema.class, DEBUG);

private static final DataType PHYSICAL_DATA_TYPE =
ROW(
FIELD("id", INT().notNull()),
Expand Down Expand Up @@ -105,6 +112,57 @@ void testIgnoreParseErrors() throws Exception {
"An error occurred while collecting data."));
}

@Test
void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
Copy link
Contributor

@davidradl davidradl Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a test case for fail on missing as well please. This also comes out of JsonParserToRowDataConverters as a JsonParseException.

String corruptMessage =
"{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\",\"database\":\"test\",\"table\":\"test\"}";
CanalJsonDeserializationSchema deserializationSchema =
CanalJsonDeserializationSchema.builder(
PHYSICAL_DATA_TYPE,
Collections.emptyList(),
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
.setIgnoreParseErrors(true)
.build();
open(deserializationSchema);

SimpleCollector collector = new SimpleCollector();
deserializationSchema.deserialize(
corruptMessage.getBytes(StandardCharsets.UTF_8), collector);

// Verify no records were collected
assertThat(collector.list).isEmpty();

// Verify the error was logged at DEBUG level
assertThat(loggerExtension.getMessages())
.anyMatch(
msg ->
msg.contains("Unknown \"type\" value")
|| msg.contains("Corrupt Canal JSON"));
}

@Test
void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse() {
String corruptMessage = "{\"type\":\"INVALID\"}";

CanalJsonDeserializationSchema deserializationSchema =
CanalJsonDeserializationSchema.builder(
PHYSICAL_DATA_TYPE,
Collections.emptyList(),
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
.setIgnoreParseErrors(false)
.build();
open(deserializationSchema);

SimpleCollector collector = new SimpleCollector();

assertThatThrownBy(
() ->
deserializationSchema.deserialize(
corruptMessage.getBytes(StandardCharsets.UTF_8), collector))
.isInstanceOf(IOException.class)
.hasMessageContaining("Unknown \"type\" value");
}

@Test
void testDeserializeNullRow() throws Exception {
final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values());
Expand Down
Loading