diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml index 6ba4a534a0d9..844d760a0dc3 100644 --- a/pinot-broker/pom.xml +++ b/pinot-broker/pom.xml @@ -100,5 +100,10 @@ mockito-core test + + org.assertj + assertj-core + test + diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java similarity index 74% rename from pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java rename to pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java index 8743f961407b..7dbdd4982c33 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheck.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerHealthCheckResource.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.time.Instant; import java.time.format.DateTimeFormatter; +import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; import javax.inject.Named; import javax.ws.rs.GET; @@ -39,6 +40,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.pinot.broker.broker.BrokerAdminApiApplication; +import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.ServiceStatus; @@ -54,7 +56,12 @@ HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is ```\"Basic \" or \"Bearer \"```"))) @Path("/") -public class PinotBrokerHealthCheck { +public class PinotBrokerHealthCheckResource { + + @Inject + @Named(BrokerAdminApiApplication.SHUTDOWN_IN_PROGRESS) + private AtomicBoolean _shutdownInProgress; + @Inject @Named(BrokerAdminApiApplication.BROKER_INSTANCE_ID) private String _instanceId; @@ -88,6 +95,32 @@ public String getBrokerHealth() { throw new WebApplicationException(errMessage, response); } + @GET + @Produces(MediaType.TEXT_PLAIN) + @Path("health/readiness") + @ApiOperation(value = "Checking broker readiness status") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Broker is ready to serve queries"), @ApiResponse(code = 503, message = + "Broker is not ready to serve queries") + }) + public String checkReadiness() { + if (_shutdownInProgress.get()) { + String errMessage = "Broker is shutting down"; + throw new WebApplicationException(errMessage, + Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build()); + } + ServiceStatus.Status status = + ServiceStatus.getServiceStatus(_instanceId + BaseBrokerStarter.READINESS_CALLBACK_SUFFIX); + if (status == ServiceStatus.Status.GOOD) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_OK_CALLS, 1); + return "OK"; + } + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_BAD_CALLS, 1); + String errMessage = String.format("Pinot broker readiness status is %s", status); + Response response = Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build(); + throw new WebApplicationException(errMessage, response); + } + @GET @Produces(MediaType.TEXT_PLAIN) @Path("uptime") diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java index 6381e76098c4..1ba733058aa0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.core5.http.io.SocketConfig; @@ -65,9 +66,11 @@ public class BrokerAdminApiApplication extends ResourceConfig { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAdminApiApplication.class); public static final String PINOT_CONFIGURATION = "pinotConfiguration"; public static final String BROKER_INSTANCE_ID = "brokerInstanceId"; + public static final String SHUTDOWN_IN_PROGRESS = "shutdownInProgress"; public static final String START_TIME = "brokerStartTime"; + private final AtomicBoolean _shutdownInProgress = new AtomicBoolean(); private final String _brokerResourcePackages; private final boolean _useHttps; private final boolean _swaggerBrokerEnabled; @@ -102,6 +105,7 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ register(new AbstractBinder() { @Override protected void configure() { + bind(_shutdownInProgress).named(SHUTDOWN_IN_PROGRESS).to(AtomicBoolean.class); bind(connMgr).to(HttpClientConnectionManager.class); bind(_executorService).to(Executor.class); bind(helixManager).to(HelixManager.class); @@ -168,6 +172,10 @@ private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvid return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics); } + public void startShuttingDown() { + _shutdownInProgress.set(true); + } + public void stop() { if (_httpServer != null) { LOGGER.info("Shutting down http server"); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index baa96367ff5a..1aba20fe1cfe 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -123,6 +123,7 @@ @SuppressWarnings("unused") public abstract class BaseBrokerStarter implements ServiceStartable { private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerStarter.class); + public static final String READINESS_CALLBACK_SUFFIX = "-readiness"; protected PinotConfiguration _brokerConf; protected List _listenerConfigs; @@ -701,6 +702,16 @@ private void registerServiceStatusHandler() { _clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager, _clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup)))); + + /* + * Register readiness callbacks for /health/readiness endpoint. + * TenantTagReadinessCallback must precede RoutingReadinessCallback because an untagged broker + * has no tables assigned, so RoutingReadinessCallback would return GOOD prematurely. + */ + ServiceStatus.setServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX, + new ServiceStatus.MultipleCallbackServiceStatusCallback(List.of( + new TenantTagReadinessCallback(_participantHelixManager, _clusterName, _instanceId), + new RoutingReadinessCallback(_helixAdmin, _routingManager, _clusterName, _instanceId)))); } private String getDefaultBrokerId() { @@ -735,6 +746,7 @@ private boolean updatePortIfNeeded(Map instanceConfigSimpleField public void stop() { LOGGER.info("Shutting down Pinot broker"); _isShuttingDown = true; + _brokerAdminApplication.startShuttingDown(); LOGGER.info("Disconnecting participant Helix manager"); _participantHelixManager.disconnect(); @@ -781,6 +793,7 @@ public void stop() { LOGGER.info("Deregistering service status handler"); ServiceStatus.removeServiceStatusCallback(_instanceId); + ServiceStatus.removeServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX); LOGGER.info("Shutdown Broker Metrics Registry"); _metricsRegistry.shutdown(); LOGGER.info("Finish shutting down Pinot broker for {}", _instanceId); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java new file mode 100644 index 000000000000..178f8fac95c4 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallback.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.spi.utils.CommonConstants.Helix; + +import static java.util.Objects.requireNonNull; + + +/** + * Service status callback that checks whether routing is ready for all tables assigned to this broker. + * Returns STARTING if any assigned table is missing routing entries. + * Returns GOOD once routing exists for all assigned tables or if no tables are assigned. + */ +public class RoutingReadinessCallback implements ServiceStatus.ServiceStatusCallback { + + private static final String STATUS_EXTERNAL_VIEW_NOT_FOUND = "Broker resource external view not found"; + + private final HelixAdmin _helixAdmin; + private final RoutingManager _routingManager; + private final String _clusterName; + private final String _instanceId; + private volatile ServiceStatus.Status _serviceStatus = ServiceStatus.Status.STARTING; + + public RoutingReadinessCallback(HelixAdmin helixAdmin, RoutingManager routingManager, + String clusterName, String instanceId) { + _helixAdmin = requireNonNull(helixAdmin, "helixAdmin"); + _routingManager = requireNonNull(routingManager, "routingManager"); + _clusterName = requireNonNull(clusterName, "clusterName"); + _instanceId = requireNonNull(instanceId, "instanceId"); + } + + @Override + public synchronized ServiceStatus.Status getServiceStatus() { + // Return cached GOOD status to avoid re-checking + if (_serviceStatus == ServiceStatus.Status.GOOD) { + return ServiceStatus.Status.GOOD; + } + + ExternalView brokerResourceExternalView = + _helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE); + + if (brokerResourceExternalView == null) { + return ServiceStatus.Status.STARTING; + } + + // Find all tables this broker instance is online for + List onlineTables = getOnlineTables(brokerResourceExternalView); + + // If no tables are online, consider it GOOD + if (onlineTables.isEmpty()) { + _serviceStatus = ServiceStatus.Status.GOOD; + return _serviceStatus; + } + + // Check routing exists for all online tables + for (String tableName : onlineTables) { + if (!_routingManager.routingExists(tableName)) { + return ServiceStatus.Status.STARTING; + } + } + + _serviceStatus = ServiceStatus.Status.GOOD; + return _serviceStatus; + } + + @Override + public synchronized String getStatusDescription() { + if (_serviceStatus == ServiceStatus.Status.GOOD) { + return ServiceStatus.STATUS_DESCRIPTION_NONE; + } + + ExternalView brokerResourceExternalView = + _helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE); + + if (brokerResourceExternalView == null) { + return STATUS_EXTERNAL_VIEW_NOT_FOUND; + } + + List onlineTables = getOnlineTables(brokerResourceExternalView); + + if (onlineTables.isEmpty()) { + return ServiceStatus.STATUS_DESCRIPTION_NONE; + } + + // Find tables missing routing + List missingRoutingTables = new ArrayList<>(); + for (String tableName : onlineTables) { + if (!_routingManager.routingExists(tableName)) { + missingRoutingTables.add(tableName); + } + } + + if (missingRoutingTables.isEmpty()) { + return ServiceStatus.STATUS_DESCRIPTION_NONE; + } + + return String.format("Waiting for routing to be ready for %d/%d tables: %s", + missingRoutingTables.size(), onlineTables.size(), missingRoutingTables); + } + + private List getOnlineTables(ExternalView brokerResourceExternalView) { + List onlineTables = new ArrayList<>(); + for (String tableName : brokerResourceExternalView.getPartitionSet()) { + Map stateMap = brokerResourceExternalView.getStateMap(tableName); + if (stateMap != null && stateMap.containsKey(_instanceId)) { + onlineTables.add(tableName); + } + } + return onlineTables; + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java new file mode 100644 index 000000000000..8910c09a6c7a --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallback.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import java.util.List; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.spi.utils.CommonConstants.Helix; + +import static java.util.Objects.requireNonNull; + + +/** + * Service status callback that checks whether the broker has valid tenant tags assigned. + * Returns STARTING if the broker only has the untagged broker instance tag or no valid broker tags. + * Returns GOOD once the broker has at least one valid tenant-specific broker tag (ending with _BROKER). + */ +public class TenantTagReadinessCallback implements ServiceStatus.ServiceStatusCallback { + + private static final String STATUS_INSTANCE_CONFIG_NOT_FOUND = "Instance config not found"; + private static final String STATUS_NO_TAGS_ASSIGNED = "No tags assigned to broker instance"; + private static final String STATUS_NO_VALID_BROKER_TAGS = "No valid tenant broker tags"; + + private final HelixManager _helixManager; + private final String _clusterName; + private final String _instanceId; + private volatile ServiceStatus.Status _serviceStatus = ServiceStatus.Status.STARTING; + + public TenantTagReadinessCallback(HelixManager helixManager, String clusterName, String instanceId) { + _helixManager = requireNonNull(helixManager, "helixManager"); + _clusterName = requireNonNull(clusterName, "clusterName"); + _instanceId = requireNonNull(instanceId, "instanceId"); + } + + @Override + public synchronized ServiceStatus.Status getServiceStatus() { + // Return cached GOOD status to avoid re-checking + if (_serviceStatus == ServiceStatus.Status.GOOD) { + return _serviceStatus; + } + + InstanceConfig instanceConfig = + _helixManager.getConfigAccessor().getInstanceConfig(_clusterName, _instanceId); + + if (instanceConfig == null) { + return ServiceStatus.Status.STARTING; + } + + List tags = instanceConfig.getTags(); + + if (tags == null || tags.isEmpty()) { + return ServiceStatus.Status.STARTING; + } + + // Check if any tag is a valid broker tenant tag (ends with _BROKER) + // and is not just the untagged broker instance + for (String tag : tags) { + if (TagNameUtils.isBrokerTag(tag) && !Helix.UNTAGGED_BROKER_INSTANCE.equals(tag)) { + _serviceStatus = ServiceStatus.Status.GOOD; + return _serviceStatus; + } + } + + return ServiceStatus.Status.STARTING; + } + + @Override + public synchronized String getStatusDescription() { + if (_serviceStatus == ServiceStatus.Status.GOOD) { + return ServiceStatus.STATUS_DESCRIPTION_NONE; + } + + InstanceConfig instanceConfig = + _helixManager.getConfigAccessor().getInstanceConfig(_clusterName, _instanceId); + + if (instanceConfig == null) { + return STATUS_INSTANCE_CONFIG_NOT_FOUND; + } + + List tags = instanceConfig.getTags(); + + if (tags == null || tags.isEmpty()) { + return STATUS_NO_TAGS_ASSIGNED; + } + + // Check if any tag is a valid broker tenant tag + for (String tag : tags) { + if (TagNameUtils.isBrokerTag(tag) && !Helix.UNTAGGED_BROKER_INSTANCE.equals(tag)) { + return ServiceStatus.STATUS_DESCRIPTION_NONE; + } + } + + return STATUS_NO_VALID_BROKER_TAGS + ". Current tags: " + tags; + } +} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 0118907e840e..430a7bb26d38 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.broker.routing.manager.BrokerRoutingManager; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.helix.ControllerTest; @@ -137,6 +138,16 @@ private Map getStreamConfigs() { return streamConfigs; } + @Test + public void testServiceStatusReturnsGood() { + // Verify that ServiceStatus returns GOOD after broker is fully started + // This implicitly tests TenantTagReadinessCallback and RoutingReadinessCallback + String instanceId = _brokerStarter.getInstanceId(); + ServiceStatus.Status status = ServiceStatus.getServiceStatus(instanceId); + assertEquals(status, ServiceStatus.Status.GOOD, + "Expected GOOD status but got " + status + ": " + ServiceStatus.getStatusDescription(instanceId)); + } + @Test public void testClusterConfigOverride() { PinotConfiguration config = _brokerStarter.getConfig(); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java new file mode 100644 index 000000000000..d95c1ce1123c --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/RoutingReadinessCallbackTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class RoutingReadinessCallbackTest { + + private static final String CLUSTER_NAME = "testCluster"; + private static final String INSTANCE_ID = "Broker_localhost_8099"; + private static final String TABLE_1 = "table1_OFFLINE"; + private static final String TABLE_2 = "table2_REALTIME"; + + private HelixAdmin _helixAdmin; + private RoutingManager _routingManager; + private RoutingReadinessCallback _callback; + + @BeforeMethod + public void setUp() { + _helixAdmin = mock(HelixAdmin.class); + _routingManager = mock(RoutingManager.class); + _callback = new RoutingReadinessCallback(_helixAdmin, _routingManager, CLUSTER_NAME, INSTANCE_ID); + } + + @Test + public void testReturnsStartingWhenExternalViewNotFound() { + when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(null); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING); + assertThat(_callback.getStatusDescription()).contains("Broker resource external view not found"); + } + + @Test + public void testReturnsGoodWhenNoTablesOnline() { + ExternalView externalView = createExternalViewWithTables(); + when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE); + } + + @Test + public void testReturnsStartingWhenRoutingMissing() { + ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2); + when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView); + when(_routingManager.routingExists(TABLE_1)).thenReturn(true); + when(_routingManager.routingExists(TABLE_2)).thenReturn(false); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING); + assertThat(_callback.getStatusDescription()).contains("Waiting for routing"); + assertThat(_callback.getStatusDescription()).contains(TABLE_2); + } + + @Test + public void testReturnsGoodWhenAllRoutingExists() { + ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2); + when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView); + when(_routingManager.routingExists(TABLE_1)).thenReturn(true); + when(_routingManager.routingExists(TABLE_2)).thenReturn(true); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE); + } + + @Test + public void testExternalViewPartitionSetReturnsTableNames() { + // Verify that ExternalView.getPartitionSet() returns table names + // and getStateMap(tableName) returns broker instance to state mapping + ExternalView externalView = createExternalViewWithTables(TABLE_1, TABLE_2); + + assertThat(externalView.getPartitionSet()).containsExactlyInAnyOrder(TABLE_1, TABLE_2); + assertThat(externalView.getStateMap(TABLE_1)).containsKey(INSTANCE_ID); + assertThat(externalView.getStateMap(TABLE_2)).containsKey(INSTANCE_ID); + } + + @Test + public void testStatusIsCachedOnceGood() { + ExternalView externalView = createExternalViewWithTables(TABLE_1); + when(_helixAdmin.getResourceExternalView(CLUSTER_NAME, Helix.BROKER_RESOURCE_INSTANCE)).thenReturn(externalView); + when(_routingManager.routingExists(TABLE_1)).thenReturn(true); + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + + // Change mock to return STARTING conditions + when(_routingManager.routingExists(TABLE_1)).thenReturn(false); + + // Should still return GOOD (cached) + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE); + } + + private ExternalView createExternalViewWithTables(String... tableNames) { + ExternalView externalView = new ExternalView(Helix.BROKER_RESOURCE_INSTANCE); + for (String tableName : tableNames) { + externalView.setState(tableName, INSTANCE_ID, "ONLINE"); + } + return externalView; + } +} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java new file mode 100644 index 000000000000..0dcd46b61604 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/TenantTagReadinessCallbackTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import java.util.Collections; +import java.util.List; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TenantTagReadinessCallbackTest { + + private static final String CLUSTER_NAME = "testCluster"; + private static final String INSTANCE_ID = "Broker_localhost_8099"; + + private HelixManager _helixManager; + private ConfigAccessor _configAccessor; + private InstanceConfig _instanceConfig; + private TenantTagReadinessCallback _callback; + + @BeforeMethod + public void setUp() { + _helixManager = mock(HelixManager.class); + _configAccessor = mock(ConfigAccessor.class); + _instanceConfig = mock(InstanceConfig.class); + + when(_helixManager.getConfigAccessor()).thenReturn(_configAccessor); + when(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID)).thenReturn(_instanceConfig); + + _callback = new TenantTagReadinessCallback(_helixManager, CLUSTER_NAME, INSTANCE_ID); + } + + @Test + public void testReturnsStartingWhenInstanceConfigNotFound() { + when(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID)).thenReturn(null); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING); + assertThat(_callback.getStatusDescription()).contains("Instance config not found"); + } + + @Test + public void testReturnsStartingWhenOnlyUntaggedBroker() { + List tags = Collections.singletonList(Helix.UNTAGGED_BROKER_INSTANCE); + when(_instanceConfig.getTags()).thenReturn(tags); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.STARTING); + assertThat(_callback.getStatusDescription()).contains("No valid tenant broker tags"); + } + + @Test + public void testReturnsGoodWhenValidBrokerTag() { + List tags = Collections.singletonList("DefaultTenant_BROKER"); + when(_instanceConfig.getTags()).thenReturn(tags); + + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE); + } + + @Test + public void testStatusIsCachedOnceGood() { + when(_instanceConfig.getTags()).thenReturn(Collections.singletonList("DefaultTenant_BROKER")); + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + + // Change mock to return STARTING conditions + when(_instanceConfig.getTags()).thenReturn(Collections.emptyList()); + + // Should still return GOOD (cached) + assertThat(_callback.getServiceStatus()).isEqualTo(ServiceStatus.Status.GOOD); + assertThat(_callback.getStatusDescription()).isEqualTo(ServiceStatus.STATUS_DESCRIPTION_NONE); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index e1d4778d12b7..f6b10f4660c9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -39,6 +39,8 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter WEB_APPLICATION_EXCEPTIONS = create("WEB_APPLICATION_EXCEPTIONS", "exceptions", true); public static final BrokerMeter HEALTHCHECK_BAD_CALLS = create("HEALTHCHECK_BAD_CALLS", "healthcheck", true); public static final BrokerMeter HEALTHCHECK_OK_CALLS = create("HEALTHCHECK_OK_CALLS", "healthcheck", true); + public static final BrokerMeter READINESS_CHECK_BAD_CALLS = create("READINESS_CHECK_BAD_CALLS", "healthcheck", true); + public static final BrokerMeter READINESS_CHECK_OK_CALLS = create("READINESS_CHECK_OK_CALLS", "healthcheck", true); /** * Number of queries executed. *