From 4084ebc35ed0b98baabdb42c393680c7c4983ea3 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Thu, 11 Dec 2025 17:15:53 +0000 Subject: [PATCH 01/16] add http:// to the uri --- .../controller/api/resources/PinotTableRestletResource.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 9d934c4d1841..ee74a723a0eb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -305,6 +305,9 @@ public CopyTableResponse copyTable( CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, CopyTablePayload.class); String sourceControllerUri = copyTablePayload.getSourceClusterUri(); + if (!sourceControllerUri.startsWith("http://") && !sourceControllerUri.startsWith("https://")) { + sourceControllerUri = "http://" + sourceControllerUri; + } Map requestHeaders = copyTablePayload.getHeaders(); LOGGER.info("[copyTable] Start copying table: {} from source: {}", tableName, sourceControllerUri); From 8bbe56f06f5c15d5f79bfff9d0e815c931bd6a87 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Sat, 13 Dec 2025 00:03:51 +0000 Subject: [PATCH 02/16] enforce user input http:// or https:// --- .../controller/api/resources/PinotTableRestletResource.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index ee74a723a0eb..9d934c4d1841 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -305,9 +305,6 @@ public CopyTableResponse copyTable( CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, CopyTablePayload.class); String sourceControllerUri = copyTablePayload.getSourceClusterUri(); - if (!sourceControllerUri.startsWith("http://") && !sourceControllerUri.startsWith("https://")) { - sourceControllerUri = "http://" + sourceControllerUri; - } Map requestHeaders = copyTablePayload.getHeaders(); LOGGER.info("[copyTable] Start copying table: {} from source: {}", tableName, sourceControllerUri); From b536ae640165f287d8b02d71b36c8359dfbf40fd Mon Sep 17 00:00:00 2001 From: xuanyili Date: Thu, 18 Dec 2025 22:45:03 +0000 Subject: [PATCH 03/16] [Pinot Copy Table] Create a Controller Job for historical segments backfilling --- .../controller/BaseControllerStarter.java | 5 + .../pinot/controller/ControllerConf.java | 5 + .../api/resources/CopyTablePayload.java | 20 +++ .../helix/core/PinotHelixResourceManager.java | 31 ++++- .../helix/core/WatermarkInductionResult.java | 11 +- .../controllerjob/ControllerJobTypes.java | 4 +- .../core/replication/NoOpSegmentCopier.java | 34 +++++ .../replication/RealtimeSegmentCopier.java | 126 ++++++++++++++++++ .../helix/core/replication/SegmentCopier.java | 39 ++++++ .../replication/TableReplicationObserver.java | 30 +++++ .../TableReplicationProgressStats.java | 69 ++++++++++ .../core/replication/TableReplicator.java | 124 +++++++++++++++++ .../ZkBasedTableReplicationObserver.java | 94 +++++++++++++ ...inotHelixResourceManagerStatelessTest.java | 20 +++ .../RealtimeSegmentCopierTest.java | 115 ++++++++++++++++ .../TableReplicationProgressStatsTest.java | 43 ++++++ .../core/replication/TableReplicatorTest.java | 121 +++++++++++++++++ .../ZkBasedTableReplicationObserverTest.java | 105 +++++++++++++++ .../pinot/spi/utils/CommonConstants.java | 2 + 19 files changed, 995 insertions(+), 3 deletions(-) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 439d6d15ea17..d5aeadfc90c7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -117,6 +117,8 @@ import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceChecker; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator; +import org.apache.pinot.controller.helix.core.replication.RealtimeSegmentCopier; +import org.apache.pinot.controller.helix.core.replication.TableReplicator; import org.apache.pinot.controller.helix.core.retention.RetentionManager; import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory; import org.apache.pinot.controller.helix.core.util.HelixSetupUtils; @@ -225,6 +227,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected TaskMetricsEmitter _taskMetricsEmitter; protected PoolingHttpClientConnectionManager _connectionManager; protected TenantRebalancer _tenantRebalancer; + protected TableReplicator _tableReplicator; // This executor should be used by all code paths for user initiated rebalances, so that the controller config // CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored. protected ExecutorService _rebalancerExecutorService; @@ -647,6 +650,7 @@ private void setUpPinotController() { _rebalancerExecutorService); _tenantRebalancer = new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, _rebalancerExecutorService); + _tableReplicator = new TableReplicator(_helixResourceManager, _executorService, new RealtimeSegmentCopier(_config)); // Setting up periodic tasks List controllerPeriodicTasks = setupControllerPeriodicTasks(); @@ -708,6 +712,7 @@ protected void configure() { bind(_sqlQueryExecutor).to(SqlQueryExecutor.class); bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class); bind(_tenantRebalancer).to(TenantRebalancer.class); + bind(_tableReplicator).to(TableReplicator.class); bind(_tableSizeReader).to(TableSizeReader.class); bind(_storageQuotaChecker).to(StorageQuotaChecker.class); bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index a0decdabdded..f3603da96a59 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -405,6 +405,7 @@ public static long getRandomInitialDelayInSeconds() { public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK = "controller.tenant.rebalance.maxJobsInZK"; public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK = "controller.reload.segment.maxJobsInZK"; public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK = "controller.force.commit.maxJobsInZK"; + public static final String CONFIG_OF_MAX_TABLE_REPLICATION_JOBS_IN_ZK = "controller.table.replication.maxJobsInZK"; private final Map _invalidConfigs = new ConcurrentHashMap<>(); @@ -1401,4 +1402,8 @@ public List getTimeseriesLanguages() { public boolean getSegmentCompletionGroupCommitEnabled() { return getProperty(CONTROLLER_SEGMENT_COMPLETION_GROUP_COMMIT_ENABLED, true); } + + public int getMaxTableReplicationZkJobs() { + return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java index 8583ff36a3cb..6cadb3a0587d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java @@ -25,11 +25,17 @@ import java.util.Map; import javax.annotation.Nullable; +/** + * Payload for the copy table request. + */ @JsonIgnoreProperties(ignoreUnknown = true) public class CopyTablePayload { private String _sourceClusterUri; private Map _headers; + + private String _destinationClusterUri; + private Map _destinationClusterHeaders; /** * Broker tenant for the new table. * MUST NOT contain the tenant type suffix, i.e. _BROKER. @@ -51,11 +57,15 @@ public class CopyTablePayload { public CopyTablePayload( @JsonProperty(value = "sourceClusterUri", required = true) String sourceClusterUri, @JsonProperty("sourceClusterHeaders") Map headers, + @JsonProperty(value = "destinationClusterUri", required = true) String destinationClusterUri, + @JsonProperty(value = "destinationClusterHeaders") Map destinationClusterHeaders, @JsonProperty(value = "brokerTenant", required = true) String brokerTenant, @JsonProperty(value = "serverTenant", required = true) String serverTenant, @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap) { _sourceClusterUri = sourceClusterUri; _headers = headers; + _destinationClusterUri = destinationClusterUri; + _destinationClusterHeaders = destinationClusterHeaders; _brokerTenant = brokerTenant; _serverTenant = serverTenant; _tagPoolReplacementMap = tagPoolReplacementMap; @@ -71,6 +81,16 @@ public Map getHeaders() { return _headers; } + @JsonGetter("destinationClusterUri") + public String getDestinationClusterUri() { + return _sourceClusterUri; + } + + @JsonGetter("destinationClusterHeaders") + public Map getDestinationClusterHeaders() { + return _destinationClusterHeaders; + } + @JsonGetter("brokerTenant") public String getBrokerTenant() { return _brokerTenant; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 7caa81d9d790..ce61ad4d4fbb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2519,6 +2519,19 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.FORCE_COMMIT); } + public boolean addNewSegmentCopyXClusterJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + List consumingSegmentsCommitted) + throws JsonProcessingException { + Map jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REPLICATION.name()); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED, + JsonUtils.objectToString(consumingSegmentsCommitted)); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + } + /** * Adds a new job metadata for controller job like table rebalance or reload into ZK * @param jobId job's UUID @@ -4931,7 +4944,23 @@ public WatermarkInductionResult getConsumerWatermarks(String tableName) throws T } return new WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq, startOffset); }).collect(Collectors.toList()); - return new WatermarkInductionResult(watermarks); + + Map partGroupToLatestSeq = watermarks.stream().collect( + Collectors.toMap(WatermarkInductionResult.Watermark::getPartitionGroupId, + WatermarkInductionResult.Watermark::getSequenceNumber)); + List historicalSegments = new ArrayList<>(); + for (String segment : idealState.getRecord().getMapFields().keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); + if (llcSegmentName != null) { + long partitionGroupId = llcSegmentName.getPartitionGroupId(); + int seq = llcSegmentName.getSequenceNumber(); + if (partGroupToLatestSeq.containsKey(partitionGroupId) && partGroupToLatestSeq.get(partitionGroupId) == seq) { + continue; + } + } + historicalSegments.add(segment); + } + return new WatermarkInductionResult(watermarks, historicalSegments); } /* diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java index 4d6a4aa8da0c..c1cd9776ed1c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java @@ -31,6 +31,8 @@ public class WatermarkInductionResult { private List _watermarks; + public List _historicalSegments; + /** * The @JsonCreator annotation marks this constructor to be used for deserializing * a JSON array back into a WaterMarks object. @@ -38,8 +40,10 @@ public class WatermarkInductionResult { * @param watermarks The list of watermarks. */ @JsonCreator - public WatermarkInductionResult(@JsonProperty("watermarks") List watermarks) { + public WatermarkInductionResult(@JsonProperty("watermarks") List watermarks, + @JsonProperty("historicalSegments") List historicalSegments) { _watermarks = watermarks; + _historicalSegments = historicalSegments; } /** @@ -52,6 +56,11 @@ public List getWatermarks() { return _watermarks; } + @JsonGetter("historicalSegments") + public List getHistoricalSegments() { + return _historicalSegments; + } + /** * Represents a single watermark with its partitionGroupId, sequence, and offset. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java index 18e67c69a10a..5eedf78079db 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java @@ -40,7 +40,8 @@ public enum ControllerJobTypes implements ControllerJobType { RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, - TENANT_REBALANCE; + TENANT_REBALANCE, + TABLE_REPLICATION; private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJobTypes.class); private static final EnumMap ZK_NUM_JOBS_LIMIT = new EnumMap<>(ControllerJobTypes.class); @@ -55,6 +56,7 @@ public static void init(ControllerConf controllerConf) { ZK_NUM_JOBS_LIMIT.put(FORCE_COMMIT, controllerConf.getMaxForceCommitZkJobs()); ZK_NUM_JOBS_LIMIT.put(TABLE_REBALANCE, controllerConf.getMaxTableRebalanceZkJobs()); ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE, controllerConf.getMaxTenantRebalanceZkJobs()); + ZK_NUM_JOBS_LIMIT.put(TABLE_REPLICATION, controllerConf.getMaxTableReplicationZkJobs()); } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java new file mode 100644 index 000000000000..d82f156cae97 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Map; +import org.apache.pinot.controller.api.resources.CopyTablePayload; + +/** + * A no-op segment copier for testing purposes. + */ +public class NoOpSegmentCopier implements SegmentCopier { + + @Override + public void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata) { + // No-op + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java new file mode 100644 index 000000000000..71627fdaab47 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.net.URI; +import java.util.Map; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copies a realtime segment from source to destination. + */ +public class RealtimeSegmentCopier implements SegmentCopier { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentCopier.class); + private static final String SEGMENT_UPLOAD_ENDPOINT_TEMPLATE = "/segments?tableName=%s"; + + private final String _destinationDeepStoreUri; + private final HttpClient _httpClient; + + public RealtimeSegmentCopier(ControllerConf controllerConf) { + this(controllerConf, HttpClient.getInstance()); + } + + public RealtimeSegmentCopier(ControllerConf controllerConf, HttpClient httpClient) { + _destinationDeepStoreUri = controllerConf.getDataDir(); + _httpClient = httpClient; + } + + + /** + * Copies a segment to the destination cluster. + * + * This method performs the following steps: + * 1. Get the source segment URI from ZK metadata. + * 2. Copy the segment from the source deep store to the destination deep store. + * 3. Upload the segment to the destination controller. + * + * @param tableNameWithType Table name with type suffix + * @param segmentName Segment name + * @param copyTablePayload Payload for copying a table + * @param segmentZKMetadata ZK metadata for the segment + */ + @Override + public void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata) { + if (!tableNameWithType.endsWith("_REALTIME")) { + throw new IllegalArgumentException("Table name must end with _REALTIME"); + } + String tableName = tableNameWithType.substring(0, tableNameWithType.lastIndexOf("_REALTIME")); + try { + // 1. Get the the source segment uri + String downloadUrl = segmentZKMetadata.get("segment.download.url"); + if (downloadUrl == null) { + throw new RuntimeException("Download URL not found in segment ZK metadata for segment: " + segmentName); + } + + // 2. Copy the segment from the source deep store to the destination deep store + URI sourceSegmentUri = new URI(downloadUrl); + PinotFS sourcePinotFS = getPinotFS(sourceSegmentUri); + String destSegmentUriStr = _destinationDeepStoreUri + "/" + tableName + "/" + segmentName; + URI destSegmentUri = new URI(destSegmentUriStr); + + PinotFS destPinotFS = getPinotFS(destSegmentUri); + + // TODO: use local file system as an intermediate store to support different file system + if (sourcePinotFS != destPinotFS) { + throw new IllegalArgumentException("Copy files across different file system is not supported"); + } + + if (!destPinotFS.exists(destSegmentUri)) { + sourcePinotFS.copy(sourceSegmentUri, destSegmentUri); + LOGGER.info("Copied segment {} from {} to {}", segmentName, sourceSegmentUri, destSegmentUri); + } else { + LOGGER.info("Segment {} already exists at destination {}", segmentName, destSegmentUri); + } + + // 3. Upload the segment to the destination controller + String payload = "{\"segmentUri\":\"" + destSegmentUriStr + "\"}"; + URI uri = new URI(copyTablePayload.getDestinationClusterUri() + String.format(SEGMENT_UPLOAD_ENDPOINT_TEMPLATE, + tableName)); + SimpleHttpResponse response = + _httpClient.sendJsonPostRequest(uri, payload, copyTablePayload.getDestinationClusterHeaders()); + LOGGER.info("Uploaded segment {} to destination controller, status: {}", segmentName, response.getStatusCode()); + } catch (Exception e) { + LOGGER.error("Caught exception while copying segment {}", segmentName, e); + throw new RuntimeException(e); + } + } + + static String getScheme(URI uri) { + if (uri.getScheme() != null) { + return uri.getScheme(); + } + return PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + + PinotFS getPinotFS(URI uri) { + String scheme = getScheme(uri); + if (!PinotFSFactory.isSchemeSupported(scheme)) { + throw new IllegalArgumentException("File scheme " + scheme + " is not supported."); + } + return PinotFSFactory.create(scheme); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java new file mode 100644 index 000000000000..a656be2524e7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Map; +import org.apache.pinot.controller.api.resources.CopyTablePayload; + + +/** + * An interface to copy a segment to the destination cluster. + */ +public interface SegmentCopier { + + /** + * Copies a segment to the destination cluster. + * @param tableNameWithType Table name with type suffix + * @param segmentName Segment name + * @param copyTablePayload Payload for copying a table + * @param segmentZKMetadata ZK metadata for the segment + */ + void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java new file mode 100644 index 000000000000..88b22e250e51 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +public interface TableReplicationObserver { + + enum Trigger { + START_TRIGGER, + SEGMENT_REPLICATE_COMPLETED_TRIGGER, + SEGMENT_REPLICATE_ERRORED_TRIGGER, + } + + void onTrigger(Trigger trigger, String segmentName); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java new file mode 100644 index 000000000000..c2a223df3625 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.annotation.JsonGetter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Tracks the progress of table replication. + */ +public class TableReplicationProgressStats { + + public enum SegmentStatus { + COMPLETED, + ERROR, + } + + private final AtomicInteger _remainingSegments; + private final BlockingQueue _segmentsFailToCopy = new LinkedBlockingQueue<>(); + + public TableReplicationProgressStats(int segmentSize) { + _remainingSegments = new AtomicInteger(segmentSize); + } + + /** + * Updates the status of a segment and returns the number of remaining segments. + * @param segment The segment name. + * @param status The status of the segment replication. + * @return The number of remaining segments to be replicated. + */ + public int updateSegmentStatus(String segment, SegmentStatus status) { + if (status == SegmentStatus.ERROR) { + _segmentsFailToCopy.add(segment); + } + return _remainingSegments.addAndGet(-1); + } + + @JsonGetter("remainingSegments") + public int getRemainingSegments() { + return _remainingSegments.get(); + } + + @JsonGetter("segmentsFailToCopy") + public List getSegmentsFailToCopy() { + return new ArrayList<>(_segmentsFailToCopy); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java new file mode 100644 index 000000000000..5302f0a73f54 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replicates a table from a source cluster to a destination cluster. + */ +public class TableReplicator { + private static final Logger LOGGER = LoggerFactory.getLogger(TableReplicator.class); + private static final String SEGMENT_ZK_METADATA_ENDPOINT_TEMPLATE = "/segments/%s/zkmetadata"; + + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final ExecutorService _executorService; + private final SegmentCopier _segmentCopier; + private final HttpClient _httpClient; + + public TableReplicator(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService, + SegmentCopier segmentCopier) { + this(pinotHelixResourceManager, executorService, segmentCopier, HttpClient.getInstance()); + } + + public TableReplicator(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService, + SegmentCopier segmentCopier, HttpClient httpClient) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _executorService = executorService; + _segmentCopier = segmentCopier; + _httpClient = httpClient; + } + + /** + * Replicates the table by copying segments from source to destination. + * + * This method performs the following steps: + * 1. Fetch ZK metadata for all segments of the table from the source cluster. + * 2. Register a new controller job in Zookeeper to track the replication progress. + * 3. Initialize a {@link ZkBasedTableReplicationObserver} to update the job status in Zookeeper. + * 4. Submit tasks to the executor service to copy segments in parallel. + * 5. Each task copies a segment and triggers the observer to update the progress. + * + * @param jobId The job ID. + * @param tableNameWithType The table name with type. + * @param copyTablePayload The payload containing the source and destination cluster information. + * @param res The watermark induction result. + * @throws Exception If an error occurs during replication. + */ + public void replicateTable(String jobId, String tableNameWithType, CopyTablePayload copyTablePayload, + WatermarkInductionResult res) + throws Exception { + // TODO: throw IllegalStateException if any previous jobs doesn't expire. + // TODO: replication job canceling mechanism + URI zkMetadataUri = new URI(copyTablePayload.getSourceClusterUri() + + String.format(SEGMENT_ZK_METADATA_ENDPOINT_TEMPLATE, tableNameWithType)); + SimpleHttpResponse zkMetadataResponse = HttpClient.wrapAndThrowHttpException( + _httpClient.sendGetRequest(zkMetadataUri, copyTablePayload.getHeaders())); + String zkMetadataJson = zkMetadataResponse.getResponse(); + Map> zkMetadataMap = + new ObjectMapper().readValue(zkMetadataJson, new TypeReference>>() { + }); + + List segments = new ArrayList<>(zkMetadataMap.keySet()); + long submitTS = System.currentTimeMillis(); + + if (!_pinotHelixResourceManager.addNewSegmentCopyXClusterJob(tableNameWithType, jobId, submitTS, segments)) { + throw new Exception("Failed to add segments to replicated table"); + } + ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableNameWithType, + res.getHistoricalSegments(), _pinotHelixResourceManager); + observer.onTrigger(TableReplicationObserver.Trigger.START_TRIGGER, null); + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(segments); + for (int i = 0; i < 4; i++) { + _executorService.submit(() -> { + while (true) { + String segment = q.poll(); + if (segment == null) { + break; + } + try { + Map segmentZKMetadata = zkMetadataMap.get(segment); + if (segmentZKMetadata == null) { + throw new RuntimeException("Segment ZK metadata not found for segment: " + segment); + } + _segmentCopier.copy(tableNameWithType, segment, copyTablePayload, segmentZKMetadata); + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, segment); + } catch (Exception e) { + LOGGER.error("Caught exception while replicating table segment", e); + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_ERRORED_TRIGGER, segment); + } + } + }); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java new file mode 100644 index 000000000000..ab311a79e000 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Observes the table replication progress and updates the status in Zookeeper. + */ +public class ZkBasedTableReplicationObserver implements TableReplicationObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableReplicationObserver.class); + + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final String _jobId; + private final String _tableNameWithType; + private final TableReplicationProgressStats _progressStats; + private final List _segmentsToCopy; + + public ZkBasedTableReplicationObserver(String jobId, String tableNameWithType, List segmentsToCopy, + PinotHelixResourceManager pinotHelixResourceManager) { + _jobId = jobId; + _tableNameWithType = tableNameWithType; + _segmentsToCopy = segmentsToCopy; + _pinotHelixResourceManager = pinotHelixResourceManager; + _progressStats = new TableReplicationProgressStats(segmentsToCopy.size()); + } + + @Override + public void onTrigger(Trigger trigger, String segmentName) { + switch (trigger) { + // Table + case START_TRIGGER: + break; + case SEGMENT_REPLICATE_COMPLETED_TRIGGER: + // Update progress stats and track in ZK every 100 segments + int remaining = _progressStats.updateSegmentStatus(segmentName, + TableReplicationProgressStats.SegmentStatus.COMPLETED); + if (remaining % 100 == 0) { + trackStatsInZk(); + } + break; + case SEGMENT_REPLICATE_ERRORED_TRIGGER: + // Update progress stats and track in ZK immediately on error + _progressStats.updateSegmentStatus(segmentName, TableReplicationProgressStats.SegmentStatus.ERROR); + trackStatsInZk(); + break; + default: + } + } + + private void trackStatsInZk() { + Map jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REPLICATION.name()); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, _tableNameWithType); + try { + jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED, + JsonUtils.objectToString(_segmentsToCopy)); + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, + JsonUtils.objectToString(_progressStats)); + } catch (JsonProcessingException e) { + LOGGER.error("Error serialising replication stats to JSON for persisting to ZK {}", _jobId, e); + } + _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index f4f876e4a553..101632df7843 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1642,6 +1642,15 @@ public void testGetConsumerWatermarks() helixAdminField.set(_helixResourceManager, spyHelixAdmin); IdealState idealState = new IdealState(realtimeTableName); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 0, 100, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "ONLINE"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 0, 101, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "CONSUMING"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 1, 199, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "ONLINE"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 1, 200, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "CONSUMING"); + doReturn(idealState).when(spyHelixAdmin).getResourceIdealState(any(), eq(realtimeTableName)); // Test happy path @@ -1663,6 +1672,17 @@ public void testGetConsumerWatermarks() assertEquals(inProgressWatermark.getSequenceNumber(), 200L); assertEquals(inProgressWatermark.getOffset(), 789L); + List historicalSegments = waterMarkInductionResult.getHistoricalSegments(); + assertEquals(historicalSegments.size(), 2); + for (String segment : historicalSegments) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); + if (llcSegmentName.getPartitionGroupId() == 0) { + assertEquals(llcSegmentName.getSequenceNumber(), 100); + } else { + assertEquals(llcSegmentName.getSequenceNumber(), 199); + } + } + // recover the original values helixAdminField.set(_helixResourceManager, originalHelixAdmin); llcManagerField.set(_helixResourceManager, originalLlcManager); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java new file mode 100644 index 000000000000..650511fdca5e --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RealtimeSegmentCopierTest { + + @Mock + private ControllerConf _controllerConf; + @Mock + private HttpClient _httpClient; + @Mock + private PinotFS _pinotFS; + + private RealtimeSegmentCopier _copier; + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + when(_controllerConf.getDataDir()).thenReturn("hdfs://data"); + _copier = spy(new RealtimeSegmentCopier(_controllerConf, _httpClient)); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testCopy() throws Exception { + String tableNameWithType = "table1_REALTIME"; + String segmentName = "seg1"; + CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + + Map metadata = new HashMap<>(); + metadata.put("segment.download.url", "hdfs://src/data/seg1"); + + doReturn(_pinotFS).when(_copier).getPinotFS(any(URI.class)); + when(_pinotFS.exists(any(URI.class))).thenReturn(false); + + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getStatusCode()).thenReturn(200); + when(_httpClient.sendJsonPostRequest(any(URI.class), anyString(), anyMap())).thenReturn(response); + + _copier.copy(tableNameWithType, segmentName, payload, metadata); + + verify(_pinotFS).copy(any(URI.class), any(URI.class)); + verify(_httpClient).sendJsonPostRequest(any(URI.class), anyString(), anyMap()); + } + + @Test + public void testCopyExisting() throws Exception { + String tableNameWithType = "table1_REALTIME"; + String segmentName = "seg1"; + CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + + Map metadata = new HashMap<>(); + metadata.put("segment.download.url", "hdfs://src/data/seg1"); + + doReturn(_pinotFS).when(_copier).getPinotFS(any(URI.class)); + when(_pinotFS.exists(any(URI.class))).thenReturn(true); + + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getStatusCode()).thenReturn(200); + when(_httpClient.sendJsonPostRequest(any(URI.class), anyString(), anyMap())).thenReturn(response); + + _copier.copy(tableNameWithType, segmentName, payload, metadata); + + verify(_pinotFS, never()).copy(any(URI.class), any(URI.class)); + verify(_httpClient).sendJsonPostRequest(any(URI.class), anyString(), anyMap()); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java new file mode 100644 index 000000000000..4dcd259479ae --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TableReplicationProgressStatsTest { + + @Test + public void testStats() { + TableReplicationProgressStats stats = new TableReplicationProgressStats(10); + Assert.assertEquals(stats.getRemainingSegments(), 10); + Assert.assertTrue(stats.getSegmentsFailToCopy().isEmpty()); + + stats.updateSegmentStatus("seg1", TableReplicationProgressStats.SegmentStatus.COMPLETED); + Assert.assertEquals(stats.getRemainingSegments(), 9); + Assert.assertTrue(stats.getSegmentsFailToCopy().isEmpty()); + + stats.updateSegmentStatus("seg2", TableReplicationProgressStats.SegmentStatus.ERROR); + Assert.assertEquals(stats.getRemainingSegments(), 8); + List failedSegments = stats.getSegmentsFailToCopy(); + Assert.assertEquals(failedSegments.size(), 1); + Assert.assertEquals(failedSegments.get(0), "seg2"); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java new file mode 100644 index 000000000000..cff490c7962f --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TableReplicatorTest { + + @Mock + private PinotHelixResourceManager _pinotHelixResourceManager; + @Mock + private ExecutorService _executorService; + @Mock + private SegmentCopier _segmentCopier; + @Mock + private HttpClient _httpClient; + + private TableReplicator _tableReplicator; + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + _tableReplicator = new TableReplicator(_pinotHelixResourceManager, _executorService, _segmentCopier, _httpClient); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testReplicateTable() throws Exception { + String jobId = "job1"; + String tableName = "table1_REALTIME"; + String sourceClusterUri = "http://localhost:9000"; + CopyTablePayload copyTablePayload = new CopyTablePayload(sourceClusterUri, Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + + WatermarkInductionResult watermarkInductionResult = new WatermarkInductionResult(Collections.emptyList(), + Arrays.asList("seg1", "seg2")); + + // Mock HttpClient response for ZK metadata + Map> zkMetadataMap = new HashMap<>(); + Map seg1Metadata = new HashMap<>(); + seg1Metadata.put("k1", "v1"); + zkMetadataMap.put("seg1", seg1Metadata); + + Map seg2Metadata = new HashMap<>(); + seg2Metadata.put("k2", "v2"); + zkMetadataMap.put("seg2", seg2Metadata); + + String zkMetadataJson = new ObjectMapper().writeValueAsString(zkMetadataMap); + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getResponse()).thenReturn(zkMetadataJson); + when(response.getStatusCode()).thenReturn(200); + + when(_httpClient.sendGetRequest(any(URI.class), anyMap())).thenReturn(response); + when(_pinotHelixResourceManager.addNewSegmentCopyXClusterJob(anyString(), anyString(), anyLong(), anyList())) + .thenReturn(true); + + _tableReplicator.replicateTable(jobId, tableName, copyTablePayload, watermarkInductionResult); + + ArgumentCaptor segmentsCaptor = ArgumentCaptor.forClass(List.class); + verify(_pinotHelixResourceManager).addNewSegmentCopyXClusterJob(eq(tableName), eq(jobId), anyLong(), + segmentsCaptor.capture()); + + List capturedSegments = segmentsCaptor.getValue(); + Assert.assertEquals(capturedSegments.size(), 2); + Assert.assertTrue(capturedSegments.contains("seg1")); + Assert.assertTrue(capturedSegments.contains("seg2")); + + verify(_executorService, times(4)).submit(any(Runnable.class)); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java new file mode 100644 index 000000000000..10a6db0df77a --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class ZkBasedTableReplicationObserverTest { + + @Mock + private PinotHelixResourceManager _pinotHelixResourceManager; + + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testObserver() { + String jobId = "job1"; + String tableName = "table1"; + List segments = Arrays.asList("seg1", "seg2", "seg3"); + + ZkBasedTableReplicationObserver observer = + new ZkBasedTableReplicationObserver(jobId, tableName, segments, _pinotHelixResourceManager); + + // Trigger completion (1st segment) - no ZK update (only every 100 or error) + // Total 3. remaining starts at 3. + // complete seg1 -> remaining 2. 2 % 100 != 0. + // complete seg2 -> remaining 1. + // complete seg3 -> remaining 0. 0 % 100 == 0 -> ZK update. + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg1"); + verify(_pinotHelixResourceManager, never()).addControllerJobToZK(anyString(), anyMap(), any()); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg2"); + verify(_pinotHelixResourceManager, never()).addControllerJobToZK(anyString(), anyMap(), any()); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg3"); + + ArgumentCaptor> metadataCaptor = ArgumentCaptor.forClass(Map.class); + verify(_pinotHelixResourceManager).addControllerJobToZK(eq(jobId), metadataCaptor.capture(), + eq(ControllerJobTypes.TABLE_REPLICATION)); + + Map metadata = metadataCaptor.getValue(); + Assert.assertEquals(metadata.get(CommonConstants.ControllerJob.JOB_ID), jobId); + Assert.assertEquals(metadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE), tableName); + } + + @Test + public void testObserverError() { + String jobId = "job1"; + String tableName = "table1"; + List segments = Arrays.asList("seg1"); + + ZkBasedTableReplicationObserver observer = + new ZkBasedTableReplicationObserver(jobId, tableName, segments, _pinotHelixResourceManager); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_ERRORED_TRIGGER, "seg1"); + + verify(_pinotHelixResourceManager).addControllerJobToZK(eq(jobId), anyMap(), + eq(ControllerJobTypes.TABLE_REPLICATION)); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index b76754e50b59..3b93423d0dce 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1645,6 +1645,8 @@ public static class ControllerJob { public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted"; public static final String NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED = "numberOfSegmentsYetToBeCommitted"; + // Table Replication job props + public static final String SEGMENTS_TO_BE_COPIED = "segmentsToBeCopied"; } // prefix for scheduler related features, e.g. query accountant From 235e4cd0f69eace6619f75aded8c93c24c3cb508 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Sun, 14 Dec 2025 06:33:09 +0000 Subject: [PATCH 04/16] persist watermarks in zk; record number of segments to copy instead of the actual list; add job status in zk --- .../helix/core/PinotHelixResourceManager.java | 18 +++++++-- .../replication/RealtimeSegmentCopier.java | 17 +++++---- .../core/replication/TableReplicator.java | 12 ++++-- .../ZkBasedTableReplicationObserver.java | 37 ++++++++++--------- .../core/replication/TableReplicatorTest.java | 13 +++---- .../ZkBasedTableReplicationObserverTest.java | 8 +++- .../pinot/spi/utils/CommonConstants.java | 3 ++ 7 files changed, 67 insertions(+), 41 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ce61ad4d4fbb..cf2ccf97f37f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2519,8 +2519,16 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.FORCE_COMMIT); } - public boolean addNewSegmentCopyXClusterJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, - List consumingSegmentsCommitted) + public boolean addNewTableReplicationJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + WatermarkInductionResult res) + throws JsonProcessingException { + Map jobMetadata = + commonTableReplicationJobMetadata(tableNameWithType, jobId, jobSubmissionTimeMs, res); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + } + + public Map commonTableReplicationJobMetadata(String tableNameWithType, String jobId, + long jobSubmissionTimeMs, WatermarkInductionResult res) throws JsonProcessingException { Map jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); @@ -2528,8 +2536,10 @@ public boolean addNewSegmentCopyXClusterJob(String tableNameWithType, String job jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED, - JsonUtils.objectToString(consumingSegmentsCommitted)); - return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + Integer.toString(res.getHistoricalSegments().size())); + jobMetadata.put(CommonConstants.ControllerJob.CONSUMER_WATERMARKS, JsonUtils.objectToString(res.getWatermarks())); + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "IN_PROGRESS"); + return jobMetadata; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java index 71627fdaab47..cdfcc02a210d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -77,11 +77,12 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload } // 2. Copy the segment from the source deep store to the destination deep store - URI sourceSegmentUri = new URI(downloadUrl); - PinotFS sourcePinotFS = getPinotFS(sourceSegmentUri); String destSegmentUriStr = _destinationDeepStoreUri + "/" + tableName + "/" + segmentName; + LOGGER.info("[copyTable] Copying segment: {} from url: {} to destination: {}", segmentName, downloadUrl, + destSegmentUriStr); + URI sourceSegmentUri = new URI(downloadUrl); URI destSegmentUri = new URI(destSegmentUriStr); - + PinotFS sourcePinotFS = getPinotFS(sourceSegmentUri); PinotFS destPinotFS = getPinotFS(destSegmentUri); // TODO: use local file system as an intermediate store to support different file system @@ -91,20 +92,22 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload if (!destPinotFS.exists(destSegmentUri)) { sourcePinotFS.copy(sourceSegmentUri, destSegmentUri); - LOGGER.info("Copied segment {} from {} to {}", segmentName, sourceSegmentUri, destSegmentUri); + LOGGER.info("[copyTable] Copied segment {} from {} to {}", segmentName, sourceSegmentUri, destSegmentUri); } else { - LOGGER.info("Segment {} already exists at destination {}", segmentName, destSegmentUri); + LOGGER.info("[copyTable] Segment {} already exists at destination {}", segmentName, destSegmentUri); } // 3. Upload the segment to the destination controller String payload = "{\"segmentUri\":\"" + destSegmentUriStr + "\"}"; + LOGGER.info("[copyTable] Uploading segment {} to destination controller, payload: {}", segmentName, payload); URI uri = new URI(copyTablePayload.getDestinationClusterUri() + String.format(SEGMENT_UPLOAD_ENDPOINT_TEMPLATE, tableName)); SimpleHttpResponse response = _httpClient.sendJsonPostRequest(uri, payload, copyTablePayload.getDestinationClusterHeaders()); - LOGGER.info("Uploaded segment {} to destination controller, status: {}", segmentName, response.getStatusCode()); + LOGGER.info("[copyTable] Uploaded segment {} to destination controller, status: {}", segmentName, + response.getStatusCode()); } catch (Exception e) { - LOGGER.error("Caught exception while copying segment {}", segmentName, e); + LOGGER.error("[copyTable] Caught exception while copying segment {}", segmentName, e); throw new RuntimeException(e); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java index 5302f0a73f54..4e722187ba4b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java @@ -80,6 +80,7 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl throws Exception { // TODO: throw IllegalStateException if any previous jobs doesn't expire. // TODO: replication job canceling mechanism + LOGGER.info("[copyTable] Start replicating table: {} with jobId: {}", tableNameWithType, jobId); URI zkMetadataUri = new URI(copyTablePayload.getSourceClusterUri() + String.format(SEGMENT_ZK_METADATA_ENDPOINT_TEMPLATE, tableNameWithType)); SimpleHttpResponse zkMetadataResponse = HttpClient.wrapAndThrowHttpException( @@ -88,15 +89,16 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl Map> zkMetadataMap = new ObjectMapper().readValue(zkMetadataJson, new TypeReference>>() { }); + LOGGER.info("[copyTable] Fetched ZK metadata for {} segments", zkMetadataMap.size()); - List segments = new ArrayList<>(zkMetadataMap.keySet()); + List segments = new ArrayList<>(res.getHistoricalSegments()); long submitTS = System.currentTimeMillis(); - if (!_pinotHelixResourceManager.addNewSegmentCopyXClusterJob(tableNameWithType, jobId, submitTS, segments)) { + if (!_pinotHelixResourceManager.addNewTableReplicationJob(tableNameWithType, jobId, submitTS, res)) { throw new Exception("Failed to add segments to replicated table"); } - ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableNameWithType, - res.getHistoricalSegments(), _pinotHelixResourceManager); + ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableNameWithType, res, + _pinotHelixResourceManager); observer.onTrigger(TableReplicationObserver.Trigger.START_TRIGGER, null); ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(segments); for (int i = 0; i < 4; i++) { @@ -107,6 +109,7 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl break; } try { + LOGGER.info("[copyTable] Starting to copy segment: {} for table: {}", segment, tableNameWithType); Map segmentZKMetadata = zkMetadataMap.get(segment); if (segmentZKMetadata == null) { throw new RuntimeException("Segment ZK metadata not found for segment: " + segment); @@ -120,5 +123,6 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl } }); } + LOGGER.info("[copyTable] Submitted replication tasks to executor service for job: {}", jobId); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java index ab311a79e000..ccdb714ce9c0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java @@ -20,12 +20,11 @@ package org.apache.pinot.controller.helix.core.replication; import com.fasterxml.jackson.core.JsonProcessingException; -import java.util.HashMap; -import java.util.List; +import com.fasterxml.jackson.databind.JsonNode; import java.util.Map; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; @@ -41,15 +40,15 @@ public class ZkBasedTableReplicationObserver implements TableReplicationObserver private final String _jobId; private final String _tableNameWithType; private final TableReplicationProgressStats _progressStats; - private final List _segmentsToCopy; + private final WatermarkInductionResult _res; - public ZkBasedTableReplicationObserver(String jobId, String tableNameWithType, List segmentsToCopy, + public ZkBasedTableReplicationObserver(String jobId, String tableNameWithType, WatermarkInductionResult res, PinotHelixResourceManager pinotHelixResourceManager) { _jobId = jobId; _tableNameWithType = tableNameWithType; - _segmentsToCopy = segmentsToCopy; + _res = res; _pinotHelixResourceManager = pinotHelixResourceManager; - _progressStats = new TableReplicationProgressStats(segmentsToCopy.size()); + _progressStats = new TableReplicationProgressStats(res.getHistoricalSegments().size()); } @Override @@ -57,6 +56,8 @@ public void onTrigger(Trigger trigger, String segmentName) { switch (trigger) { // Table case START_TRIGGER: + // in case of zero segments to be copied, track stats in ZK + trackStatsInZk(); break; case SEGMENT_REPLICATE_COMPLETED_TRIGGER: // Update progress stats and track in ZK every 100 segments @@ -76,19 +77,21 @@ public void onTrigger(Trigger trigger, String segmentName) { } private void trackStatsInZk() { - Map jobMetadata = new HashMap<>(); - jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REPLICATION.name()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, _tableNameWithType); + LOGGER.info("[copyTable] Tracking replication stats in ZK for job: {}", _jobId); try { - jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED, - JsonUtils.objectToString(_segmentsToCopy)); - jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, - JsonUtils.objectToString(_progressStats)); + Map jobMetadata = _pinotHelixResourceManager + .commonTableReplicationJobMetadata(_tableNameWithType, _jobId, System.currentTimeMillis(), _res); + String progress = JsonUtils.objectToString(_progressStats); + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_PROGRESS, progress); + int remaining = JsonUtils.stringToObject(progress, JsonNode.class).get("remainingSegments").asInt(); + if (remaining == 0) { + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "COMPLETED"); + } else { + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "IN_PROGRESS"); + } + _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); } catch (JsonProcessingException e) { LOGGER.error("Error serialising replication stats to JSON for persisting to ZK {}", _jobId, e); } - _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java index cff490c7962f..2b40b8b1ef4d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java @@ -40,7 +40,6 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyMap; import static org.mockito.Mockito.anyString; @@ -102,16 +101,16 @@ public void testReplicateTable() throws Exception { when(response.getStatusCode()).thenReturn(200); when(_httpClient.sendGetRequest(any(URI.class), anyMap())).thenReturn(response); - when(_pinotHelixResourceManager.addNewSegmentCopyXClusterJob(anyString(), anyString(), anyLong(), anyList())) - .thenReturn(true); + when(_pinotHelixResourceManager.addNewTableReplicationJob(anyString(), anyString(), anyLong(), + any(WatermarkInductionResult.class))).thenReturn(true); _tableReplicator.replicateTable(jobId, tableName, copyTablePayload, watermarkInductionResult); - ArgumentCaptor segmentsCaptor = ArgumentCaptor.forClass(List.class); - verify(_pinotHelixResourceManager).addNewSegmentCopyXClusterJob(eq(tableName), eq(jobId), anyLong(), - segmentsCaptor.capture()); + ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(WatermarkInductionResult.class); + verify(_pinotHelixResourceManager).addNewTableReplicationJob(eq(tableName), eq(jobId), anyLong(), + resultCaptor.capture()); - List capturedSegments = segmentsCaptor.getValue(); + List capturedSegments = resultCaptor.getValue().getHistoricalSegments(); Assert.assertEquals(capturedSegments.size(), 2); Assert.assertTrue(capturedSegments.contains("seg1")); Assert.assertTrue(capturedSegments.contains("seg2")); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java index 10a6db0df77a..040c5273a55d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java @@ -19,9 +19,11 @@ package org.apache.pinot.controller.helix.core.replication; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.ArgumentCaptor; @@ -61,9 +63,10 @@ public void testObserver() { String jobId = "job1"; String tableName = "table1"; List segments = Arrays.asList("seg1", "seg2", "seg3"); + WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); ZkBasedTableReplicationObserver observer = - new ZkBasedTableReplicationObserver(jobId, tableName, segments, _pinotHelixResourceManager); + new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); // Trigger completion (1st segment) - no ZK update (only every 100 or error) // Total 3. remaining starts at 3. @@ -93,9 +96,10 @@ public void testObserverError() { String jobId = "job1"; String tableName = "table1"; List segments = Arrays.asList("seg1"); + WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); ZkBasedTableReplicationObserver observer = - new ZkBasedTableReplicationObserver(jobId, tableName, segments, _pinotHelixResourceManager); + new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_ERRORED_TRIGGER, "seg1"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3b93423d0dce..3cba326da3d1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1647,6 +1647,9 @@ public static class ControllerJob { public static final String NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED = "numberOfSegmentsYetToBeCommitted"; // Table Replication job props public static final String SEGMENTS_TO_BE_COPIED = "segmentsToBeCopied"; + public static final String CONSUMER_WATERMARKS = "consumerWatermarks"; + public static final String REPLICATION_PROGRESS = "replicationProgress"; + public static final String REPLICATION_JOB_STATUS = "replicationJobStatus"; } // prefix for scheduler related features, e.g. query accountant From 340d4682a3f286b608f3ff32c81703d435c53bd0 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Mon, 15 Dec 2025 07:18:33 +0000 Subject: [PATCH 05/16] make WatermarkInductionResult fields private --- .../pinot/controller/helix/core/WatermarkInductionResult.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java index c1cd9776ed1c..2c7dadbf5b38 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java @@ -31,7 +31,7 @@ public class WatermarkInductionResult { private List _watermarks; - public List _historicalSegments; + private List _historicalSegments; /** * The @JsonCreator annotation marks this constructor to be used for deserializing From 673974fda2f2652ccc0e9423986a29f0ce3bdd8c Mon Sep 17 00:00:00 2001 From: xuanyili Date: Mon, 15 Dec 2025 09:22:41 +0000 Subject: [PATCH 06/16] segment v2 push request with retry policy --- .../api/resources/CopyTablePayload.java | 2 +- .../replication/RealtimeSegmentCopier.java | 70 ++++++++++++++++--- 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java index 6cadb3a0587d..30fbf82d1a98 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java @@ -83,7 +83,7 @@ public Map getHeaders() { @JsonGetter("destinationClusterUri") public String getDestinationClusterUri() { - return _sourceClusterUri; + return _destinationClusterUri; } @JsonGetter("destinationClusterHeaders") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java index cdfcc02a210d..5c892c635291 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -19,13 +19,22 @@ package org.apache.pinot.controller.helix.core.replication; import java.net.URI; +import java.util.Collections; import java.util.Map; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.resources.CopyTablePayload; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,27 +100,70 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload } if (!destPinotFS.exists(destSegmentUri)) { - sourcePinotFS.copy(sourceSegmentUri, destSegmentUri); + if (!destPinotFS.copy(sourceSegmentUri, destSegmentUri)) { + throw new RuntimeException("Failed to copy segment " + segmentName + " from " + downloadUrl + " to " + + destSegmentUriStr); + } LOGGER.info("[copyTable] Copied segment {} from {} to {}", segmentName, sourceSegmentUri, destSegmentUri); } else { LOGGER.info("[copyTable] Segment {} already exists at destination {}", segmentName, destSegmentUri); } // 3. Upload the segment to the destination controller - String payload = "{\"segmentUri\":\"" + destSegmentUriStr + "\"}"; - LOGGER.info("[copyTable] Uploading segment {} to destination controller, payload: {}", segmentName, payload); - URI uri = new URI(copyTablePayload.getDestinationClusterUri() + String.format(SEGMENT_UPLOAD_ENDPOINT_TEMPLATE, - tableName)); - SimpleHttpResponse response = - _httpClient.sendJsonPostRequest(uri, payload, copyTablePayload.getDestinationClusterHeaders()); - LOGGER.info("[copyTable] Uploaded segment {} to destination controller, status: {}", segmentName, - response.getStatusCode()); + LOGGER.info("[copyTable] Uploading segment {} to destination controller", segmentName); + String dstControllerURIStr = copyTablePayload.getDestinationClusterUri(); + URI segmentPushURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(dstControllerURIStr)); + + // TODO: Refactor SegmentPushUtils.java and FileUploadDownloadClient to dedup code + RetryPolicies.exponentialBackoffRetryPolicy(1, 5000, 5).attempt(() -> { + try { + SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException( + _httpClient.sendRequest( + getSendSegmentUriRequest(segmentPushURI, destSegmentUriStr, + copyTablePayload.getDestinationClusterHeaders(), tableName), + HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)); + LOGGER.info("[copyTable] Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, + destSegmentUriStr, dstControllerURIStr, response.getStatusCode(), + response.getResponse()); + return true; + } catch (HttpErrorStatusException e) { + int statusCode = e.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("[copyTable] Caught temporary error when pushing table: {} segment uri: {} to {}, will retry", + tableName, destSegmentUriStr, dstControllerURIStr, e); + return false; + } else { + // Permanent exception + LOGGER.error("[copyTable] Caught permanent error when pushing table: {} segment uri: {} to {}, won't retry", + tableName, destSegmentUriStr, dstControllerURIStr, e); + throw e; + } + } + }); } catch (Exception e) { LOGGER.error("[copyTable] Caught exception while copying segment {}", segmentName, e); throw new RuntimeException(e); } } + static ClassicHttpRequest getSendSegmentUriRequest(URI uri, String downloadUri, + Map headers, String tableNameWithoutType) { + ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) + .setHeader( + FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()) + .setHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, downloadUri) + .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE); + + for (Map.Entry pair: headers.entrySet()) { + requestBuilder.setHeader(pair.getKey(), pair.getValue()); + } + + HttpClient.addHeadersAndParameters(requestBuilder, Collections.emptyList(), Collections.singletonList( + new BasicNameValuePair("tableName", tableNameWithoutType))); + return requestBuilder.build(); + } + static String getScheme(URI uri) { if (uri.getScheme() != null) { return uri.getScheme(); From 997de4ef983a67eafb72c31565c6a7b43a2b9487 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Sun, 14 Dec 2025 06:48:09 +0000 Subject: [PATCH 07/16] add endpoint to fetch job status --- .../api/resources/PinotTableRestletResource.java | 12 ++++++++++++ .../java/org/apache/pinot/core/auth/Actions.java | 1 + 2 files changed, 13 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 9d934c4d1841..4d61c6939588 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -415,6 +415,18 @@ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, CopyTab } } + @GET + @Path("/tables/copyStatus/{jobId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TABLE_COPY_STATUS) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted table replication job", + notes = "Get status for a submitted table replication job") + public JsonNode getForceCommitJobStatus( + @ApiParam(value = "job id", required = true) @PathParam("jobId") String id) { + return JsonUtils.objectToJsonNode( + _pinotHelixResourceManager.getControllerJobZKMetadata(id, ControllerJobTypes.TABLE_REPLICATION)); + } + @PUT @Produces(MediaType.APPLICATION_JSON) @Path("/tables/recommender") diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index b8fc93363dfd..2c4108dbfd3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -105,6 +105,7 @@ public static class Cluster { public static final String DELETE_QUERY_WORKLOAD_CONFIG = "DeleteQueryWorkloadConfig"; public static final String GET_GROOVY_STATIC_ANALYZER_CONFIG = "GetGroovyStaticAnalyzerConfig"; public static final String UPDATE_GROOVY_STATIC_ANALYZER_CONFIG = "UpdateGroovyStaticAnalyzerConfig"; + public static final String GET_TABLE_COPY_STATUS = "GetTableCopyStatus"; } // Action names for table From 3e0ecc2a75bea2a8730225b3a2b7978340902917 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Mon, 15 Dec 2025 20:47:06 +0000 Subject: [PATCH 08/16] add tableName as a path parameter --- .../core/replication/RealtimeSegmentCopier.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java index 5c892c635291..d86942e6d223 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -19,13 +19,12 @@ package org.apache.pinot.controller.helix.core.replication; import java.net.URI; -import java.util.Collections; +import java.net.URISyntaxException; import java.util.Map; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; -import org.apache.hc.core5.http.message.BasicNameValuePair; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; @@ -112,14 +111,13 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload // 3. Upload the segment to the destination controller LOGGER.info("[copyTable] Uploading segment {} to destination controller", segmentName); String dstControllerURIStr = copyTablePayload.getDestinationClusterUri(); - URI segmentPushURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(dstControllerURIStr)); // TODO: Refactor SegmentPushUtils.java and FileUploadDownloadClient to dedup code RetryPolicies.exponentialBackoffRetryPolicy(1, 5000, 5).attempt(() -> { try { SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException( _httpClient.sendRequest( - getSendSegmentUriRequest(segmentPushURI, destSegmentUriStr, + getSendSegmentUriRequest(dstControllerURIStr, destSegmentUriStr, copyTablePayload.getDestinationClusterHeaders(), tableName), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)); LOGGER.info("[copyTable] Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, @@ -147,20 +145,17 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload } } - static ClassicHttpRequest getSendSegmentUriRequest(URI uri, String downloadUri, - Map headers, String tableNameWithoutType) { - ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) + static ClassicHttpRequest getSendSegmentUriRequest(String controllerUriStr, String downloadUri, + Map headers, String tableNameWithoutType) throws URISyntaxException { + URI segmentPushURI = new URI(controllerUriStr + "?tableName=" + tableNameWithoutType); + ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(segmentPushURI).setVersion(HttpVersion.HTTP_1_1) .setHeader( FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()) .setHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, downloadUri) .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE); - for (Map.Entry pair: headers.entrySet()) { requestBuilder.setHeader(pair.getKey(), pair.getValue()); } - - HttpClient.addHeadersAndParameters(requestBuilder, Collections.emptyList(), Collections.singletonList( - new BasicNameValuePair("tableName", tableNameWithoutType))); return requestBuilder.build(); } From 15ea8ec555779d221c6e883727db725a57b1bffd Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 16 Dec 2025 01:25:13 +0000 Subject: [PATCH 09/16] forget to add v2/segments path --- .../helix/core/replication/RealtimeSegmentCopier.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java index d86942e6d223..d421a999b7c5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -147,7 +147,7 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload static ClassicHttpRequest getSendSegmentUriRequest(String controllerUriStr, String downloadUri, Map headers, String tableNameWithoutType) throws URISyntaxException { - URI segmentPushURI = new URI(controllerUriStr + "?tableName=" + tableNameWithoutType); + URI segmentPushURI = new URI(controllerUriStr + "/v2/segments?tableName=" + tableNameWithoutType); ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(segmentPushURI).setVersion(HttpVersion.HTTP_1_1) .setHeader( FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()) From 382d8cc65def4befc75fe53469ca3df8ad8d39b4 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 16 Dec 2025 04:37:19 +0000 Subject: [PATCH 10/16] need to specify the table type for REALTIME --- .../helix/core/replication/RealtimeSegmentCopier.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java index d421a999b7c5..52c99c550fe0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -118,7 +118,7 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException( _httpClient.sendRequest( getSendSegmentUriRequest(dstControllerURIStr, destSegmentUriStr, - copyTablePayload.getDestinationClusterHeaders(), tableName), + copyTablePayload.getDestinationClusterHeaders(), tableName, "REALTIME"), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)); LOGGER.info("[copyTable] Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, destSegmentUriStr, dstControllerURIStr, response.getStatusCode(), @@ -146,8 +146,9 @@ public void copy(String tableNameWithType, String segmentName, CopyTablePayload } static ClassicHttpRequest getSendSegmentUriRequest(String controllerUriStr, String downloadUri, - Map headers, String tableNameWithoutType) throws URISyntaxException { - URI segmentPushURI = new URI(controllerUriStr + "/v2/segments?tableName=" + tableNameWithoutType); + Map headers, String tableNameWithoutType, String tableType) throws URISyntaxException { + URI segmentPushURI = new URI(controllerUriStr + "/v2/segments?tableName=" + tableNameWithoutType + "&tableType=" + + tableType); ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(segmentPushURI).setVersion(HttpVersion.HTTP_1_1) .setHeader( FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()) From 65c86f4595687cb9e053b5fcd6d08308639929d3 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 23 Jan 2026 18:31:32 +0000 Subject: [PATCH 11/16] fix the compilation error due to long -> int change in previous PR --- .../controller/helix/core/PinotHelixResourceManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index cf2ccf97f37f..9407643ba25a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -4955,14 +4955,14 @@ public WatermarkInductionResult getConsumerWatermarks(String tableName) throws T return new WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq, startOffset); }).collect(Collectors.toList()); - Map partGroupToLatestSeq = watermarks.stream().collect( + Map partGroupToLatestSeq = watermarks.stream().collect( Collectors.toMap(WatermarkInductionResult.Watermark::getPartitionGroupId, WatermarkInductionResult.Watermark::getSequenceNumber)); List historicalSegments = new ArrayList<>(); for (String segment : idealState.getRecord().getMapFields().keySet()) { LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); if (llcSegmentName != null) { - long partitionGroupId = llcSegmentName.getPartitionGroupId(); + int partitionGroupId = llcSegmentName.getPartitionGroupId(); int seq = llcSegmentName.getSequenceNumber(); if (partGroupToLatestSeq.containsKey(partitionGroupId) && partGroupToLatestSeq.get(partitionGroupId) == seq) { continue; From 302eec493223fdc9590f60da35e139f00dbd5f0b Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 23 Jan 2026 18:46:35 +0000 Subject: [PATCH 12/16] the claude code fails to pick up the important integration of table replicator --- .../api/resources/PinotTableRestletResource.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 4d61c6939588..02422439ba3e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -108,6 +109,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.replication.TableReplicator; import org.apache.pinot.controller.recommender.RecommenderDriver; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; import org.apache.pinot.controller.util.CompletionServiceHelper; @@ -205,6 +207,9 @@ public class PinotTableRestletResource { @Inject HttpClientConnectionManager _connectionManager; + @Inject + TableReplicator _tableReplicator; + /** * API to create a table. Before adding, validations will be done (min number of replicas, checking offline and * realtime table configs match, checking for tenants existing). @@ -372,6 +377,9 @@ public CopyTableResponse copyTable( response.setTableConfig(realtimeTableConfig); response.setWatermarkInductionResult(watermarkInductionResult); } + String jobID = UUID.randomUUID().toString(); + _tableReplicator.replicateTable(jobID, realtimeTableConfig.getTableName(), copyTablePayload, + watermarkInductionResult); return response; } catch (Exception e) { LOGGER.error("[copyTable] Error copying table: {}", tableName, e); From 741c6aec3b1385421866d4650036b69150fcfce9 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 23 Jan 2026 20:01:55 +0000 Subject: [PATCH 13/16] make parallism tunable --- .../controller/api/resources/CopyTablePayload.java | 10 +++++++++- .../helix/core/replication/TableReplicator.java | 5 ++++- .../helix/core/replication/TableReplicatorTest.java | 9 ++++++--- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java index 30fbf82d1a98..e46eeed8b42e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java @@ -46,6 +46,7 @@ public class CopyTablePayload { * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE. */ private String _serverTenant; + private Integer _backfillParallism; /** * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. We will use this field to let user specify @@ -61,7 +62,8 @@ public CopyTablePayload( @JsonProperty(value = "destinationClusterHeaders") Map destinationClusterHeaders, @JsonProperty(value = "brokerTenant", required = true) String brokerTenant, @JsonProperty(value = "serverTenant", required = true) String serverTenant, - @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap) { + @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap, + @JsonProperty("backfillParallism") @Nullable Integer backfillParallism) { _sourceClusterUri = sourceClusterUri; _headers = headers; _destinationClusterUri = destinationClusterUri; @@ -69,6 +71,7 @@ public CopyTablePayload( _brokerTenant = brokerTenant; _serverTenant = serverTenant; _tagPoolReplacementMap = tagPoolReplacementMap; + _backfillParallism = backfillParallism; } @JsonGetter("sourceClusterUri") @@ -101,6 +104,11 @@ public String getServerTenant() { return _serverTenant; } + @JsonGetter("backfillParallism") + public Integer getBackfillParallism() { + return _backfillParallism; + } + @JsonGetter("tagPoolReplacementMap") public Map getTagPoolReplacementMap() { return _tagPoolReplacementMap; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java index 4e722187ba4b..dbabd32c1a32 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java @@ -101,7 +101,10 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl _pinotHelixResourceManager); observer.onTrigger(TableReplicationObserver.Trigger.START_TRIGGER, null); ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(segments); - for (int i = 0; i < 4; i++) { + int parallelism = copyTablePayload.getBackfillParallism() != null + ? copyTablePayload.getBackfillParallism() + : res.getWatermarks().size(); + for (int i = 0; i < parallelism; i++) { _executorService.submit(() -> { while (true) { String segment = q.poll(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java index 2b40b8b1ef4d..947ddcf29a22 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java @@ -80,9 +80,12 @@ public void testReplicateTable() throws Exception { String tableName = "table1_REALTIME"; String sourceClusterUri = "http://localhost:9000"; CopyTablePayload copyTablePayload = new CopyTablePayload(sourceClusterUri, Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); - WatermarkInductionResult watermarkInductionResult = new WatermarkInductionResult(Collections.emptyList(), + List watermarks = Arrays.asList( + new WatermarkInductionResult.Watermark(0, 10, 100L), + new WatermarkInductionResult.Watermark(1, 11, 110L)); + WatermarkInductionResult watermarkInductionResult = new WatermarkInductionResult(watermarks, Arrays.asList("seg1", "seg2")); // Mock HttpClient response for ZK metadata @@ -115,6 +118,6 @@ public void testReplicateTable() throws Exception { Assert.assertTrue(capturedSegments.contains("seg1")); Assert.assertTrue(capturedSegments.contains("seg2")); - verify(_executorService, times(4)).submit(any(Runnable.class)); + verify(_executorService, times(watermarks.size())).submit(any(Runnable.class)); } } From dcd05eb34ff07a0e9e6d4a9258f99223f5531923 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 23 Jan 2026 20:14:27 +0000 Subject: [PATCH 14/16] get uri for zk metadata --- .../helix/core/replication/TableReplicator.java | 8 ++++---- .../spi/utils/builder/ControllerRequestURLBuilder.java | 4 ++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java index dbabd32c1a32..7df558ee61bd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java @@ -31,6 +31,7 @@ import org.apache.pinot.controller.api.resources.CopyTablePayload; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +40,6 @@ */ public class TableReplicator { private static final Logger LOGGER = LoggerFactory.getLogger(TableReplicator.class); - private static final String SEGMENT_ZK_METADATA_ENDPOINT_TEMPLATE = "/segments/%s/zkmetadata"; - private final PinotHelixResourceManager _pinotHelixResourceManager; private final ExecutorService _executorService; private final SegmentCopier _segmentCopier; @@ -81,8 +80,9 @@ public void replicateTable(String jobId, String tableNameWithType, CopyTablePayl // TODO: throw IllegalStateException if any previous jobs doesn't expire. // TODO: replication job canceling mechanism LOGGER.info("[copyTable] Start replicating table: {} with jobId: {}", tableNameWithType, jobId); - URI zkMetadataUri = new URI(copyTablePayload.getSourceClusterUri() - + String.format(SEGMENT_ZK_METADATA_ENDPOINT_TEMPLATE, tableNameWithType)); + ControllerRequestURLBuilder urlBuilder = + ControllerRequestURLBuilder.baseUrl(copyTablePayload.getSourceClusterUri()); + URI zkMetadataUri = new URI(urlBuilder.forSegmentZkMetadata(tableNameWithType)); SimpleHttpResponse zkMetadataResponse = HttpClient.wrapAndThrowHttpException( _httpClient.sendGetRequest(zkMetadataUri, copyTablePayload.getHeaders())); String zkMetadataJson = zkMetadataResponse.getResponse(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index a7fa09019824..af525911b4f5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -468,6 +468,10 @@ public String forSegmentMetadata(String tableName, TableType tableType) { return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") + "?type=" + tableType.name(); } + public String forSegmentZkMetadata(String tableNameWithType) { + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, "zkmetadata"); + } + public String forListAllSegmentLineages(String tableName, String tableType) { return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType); } From 7a9e1044bcfc239d7928d6d439bbd73554eb1deb Mon Sep 17 00:00:00 2001 From: xuanyili Date: Sat, 24 Jan 2026 06:49:32 +0000 Subject: [PATCH 15/16] fix the unit test failure --- .../PinotTableRestletResourceTest.java | 5 +++-- .../RealtimeSegmentCopierTest.java | 19 +++++++++++-------- .../ZkBasedTableReplicationObserverTest.java | 19 +++++++++++++++++-- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java index abbfa71e3ace..29995023afb1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java @@ -36,8 +36,9 @@ public void testTweakRealtimeTableConfig() throws Exception { String brokerTenant = "testBroker"; String serverTenant = "testServer"; - CopyTablePayload copyTablePayload = new CopyTablePayload("http://localhost:9000", null, brokerTenant, - serverTenant, Map.of("server1_REALTIME", "testServer_REALTIME")); + CopyTablePayload copyTablePayload = new CopyTablePayload("http://localhost:9000", null, + "http://localhost:9000", null, brokerTenant, serverTenant, + Map.of("server1_REALTIME", "testServer_REALTIME"), null); PinotTableRestletResource.tweakRealtimeTableConfig(tableConfig, copyTablePayload); assertEquals(tableConfig.get("tenants").get("broker").asText(), brokerTenant); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java index 650511fdca5e..09d973a6998b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.controller.ControllerConf; @@ -34,8 +35,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyMap; -import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -72,22 +72,24 @@ public void testCopy() throws Exception { String tableNameWithType = "table1_REALTIME"; String segmentName = "seg1"; CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); Map metadata = new HashMap<>(); metadata.put("segment.download.url", "hdfs://src/data/seg1"); doReturn(_pinotFS).when(_copier).getPinotFS(any(URI.class)); when(_pinotFS.exists(any(URI.class))).thenReturn(false); + when(_pinotFS.copy(any(URI.class), any(URI.class))).thenReturn(true); SimpleHttpResponse response = mock(SimpleHttpResponse.class); when(response.getStatusCode()).thenReturn(200); - when(_httpClient.sendJsonPostRequest(any(URI.class), anyString(), anyMap())).thenReturn(response); + when(response.getResponse()).thenReturn("{}"); + doReturn(response).when(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); _copier.copy(tableNameWithType, segmentName, payload, metadata); verify(_pinotFS).copy(any(URI.class), any(URI.class)); - verify(_httpClient).sendJsonPostRequest(any(URI.class), anyString(), anyMap()); + verify(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); } @Test @@ -95,7 +97,7 @@ public void testCopyExisting() throws Exception { String tableNameWithType = "table1_REALTIME"; String segmentName = "seg1"; CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap()); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); Map metadata = new HashMap<>(); metadata.put("segment.download.url", "hdfs://src/data/seg1"); @@ -105,11 +107,12 @@ public void testCopyExisting() throws Exception { SimpleHttpResponse response = mock(SimpleHttpResponse.class); when(response.getStatusCode()).thenReturn(200); - when(_httpClient.sendJsonPostRequest(any(URI.class), anyString(), anyMap())).thenReturn(response); + when(response.getResponse()).thenReturn("{}"); + doReturn(response).when(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); _copier.copy(tableNameWithType, segmentName, payload, metadata); verify(_pinotFS, never()).copy(any(URI.class), any(URI.class)); - verify(_httpClient).sendJsonPostRequest(any(URI.class), anyString(), anyMap()); + verify(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java index 040c5273a55d..4dec37ef0bb3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -35,11 +36,13 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyMap; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ZkBasedTableReplicationObserverTest { @@ -59,12 +62,18 @@ public void tearDown() throws Exception { } @Test - public void testObserver() { + public void testObserver() throws Exception { String jobId = "job1"; String tableName = "table1"; List segments = Arrays.asList("seg1", "seg2", "seg3"); WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); + Map baseMetadata = new HashMap<>(); + baseMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + baseMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableName); + when(_pinotHelixResourceManager.commonTableReplicationJobMetadata(eq(tableName), eq(jobId), anyLong(), + eq(res))).thenReturn(baseMetadata); + ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); @@ -92,12 +101,18 @@ public void testObserver() { } @Test - public void testObserverError() { + public void testObserverError() throws Exception { String jobId = "job1"; String tableName = "table1"; List segments = Arrays.asList("seg1"); WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); + Map baseMetadata = new HashMap<>(); + baseMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + baseMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableName); + when(_pinotHelixResourceManager.commonTableReplicationJobMetadata(eq(tableName), eq(jobId), anyLong(), + eq(res))).thenReturn(baseMetadata); + ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); From 5167f4fa21706961631757caa24c2d1db42d6f4f Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 24 Feb 2026 00:36:58 +0000 Subject: [PATCH 16/16] add controller jobtype --- .../api/resources/CopyTablePayload.java | 31 ++++++++++++++++++- .../resources/PinotTableRestletResource.java | 11 +++++++ .../PinotTableRestletResourceTest.java | 28 ++++++++++++++++- .../RealtimeSegmentCopierTest.java | 4 +-- .../core/replication/TableReplicatorTest.java | 2 +- 5 files changed, 71 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java index e46eeed8b42e..5a70273d1822 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java @@ -31,6 +31,21 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class CopyTablePayload { + /** + * Job type for table copy operation. + */ + public enum JobType { + /** + * Controller-based job type (default). The copy job runs on the controller. + */ + CONTROLLER, + + /** + * Minion-based job type (not yet supported). The copy job would run on minion workers. + */ + MINION + } + private String _sourceClusterUri; private Map _headers; @@ -48,6 +63,13 @@ public class CopyTablePayload { private String _serverTenant; private Integer _backfillParallism; + /** + * Job type for the copy operation. + * Defaults to CONTROLLER if not specified. + * Currently only CONTROLLER is supported. + */ + private JobType _jobType; + /** * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. We will use this field to let user specify * the replacement relation from source cluster's full tenant to target cluster's full tenant. @@ -63,7 +85,8 @@ public CopyTablePayload( @JsonProperty(value = "brokerTenant", required = true) String brokerTenant, @JsonProperty(value = "serverTenant", required = true) String serverTenant, @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap, - @JsonProperty("backfillParallism") @Nullable Integer backfillParallism) { + @JsonProperty("backfillParallism") @Nullable Integer backfillParallism, + @JsonProperty("jobType") @Nullable JobType jobType) { _sourceClusterUri = sourceClusterUri; _headers = headers; _destinationClusterUri = destinationClusterUri; @@ -72,6 +95,7 @@ public CopyTablePayload( _serverTenant = serverTenant; _tagPoolReplacementMap = tagPoolReplacementMap; _backfillParallism = backfillParallism; + _jobType = jobType != null ? jobType : JobType.CONTROLLER; } @JsonGetter("sourceClusterUri") @@ -113,4 +137,9 @@ public Integer getBackfillParallism() { public Map getTagPoolReplacementMap() { return _tagPoolReplacementMap; } + + @JsonGetter("jobType") + public JobType getJobType() { + return _jobType; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 02422439ba3e..164a0dc9483f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -309,6 +309,17 @@ public CopyTableResponse copyTable( } CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, CopyTablePayload.class); + + // Validate jobType - only CONTROLLER is currently supported + CopyTablePayload.JobType jobType = copyTablePayload.getJobType(); + if (jobType == CopyTablePayload.JobType.MINION) { + throw new ControllerApplicationException(LOGGER, + String.format("Job type '%s' is not supported. Only 'CONTROLLER' job type is currently supported.", + jobType), + Response.Status.BAD_REQUEST); + } + LOGGER.info("[copyTable] Job type: {}", jobType); + String sourceControllerUri = copyTablePayload.getSourceClusterUri(); Map requestHeaders = copyTablePayload.getHeaders(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java index 29995023afb1..8f8f23f21983 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java @@ -25,6 +25,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.expectThrows; public class PinotTableRestletResourceTest { @@ -38,7 +39,7 @@ public void testTweakRealtimeTableConfig() throws Exception { String serverTenant = "testServer"; CopyTablePayload copyTablePayload = new CopyTablePayload("http://localhost:9000", null, "http://localhost:9000", null, brokerTenant, serverTenant, - Map.of("server1_REALTIME", "testServer_REALTIME"), null); + Map.of("server1_REALTIME", "testServer_REALTIME"), null, null); PinotTableRestletResource.tweakRealtimeTableConfig(tableConfig, copyTablePayload); assertEquals(tableConfig.get("tenants").get("broker").asText(), brokerTenant); @@ -47,4 +48,29 @@ public void testTweakRealtimeTableConfig() throws Exception { .asText(), serverTenant + "_REALTIME"); } } + + @Test + public void testCopyTablePayloadJobType() throws Exception { + // Test 1: Backwards compatibility - null jobType defaults to CONTROLLER + CopyTablePayload payload1 = new CopyTablePayload("http://src", null, "http://dest", null, + "broker", "server", null, null, null); + assertEquals(payload1.getJobType(), CopyTablePayload.JobType.CONTROLLER); + + // Test 2: Explicit CONTROLLER (uppercase) + String json2 = "{\"sourceClusterUri\":\"http://src\",\"destinationClusterUri\":\"http://dest\"," + + "\"brokerTenant\":\"broker\",\"serverTenant\":\"server\",\"jobType\":\"CONTROLLER\"}"; + CopyTablePayload payload2 = JsonUtils.stringToObject(json2, CopyTablePayload.class); + assertEquals(payload2.getJobType(), CopyTablePayload.JobType.CONTROLLER); + + // Test 3: MINION enum value deserializes correctly + String json3 = "{\"sourceClusterUri\":\"http://src\",\"destinationClusterUri\":\"http://dest\"," + + "\"brokerTenant\":\"broker\",\"serverTenant\":\"server\",\"jobType\":\"MINION\"}"; + CopyTablePayload payload3 = JsonUtils.stringToObject(json3, CopyTablePayload.class); + assertEquals(payload3.getJobType(), CopyTablePayload.JobType.MINION); + + // Test 4: Invalid job type throws exception during deserialization + String json4 = "{\"sourceClusterUri\":\"http://src\",\"destinationClusterUri\":\"http://dest\"," + + "\"brokerTenant\":\"broker\",\"serverTenant\":\"server\",\"jobType\":\"invalid\"}"; + expectThrows(Exception.class, () -> JsonUtils.stringToObject(json4, CopyTablePayload.class)); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java index 09d973a6998b..f36b1365334f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java @@ -72,7 +72,7 @@ public void testCopy() throws Exception { String tableNameWithType = "table1_REALTIME"; String segmentName = "seg1"; CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null, null); Map metadata = new HashMap<>(); metadata.put("segment.download.url", "hdfs://src/data/seg1"); @@ -97,7 +97,7 @@ public void testCopyExisting() throws Exception { String tableNameWithType = "table1_REALTIME"; String segmentName = "seg1"; CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null, null); Map metadata = new HashMap<>(); metadata.put("segment.download.url", "hdfs://src/data/seg1"); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java index 947ddcf29a22..936dcb8008f9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java @@ -80,7 +80,7 @@ public void testReplicateTable() throws Exception { String tableName = "table1_REALTIME"; String sourceClusterUri = "http://localhost:9000"; CopyTablePayload copyTablePayload = new CopyTablePayload(sourceClusterUri, Collections.emptyMap(), - "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null, null); List watermarks = Arrays.asList( new WatermarkInductionResult.Watermark(0, 10, 100L),