Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.operators.async.AsyncBatchWaitOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -319,4 +321,84 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitWithRetry(
OutputMode.ORDERED,
asyncRetryStrategy);
}

// ================================================================================
// Batch Async Operations
// ================================================================================

/**
* Adds an AsyncBatchWaitOperator to process elements in batches. The order of output stream
* records may be reordered (unordered mode).
*
* <p>This method is particularly useful for high-latency inference workloads where batching can
* significantly improve throughput, such as machine learning model inference.
*
* <p>The operator buffers incoming elements and triggers the async batch function when the
* buffer reaches {@code maxBatchSize}. Remaining elements are flushed when the input ends.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
DataStream<IN> in, AsyncBatchFunction<IN, OUT> func, int maxBatchSize) {
return unorderedWaitBatch(in, func, maxBatchSize, 0L);
}

/**
* Adds an AsyncBatchWaitOperator to process elements in batches with timeout support. The order
* of output stream records may be reordered (unordered mode).
*
* <p>This method is particularly useful for high-latency inference workloads where batching can
* significantly improve throughput, such as machine learning model inference.
*
* <p>The operator buffers incoming elements and triggers the async batch function when either:
*
* <ul>
* <li>The buffer reaches {@code maxBatchSize}
* <li>The {@code batchTimeoutMs} has elapsed since the first buffered element (if timeout is
* enabled)
* </ul>
*
* <p>Remaining elements are flushed when the input ends.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
long batchTimeoutMs) {
Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0");

TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncBatchFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);

// create transform
AsyncBatchWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncBatchWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), maxBatchSize, batchTimeoutMs);

return in.transform("async batch wait operator", outTypeInfo, operatorFactory);
}

// TODO: Add orderedWaitBatch in follow-up PR
// TODO: Add event-time based batching support in follow-up PR
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.async;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;
import java.util.List;

/**
* A function to trigger Async I/O operations in batches.
*
* <p>For each batch of inputs, an async I/O operation can be triggered via {@link
* #asyncInvokeBatch}, and once it has been done, the results can be collected by calling {@link
* ResultFuture#complete}. This is particularly useful for high-latency inference workloads where
* batching can significantly improve throughput.
*
* <p>Unlike {@link AsyncFunction} which processes one element at a time, this interface allows
* processing multiple elements together, which is beneficial for scenarios like:
*
* <ul>
* <li>Machine learning model inference where batching improves GPU utilization
* <li>External service calls that support batch APIs
* <li>Database queries that can be batched for efficiency
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* public class BatchInferenceFunction implements AsyncBatchFunction<String, String> {
*
* public void asyncInvokeBatch(List<String> inputs, ResultFuture<String> resultFuture) {
* // Submit batch inference request
* CompletableFuture.supplyAsync(() -> {
* List<String> results = modelService.batchInference(inputs);
* return results;
* }).thenAccept(results -> resultFuture.complete(results));
* }
* }
* }</pre>
*
* @param <IN> The type of the input elements.
* @param <OUT> The type of the returned elements.
*/
@PublicEvolving
public interface AsyncBatchFunction<IN, OUT> extends Function, Serializable {

/**
* Trigger async operation for a batch of stream inputs.
*
* <p>The implementation should process all inputs in the batch and complete the result future
* with all corresponding outputs. The number of outputs does not need to match the number of
* inputs - it depends on the specific use case.
*
* @param inputs a batch of elements coming from upstream tasks
* @param resultFuture to be completed with the result data for the entire batch
* @throws Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT> resultFuture) throws Exception;

// TODO: Add timeout handling in follow-up PR
// TODO: Add open/close lifecycle methods in follow-up PR
}
Loading