diff options
author | Håkon Hallingstad <hakon@yahooinc.com> | 2021-10-15 16:59:45 +0200 |
---|---|---|
committer | Håkon Hallingstad <hakon@yahooinc.com> | 2021-10-15 16:59:45 +0200 |
commit | 4c1ca8d9b431c59895666256ad6e47193789b2b4 (patch) | |
tree | 4d4ae9ab2f2e1ccda7911e4cef2da3272a235989 /clustercontroller-core | |
parent | 5f13963aed2a5fe94d2d384954adf5c59e5231af (diff) |
Improve logging of FleetController and DatabaseHandler
Diffstat (limited to 'clustercontroller-core')
20 files changed, 370 insertions, 255 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Context.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Context.java new file mode 100644 index 00000000000..fbe252f3331 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Context.java @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Context for FleetController and all instances 1:1 with the FleetController. + * + * @author hakon + */ +public interface Context { + FleetControllerId id(); + + default void log(Logger logger, Level level, String message) { log(logger, level, () -> message); } + void log(Logger logger, Level level, String message, Throwable t); + void log(Logger logger, Level level, Supplier<String> message); +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ContextImpl.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ContextImpl.java new file mode 100644 index 00000000000..7dbb3ef9c67 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ContextImpl.java @@ -0,0 +1,36 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author hakon + */ +public class ContextImpl implements Context { + private final FleetControllerId id; + + public ContextImpl(FleetControllerOptions options) { + this(FleetControllerId.fromOptions(options)); + } + + public ContextImpl(FleetControllerId id) { + this.id = id; + } + + @Override + public FleetControllerId id() { return id; } + + @Override + public void log(Logger logger, Level level, String message, Throwable t) { + logger.log(level, withLogPrefix(message), t); + } + + @Override + public void log(Logger logger, Level level, Supplier<String> message) { + logger.log(level, () -> withLogPrefix(message.get())); + } + + protected String withLogPrefix(String message) { return "Cluster '" + id.clusterName() + "': " + message; } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 20b62576ff5..f59b4111f37 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -50,8 +50,9 @@ import java.util.stream.Stream; public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener, Runnable, RemoteClusterControllerTaskScheduler { - private static final Logger log = Logger.getLogger(FleetController.class.getName()); + private static final Logger logger = Logger.getLogger(FleetController.class.getName()); + private final Context context; private final Timer timer; private final Object monitor; private final EventLog eventLog; @@ -70,7 +71,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final AtomicBoolean running = new AtomicBoolean(true); private FleetControllerOptions options; private FleetControllerOptions nextOptions; - private final int configuredIndex; private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>(); private boolean processingCycle = false; private boolean wantedStateChanged = false; @@ -83,8 +83,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter(); private final List<ClusterStateBundle> newStates = new ArrayList<>(); private final List<ClusterStateBundle> convergedStates = new ArrayList<>(); - private long configGeneration = -1; - private long nextConfigGeneration = -1; private final Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>(); private final MetricUpdater metricUpdater; @@ -107,12 +105,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public FleetControllerOptions getOptions() { return options; } @Override - public long getConfigGeneration() { return configGeneration; } + public long getConfigGeneration() { return 0; } @Override public ContentCluster getCluster() { return cluster; } }; - public FleetController(Timer timer, + public FleetController(Context context, + Timer timer, EventLog eventLog, ContentCluster cluster, NodeStateGatherer nodeStateGatherer, @@ -126,8 +125,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd MasterElectionHandler masterElectionHandler, MetricUpdater metricUpdater, FleetControllerOptions options) { - log.info("Starting up cluster controller " + options.fleetControllerIndex + " for cluster " + cluster.getName()); - this.configuredIndex = options.fleetControllerIndex; + context.log(logger, Level.INFO, "Created"); + this.context = context; this.timer = timer; this.monitor = timer; this.eventLog = eventLog; @@ -169,15 +168,16 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public static FleetController create(FleetControllerOptions options, StatusPageServerInterface statusPageServer, MetricReporter metricReporter) throws Exception { - Timer timer = new RealTimer(); - MetricUpdater metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex, options.clusterName); - EventLog log = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster( + var context = new ContextImpl(options); + var timer = new RealTimer(); + var metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex, options.clusterName); + var log = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster( options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); - Communicator communicator = new RPCCommunicator( + var stateGatherer = new NodeStateGatherer(timer, timer, log); + var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, @@ -185,13 +185,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - NodeLookup lookUp = new SlobrokClient(timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, log); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - FleetController controller = new FleetController( - timer, log, cluster, stateGatherer, communicator, statusPageServer, null, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var lookUp = new SlobrokClient(timer); + var stateGenerator = new StateChangeHandler(timer, log); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, + statusPageServer, null, lookUp, database, stateGenerator, + stateBroadcaster, masterElectionHandler, metricUpdater, options); controller.start(); return controller; } @@ -227,7 +228,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public void schedule(RemoteClusterControllerTask task) { synchronized (monitor) { - log.fine("Scheduled remote task " + task.getClass().getName() + " for execution"); + context.log(logger, Level.FINE, "Scheduled remote task " + task.getClass().getName() + " for execution"); remoteTasks.add(task); } } @@ -280,12 +281,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public void shutdown() throws InterruptedException, java.io.IOException { if (runner != null && isRunning()) { - log.log(Level.INFO, "Joining event thread."); + context.log(logger, Level.INFO, "Joining event thread."); running.set(false); synchronized(monitor) { monitor.notifyAll(); } runner.join(); } - log.log(Level.INFO, "Fleetcontroller done shutting down event thread."); + context.log(logger, Level.INFO, "FleetController done shutting down event thread."); controllerThreadId = Thread.currentThread().getId(); database.shutdown(databaseContext); @@ -299,12 +300,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd nodeLookup.shutdown(); } - public void updateOptions(FleetControllerOptions options, long configGeneration) { + public void updateOptions(FleetControllerOptions options) { + var newId = FleetControllerId.fromOptions(options); synchronized(monitor) { - assert(this.options.fleetControllerIndex == options.fleetControllerIndex); - log.log(Level.INFO, "Fleetcontroller " + options.fleetControllerIndex + " has new options"); + assert newId.equals(context.id()); + context.log(logger, Level.INFO, "FleetController has new options"); nextOptions = options.clone(); - nextConfigGeneration = configGeneration; monitor.notifyAll(); } } @@ -348,8 +349,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd var previouslyExhausted = calc.enumerateNodeResourceExhaustions(nodeInfo); var nowExhausted = calc.resourceExhaustionsFromHostInfo(nodeInfo, newHostInfo); if (!previouslyExhausted.equals(nowExhausted)) { - log.fine(() -> String.format("Triggering state recomputation due to change in cluster feed block: %s -> %s", - previouslyExhausted, nowExhausted)); + context.log(logger, Level.FINE, () -> String.format("Triggering state recomputation due to change in cluster feed block: %s -> %s", + previouslyExhausted, nowExhausted)); stateChangeHandler.setStateChangedFlag(); } } @@ -421,7 +422,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd /** * This function gives data of the current state in master election. - * The keys in the given map are indexes of fleet controllers. + * The keys in the given map are indices of fleet controllers. * The values are what fleetcontroller that fleetcontroller wants to * become master. * @@ -431,7 +432,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd */ public void handleFleetData(Map<Integer, Integer> data) { verifyInControllerThread(); - log.log(Level.FINEST, "Sending fleet data event on to master election handler"); + context.log(logger, Level.FINEST, "Sending fleet data event on to master election handler"); metricUpdater.updateMasterElectionMetrics(data); masterElectionHandler.handleFleetData(data); } @@ -466,12 +467,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } /** Called when all distributors have acked newest cluster state version. */ - public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { + public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); // TODO wouldn't it be better to always get bundle information from the state broadcaster? var currentBundle = stateVersionTracker.getVersionedClusterStateBundle(); - log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentBundle.getVersion())); - stateChangeHandler.handleAllDistributorsInSync(currentBundle.getBaselineClusterState(), nodes, database, context); + context.log(logger, Level.FINE, () -> String.format("All distributors have ACKed cluster state version %d", currentBundle.getVersion())); + stateChangeHandler.handleAllDistributorsInSync(currentBundle.getBaselineClusterState(), nodes, database, dbContext); convergedStates.add(currentBundle); } @@ -531,9 +532,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try{ rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs, options.rpcPort); } catch (ListenFailedException e) { - log.log(Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort +". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort + ". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } catch (Exception e) { - log.log(Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); } } @@ -541,21 +542,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try{ statusPageServer.setPort(options.httpPort); } catch (Exception e) { - log.log(Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } } long currentTime = timer.getCurrentTimeInMillis(); nextStateSendTime = Math.min(currentTime + options.minTimeBetweenNewSystemStates, nextStateSendTime); - configGeneration = nextConfigGeneration; - nextConfigGeneration = -1; } private void selfTerminateIfConfiguredNodeIndexHasChanged() { - if (options.fleetControllerIndex != configuredIndex) { - log.warning(String.format("Got new configuration where CC index has changed from %d to %d. We do not support "+ - "doing this live; immediately exiting now to force new configuration", - configuredIndex, options.fleetControllerIndex)); + var newId = new FleetControllerId(options.clusterName, options.fleetControllerIndex); + if (!newId.equals(context.id())) { + context.log(logger, Level.WARNING, context.id() + " got new configuration for " + newId + ". We do not support doing this live; " + + "immediately exiting now to force new configuration"); prepareShutdownEdge(); System.exit(1); } @@ -565,7 +564,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd verifyInControllerThread(); StatusPageResponse.ResponseCode responseCode; String message; - String hiddenMessage = ""; + final String hiddenMessage; try { StatusPageServer.RequestHandler handler = statusRequestRouter.resolveHandler(httpRequest); if (handler == null) { @@ -575,12 +574,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (FileNotFoundException e) { responseCode = StatusPageResponse.ResponseCode.NOT_FOUND; message = e.getMessage(); + hiddenMessage = ""; } catch (Exception e) { responseCode = StatusPageResponse.ResponseCode.INTERNAL_SERVER_ERROR; message = "Internal Server Error"; hiddenMessage = ExceptionUtils.getStackTraceAsString(e); - if (log.isLoggable(Level.FINE)) - log.log(Level.FINE, "Unknown exception thrown for request " + httpRequest.getRequest() + ": " + hiddenMessage); + context.log(logger, Level.FINE, () -> "Unknown exception thrown for request " + httpRequest.getRequest() + ": " + hiddenMessage); } TimeZone tz = TimeZone.getTimeZone("UTC"); @@ -664,7 +663,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (InterruptedException e) { throw new RuntimeException(e); } catch (Exception e) { - log.log(Level.WARNING, "Failed to watch master election: " + e.toString()); + context.log(logger, Level.WARNING, "Failed to watch master election: " + e); } return false; } @@ -683,7 +682,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try { propagateOptions(); } catch (Exception e) { - log.log(Level.SEVERE, "Failed to handle new fleet controller config", e); + context.log(logger, Level.SEVERE, "Failed to handle new fleet controller config", e); } } @@ -702,7 +701,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // If there's a pending DB store we have not yet been able to store the // current state bundle to ZK and must therefore _not_ allow it to be published. if (database.hasPendingClusterStateMetaDataStore()) { - log.log(Level.FINE, "Can't publish current cluster state as it has one or more pending ZooKeeper stores"); + context.log(logger, Level.FINE, "Can't publish current cluster state as it has one or more pending ZooKeeper stores"); return false; } boolean sentAny = false; @@ -713,7 +712,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd && currentTime >= nextStateSendTime) { if (inMasterMoratorium) { - log.info(currentTime < firstAllowedStateBroadcast ? + context.log(logger, Level.INFO, currentTime < firstAllowedStateBroadcast ? "Master moratorium complete: all nodes have reported in" : "Master moratorium complete: timed out waiting for all nodes to report in"); // Reset firstAllowedStateBroadcast to make sure all future times are after firstAllowedStateBroadcast @@ -763,14 +762,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return false; } - final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); - log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName())); - task.doRemoteFleetControllerTask(context); + final RemoteClusterControllerTask.Context taskContext = createRemoteTaskProcessingContext(); + context.log(logger, Level.FINEST, () -> String.format("Processing remote task of type '%s'", task.getClass().getName())); + task.doRemoteFleetControllerTask(taskContext); if (taskMayBeCompletedImmediately(task)) { - log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); + context.log(logger, Level.FINEST, () -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); task.notifyCompleted(); } else { - log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); + context.log(logger, Level.FINEST, () -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); tasksPendingStateRecompute.add(task); } @@ -849,14 +848,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek(); // TODO expose and use monotonic clock instead of system clock if (publishedVersion >= taskCompletion.getMinimumVersion()) { - log.fine(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", - taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); + context.log(logger, Level.FINE, () -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", + taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); taskCompletion.getTask().notifyCompleted(); taskCompletionQueue.remove(); } else if (taskCompletion.getDeadlineTimePointMs() <= now) { var details = buildNodesNotYetConvergedMessage(taskCompletion.getMinimumVersion()); - log.log(Level.WARNING, () -> String.format("Deferred task of type '%s' has exceeded wait deadline; completing with failure (details: %s)", - taskCompletion.getTask().getClass().getName(), details)); + context.log(logger, Level.WARNING, () -> String.format("Deferred task of type '%s' has exceeded wait deadline; completing with failure (details: %s)", + taskCompletion.getTask().getClass().getName(), details)); taskCompletion.getTask().handleFailure(RemoteClusterControllerTask.Failure.of( RemoteClusterControllerTask.FailureCondition.DEADLINE_EXCEEDED, details)); taskCompletion.getTask().notifyCompleted(); @@ -1007,8 +1006,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // TODO expose and use monotonic clock instead of system clock final long maxDeadlineTimePointMs = timer.getCurrentTimeInMillis() + options.getMaxDeferredTaskVersionWaitTime().toMillis(); for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { - log.finest(() -> String.format("Adding task of type '%s' to be completed at version %d", - task.getClass().getName(), completeAtVersion)); + context.log(logger, Level.FINEST, () -> String.format("Adding task of type '%s' to be completed at version %d", + task.getClass().getName(), completeAtVersion)); taskCompletionQueue.add(new VersionDependentTaskCompletion(completeAtVersion, task, maxDeadlineTimePointMs)); } tasksPendingStateRecompute.clear(); @@ -1085,7 +1084,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd database.loadWantedStates(databaseContext); // TODO determine if we need any specialized handling here if feed block is set in the loaded bundle - log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle)); + context.log(logger, Level.INFO, () -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle)); stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle); eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " @@ -1094,8 +1093,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; isMaster = true; inMasterMoratorium = true; - log.log(Level.FINE, () -> "At time " + currentTime + " we set first system state broadcast time to be " - + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); + context.log(logger, Level.FINE, () -> "At time " + currentTime + " we set first system state broadcast time to be " + + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); didWork = true; } if (wantedStateChanged) { @@ -1123,16 +1122,17 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public void run() { controllerThreadId = Thread.currentThread().getId(); + context.log(logger, Level.INFO, "Starting ticks"); try { processingCycle = true; while (isRunning()) { tick(); } } catch (InterruptedException e) { - log.log(Level.FINE, () -> "Event thread stopped by interrupt exception: " + e); + context.log(logger, Level.FINE, () -> "Event thread stopped by interrupt exception: " + e); } catch (Throwable t) { t.printStackTrace(); - log.log(Level.SEVERE, "Fatal error killed fleet controller", t); + context.log(logger, Level.SEVERE, "Fatal error killed fleet controller", t); synchronized (monitor) { running.set(false); } System.exit(1); } finally { @@ -1146,7 +1146,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (monitor) { monitor.notifyAll(); } } - public DatabaseHandler.Context databaseContext = new DatabaseHandler.Context() { + public DatabaseHandler.DatabaseContext databaseContext = new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return cluster; } @Override @@ -1191,7 +1191,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } if (ackedNodes >= nodeCount) { - log.log(Level.INFO, ackedNodes + " nodes now have acked system state " + version + " or higher."); + context.log(logger, Level.INFO, ackedNodes + " nodes now have acked system state " + version + " or higher."); return; } long remainingTime = maxTime - System.currentTimeMillis(); @@ -1239,5 +1239,4 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public EventLog getEventLog() { return eventLog; } - } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java new file mode 100644 index 00000000000..fd0784d69c9 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java @@ -0,0 +1,44 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.Objects; + +/** + * Uniquely identifies a running FleetController: cluster name + index. + * + * @author hakon + */ +public class FleetControllerId { + private final String clusterName; + private final int index; + + public static FleetControllerId fromOptions(FleetControllerOptions options) { + return new FleetControllerId(options.clusterName, options.fleetControllerIndex); + } + + public FleetControllerId(String clusterName, int index) { + this.clusterName = clusterName; + this.index = index; + } + + public String clusterName() { return clusterName; } + public int index() { return index; } + + @Override + public String toString() { + return "FleetController " + index + " for cluster '" + clusterName + '\''; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FleetControllerId that = (FleetControllerId) o; + return index == that.index && Objects.equals(clusterName, that.clusterName); + } + + @Override + public int hashCode() { + return Objects.hash(clusterName, index); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java index 9ae3d543121..4c2187eca4d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java @@ -12,8 +12,9 @@ import java.util.logging.Logger; */ public class MasterElectionHandler implements MasterInterface { - private static final Logger log = Logger.getLogger(MasterElectionHandler.class.getName()); + private static final Logger logger = Logger.getLogger(MasterElectionHandler.class.getName()); + private final Context context; private final Object monitor; private final Timer timer; private final int index; @@ -27,7 +28,8 @@ public class MasterElectionHandler implements MasterInterface { private long masterZooKeeperCooldownPeriod; // The period in ms that we won't take over unless master come back. private boolean usingZooKeeper = false; // Unit tests may not use ZooKeeper at all. - public MasterElectionHandler(int index, int totalCount, Object monitor, Timer timer) { + public MasterElectionHandler(Context context, int index, int totalCount, Object monitor, Timer timer) { + this.context = context; this.monitor = monitor; this.timer = timer; this.index = index; @@ -35,7 +37,7 @@ public class MasterElectionHandler implements MasterInterface { this.nextInLineCount = Integer.MAX_VALUE; // Only a given set of nodes can ever become master if (index > (totalCount - 1) / 2) { - log.log(Level.FINE, () -> "Cluster controller " + index + ": We can never become master and will always stay a follower."); + context.log(logger, Level.FINE, () -> "We can never become master and will always stay a follower."); } // Tag current time as when we have not seen any other master. Make sure we're not taking over at once for master that is on the way down masterGoneFromZooKeeperTime = timer.getCurrentTimeInMillis(); @@ -119,38 +121,38 @@ public class MasterElectionHandler implements MasterInterface { public boolean isAmongNthFirst(int first) { return (nextInLineCount < first); } public boolean watchMasterElection(DatabaseHandler database, - DatabaseHandler.Context dbContext) throws InterruptedException { + DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { if (totalCount == 1 && !usingZooKeeper) { return false; // Allow single configured node to become master implicitly if no ZK configured } if (nextMasterData == null) { if (masterCandidate == null) { - log.log(Level.FINEST, () -> "Cluster controller " + index + ": No current master candidate. Waiting for data to do master election."); + context.log(logger, Level.FINEST, () -> "No current master candidate. Waiting for data to do master election."); } return false; // Nothing have happened since last time. } // Move next data to temporary, such that we don't need to keep lock, and such that we don't retry // if we happen to fail processing the data. Map<Integer, Integer> state; - log.log(Level.INFO, "Cluster controller " + index + ": Handling new master election, as we have received " + nextMasterData.size() + " entries"); + context.log(logger, Level.INFO, "Handling new master election, as we have received " + nextMasterData.size() + " entries"); synchronized (monitor) { state = nextMasterData; nextMasterData = null; } - log.log(Level.INFO, "Cluster controller " + index + ": Got master election state " + toString(state) + "."); + context.log(logger, Level.INFO, "Got master election state " + toString(state) + "."); if (state.isEmpty()) throw new IllegalStateException("Database has no master data. We should at least have data for ourselves."); Map.Entry<Integer, Integer> first = state.entrySet().iterator().next(); Integer currentMaster = getMaster(); if (currentMaster != null && first.getKey().intValue() != currentMaster.intValue()) { - log.log(Level.INFO, "Cluster controller " + index + ": Master gone from ZooKeeper. Tagging timestamp. Will wait " + this.masterZooKeeperCooldownPeriod + " ms."); + context.log(logger, Level.INFO, "Master gone from ZooKeeper. Tagging timestamp. Will wait " + this.masterZooKeeperCooldownPeriod + " ms."); masterGoneFromZooKeeperTime = timer.getCurrentTimeInMillis(); masterCandidate = null; } if (first.getValue().intValue() != first.getKey().intValue()) { - log.log(Level.INFO, "Fleet controller " + index + ": First index is not currently trying to become master. Waiting for it to change state"); + context.log(logger, Level.INFO, "First index is not currently trying to become master. Waiting for it to change state"); masterCandidate = null; if (first.getKey() == index) { - log.log(Level.INFO, "Cluster controller " + index + ": We are next in line to become master. Altering our state to look for followers"); + context.log(logger, Level.INFO, "We are next in line to become master. Altering our state to look for followers"); database.setMasterVote(dbContext, index); } } else { @@ -164,21 +166,21 @@ public class MasterElectionHandler implements MasterInterface { if (2 * followers > totalCount) { Integer newMaster = getMaster(); if (newMaster != null && currentMaster != null && newMaster.intValue() == currentMaster.intValue()) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + currentMaster + " is still the master"); + context.log(logger, Level.INFO, currentMaster + " is still the master"); } else if (newMaster != null && currentMaster != null) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + newMaster + " took over for fleet controller " + currentMaster + " as master"); + context.log(logger, Level.INFO, newMaster + " took over for fleet controller " + currentMaster + " as master"); } else if (newMaster == null) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + masterCandidate + " is new master candidate, but needs to wait before it can take over"); + context.log(logger, Level.INFO, masterCandidate + " is new master candidate, but needs to wait before it can take over"); } else { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + newMaster + " is newly elected master"); + context.log(logger, Level.INFO, newMaster + " is newly elected master"); } } else { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": Currently too few followers for cluster controller candidate " + masterCandidate + ". No current master. (" + followers + "/" + totalCount + " followers)"); + context.log(logger, Level.INFO, "Currently too few followers for cluster controller candidate " + masterCandidate + ". No current master. (" + followers + "/" + totalCount + " followers)"); } Integer ourState = state.get(index); if (ourState == null) throw new IllegalStateException("Database lacks data from ourselves. This should always be present."); if (ourState.intValue() != first.getKey().intValue()) { - log.log(Level.INFO, "Cluster controller " + index + ": Altering our state to follow new fleet controller master candidate " + first.getKey()); + context.log(logger, Level.INFO, "Altering our state to follow new fleet controller master candidate " + first.getKey()); database.setMasterVote(dbContext, first.getKey()); } } @@ -195,7 +197,7 @@ public class MasterElectionHandler implements MasterInterface { if (nextInLineCount != ourPosition) { nextInLineCount = ourPosition; if (ourPosition > 0) { - log.log(Level.FINE, () -> "Cluster controller " + index + ": We are now " + getPosition(nextInLineCount) + " in queue to take over being master."); + context.log(logger, Level.FINE, () -> "We are now " + getPosition(nextInLineCount) + " in queue to take over being master."); } } } @@ -225,7 +227,7 @@ public class MasterElectionHandler implements MasterInterface { } public void handleFleetData(Map<Integer, Integer> data) { - log.log(Level.INFO, "Cluster controller " + index + ": Got new fleet data with " + data.size() + " entries: " + data); + context.log(logger, Level.INFO, "Got new fleet data with " + data.size() + " entries: " + data); synchronized (monitor) { nextMasterData = data; monitor.notifyAll(); @@ -234,7 +236,7 @@ public class MasterElectionHandler implements MasterInterface { public void lostDatabaseConnection() { if (totalCount > 1 || usingZooKeeper) { - log.log(Level.INFO, "Cluster controller " + index + ": Clearing master data as we lost connection on node " + index); + context.log(logger, Level.INFO, "Clearing master data as we lost connection on node " + index); resetElectionProgress(); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java index 4a57be7ddf1..5035ed1aa88 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -50,7 +50,7 @@ public class StateChangeHandler { public void handleAllDistributorsInSync(final ClusterState currentState, final Set<ConfiguredNode> nodes, final DatabaseHandler database, - final DatabaseHandler.Context dbContext) throws InterruptedException { + final DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { int startTimestampsReset = 0; log.log(Level.FINE, "handleAllDistributorsInSync invoked for state version %d", currentState.getVersion()); for (NodeType nodeType : NodeType.getTypes()) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 9a0ac182017..bc8d84c4634 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -178,7 +178,7 @@ public class SystemStateBroadcaster { return nodeIsReachable(node); } - private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterStateBundle) .filter(node -> !newestStateBundleAlreadySentToNode(node)) @@ -186,7 +186,7 @@ public class SystemStateBroadcaster { } // Precondition: no nodes in the cluster need to receive the current cluster state version bundle - private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterStateActivation) .filter(node -> !newestStateActivationAlreadySentToNode(node)) @@ -207,7 +207,7 @@ public class SystemStateBroadcaster { * object and updates the broadcaster's last known in-sync cluster state version. */ void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, - DatabaseHandler.Context dbContext, + DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { if ((clusterStateBundle == null) || currentClusterStateIsConverged()) { return; // Nothing to do for the current state @@ -248,7 +248,7 @@ public class SystemStateBroadcaster { lastStateVersionBundleAcked = clusterStateBundle.getVersion(); } - private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { + private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { log.log(Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); lastClusterStateVersionConverged = clusterStateBundle.getVersion(); lastClusterStateBundleConverged = clusterStateBundle; @@ -267,7 +267,7 @@ public class SystemStateBroadcaster { lastOfficialStateVersion = clusterStateBundle.getVersion(); } - public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator, + public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator, int lastClusterStateVersionWrittenToZooKeeper) { if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0) { return false; @@ -302,7 +302,7 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } - public boolean broadcastStateActivationsIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) { + public boolean broadcastStateActivationsIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator) { if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0 || !currentBundleVersionIsTaggedOfficial()) { return false; } @@ -331,7 +331,7 @@ public class SystemStateBroadcaster { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } - private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.Context dbContext) { + private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.DatabaseContext dbContext) { ClusterState newState = sourceState.clone(); for (NodeInfo n : dbContext.getCluster().getNodeInfo()) { NodeState ns = newState.getNodeState(n.getNode()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index c2574f4c6cf..ad436442fd6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -6,6 +6,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.ContentCluster; +import com.yahoo.vespa.clustercontroller.core.Context; import com.yahoo.vespa.clustercontroller.core.FleetController; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.Timer; @@ -26,9 +27,9 @@ import java.util.logging.Logger; */ public class DatabaseHandler { - private static final Logger log = Logger.getLogger(DatabaseHandler.class.getName()); + private static final Logger logger = Logger.getLogger(DatabaseHandler.class.getName()); - public interface Context { + public interface DatabaseContext { ContentCluster getCluster(); FleetController getFleetController(); NodeAddedOrRemovedListener getNodeAddedOrRemovedListener(); @@ -56,7 +57,7 @@ public class DatabaseHandler { } private class DatabaseListener implements Database.DatabaseListener { public void handleZooKeeperSessionDown() { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Lost contact with zookeeper server"); + context.log(logger, Level.FINE, () -> "Lost contact with zookeeper server"); synchronized(monitor) { lostZooKeeperConnectionEvent = true; monitor.notifyAll(); @@ -66,7 +67,7 @@ public class DatabaseHandler { public void handleMasterData(Map<Integer, Integer> data) { synchronized (monitor) { if (masterDataEvent != null && masterDataEvent.equals(data)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": New master data was the same as the last one. Not responding to it"); + context.log(logger, Level.FINE, () -> "New master data was the same as the last one. Not responding to it"); } else { masterDataEvent = data; } @@ -75,9 +76,9 @@ public class DatabaseHandler { } } + private final Context context; private final DatabaseFactory databaseFactory; private final Timer timer; - private final int nodeIndex; private final Object monitor; private String zooKeeperAddress; private int zooKeeperSessionTimeout = 5000; @@ -91,15 +92,14 @@ public class DatabaseHandler { private long lastZooKeeperConnectionAttempt = 0; private int minimumWaitBetweenFailedConnectionAttempts = 10000; private boolean lostZooKeeperConnectionEvent = false; - private boolean connectionEstablishmentIsAllowed = false; private Map<Integer, Integer> masterDataEvent = null; - public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException + public DatabaseHandler(Context context, DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, Object monitor) throws InterruptedException { + this.context = context; this.databaseFactory = databaseFactory; this.timer = timer; - this.nodeIndex = ourIndex; - pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves. + pendingStore.masterVote = context.id().index(); // To begin with we'll vote for ourselves. this.monitor = monitor; // TODO: Require non-null, not possible now since at least ClusterFeedBlockTest uses null address this.zooKeeperAddress = zooKeeperAddress; @@ -111,8 +111,8 @@ public class DatabaseHandler { } } - public void shutdown(Context context) { - relinquishDatabaseConnectivity(context); + public void shutdown(DatabaseContext databaseContext) { + relinquishDatabaseConnectivity(databaseContext); } public boolean isClosed() { return database == null || database.isClosed(); } @@ -125,21 +125,21 @@ public class DatabaseHandler { this.minimumWaitBetweenFailedConnectionAttempts = minimumWaitBetweenFailedConnectionAttempts; } - public void reset(Context context) { + public void reset(DatabaseContext databaseContext) { final boolean wasRunning; synchronized (databaseMonitor) { wasRunning = database != null; if (wasRunning) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Resetting database state"); + context.log(logger, Level.INFO, "Resetting database state"); database.close(); database = null; } } clearSessionMetaData(true); - context.getFleetController().lostDatabaseConnection(); + databaseContext.getFleetController().lostDatabaseConnection(); if (wasRunning) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Done resetting database state"); + context.log(logger, Level.INFO, "Done resetting database state"); } } @@ -157,24 +157,24 @@ public class DatabaseHandler { pendingStore.clearNonClusterStateFields(); } pendingStore.masterVote = currentVote; - log.log(Level.FINE, () -> "Cleared session metadata. Pending master vote is now " + pendingStore.masterVote); + context.log(logger, Level.FINE, () -> "Cleared session metadata. Pending master vote is now " + pendingStore.masterVote); } - public void setZooKeeperAddress(String address, Context context) { + public void setZooKeeperAddress(String address, DatabaseContext databaseContext) { if (address == null && zooKeeperAddress == null) return; if (address != null && address.equals(zooKeeperAddress)) return; if (zooKeeperAddress != null) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": " + (address == null ? "Stopped using ZooKeeper." : "Got new ZooKeeper address to use: " + address)); + context.log(logger, Level.INFO, "" + (address == null ? "Stopped using ZooKeeper." : "Got new ZooKeeper address to use: " + address)); } zooKeeperAddress = address; - reset(context); + reset(databaseContext); } - public void setZooKeeperSessionTimeout(int timeout, Context context) { + public void setZooKeeperSessionTimeout(int timeout, DatabaseContext databaseContext) { if (timeout == zooKeeperSessionTimeout) return; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Got new ZooKeeper session timeout of " + timeout + " milliseconds."); + context.log(logger, Level.FINE, () -> "Got new ZooKeeper session timeout of " + timeout + " milliseconds."); zooKeeperSessionTimeout = timeout; - reset(context); + reset(databaseContext); } private boolean usingZooKeeper() { return (zooKeeperAddress != null); } @@ -190,31 +190,30 @@ public class DatabaseHandler { // Don't clear pending state writes in case they were attempted prior to connect() // being called, but after receiving a database loss event. clearSessionMetaData(false); - log.log(Level.INFO, - "Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress); + context.log(logger, Level.INFO, "Setting up new ZooKeeper session at " + zooKeeperAddress); DatabaseFactory.Params params = new DatabaseFactory.Params() .cluster(cluster) - .nodeIndex(nodeIndex) + .nodeIndex(context.id().index()) .databaseAddress(zooKeeperAddress) .databaseSessionTimeout(zooKeeperSessionTimeout) .databaseListener(dbListener); database = databaseFactory.create(params); } } catch (KeeperException.NodeExistsException e) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server " - + "not seen old fleetcontroller instance disappear? It already exists. Will retry later: " + e.getMessage()); + context.log(logger, Level.FINE, () -> "Cannot create ephemeral fleetcontroller node. ZooKeeper server " + + "not seen old fleetcontroller instance disappear? It already exists. Will retry later: " + e.getMessage()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException.ConnectionLossException e) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to connect to ZooKeeper at " + zooKeeperAddress - + " with session timeout " + zooKeeperSessionTimeout + ": " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to connect to ZooKeeper at " + zooKeeperAddress + + " with session timeout " + zooKeeperSessionTimeout + ": " + e.getMessage()); } catch (Exception e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to connect to ZooKeeper at " + zooKeeperAddress - + " with session timeout " + zooKeeperSessionTimeout + ": " + sw); + context.log(logger, Level.WARNING, "Failed to connect to ZooKeeper at " + zooKeeperAddress + + " with session timeout " + zooKeeperSessionTimeout + ": " + sw); } - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Done setting up new ZooKeeper session at " + zooKeeperAddress); + context.log(logger, Level.INFO, "Done setting up new ZooKeeper session at " + zooKeeperAddress); } /** @@ -222,27 +221,27 @@ public class DatabaseHandler { * * @return true if we did or attempted any work. */ - public boolean doNextZooKeeperTask(Context context) { + public boolean doNextZooKeeperTask(DatabaseContext databaseContext) { boolean didWork = false; synchronized (monitor) { if (lostZooKeeperConnectionEvent) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": doNextZooKeeperTask(): lost connection"); - context.getFleetController().lostDatabaseConnection(); + context.log(logger, Level.FINE, () -> "doNextZooKeeperTask(): lost connection"); + databaseContext.getFleetController().lostDatabaseConnection(); lostZooKeeperConnectionEvent = false; didWork = true; if (masterDataEvent != null) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Had new master data queued on disconnect. Removing master data event"); + context.log(logger, Level.FINE, () -> "Had new master data queued on disconnect. Removing master data event"); masterDataEvent = null; } } if (masterDataEvent != null) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": doNextZooKeeperTask(): new master data"); - if (!masterDataEvent.containsKey(nodeIndex)) { + context.log(logger, Level.FINE, () -> "doNextZooKeeperTask(): new master data"); + if (!masterDataEvent.containsKey(context.id().index())) { Integer currentVote = (pendingStore.masterVote != null ? pendingStore.masterVote : currentlyStored.masterVote); assert(currentVote != null); - masterDataEvent.put(nodeIndex, currentVote); + masterDataEvent.put(context.id().index(), currentVote); } - context.getFleetController().handleFleetData(masterDataEvent); + databaseContext.getFleetController().handleFleetData(masterDataEvent); masterDataEvent = null; didWork = true; } @@ -253,7 +252,7 @@ public class DatabaseHandler { return false; // Not time to attempt connection yet. } didWork = true; - connect(context.getCluster(), currentTime); + connect(databaseContext.getCluster(), currentTime); } try { synchronized (databaseMonitor) { @@ -263,11 +262,11 @@ public class DatabaseHandler { didWork |= performZooKeeperWrites(); } } catch (CasWriteFailed e) { - log.log(Level.WARNING, String.format("CaS write to ZooKeeper failed, another controller " + - "has likely taken over ownership: %s", e.getMessage())); + context.log(logger, Level.WARNING, String.format("CaS write to ZooKeeper failed, another controller " + + "has likely taken over ownership: %s", e.getMessage())); // Clear DB and master election state. This shall trigger a full re-fetch of all // version and election-related metadata. - relinquishDatabaseConnectivity(context); + relinquishDatabaseConnectivity(databaseContext); } return didWork; } @@ -277,32 +276,31 @@ public class DatabaseHandler { return zooKeeperAddress != null; } - private void relinquishDatabaseConnectivity(Context context) { + private void relinquishDatabaseConnectivity(DatabaseContext databaseContext) { // reset() will handle both session clearing and trigger a database loss callback into the CC. - reset(context); + reset(databaseContext); } private boolean performZooKeeperWrites() { boolean didWork = false; if (pendingStore.masterVote != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store master vote " - + pendingStore.masterVote + " into zookeeper."); + context.log(logger, Level.FINE, () -> "Attempting to store master vote " + + pendingStore.masterVote + " into zookeeper."); if (database.storeMasterVote(pendingStore.masterVote)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Managed to store master vote " - + pendingStore.masterVote + " into zookeeper."); + context.log(logger, Level.FINE, () -> "Managed to store master vote " + + pendingStore.masterVote + " into zookeeper."); currentlyStored.masterVote = pendingStore.masterVote; pendingStore.masterVote = null; } else { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote"); + context.log(logger, Level.WARNING, "Failed to store master vote"); return true; } } if (pendingStore.lastSystemStateVersion != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex - + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion - + " into zookeeper."); + context.log(logger, Level.FINE, () -> "Attempting to store last system state version " + + pendingStore.lastSystemStateVersion + " into zookeeper."); if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; pendingStore.lastSystemStateVersion = null; @@ -312,8 +310,8 @@ public class DatabaseHandler { } if (pendingStore.startTimestamps != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.startTimestamps.size() + " start timestamps into zookeeper."); + context.log(logger, Level.FINE, () -> "Attempting to store " + pendingStore.startTimestamps.size() + + " start timestamps into zookeeper."); if (database.storeStartTimestamps(pendingStore.startTimestamps)) { currentlyStored.startTimestamps = pendingStore.startTimestamps; pendingStore.startTimestamps = null; @@ -323,8 +321,8 @@ public class DatabaseHandler { } if (pendingStore.wantedStates != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.wantedStates.size() + " wanted states into zookeeper."); + context.log(logger, Level.FINE, () -> "Attempting to store " + + pendingStore.wantedStates.size() + " wanted states into zookeeper."); if (database.storeWantedStates(pendingStore.wantedStates)) { currentlyStored.wantedStates = pendingStore.wantedStates; pendingStore.wantedStates = null; @@ -334,8 +332,8 @@ public class DatabaseHandler { } if (pendingStore.clusterStateBundle != null) { didWork = true; - log.fine(() -> String.format("Fleetcontroller %d: Attempting to store last cluster state bundle with version %d into zookeeper.", - nodeIndex, pendingStore.clusterStateBundle.getVersion())); + context.log(logger, Level.FINE, () -> "Attempting to store last cluster state bundle with version " + + pendingStore.clusterStateBundle.getVersion() + " into zookeeper."); if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) { lastKnownStateBundleVersionWrittenBySelf = pendingStore.clusterStateBundle.getVersion(); currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle; @@ -347,8 +345,8 @@ public class DatabaseHandler { return didWork; } - public void setMasterVote(Context context, int wantedMasterCandidate) throws InterruptedException { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking if master vote has been updated and need to be stored."); + public void setMasterVote(DatabaseContext databaseContext, int wantedMasterCandidate) throws InterruptedException { + context.log(logger, Level.FINE, () -> "Checking if master vote has been updated and need to be stored."); // Schedule a write if one of the following is true: // - There is already a pending vote to be written, that may have been written already without our knowledge // - We don't know what is actually stored now @@ -356,14 +354,14 @@ public class DatabaseHandler { if (pendingStore.masterVote != null || currentlyStored.masterVote == null || currentlyStored.masterVote != wantedMasterCandidate) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling master vote " + wantedMasterCandidate + " to be stored in zookeeper."); + context.log(logger, Level.FINE, () -> "Scheduling master vote " + wantedMasterCandidate + " to be stored in zookeeper."); pendingStore.masterVote = wantedMasterCandidate; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } - public void saveLatestSystemStateVersion(Context context, int version) throws InterruptedException { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking if latest system state version has been updated and need to be stored."); + public void saveLatestSystemStateVersion(DatabaseContext databaseContext, int version) throws InterruptedException { + context.log(logger, Level.FINE, () -> "Checking if latest system state version has been updated and need to be stored."); // Schedule a write if one of the following is true: // - There is already a pending vote to be written, that may have been written already without our knowledge // - We don't know what is actually stored now @@ -371,14 +369,14 @@ public class DatabaseHandler { if (pendingStore.lastSystemStateVersion != null || currentlyStored.lastSystemStateVersion == null || currentlyStored.lastSystemStateVersion != version) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling new last system state version " + version + " to be stored in zookeeper."); + context.log(logger, Level.FINE, () -> "Scheduling new last system state version " + version + " to be stored in zookeeper."); pendingStore.lastSystemStateVersion = version; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } public int getLatestSystemStateVersion() { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving latest system state version."); + context.log(logger, Level.FINE, () -> "Retrieving latest system state version."); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { currentlyStored.lastSystemStateVersion = database.retrieveLatestSystemStateVersion(); @@ -387,23 +385,23 @@ public class DatabaseHandler { Integer version = currentlyStored.lastSystemStateVersion; if (version == null) { if (usingZooKeeper()) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0."); + context.log(logger, Level.WARNING, "Failed to retrieve latest system state version from ZooKeeper. Returning version 0."); } return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component! } return version; } - public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) { - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle)); + public void saveLatestClusterStateBundle(DatabaseContext databaseContext, ClusterStateBundle clusterStateBundle) { + context.log(logger, Level.FINE, () -> "Scheduling bundle " + clusterStateBundle + " to be saved to ZooKeeper"); pendingStore.clusterStateBundle = clusterStateBundle; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); // FIXME this is a nasty hack to get around the fact that a massive amount of unit tests // set up the system with a null ZooKeeper server address. If we don't fake that we have // written the state version, the tests will never progress past waiting for state broadcasts. if (zooKeeperAddress == null) { - log.warning(() -> String.format("Fleetcontroller %d: Simulating ZK write of version %d. This should not happen in production!", - nodeIndex, clusterStateBundle.getVersion())); + logger.warning(() -> "Simulating ZK write of version " + clusterStateBundle.getVersion() + + ". This should not happen in production!"); lastKnownStateBundleVersionWrittenBySelf = clusterStateBundle.getVersion(); } } @@ -418,7 +416,7 @@ public class DatabaseHandler { } public ClusterStateBundle getLatestClusterStateBundle() { - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex)); + context.log(logger, Level.FINE, () -> "Retrieving latest cluster state bundle from ZooKeeper"); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { return database.retrieveLastPublishedStateBundle(); @@ -428,10 +426,10 @@ public class DatabaseHandler { } } - public void saveWantedStates(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version."); + public void saveWantedStates(DatabaseContext databaseContext) { + context.log(logger, Level.FINE, () -> "Checking whether wanted states have changed compared to zookeeper version."); Map<Node, NodeState> wantedStates = new TreeMap<>(); - for (NodeInfo info : context.getCluster().getNodeInfo()) { + for (NodeInfo info : databaseContext.getCluster().getNodeInfo()) { if (!info.getUserWantedState().equals(new NodeState(info.getNode().getType(), State.UP))) { wantedStates.put(info.getNode(), info.getUserWantedState()); } @@ -443,14 +441,14 @@ public class DatabaseHandler { if (pendingStore.wantedStates != null || currentlyStored.wantedStates == null || !currentlyStored.wantedStates.equals(wantedStates)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling new wanted states to be stored into zookeeper."); + context.log(logger, Level.FINE, () -> "Scheduling new wanted states to be stored into zookeeper."); pendingStore.wantedStates = wantedStates; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } - public boolean loadWantedStates(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving node wanted states."); + public boolean loadWantedStates(DatabaseContext databaseContext) { + context.log(logger, Level.FINE, () -> "Retrieving node wanted states."); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { currentlyStored.wantedStates = database.retrieveWantedStates(); @@ -461,43 +459,43 @@ public class DatabaseHandler { if (usingZooKeeper()) { // We get here if the ZooKeeper client has lost the connection to ZooKeeper. // TODO: Should instead fail the tick until connected!? - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from ZooKeeper. Assuming UP for all nodes."); + context.log(logger, Level.FINE, () -> "Failed to retrieve wanted states from ZooKeeper. Assuming UP for all nodes."); } wantedStates = new TreeMap<>(); } boolean altered = false; for (Node node : wantedStates.keySet()) { - NodeInfo nodeInfo = context.getCluster().getNodeInfo(node); + NodeInfo nodeInfo = databaseContext.getCluster().getNodeInfo(node); if (nodeInfo == null) continue; // ignore wanted state of nodes which doesn't exist NodeState wantedState = wantedStates.get(node); if ( ! nodeInfo.getUserWantedState().equals(wantedState)) { nodeInfo.setWantedState(wantedState); - context.getNodeStateUpdateListener().handleNewWantedNodeState(nodeInfo, wantedState); + databaseContext.getNodeStateUpdateListener().handleNewWantedNodeState(nodeInfo, wantedState); altered = true; } - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Node " + node + " has wanted state " + wantedState); + context.log(logger, Level.FINE, () -> "Node " + node + " has wanted state " + wantedState); } // Remove wanted state from any node having a wanted state set that is no longer valid - for (NodeInfo info : context.getCluster().getNodeInfo()) { + for (NodeInfo info : databaseContext.getCluster().getNodeInfo()) { NodeState wantedState = wantedStates.get(info.getNode()); if (wantedState == null && !info.getUserWantedState().equals(new NodeState(info.getNode().getType(), State.UP))) { info.setWantedState(null); - context.getNodeStateUpdateListener().handleNewWantedNodeState(info, info.getWantedState().clone()); + databaseContext.getNodeStateUpdateListener().handleNewWantedNodeState(info, info.getWantedState().clone()); altered = true; } } return altered; } - public void saveStartTimestamps(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling start timestamps to be stored into zookeeper."); - pendingStore.startTimestamps = context.getCluster().getStartTimestamps(); - doNextZooKeeperTask(context); + public void saveStartTimestamps(DatabaseContext databaseContext) { + context.log(logger, Level.FINE, () -> "Scheduling start timestamps to be stored into zookeeper."); + pendingStore.startTimestamps = databaseContext.getCluster().getStartTimestamps(); + doNextZooKeeperTask(databaseContext); } public boolean loadStartTimestamps(ContentCluster cluster) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps"); + context.log(logger, Level.FINE, () -> "Retrieving start timestamps"); synchronized (databaseMonitor) { if (database == null || database.isClosed()) { return false; @@ -507,13 +505,13 @@ public class DatabaseHandler { Map<Node, Long> startTimestamps = currentlyStored.startTimestamps; if (startTimestamps == null) { if (usingZooKeeper()) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from ZooKeeper. Cluster state will be bloated with timestamps until we get them set."); + context.log(logger, Level.WARNING, "Failed to retrieve start timestamps from ZooKeeper. Cluster state will be bloated with timestamps until we get them set."); } startTimestamps = new TreeMap<>(); } for (Map.Entry<Node, Long> e : startTimestamps.entrySet()) { cluster.setStartTimestamp(e.getKey(), e.getValue()); - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Node " + e.getKey() + " has start timestamp " + e.getValue()); + context.log(logger, Level.FINE, () -> "Node " + e.getKey() + " has start timestamp " + e.getValue()); } return true; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java index 308a78ee40e..80f88c57cb3 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java @@ -48,16 +48,17 @@ public class ClusterFeedBlockTest extends FleetControllerTest { nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } + var context = new TestContext(options); communicator = new DummyCommunicator(nodes, timer); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); - EventLog eventLog = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - ctrl = new FleetController(timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var eventLog = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); ctrl.tick(); markAllNodesAsUp(options); @@ -134,8 +135,7 @@ public class ClusterFeedBlockTest extends FleetControllerTest { assertTrue(ctrl.getClusterStateBundle().clusterFeedIsBlocked()); // Increase cheese allowance. Should now automatically unblock since reported usage is lower. - int dummyConfigGeneration = 2; - ctrl.updateOptions(createOptions(mapOf(usage("cheese", 0.9), usage("wine", 0.4))), dummyConfigGeneration); + ctrl.updateOptions(createOptions(mapOf(usage("cheese", 0.9), usage("wine", 0.4)))); ctrl.tick(); // Options propagation ctrl.tick(); // State recomputation assertFalse(ctrl.getClusterStateBundle().clusterFeedIsBlocked()); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java index 630a4898dfa..57468ffa0e7 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java @@ -10,7 +10,6 @@ import org.junit.Test; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -41,8 +40,8 @@ public class DatabaseHandlerTest { when(mockTimer.getCurrentTimeInMillis()).thenReturn(1000000L); } - DatabaseHandler.Context createMockContext() { - return new DatabaseHandler.Context() { + DatabaseHandler.DatabaseContext createMockContext() { + return new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return clusterFixture.cluster(); @@ -66,7 +65,9 @@ public class DatabaseHandlerTest { } DatabaseHandler createHandler() throws Exception { - return new DatabaseHandler(mockDbFactory, mockTimer, databaseAddress, 0, monitor); + Context context = mock(Context.class); + when(context.id()).thenReturn(new FleetControllerId("clusterName", 0)); + return new DatabaseHandler(context, mockDbFactory, mockTimer, databaseAddress, monitor); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index 0f172f6f248..e8f20a00d24 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -39,12 +39,12 @@ public class DistributionBitCountTest extends FleetControllerTest { public void testDistributionBitCountConfigIncrease() throws Exception { setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigIncrease"); options.distributionBits = 20; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); ClusterState currentState = waitForState("version:\\d+ bits:20 distributor:10 storage:10"); int version = currentState.getVersion(); options.distributionBits = 23; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); assertEquals(version, currentState.getVersion()); } @@ -55,7 +55,7 @@ public class DistributionBitCountTest extends FleetControllerTest { public void testDistributionBitCountConfigDecrease() throws Exception { setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigDecrease"); options.distributionBits = 12; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ bits:12 distributor:10 storage:10"); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 55677782a3d..d9e20171681 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -157,15 +157,16 @@ public abstract class FleetControllerTest implements Waiter { FleetController createFleetController(boolean useFakeTimer, FleetControllerOptions options, boolean startThread, StatusPageServerInterface status) throws Exception { Objects.requireNonNull(status, "status server cannot be null"); + var context = new TestContext(options); Timer timer = useFakeTimer ? this.timer : new RealTimer(); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); - EventLog log = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster( + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var log = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster( options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); - Communicator communicator = new RPCCommunicator( + var stateGatherer = new NodeStateGatherer(timer, timer, log); + var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, @@ -173,20 +174,20 @@ public abstract class FleetControllerTest implements Waiter { options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - SlobrokClient lookUp = new SlobrokClient(timer); + var lookUp = new SlobrokClient(timer); lookUp.setSlobrokConnectionSpecs(new String[0]); - RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); + var rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); // Setting this <1000 ms causes ECONNREFUSED on socket trying to connect to ZK server, in ZooKeeper, // after creating a new ZooKeeper (session). This causes ~10s extra time to connect after connection loss. // Reasons unknown. Larger values like the default 10_000 causes that much additional running time for some tests. database.setMinimumWaitBetweenFailedConnectionAttempts(2_000); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, log); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - FleetController controller = new FleetController(timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var stateGenerator = new StateChangeHandler(timer, log); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); if (startThread) { controller.start(); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java index 77bcff1e7d5..e6c5c31010f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java @@ -9,8 +9,6 @@ import static org.junit.Assert.assertFalse; public class GroupAutoTakedownLiveConfigTest extends FleetControllerTest { - private long mockConfigGeneration = 1; - private static FleetControllerOptions createOptions(DistributionBuilder.GroupBuilder groupBuilder, double minNodeRatio) { FleetControllerOptions options = defaultOptions("mycluster"); options.setStorageDistribution(DistributionBuilder.forHierarchicCluster(groupBuilder)); @@ -21,8 +19,7 @@ public class GroupAutoTakedownLiveConfigTest extends FleetControllerTest { } private void updateConfigLive(FleetControllerOptions newOptions) { - ++mockConfigGeneration; - this.fleetController.updateOptions(newOptions, mockConfigGeneration); + this.fleetController.updateOptions(newOptions); } private void reconfigureWithMinNodeRatio(double minNodeRatio) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java index 83167367a91..a3bc39bde39 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java @@ -307,7 +307,7 @@ public class GroupAutoTakedownTest { assertEquals("distributor:6 storage:6 .4.t:123456", fixture.generatedClusterState()); DatabaseHandler handler = mock(DatabaseHandler.class); - DatabaseHandler.Context context = mock(DatabaseHandler.Context.class); + DatabaseHandler.DatabaseContext context = mock(DatabaseHandler.DatabaseContext.class); when(context.getCluster()).thenReturn(fixture.cluster); Set<ConfiguredNode> nodes = new HashSet<>(fixture.cluster.clusterInfo().getConfiguredNodes().values()); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index 8d32986bcaf..bf3efb17ddf 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -7,7 +7,6 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.server.Slobrok; -import java.util.logging.Level; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; @@ -22,6 +21,7 @@ import org.junit.rules.Timeout; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; @@ -296,7 +296,7 @@ public class MasterElectionTest extends FleetControllerTest { for (FleetController fc : fleetControllers) { FleetControllerOptions myoptions = fc.getOptions(); myoptions.zooKeeperServerAddress = zooKeeperServer.getAddress(); - fc.updateOptions(myoptions, 0); + fc.updateOptions(myoptions); log.log(Level.INFO, "Should now have sent out new zookeeper server address " + myoptions.zooKeeperServerAddress + " to fleetcontroller " + myoptions.fleetControllerIndex); } timer.advanceTime(10 * 1000); // Wait long enough for fleetcontroller wanting to retry zookeeper connection @@ -447,7 +447,7 @@ public class MasterElectionTest extends FleetControllerTest { FleetControllerOptions newOptions = options.clone(); for (int i=0; i<fleetControllers.size(); ++i) { FleetControllerOptions nodeOptions = adjustConfig(newOptions, i, fleetControllers.size()); - fleetControllers.get(i).updateOptions(nodeOptions, 2); + fleetControllers.get(i).updateOptions(nodeOptions); } waitForMaster(0); log.log(Level.INFO, "SHUTTING DOWN FLEET CONTROLLER 0"); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeSlobrokConfigurationMembershipTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeSlobrokConfigurationMembershipTest.java index c1f1d0e1bc1..ea539cc13e0 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeSlobrokConfigurationMembershipTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeSlobrokConfigurationMembershipTest.java @@ -46,7 +46,7 @@ public class NodeSlobrokConfigurationMembershipTest extends FleetControllerTest // cluster. If we do not re-fetch state from slobrok we risk racing nodeIndices.add(foreignNode); options.nodes = asConfiguredNodes(nodeIndices); - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); // Need to treat cluster as having 6 nodes due to ideal state algo semantics. // Note that we do not use subsetWaiter here since we want node 6 included. waitForState("version:\\d+ distributor:7 .4.s:d .5.s:d storage:7 .4.s:d .5.s:d"); @@ -65,13 +65,13 @@ public class NodeSlobrokConfigurationMembershipTest extends FleetControllerTest assertTrue(configuredNodes.remove(new ConfiguredNode(0, false))); configuredNodes.add(new ConfiguredNode(0, true)); options.nodes = configuredNodes; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:4 storage:4 .0.s:r"); // Now remove the retired node entirely from config assertTrue(configuredNodes.remove(new ConfiguredNode(0, true))); - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); // The previously retired node should now be marked as down, as it no longer // exists from the point of view of the content cluster. We have to use a subset diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java index 05ee96500d5..fee1d33725c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java @@ -346,7 +346,7 @@ public class RpcServerTest extends FleetControllerTest { options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; this.options.maxInitProgressTime = 30000; this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:7 storage:7 .0.s:m .1.s:m .2.s:r .3.s:r .4.s:r"); } @@ -376,7 +376,7 @@ public class RpcServerTest extends FleetControllerTest { options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; this.options.maxInitProgressTime = 30000; this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:7 storage:7 .0.s:m .1.s:m"); } @@ -415,7 +415,7 @@ public class RpcServerTest extends FleetControllerTest { options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; this.options.maxInitProgressTime = 30000; this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:5 storage:5"); } @@ -430,7 +430,7 @@ public class RpcServerTest extends FleetControllerTest { options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; this.options.maxInitProgressTime = 30000; this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:7 storage:7 .0.s:r .1.s:r .2.s:r .3.s:r .4.s:r"); } @@ -444,7 +444,7 @@ public class RpcServerTest extends FleetControllerTest { options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; this.options.maxInitProgressTime = 30000; this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options, 0); + fleetController.updateOptions(options); waitForState("version:\\d+ distributor:7 storage:7 .0.s:r .1.s:r .2.s:r .3.s:r .4.s:r"); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 2a0ab389865..e7aae087626 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -16,12 +16,6 @@ import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +23,12 @@ import java.util.List; import java.util.Map; import java.util.logging.Logger; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class StateChangeTest extends FleetControllerTest { public static Logger log = Logger.getLogger(StateChangeTest.class.getName()); @@ -49,16 +49,17 @@ public class StateChangeTest extends FleetControllerTest { nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } + var context = new TestContext(options); communicator = new DummyCommunicator(nodes, timer); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); eventLog = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - ctrl = new FleetController(timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); ctrl.tick(); if (options.fleetControllerCount == 1) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index dfbe4d5a8e8..84b479cfc29 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -55,8 +55,8 @@ public class SystemStateBroadcasterTest { } } - private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) { - return new DatabaseHandler.Context() { + private static DatabaseHandler.DatabaseContext dbContextFrom(ContentCluster cluster) { + return new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return cluster; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestContext.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestContext.java new file mode 100644 index 00000000000..1a7c2659bd6 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestContext.java @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +/** + * @author hakon + */ +public class TestContext extends ContextImpl { + public TestContext(FleetControllerOptions options) { + super(options); + } + + @Override + protected String withLogPrefix(String message) { + // Include fleet controller index in prefix in tests, since many may be running + return id() + ": " + message; + } +} |