Skip to content
5 changes: 5 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
Expand All @@ -39,6 +40,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ServiceStatus;
Expand All @@ -54,7 +56,12 @@
HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```")))
@Path("/")
public class PinotBrokerHealthCheck {
public class PinotBrokerHealthCheckResource {

@Inject
@Named(BrokerAdminApiApplication.SHUTDOWN_IN_PROGRESS)
private AtomicBoolean _shutdownInProgress;

@Inject
@Named(BrokerAdminApiApplication.BROKER_INSTANCE_ID)
private String _instanceId;
Expand Down Expand Up @@ -88,6 +95,32 @@ public String getBrokerHealth() {
throw new WebApplicationException(errMessage, response);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("health/readiness")
@ApiOperation(value = "Checking broker readiness status")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Broker is ready to serve queries"), @ApiResponse(code = 503, message =
"Broker is not ready to serve queries")
})
public String checkReadiness() {
if (_shutdownInProgress.get()) {
String errMessage = "Broker is shutting down";
throw new WebApplicationException(errMessage,
Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build());
}
ServiceStatus.Status status =
ServiceStatus.getServiceStatus(_instanceId + BaseBrokerStarter.READINESS_CALLBACK_SUFFIX);
if (status == ServiceStatus.Status.GOOD) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_OK_CALLS, 1);
return "OK";
}
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.READINESS_CHECK_BAD_CALLS, 1);
String errMessage = String.format("Pinot broker readiness status is %s", status);
Response response = Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(errMessage).build();
throw new WebApplicationException(errMessage, response);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("uptime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.http.io.SocketConfig;
Expand Down Expand Up @@ -65,9 +66,11 @@ public class BrokerAdminApiApplication extends ResourceConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAdminApiApplication.class);
public static final String PINOT_CONFIGURATION = "pinotConfiguration";
public static final String BROKER_INSTANCE_ID = "brokerInstanceId";
public static final String SHUTDOWN_IN_PROGRESS = "shutdownInProgress";

public static final String START_TIME = "brokerStartTime";

private final AtomicBoolean _shutdownInProgress = new AtomicBoolean();
private final String _brokerResourcePackages;
private final boolean _useHttps;
private final boolean _swaggerBrokerEnabled;
Expand Down Expand Up @@ -102,6 +105,7 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ
register(new AbstractBinder() {
@Override
protected void configure() {
bind(_shutdownInProgress).named(SHUTDOWN_IN_PROGRESS).to(AtomicBoolean.class);
bind(connMgr).to(HttpClientConnectionManager.class);
bind(_executorService).to(Executor.class);
bind(helixManager).to(HelixManager.class);
Expand Down Expand Up @@ -168,6 +172,10 @@ private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvid
return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics);
}

public void startShuttingDown() {
_shutdownInProgress.set(true);
}

public void stop() {
if (_httpServer != null) {
LOGGER.info("Shutting down http server");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
@SuppressWarnings("unused")
public abstract class BaseBrokerStarter implements ServiceStartable {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerStarter.class);
public static final String READINESS_CALLBACK_SUFFIX = "-readiness";

protected PinotConfiguration _brokerConf;
protected List<ListenerConfig> _listenerConfigs;
Expand Down Expand Up @@ -701,6 +702,16 @@ private void registerServiceStatusHandler() {
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup))));

/*
* Register readiness callbacks for /health/readiness endpoint.
* TenantTagReadinessCallback must precede RoutingReadinessCallback because an untagged broker
* has no tables assigned, so RoutingReadinessCallback would return GOOD prematurely.
*/
ServiceStatus.setServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX,
new ServiceStatus.MultipleCallbackServiceStatusCallback(List.of(
new TenantTagReadinessCallback(_participantHelixManager, _clusterName, _instanceId),
new RoutingReadinessCallback(_helixAdmin, _routingManager, _clusterName, _instanceId))));
}

private String getDefaultBrokerId() {
Expand Down Expand Up @@ -735,6 +746,7 @@ private boolean updatePortIfNeeded(Map<String, String> instanceConfigSimpleField
public void stop() {
LOGGER.info("Shutting down Pinot broker");
_isShuttingDown = true;
_brokerAdminApplication.startShuttingDown();

LOGGER.info("Disconnecting participant Helix manager");
_participantHelixManager.disconnect();
Expand Down Expand Up @@ -781,6 +793,7 @@ public void stop() {

LOGGER.info("Deregistering service status handler");
ServiceStatus.removeServiceStatusCallback(_instanceId);
ServiceStatus.removeServiceStatusCallback(_instanceId + READINESS_CALLBACK_SUFFIX);
LOGGER.info("Shutdown Broker Metrics Registry");
_metricsRegistry.shutdown();
LOGGER.info("Finish shutting down Pinot broker for {}", _instanceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker.helix;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.spi.utils.CommonConstants.Helix;

import static java.util.Objects.requireNonNull;


/**
* Service status callback that checks whether routing is ready for all tables assigned to this broker.
* Returns STARTING if any assigned table is missing routing entries.
* Returns GOOD once routing exists for all assigned tables or if no tables are assigned.
*/
public class RoutingReadinessCallback implements ServiceStatus.ServiceStatusCallback {

private static final String STATUS_EXTERNAL_VIEW_NOT_FOUND = "Broker resource external view not found";

private final HelixAdmin _helixAdmin;
private final RoutingManager _routingManager;
private final String _clusterName;
private final String _instanceId;
private volatile ServiceStatus.Status _serviceStatus = ServiceStatus.Status.STARTING;

public RoutingReadinessCallback(HelixAdmin helixAdmin, RoutingManager routingManager,
String clusterName, String instanceId) {
_helixAdmin = requireNonNull(helixAdmin, "helixAdmin");
_routingManager = requireNonNull(routingManager, "routingManager");
_clusterName = requireNonNull(clusterName, "clusterName");
_instanceId = requireNonNull(instanceId, "instanceId");
}

@Override
public synchronized ServiceStatus.Status getServiceStatus() {
// Return cached GOOD status to avoid re-checking
if (_serviceStatus == ServiceStatus.Status.GOOD) {
return ServiceStatus.Status.GOOD;
}

ExternalView brokerResourceExternalView =
_helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE);

if (brokerResourceExternalView == null) {
return ServiceStatus.Status.STARTING;
}

// Find all tables this broker instance is online for
List<String> onlineTables = getOnlineTables(brokerResourceExternalView);

// If no tables are online, consider it GOOD
if (onlineTables.isEmpty()) {
_serviceStatus = ServiceStatus.Status.GOOD;
return _serviceStatus;
}

// Check routing exists for all online tables
for (String tableName : onlineTables) {
if (!_routingManager.routingExists(tableName)) {
return ServiceStatus.Status.STARTING;
}
}

_serviceStatus = ServiceStatus.Status.GOOD;
return _serviceStatus;
}

@Override
public synchronized String getStatusDescription() {
if (_serviceStatus == ServiceStatus.Status.GOOD) {
return ServiceStatus.STATUS_DESCRIPTION_NONE;
}

ExternalView brokerResourceExternalView =
_helixAdmin.getResourceExternalView(_clusterName, Helix.BROKER_RESOURCE_INSTANCE);

if (brokerResourceExternalView == null) {
return STATUS_EXTERNAL_VIEW_NOT_FOUND;
}

List<String> onlineTables = getOnlineTables(brokerResourceExternalView);

if (onlineTables.isEmpty()) {
return ServiceStatus.STATUS_DESCRIPTION_NONE;
}

// Find tables missing routing
List<String> missingRoutingTables = new ArrayList<>();
for (String tableName : onlineTables) {
if (!_routingManager.routingExists(tableName)) {
missingRoutingTables.add(tableName);
}
}

if (missingRoutingTables.isEmpty()) {
return ServiceStatus.STATUS_DESCRIPTION_NONE;
}

return String.format("Waiting for routing to be ready for %d/%d tables: %s",
missingRoutingTables.size(), onlineTables.size(), missingRoutingTables);
}

private List<String> getOnlineTables(ExternalView brokerResourceExternalView) {
List<String> onlineTables = new ArrayList<>();
for (String tableName : brokerResourceExternalView.getPartitionSet()) {
Map<String, String> stateMap = brokerResourceExternalView.getStateMap(tableName);
if (stateMap != null && stateMap.containsKey(_instanceId)) {
onlineTables.add(tableName);
}
}
return onlineTables;
}
}
Loading
Loading