Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ public enum Property {
"The number of threads used to run fault-tolerant executions (FATE)."
+ " These are primarily table operations like merge.",
"1.4.3"),
MANAGER_TSERVER_HALT_DURATION("manager.tservers.halt.grace.period", "0",
PropertyType.TIMEDURATION,
"Allows the manager to force tserver halting by setting the max duration of time spent attempting to halt a tserver "
+ " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.",
"2.1.5"),
@Deprecated(since = "2.1.0")
MANAGER_REPLICATION_SCAN_INTERVAL("manager.replication.status.scan.interval", "30s",
PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,9 @@ public long getSessionId() throws KeeperException, InterruptedException {
/**
* This method will delete all server locks for a given path according the predicate conditions.
*
* @param zk zookeeper instance
* @param zPath can be a path directly to a host or a general path like @{link
* org.apache.accumulo.core.Constants.ZTSERVERS} or a resource group
* @param hostPortPredicate conditional predicate for determining if the lock should be removed.
* @param messageOutput function for setting where the output from the lockPath goes
* @param dryRun allows lock format validation and the messageOutput to be sent without actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.manager.metrics.BalancerMetrics;
Expand Down Expand Up @@ -195,6 +196,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener,
final AuditedSecurityOperation security;
final Map<TServerInstance,AtomicInteger> badServers =
Collections.synchronizedMap(new HashMap<>());
final Map<TServerInstance,GracefulHaltTimer> tserverHaltRpcAttempts =
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
final EventCoordinator nextEvent = new EventCoordinator();
Expand Down Expand Up @@ -1143,6 +1146,33 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,

}

/**
* This class tracks the duration of a haltRPC and is used to determine when the manager should
* delete the server zLock.
*/
private static class GracefulHaltTimer {

Duration maxHaltGraceDuration;
Timer timer;

public GracefulHaltTimer(AccumuloConfiguration config) {
timer = null;
maxHaltGraceDuration =
Duration.ofMillis(config.getTimeInMillis(Property.MANAGER_TSERVER_HALT_DURATION));
}

public synchronized void startTimer() {
if (timer == null) {
timer = Timer.startNew();
}
}

public synchronized boolean shouldForceHalt() {
return maxHaltGraceDuration.toMillis() != 0 && timer != null
&& timer.hasElapsed(maxHaltGraceDuration);
}
}

private SortedMap<TServerInstance,TabletServerStatus>
gatherTableInformation(Set<TServerInstance> currentServers) {
final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
Expand Down Expand Up @@ -1192,15 +1222,35 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
> MAX_BAD_STATUS_COUNT) {
if (shutdownServerRateLimiter.tryAcquire()) {
log.warn("attempting to stop {}", server);
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
var gracefulHaltTimer = tserverHaltRpcAttempts.computeIfAbsent(server,
s -> new GracefulHaltTimer(getConfiguration()));
if (gracefulHaltTimer.shouldForceHalt()) {
log.warn("tserver {} is not responding to halt requests, deleting zlock", server);
var zk = getContext().getZooReaderWriter();
var iid = getContext().getInstanceID();
String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
try {
ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals,
log::info, false);
tserverHaltRpcAttempts.remove(server);
badServers.remove(server);
} catch (KeeperException | InterruptedException e) {
log.error("Failed to delete zlock for server {}", server, e);
}
} else {
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
}
} catch (TTransportException e1) {
// ignore: it's probably down so log the exception at trace
log.trace("error attempting to halt tablet server {}", server, e1);
} catch (Exception e2) {
log.info("error talking to troublesome tablet server {}", server, e2);
} finally {
gracefulHaltTimer.startTimer();
}
} catch (TTransportException e1) {
// ignore: it's probably down
} catch (Exception e2) {
log.info("error talking to troublesome tablet server", e2);
}
} else {
log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server,
Expand All @@ -1227,6 +1277,12 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
badServers.keySet().retainAll(currentServers);
badServers.keySet().removeAll(info.keySet());
}

synchronized (tserverHaltRpcAttempts) {
tserverHaltRpcAttempts.keySet().retainAll(currentServers);
tserverHaltRpcAttempts.keySet().removeAll(info.keySet());
}

log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds",
info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));

Expand Down Expand Up @@ -1729,14 +1785,17 @@ public void update(LiveTServerSet current, Set<TServerInstance> deleted,
}
serversToShutdown.removeAll(deleted);
badServers.keySet().removeAll(deleted);
tserverHaltRpcAttempts.keySet().removeAll(deleted);
// clear out any bad server with the same host/port as a new server
synchronized (badServers) {
cleanListByHostAndPort(badServers.keySet(), deleted, added);
}
synchronized (serversToShutdown) {
cleanListByHostAndPort(serversToShutdown, deleted, added);
}

synchronized (tserverHaltRpcAttempts) {
cleanListByHostAndPort(tserverHaltRpcAttempts.keySet(), deleted, added);
}
migrations.removeServers(deleted);
nextEvent.event("There are now %d tablet servers", current.size());
}
Expand Down