diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index 6ba4a534a0d9..844d760a0dc3 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -100,5 +100,10 @@
mockito-core
test
+
+ org.assertj
+ assertj-core
+ test
+
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java
similarity index 74%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java
index 8743f961407b..7dbdd4982c33 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java
@@ -29,6 +29,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
@@ -39,6 +40,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -54,7 +56,12 @@
HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic \" or \"Bearer \"```")))
@Path("/")
-public class PinotBrokerHealthCheck {
+public class PinotBrokerHealthCheckResource {
+
+ @Inject
+ @Named(BrokerAdminApiApplication.SHUTDOWN_IN_PROGRESS)
+ private AtomicBoolean _shutdownInProgress;
+
@Inject
@Named(BrokerAdminApiApplication.BROKER_INSTANCE_ID)
private String _instanceId;
@@ -88,6 +95,32 @@ public String getBrokerHealth() {
throw new WebApplicationException(errMessage, response);
}
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ @Path("health/readiness")
+ @ApiOperation(value = "Checking broker readiness status")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Broker is ready to serve queries"), @ApiResponse(code = 503, message =
+ "Broker is not ready to serve queries")
+ })
+ public String checkReadiness() {
+ if (_shutdownInProgress.get()) {
+ String errMessage = "Broker is shutting down";
+ throw new WebApplicationException(errMessage,
+ Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build());
+ }
+ ServiceStatus.Status status =
+ ServiceStatus.getServiceStatus(_instanceId + BaseBrokerStarter.READINESS_CALLBACK_SUFFIX);
+ if (status == ServiceStatus.Status.GOOD) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_OK_CALLS, 1);
+ return "OK";
+ }
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_BAD_CALLS, 1);
+ String errMessage = String.format("Pinot broker readiness status is %s", status);
+ Response response = Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build();
+ throw new WebApplicationException(errMessage, response);
+ }
+
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("uptime")
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index 6381e76098c4..1ba733058aa0 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.http.io.SocketConfig;
@@ -65,9 +66,11 @@ public class BrokerAdminApiApplication extends ResourceConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAdminApiApplication.class);
public static final String PINOT_CONFIGURATION = "pinotConfiguration";
public static final String BROKER_INSTANCE_ID = "brokerInstanceId";
+ public static final String SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
public static final String START_TIME = "brokerStartTime";
+ private final AtomicBoolean _shutdownInProgress = new AtomicBoolean();
private final String _brokerResourcePackages;
private final boolean _useHttps;
private final boolean _swaggerBrokerEnabled;
@@ -102,6 +105,7 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ
register(new AbstractBinder() {
@Override
protected void configure() {
+ bind(_shutdownInProgress).named(SHUTDOWN_IN_PROGRESS).to(AtomicBoolean.class);
bind(connMgr).to(HttpClientConnectionManager.class);
bind(_executorService).to(Executor.class);
bind(helixManager).to(HelixManager.class);
@@ -168,6 +172,10 @@ private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvid
return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics);
}
+ public void startShuttingDown() {
+ _shutdownInProgress.set(true);
+ }
+
public void stop() {
if (_httpServer != null) {
LOGGER.info("Shutting down http server");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index baa96367ff5a..1aba20fe1cfe 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -123,6 +123,7 @@
@SuppressWarnings("unused")
public abstract class BaseBrokerStarter implements ServiceStartable {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerStarter.class);
+ public static final String READINESS_CALLBACK_SUFFIX = "-readiness";
protected PinotConfiguration _brokerConf;
protected List _listenerConfigs;
@@ -701,6 +702,16 @@ private void registerServiceStatusHandler() {
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup))));
+
+ /*
+ * Register readiness callbacks for /health/readiness endpoint.
+ * TenantTagReadinessCallback must precede RoutingReadinessCallback because an untagged broker
+ * has no tables assigned, so RoutingReadinessCallback would return GOOD prematurely.
+ */
+ ServiceStatus.setServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX,
+ new ServiceStatus.MultipleCallbackServiceStatusCallback(List.of(
+ new TenantTagReadinessCallback(_participantHelixManager, _clusterName, _instanceId),
+ new RoutingReadinessCallback(_helixAdmin, _routingManager, _clusterName, _instanceId))));
}
private String getDefaultBrokerId() {
@@ -735,6 +746,7 @@ private boolean updatePortIfNeeded(Map instanceConfigSimpleField
public void stop() {
LOGGER.info("Shutting down Pinot broker");
_isShuttingDown = true;
+ _brokerAdminApplication.startShuttingDown();
LOGGER.info("Disconnecting participant Helix manager");
_participantHelixManager.disconnect();
@@ -781,6 +793,7 @@ public void stop() {
LOGGER.info("Deregistering service status handler");
ServiceStatus.removeServiceStatusCallback(_instanceId);
+ ServiceStatus.removeServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX);
LOGGER.info("Shutdown Broker Metrics Registry");
_metricsRegistry.shutdown();
LOGGER.info("Finish shutting down Pinot broker for {}", _instanceId);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java
new file mode 100644
index 000000000000..178f8fac95c4
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Service status callback that checks whether routing is ready for all tables assigned to this broker.
+ * Returns STARTING if any assigned table is missing routing entries.
+ * Returns GOOD once routing exists for all assigned tables or if no tables are assigned.
+ */
+public class RoutingReadinessCallback implements ServiceStatus.ServiceStatusCallback {
+
+ private static final String STATUS_EXTERNAL_VIEW_NOT_FOUND = "Broker resource external view not found";
+
+ private final HelixAdmin _helixAdmin;
+ private final RoutingManager _routingManager;
+ private final String _clusterName;
+ private final String _instanceId;
+ private volatile ServiceStatus.Status _serviceStatus = ServiceStatus.Status.STARTING;
+
+ public RoutingReadinessCallback(HelixAdmin helixAdmin, RoutingManager routingManager,
+ String clusterName, String instanceId) {
+ _helixAdmin = requireNonNull(helixAdmin, "helixAdmin");
+ _routingManager = requireNonNull(routingManager, "routingManager");
+ _clusterName = requireNonNull(clusterName, "clusterName");
+ _instanceId = requireNonNull(instanceId, "instanceId");
+ }
+
+ @Override
+ public synchronized ServiceStatus.Status getServiceStatus() {
+ // Return cached GOOD status to avoid re-checking
+ if (_serviceStatus == ServiceStatus.Status.GOOD) {
+ return ServiceStatus.Status.GOOD;
+ }
+
+ ExternalView brokerResourceExternalView =
+ _helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE);
+
+ if (brokerResourceExternalView == null) {
+ return ServiceStatus.Status.STARTING;
+ }
+
+ // Find all tables this broker instance is online for
+ List onlineTables = getOnlineTables(brokerResourceExternalView);
+
+ // If no tables are online, consider it GOOD
+ if (onlineTables.isEmpty()) {
+ _serviceStatus = ServiceStatus.Status.GOOD;
+ return _serviceStatus;
+ }
+
+ // Check routing exists for all online tables
+ for (String tableName : onlineTables) {
+ if (!_routingManager.routingExists(tableName)) {
+ return ServiceStatus.Status.STARTING;
+ }
+ }
+
+ _serviceStatus = ServiceStatus.Status.GOOD;
+ return _serviceStatus;
+ }
+
+ @Override
+ public synchronized String getStatusDescription() {
+ if (_serviceStatus == ServiceStatus.Status.GOOD) {
+ return ServiceStatus.STATUS_DESCRIPTION_NONE;
+ }
+
+ ExternalView brokerResourceExternalView =
+ _helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE);
+
+ if (brokerResourceExternalView == null) {
+ return STATUS_EXTERNAL_VIEW_NOT_FOUND;
+ }
+
+ List onlineTables = getOnlineTables(brokerResourceExternalView);
+
+ if (onlineTables.isEmpty()) {
+ return ServiceStatus.STATUS_DESCRIPTION_NONE;
+ }
+
+ // Find tables missing routing
+ List missingRoutingTables = new ArrayList<>();
+ for (String tableName : onlineTables) {
+ if (!_routingManager.routingExists(tableName)) {
+ missingRoutingTables.add(tableName);
+ }
+ }
+
+ if (missingRoutingTables.isEmpty()) {
+ return ServiceStatus.STATUS_DESCRIPTION_NONE;
+ }
+
+ return String.format("Waiting for routing to be ready for %d/%d tables: %s",
+ missingRoutingTables.size(), onlineTables.size(), missingRoutingTables);
+ }
+
+ private List getOnlineTables(ExternalView brokerResourceExternalView) {
+ List onlineTables = new ArrayList<>();
+ for (String tableName : brokerResourceExternalView.getPartitionSet()) {
+ Map stateMap = brokerResourceExternalView.getStateMap(tableName);
+ if (stateMap != null && stateMap.containsKey(_instanceId)) {
+ onlineTables.add(tableName);
+ }
+ }
+ return onlineTables;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java
new file mode 100644
index 000000000000..8910c09a6c7a
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Service status callback that checks whether the broker has valid tenant tags assigned.
+ * Returns STARTING if the broker only has the untagged broker instance tag or no valid broker tags.
+ * Returns GOOD once the broker has at least one valid tenant-specific broker tag (ending with _BROKER).
+ */
+public class TenantTagReadinessCallback implements ServiceStatus.ServiceStatusCallback {
+
+ private static final String STATUS_INSTANCE_CONFIG_NOT_FOUND = "Instance config not found";
+ private static final String STATUS_NO_TAGS_ASSIGNED = "No tags assigned to broker instance";
+ private static final String STATUS_NO_VALID_BROKER_TAGS = "No valid tenant broker tags";
+
+ private final HelixManager _helixManager;
+ private final String _clusterName;
+ private final String _instanceId;
+ private volatile ServiceStatus.Status _serviceStatus = ServiceStatus.Status.STARTING;
+
+ public TenantTagReadinessCallback(HelixManager helixManager, String clusterName, String instanceId) {
+ _helixManager = requireNonNull(helixManager, "helixManager");
+ _clusterName = requireNonNull(clusterName, "clusterName");
+ _instanceId = requireNonNull(instanceId, "instanceId");
+ }
+
+ @Override
+ public synchronized ServiceStatus.Status getServiceStatus() {
+ // Return cached GOOD status to avoid re-checking
+ if (_serviceStatus == ServiceStatus.Status.GOOD) {
+ return _serviceStatus;
+ }
+
+ InstanceConfig instanceConfig =
+ _helixManager.getConfigAccessor().getInstanceConfig(_clusterName, _instanceId);
+
+ if (instanceConfig == null) {
+ return ServiceStatus.Status.STARTING;
+ }
+
+ List tags = instanceConfig.getTags();
+
+ if (tags == null || tags.isEmpty()) {
+ return ServiceStatus.Status.STARTING;
+ }
+
+ // Check if any tag is a valid broker tenant tag (ends with _BROKER)
+ // and is not just the untagged broker instance
+ for (String tag : tags) {
+ if (TagNameUtils.isBrokerTag(tag) && !Helix.UNTAGGED_BROKER_INSTANCE.equals(tag)) {
+ _serviceStatus = ServiceStatus.Status.GOOD;
+ return _serviceStatus;
+ }
+ }
+
+ return ServiceStatus.Status.STARTING;
+ }
+
+ @Override
+ public synchronized String getStatusDescription() {
+ if (_serviceStatus == ServiceStatus.Status.GOOD) {
+ return ServiceStatus.STATUS_DESCRIPTION_NONE;
+ }
+
+ InstanceConfig instanceConfig =
+ _helixManager.getConfigAccessor().getInstanceConfig(_clusterName, _instanceId);
+
+ if (instanceConfig == null) {
+ return STATUS_INSTANCE_CONFIG_NOT_FOUND;
+ }
+
+ List tags = instanceConfig.getTags();
+
+ if (tags == null || tags.isEmpty()) {
+ return STATUS_NO_TAGS_ASSIGNED;
+ }
+
+ // Check if any tag is a valid broker tenant tag
+ for (String tag : tags) {
+ if (TagNameUtils.isBrokerTag(tag) && !Helix.UNTAGGED_BROKER_INSTANCE.equals(tag)) {
+ return ServiceStatus.STATUS_DESCRIPTION_NONE;
+ }
+ }
+
+ return STATUS_NO_VALID_BROKER_TAGS + ". Current tags: " + tags;
+ }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 0118907e840e..430a7bb26d38 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -29,6 +29,7 @@
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.helix.ControllerTest;
@@ -137,6 +138,16 @@ private Map getStreamConfigs() {
return streamConfigs;
}
+ @Test
+ public void testServiceStatusReturnsGood() {
+ // Verify that ServiceStatus returns GOOD after broker is fully started
+ // This implicitly tests TenantTagReadinessCallback and RoutingReadinessCallback
+ String instanceId = _brokerStarter.getInstanceId();
+ ServiceStatus.Status status = ServiceStatus.getServiceStatus(instanceId);
+ assertEquals(status, ServiceStatus.Status.GOOD,
+ "Expected GOOD status but got " + status + ": " + ServiceStatus.getStatusDescription(instanceId));
+ }
+
@Test
public void testClusterConfigOverride() {
PinotConfiguration config = _brokerStarter.getConfig();
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java
new file mode 100644
index 000000000000..d95c1ce1123c
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class RoutingReadinessCallbackTest {
+
+ private static final String CLUSTER_NAME = "testCluster";
+ private static final String INSTANCE_ID = "Broker_localhost_8099";
+ private static final String TABLE_1 = "table1_OFFLINE";
+ private static final String TABLE_2 = "table2_REALTIME";
+
+ private HelixAdmin _helixAdmin;
+ private RoutingManager _routingManager;
+ private RoutingReadinessCallback _callback;
+
+ @BeforeMethod
+ public void setUp() {
+ _helixAdmin = mock(HelixAdmin.class);
+ _routingManager = mock(RoutingManager.class);
+ _callback = new RoutingReadinessCallback(_helixAdmin, _routingManager, CLUSTER_NAME, INSTANCE_ID);
+ }
+
+ @Test
+ public void testReturnsStartingWhenExternalViewNotFound() {
+ when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(null);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING);
+ assertThat(_callback.getStatusDescription()).contains("Broker resource external view not found");
+ }
+
+ @Test
+ public void testReturnsGoodWhenNoTablesOnline() {
+ ExternalView externalView = createExternalViewWithTables();
+ when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+ assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE);
+ }
+
+ @Test
+ public void testReturnsStartingWhenRoutingMissing() {
+ ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2);
+ when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView);
+ when(_routingManager.routingExists(TABLE_1)).thenReturn(true);
+ when(_routingManager.routingExists(TABLE_2)).thenReturn(false);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING);
+ assertThat(_callback.getStatusDescription()).contains("Waiting for routing");
+ assertThat(_callback.getStatusDescription()).contains(TABLE_2);
+ }
+
+ @Test
+ public void testReturnsGoodWhenAllRoutingExists() {
+ ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2);
+ when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView);
+ when(_routingManager.routingExists(TABLE_1)).thenReturn(true);
+ when(_routingManager.routingExists(TABLE_2)).thenReturn(true);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+ assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE);
+ }
+
+ @Test
+ public void testExternalViewPartitionSetReturnsTableNames() {
+ // Verify that ExternalView.getPartitionSet() returns table names
+ // and getStateMap(tableName) returns broker instance to state mapping
+ ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2);
+
+ assertThat(externalView.getPartitionSet()).containsExactlyInAnyOrder(TABLE_1, TABLE_2);
+ assertThat(externalView.getStateMap(TABLE_1)).containsKey(INSTANCE_ID);
+ assertThat(externalView.getStateMap(TABLE_2)).containsKey(INSTANCE_ID);
+ }
+
+ @Test
+ public void testStatusIsCachedOnceGood() {
+ ExternalView externalView = createExternalViewWithTables(TABLE_1);
+ when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView);
+ when(_routingManager.routingExists(TABLE_1)).thenReturn(true);
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+
+ // Change mock to return STARTING conditions
+ when(_routingManager.routingExists(TABLE_1)).thenReturn(false);
+
+ // Should still return GOOD (cached)
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+ assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE);
+ }
+
+ private ExternalView createExternalViewWithTables(String... tableNames) {
+ ExternalView externalView = new ExternalView(Helix.BROKER_RESOURCE_INSTANCE);
+ for (String tableName : tableNames) {
+ externalView.setState(tableName, INSTANCE_ID, "ONLINE");
+ }
+ return externalView;
+ }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java
new file mode 100644
index 000000000000..0dcd46b61604
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TenantTagReadinessCallbackTest {
+
+ private static final String CLUSTER_NAME = "testCluster";
+ private static final String INSTANCE_ID = "Broker_localhost_8099";
+
+ private HelixManager _helixManager;
+ private ConfigAccessor _configAccessor;
+ private InstanceConfig _instanceConfig;
+ private TenantTagReadinessCallback _callback;
+
+ @BeforeMethod
+ public void setUp() {
+ _helixManager = mock(HelixManager.class);
+ _configAccessor = mock(ConfigAccessor.class);
+ _instanceConfig = mock(InstanceConfig.class);
+
+ when(_helixManager.getConfigAccessor()).thenReturn(_configAccessor);
+ when(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID)).thenReturn(_instanceConfig);
+
+ _callback = new TenantTagReadinessCallback(_helixManager, CLUSTER_NAME, INSTANCE_ID);
+ }
+
+ @Test
+ public void testReturnsStartingWhenInstanceConfigNotFound() {
+ when(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID)).thenReturn(null);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING);
+ assertThat(_callback.getStatusDescription()).contains("Instance config not found");
+ }
+
+ @Test
+ public void testReturnsStartingWhenOnlyUntaggedBroker() {
+ List tags = Collections.singletonList(Helix.UNTAGGED_BROKER_INSTANCE);
+ when(_instanceConfig.getTags()).thenReturn(tags);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING);
+ assertThat(_callback.getStatusDescription()).contains("No valid tenant broker tags");
+ }
+
+ @Test
+ public void testReturnsGoodWhenValidBrokerTag() {
+ List tags = Collections.singletonList("DefaultTenant_BROKER");
+ when(_instanceConfig.getTags()).thenReturn(tags);
+
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+ assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE);
+ }
+
+ @Test
+ public void testStatusIsCachedOnceGood() {
+ when(_instanceConfig.getTags()).thenReturn(Collections.singletonList("DefaultTenant_BROKER"));
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+
+ // Change mock to return STARTING conditions
+ when(_instanceConfig.getTags()).thenReturn(Collections.emptyList());
+
+ // Should still return GOOD (cached)
+ assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD);
+ assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE);
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index e1d4778d12b7..f6b10f4660c9 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -39,6 +39,8 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter WEB_APPLICATION_EXCEPTIONS = create("WEB_APPLICATION_EXCEPTIONS", "exceptions", true);
public static final BrokerMeter HEALTHCHECK_BAD_CALLS = create("HEALTHCHECK_BAD_CALLS", "healthcheck", true);
public static final BrokerMeter HEALTHCHECK_OK_CALLS = create("HEALTHCHECK_OK_CALLS", "healthcheck", true);
+ public static final BrokerMeter READINESS_CHECK_BAD_CALLS = create("READINESS_CHECK_BAD_CALLS", "healthcheck", true);
+ public static final BrokerMeter READINESS_CHECK_OK_CALLS = create("READINESS_CHECK_OK_CALLS", "healthcheck", true);
/**
* Number of queries executed.
*