Skip to content

Commit 6ffb7ed

Browse files
authored
[FLINK-38189][core][python] Introduce RowFieldExtractorSchema for Row field serialization (#27353)
This commit introduces RowFieldExtractorSchema, a new SerializationSchema that extracts and serializes a specific field from a Row object. This is particularly useful for Kafka scenarios where keys and values need separate serialization. --------- Co-authored-by: Noufal Rijal <Nflrijal>
1 parent 0ffc910 commit 6ffb7ed

File tree

5 files changed

+584
-2
lines changed

5 files changed

+584
-2
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.common.serialization;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.types.Row;
23+
24+
import javax.annotation.Nullable;
25+
26+
import static org.apache.flink.util.Preconditions.checkArgument;
27+
28+
/**
29+
* Serialization schema that extracts a specific field from a {@link Row} and returns it as a byte
30+
* array.
31+
*
32+
* <p>The field is required to be of type {@code byte[]}. This schema is particularly useful when
33+
* using Flink with Kafka, where you may want to use one Row field as the message key and another as
34+
* the value and perform the conversion to bytes explicitly in user code.
35+
*
36+
* <p>Example usage with Kafka:
37+
*
38+
* <pre>{@code
39+
* KafkaSink<Row> sink = KafkaSink.<Row>builder()
40+
* .setBootstrapServers(bootstrapServers)
41+
* .setRecordSerializer(
42+
* KafkaRecordSerializationSchema.builder()
43+
* .setTopic("my-topic")
44+
* .setKeySerializationSchema(new RowFieldExtractorSchema(0)) // field 0 as key
45+
* .setValueSerializationSchema(new RowFieldExtractorSchema(1)) // field 1 as value
46+
* .build())
47+
* .build();
48+
* }</pre>
49+
*/
50+
@PublicEvolving
51+
public class RowFieldExtractorSchema implements SerializationSchema<Row> {
52+
53+
private static final long serialVersionUID = 1L;
54+
55+
/** The index of the field to extract from the Row. */
56+
private final int fieldIndex;
57+
58+
/**
59+
* Creates a new RowFieldExtractorSchema that extracts the field at the specified index.
60+
*
61+
* @param fieldIndex the zero-based index of the field to extract
62+
* @throws IllegalArgumentException if fieldIndex is negative
63+
*/
64+
public RowFieldExtractorSchema(int fieldIndex) {
65+
checkArgument(fieldIndex >= 0, "Field index must be non-negative, got: %s", fieldIndex);
66+
this.fieldIndex = fieldIndex;
67+
}
68+
69+
/**
70+
* Gets the field index being extracted.
71+
*
72+
* @return the field index
73+
*/
74+
@VisibleForTesting
75+
public int getFieldIndex() {
76+
return fieldIndex;
77+
}
78+
79+
@Override
80+
public byte[] serialize(@Nullable Row element) {
81+
if (element == null) {
82+
return new byte[0];
83+
}
84+
85+
checkArgument(
86+
fieldIndex < element.getArity(),
87+
"Cannot access field %s in Row with arity %s",
88+
fieldIndex,
89+
element.getArity());
90+
91+
Object field = element.getField(fieldIndex);
92+
if (field == null) {
93+
return new byte[0];
94+
}
95+
96+
if (!(field instanceof byte[])) {
97+
throw new IllegalArgumentException(
98+
String.format(
99+
"Field at index %s must be of type byte[], but was %s",
100+
fieldIndex, field.getClass().getName()));
101+
}
102+
103+
return (byte[]) field;
104+
}
105+
106+
@Override
107+
public boolean equals(Object o) {
108+
if (this == o) {
109+
return true;
110+
}
111+
if (o == null || getClass() != o.getClass()) {
112+
return false;
113+
}
114+
RowFieldExtractorSchema that = (RowFieldExtractorSchema) o;
115+
return fieldIndex == that.fieldIndex;
116+
}
117+
118+
@Override
119+
public int hashCode() {
120+
return fieldIndex;
121+
}
122+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.common.serialization;
19+
20+
import org.apache.flink.types.Row;
21+
import org.apache.flink.util.InstantiationUtil;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
30+
31+
/** Tests for {@link RowFieldExtractorSchema}. */
32+
class RowFieldExtractorSchemaTest {
33+
34+
@Test
35+
void testSerializeByteArrayField() {
36+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
37+
byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
38+
Row row = Row.of(value, 123);
39+
40+
byte[] result = schema.serialize(row);
41+
42+
assertThat(result).isEqualTo(value);
43+
}
44+
45+
@Test
46+
void testSerializeNonByteArrayFieldThrowsException() {
47+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(1);
48+
Row row = Row.of("key", 42); // field 1 is Integer, not byte[]
49+
50+
assertThatThrownBy(() -> schema.serialize(row))
51+
.isInstanceOf(IllegalArgumentException.class)
52+
.hasMessageContaining("must be of type byte[]");
53+
}
54+
55+
@Test
56+
void testSerializeNullRow() {
57+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
58+
59+
byte[] result = schema.serialize(null);
60+
61+
assertThat(result).isEmpty();
62+
}
63+
64+
@Test
65+
void testSerializeNullField() {
66+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
67+
Row row = Row.of(null, "value");
68+
69+
byte[] result = schema.serialize(row);
70+
71+
assertThat(result).isEmpty();
72+
}
73+
74+
@Test
75+
void testSerializeOutOfBoundsIndex() {
76+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(5);
77+
Row row = Row.of("field0", "field1");
78+
79+
assertThatThrownBy(() -> schema.serialize(row))
80+
.isInstanceOf(IllegalArgumentException.class)
81+
.hasMessageContaining("Cannot access field 5 in Row with arity 2");
82+
}
83+
84+
@Test
85+
void testNegativeFieldIndexThrowsException() {
86+
assertThatThrownBy(() -> new RowFieldExtractorSchema(-1))
87+
.isInstanceOf(IllegalArgumentException.class)
88+
.hasMessageContaining("Field index must be non-negative");
89+
}
90+
91+
@Test
92+
void testSerializability() throws IOException, ClassNotFoundException {
93+
RowFieldExtractorSchema schema = new RowFieldExtractorSchema(3);
94+
95+
RowFieldExtractorSchema deserialized =
96+
InstantiationUtil.deserializeObject(
97+
InstantiationUtil.serializeObject(schema), getClass().getClassLoader());
98+
99+
assertThat(deserialized.getFieldIndex()).isEqualTo(3);
100+
}
101+
102+
@Test
103+
void testEquals() {
104+
RowFieldExtractorSchema schema1 = new RowFieldExtractorSchema(1);
105+
RowFieldExtractorSchema schema2 = new RowFieldExtractorSchema(1);
106+
RowFieldExtractorSchema schema3 = new RowFieldExtractorSchema(2);
107+
108+
assertThat(schema1).isEqualTo(schema2);
109+
assertThat(schema1).isNotEqualTo(schema3);
110+
}
111+
112+
@Test
113+
void testHashCode() {
114+
RowFieldExtractorSchema schema1 = new RowFieldExtractorSchema(1);
115+
RowFieldExtractorSchema schema2 = new RowFieldExtractorSchema(1);
116+
117+
assertThat(schema1.hashCode()).isEqualTo(schema2.hashCode());
118+
}
119+
}

flink-python/pyflink/common/serialization.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
'SimpleStringSchema',
2626
'ByteArraySchema',
2727
'Encoder',
28-
'BulkWriterFactory'
28+
'BulkWriterFactory',
29+
'RowFieldExtractorSchema',
2930
]
3031

3132

@@ -35,6 +36,7 @@ class SerializationSchema(object):
3536
into a different serialized representation. Most data sinks (for example Apache Kafka) require
3637
the data to be handed to them in a specific format (for example as byte strings).
3738
"""
39+
3840
def __init__(self, j_serialization_schema=None):
3941
self._j_serialization_schema = j_serialization_schema
4042

@@ -48,6 +50,7 @@ class DeserializationSchema(object):
4850
In addition, the DeserializationSchema describes the produced type which lets Flink create
4951
internal serializers and structures to handle the type.
5052
"""
53+
5154
def __init__(self, j_deserialization_schema=None):
5255
self._j_deserialization_schema = j_deserialization_schema
5356

@@ -126,3 +129,53 @@ def __init__(self, j_bulk_writer_factory, row_type):
126129

127130
def get_row_type(self):
128131
return self._row_type
132+
133+
134+
class RowFieldExtractorSchema(SerializationSchema):
135+
"""
136+
Serialization schema that extracts a specific field from a Row and returns it as a
137+
byte array. The field at the specified index MUST be of type bytes (byte array).
138+
This schema is particularly useful when using Flink with Kafka, where you may want to use a
139+
specific field as the message key for partition routing.
140+
The field being extracted must already be a byte array. Users are responsible for
141+
converting their data to bytes before passing it to this schema.
142+
143+
Example usage with Kafka:
144+
>>> from pyflink.common.serialization import RowFieldExtractorSchema
145+
>>> from pyflink.datastream.connectors.kafka import KafkaSink, \
146+
KafkaRecordSerializationSchema
147+
>>>
148+
>>> # User must convert data to bytes beforehand
149+
>>> # For example: Row.of(b"key-bytes", b"value-bytes")
150+
>>>
151+
>>> sink = KafkaSink.builder() \\
152+
... .set_bootstrap_servers("localhost:9092") \\
153+
... .set_record_serializer(
154+
... KafkaRecordSerializationSchema.builder()
155+
... .set_topic("my-topic")
156+
... .set_key_serialization_schema(RowFieldExtractorSchema(0))
157+
# Field 0 (must be bytes) as key
158+
... .set_value_serialization_schema(RowFieldExtractorSchema(1))
159+
# Field 1 (must be bytes) as value
160+
... .build()
161+
... ) \\
162+
... .build()
163+
164+
:param field_index: The zero-based index of the field to extract from the Row.
165+
The field at this index must be of type bytes.
166+
"""
167+
168+
def __init__(self, field_index: int):
169+
"""
170+
Creates a new RowFieldExtractorSchema that extracts the field at the specified index.
171+
172+
:param field_index: The zero-based index of the field to extract (must be non-negative).
173+
:raises ValueError: If field_index is negative.
174+
"""
175+
if field_index < 0:
176+
raise ValueError(f"Field index must be non-negative, got: {field_index}")
177+
gateway = get_gateway()
178+
j_row_field_extractor_schema = gateway.jvm.org.apache.flink.api.common.serialization \
179+
.RowFieldExtractorSchema(field_index)
180+
super(RowFieldExtractorSchema, self).__init__(
181+
j_serialization_schema=j_row_field_extractor_schema)

0 commit comments

Comments
 (0)