diff options
author | Håkon Hallingstad <hakon@yahooinc.com> | 2021-12-13 15:42:23 +0100 |
---|---|---|
committer | Håkon Hallingstad <hakon@yahooinc.com> | 2021-12-13 15:42:23 +0100 |
commit | 313d8840720fde3f79d01f86226d7aae065b3579 (patch) | |
tree | 42b92d281f5d62d5abaa9c2771c4d5b22afa6938 /clustercontroller-core | |
parent | 49dd2469b58193513332f5e93133f882eea87001 (diff) |
Use FleetControllerContext in ZooKeeperDatabase
Diffstat (limited to 'clustercontroller-core')
16 files changed, 195 insertions, 110 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 948e416eb53..3137dfff606 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -183,10 +183,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); - var lookUp = new SlobrokClient(timer); - var stateGenerator = new StateChangeHandler(timer, log); - var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var lookUp = new SlobrokClient(context, timer); + var stateGenerator = new StateChangeHandler(context, timer, log); + var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, statusPageServer, null, lookUp, database, stateGenerator, diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java index cc94dd88e60..d1aadf9d217 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java @@ -16,4 +16,13 @@ public interface FleetControllerContext { default void log(Logger logger, Level level, String message) { log(logger, level, () -> message); } void log(Logger logger, Level level, String message, Throwable t); void log(Logger logger, Level level, Supplier<String> message); + + default void log(Logger logger, Level level, String format, Object first, Object... rest) { + log(logger, level, () -> { + var args = new Object[1 + rest.length]; + args[0] = first; + System.arraycopy(rest, 0, args, 1, rest.length); + return String.format(format, args); + }); + } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java index 5035ed1aa88..46fafddfade 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -28,6 +28,7 @@ public class StateChangeHandler { private static final Logger log = Logger.getLogger(StateChangeHandler.class.getName()); + private final FleetControllerContext context; private final Timer timer; private final EventLogInterface eventLog; private boolean stateMayHaveChanged = false; @@ -40,7 +41,8 @@ public class StateChangeHandler { private int maxSlobrokDisconnectGracePeriod = 1000; private static final boolean disableUnstableNodes = true; - public StateChangeHandler(Timer timer, EventLogInterface eventLog) { + public StateChangeHandler(FleetControllerContext context, Timer timer, EventLogInterface eventLog) { + this.context = context; this.timer = timer; this.eventLog = eventLog; maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000); @@ -52,7 +54,7 @@ public class StateChangeHandler { final DatabaseHandler database, final DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { int startTimestampsReset = 0; - log.log(Level.FINE, "handleAllDistributorsInSync invoked for state version %d", currentState.getVersion()); + context.log(log, Level.FINE, "handleAllDistributorsInSync invoked for state version %d", currentState.getVersion()); for (NodeType nodeType : NodeType.getTypes()) { for (ConfiguredNode configuredNode : nodes) { final Node node = new Node(nodeType, configuredNode.index()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index bc8d84c4634..d061f7edbea 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -18,8 +18,9 @@ import java.util.stream.Collectors; public class SystemStateBroadcaster { - public static Logger log = Logger.getLogger(SystemStateBroadcaster.class.getName()); + private static Logger log = Logger.getLogger(SystemStateBroadcaster.class.getName()); + private final FleetControllerContext context; private final Timer timer; private final Object monitor; private ClusterStateBundle clusterStateBundle; @@ -37,7 +38,8 @@ public class SystemStateBroadcaster { private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter(); private final ActivateClusterStateVersionWaiter activateClusterStateVersionWaiter = new ActivateClusterStateVersionWaiter(); - public SystemStateBroadcaster(Timer timer, Object monitor) { + public SystemStateBroadcaster(FleetControllerContext context, Timer timer, Object monitor) { + this.context = context; this.timer = timer; this.monitor = monitor; } @@ -70,7 +72,7 @@ public class SystemStateBroadcaster { long time = timer.getCurrentTimeInMillis(); Long lastReported = lastErrorReported.get(info.getNode()); boolean alreadySeen = (lastReported != null && time - lastReported < minTimeBetweenNodeErrorLogging); - log.log((nodeOk && !alreadySeen) ? Level.WARNING : Level.FINE, message); + context.log(log, nodeOk && !alreadySeen ? Level.WARNING : Level.FINE, message); if (!alreadySeen) { lastErrorReported.put(info.getNode(), time); } @@ -96,12 +98,17 @@ public class SystemStateBroadcaster { // NO_SUCH_METHOD implies node is on a version that does not understand explicit activations // and it has already merrily started using the state version. Treat as if it had been ACKed. if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) { - log.log(Level.FINE, () -> String.format("Activation NACK for node %s with version %d, message %s", - info, version, reply.getReturnMessage())); + context.log(log, + Level.FINE, + () -> String.format("Activation NACK for node %s with version %d, message %s", + info, version, reply.getReturnMessage())); success = false; } else { - log.log(Level.FINE, () -> String.format("Node %s did not understand state activation RPC; " + - "implicitly treating state %d as activated on node", info, version)); + context.log(log, + Level.FINE, + () -> String.format("Node %s did not understand state activation RPC; " + + "implicitly treating state %d as activated on node", + info, version)); } } else if (reply.getActualVersion() != version) { boolean nodeOk = nodeReportsSelfAsAvailable(info); @@ -113,8 +120,10 @@ public class SystemStateBroadcaster { version, info, reply.getActualVersion())); success = false; } else { - log.log(Level.FINE, () -> String.format("Node %s reports successful activation of state " + - "version %d", info, version)); + context.log(log, + Level.FINE, + () -> String.format("Node %s reports successful activation of state version %d", + info, version)); } info.setSystemStateVersionActivationAcked(version, success); // TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported @@ -144,7 +153,7 @@ public class SystemStateBroadcaster { } } else { info.setClusterStateBundleVersionAcknowledged(version, true); - log.log(Level.FINE, () -> String.format("Node %s ACKed system state version %d.", info, version)); + context.log(log, Level.FINE, () -> String.format("Node %s ACKed system state version %d.", info, version)); lastErrorReported.remove(info.getNode()); } } @@ -220,8 +229,10 @@ public class SystemStateBroadcaster { if (!anyDistributorsNeedStateBundle && (currentStateVersion > lastStateVersionBundleAcked)) { markCurrentClusterStateBundleAsReceivedByAllDistributors(); if (clusterStateBundle.deferredActivation()) { - log.log(Level.FINE, () -> String.format("All distributors have ACKed cluster state " + - "version %d, sending activation", currentStateVersion)); + context.log(log, + Level.FINE, + () -> String.format("All distributors have ACKed cluster state " + + "version %d, sending activation", currentStateVersion)); } else { markCurrentClusterStateAsConverged(database, dbContext, fleetController); } @@ -239,8 +250,10 @@ public class SystemStateBroadcaster { if (!anyDistributorsNeedActivation && (currentStateVersion > lastClusterStateVersionConverged)) { markCurrentClusterStateAsConverged(database, dbContext, fleetController); } else { - log.log(Level.FINE, () -> String.format("distributors still need activation in state %d (last converged: %d)", - currentStateVersion, lastClusterStateVersionConverged)); + context.log(log, + Level.FINE, + () -> String.format("distributors still need activation in state %d (last converged: %d)", + currentStateVersion, lastClusterStateVersionConverged)); } } @@ -249,7 +262,7 @@ public class SystemStateBroadcaster { } private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { - log.log(Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); + context.log(log, Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); lastClusterStateVersionConverged = clusterStateBundle.getVersion(); lastClusterStateBundleConverged = clusterStateBundle; fleetController.handleAllDistributorsInSync(database, dbContext); @@ -279,7 +292,7 @@ public class SystemStateBroadcaster { ClusterState baselineState = clusterStateBundle.getBaselineClusterState(); if (!currentBundleVersionIsTaggedOfficial()) { - log.log(Level.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion())); + context.log(log, Level.INFO, "Publishing cluster state version " + baselineState.getVersion()); tagCurrentBundleVersionAsOfficial(); } @@ -288,13 +301,17 @@ public class SystemStateBroadcaster { if (nodeNeedsToObserveStartupTimestamps(node)) { // TODO this is the same for all nodes, compute only once ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); - log.log(Level.FINE, () -> String.format("Sending modified cluster state version %d" + - " to node %s: %s", baselineState.getVersion(), node, modifiedBundle)); + context.log(log, + Level.FINE, + () -> "Sending modified cluster state version " + baselineState.getVersion() + + " to node " + node + ": " + modifiedBundle); communicator.setSystemState(modifiedBundle, node, setClusterStateWaiter); } else { - log.log(Level.FINE, () -> String.format("Sending system state version %d to node %s. " + - "(went down time %d, node start time %d)", baselineState.getVersion(), node, - node.getWentDownWithStartTime(), node.getStartTimestamp())); + context.log(log, + Level.FINE, + () -> "Sending system state version " + baselineState.getVersion() + + " to node " + node + ". (went down time " + node.getWentDownWithStartTime() + + ", node start time " + node.getStartTimestamp() + ")"); communicator.setSystemState(clusterStateBundle, node, setClusterStateWaiter); } } @@ -313,8 +330,10 @@ public class SystemStateBroadcaster { var recipients = resolveStateActivationSendSet(dbContext); for (NodeInfo node : recipients) { - log.log(Level.FINE, () -> String.format("Sending cluster state activation to node %s for version %d", - node, clusterStateBundle.getVersion())); + context.log(log, + Level.FINE, + () -> "Sending cluster state activation to node " + node + " for version " + + clusterStateBundle.getVersion()); communicator.activateClusterStateVersion(clusterStateBundle.getVersion(), node, activateClusterStateVersionWaiter); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java index fe716eea288..4285ef83782 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java @@ -7,6 +7,7 @@ import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.ContentCluster; +import com.yahoo.vespa.clustercontroller.core.FleetControllerContext; import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec; import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; @@ -42,18 +43,14 @@ public class ZooKeeperDatabase extends Database { private final ZooKeeperWatcher watcher = new ZooKeeperWatcher(); private final ZooKeeper session; private boolean sessionOpen = true; + private final FleetControllerContext context; private final int nodeIndex; private final MasterDataGatherer masterDataGatherer; - private boolean reportErrors = true; // Expected ZK znode versions. Note: these are _not_ -1 as that would match anything. // We expect the caller to invoke the load methods prior to calling any store methods. private int lastKnownStateBundleZNodeVersion = -2; private int lastKnownStateVersionZNodeVersion = -2; - public void stopErrorReporting() { - reportErrors = false; - } - private class ZooKeeperWatcher implements Watcher { private Event.KeeperState state = null; @@ -62,50 +59,51 @@ public class ZooKeeperDatabase extends Database { public void process(WatchedEvent watchedEvent) { // Shouldn't get events after we expire, but just be sure we stop them here. if (state != null && state.equals(Event.KeeperState.Expired)) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got event from ZooKeeper session after it expired"); + context.log(log, Level.WARNING, "Got event from ZooKeeper session after it expired"); return; } Event.KeeperState newState = watchedEvent.getState(); if (state == null || !state.equals(newState)) switch (newState) { case Expired: - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Zookeeper session expired"); + context.log(log, Level.INFO, "Zookeeper session expired"); sessionOpen = false; listener.handleZooKeeperSessionDown(); break; case Disconnected: - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Lost connection to zookeeper server"); + context.log(log, Level.INFO, "Lost connection to zookeeper server"); sessionOpen = false; listener.handleZooKeeperSessionDown(); break; case SyncConnected: - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Connection to zookeeper server established. Refetching master data"); + context.log(log, Level.INFO, "Connection to zookeeper server established. Refetching master data"); if (masterDataGatherer != null) { masterDataGatherer.restart(); } } switch (watchedEvent.getType()) { case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeChildrenChanged"); + context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeChildrenChanged"); break; case NodeDataChanged: // A fleetcontroller have changed what node it is voting for - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeDataChanged"); + context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeDataChanged"); break; case NodeCreated: // How can this happen? Can one leave watches on non-existing nodes? - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeCreated"); + context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeCreated"); break; case NodeDeleted: // We're not watching any nodes for whether they are deleted or not. - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeDeleted"); + context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeDeleted"); break; case None: if (state != null && state.equals(watchedEvent.getState())) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got None type event that didn't even alter session state. What does that indicate?"); + context.log(log, Level.WARNING, "Got None type event that didn't even alter session state. What does that indicate?"); } } state = watchedEvent.getState(); } } - public ZooKeeperDatabase(ContentCluster cluster, int nodeIndex, String address, int timeout, Database.DatabaseListener zksl) throws IOException, KeeperException, InterruptedException { + public ZooKeeperDatabase(FleetControllerContext context, ContentCluster cluster, int nodeIndex, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException { + this.context = context; this.nodeIndex = nodeIndex; zooKeeperRoot = "/vespa/fleetcontroller/" + cluster.getName() + "/"; session = new ZooKeeper(address, timeout, watcher, new ZkClientConfigBuilder().toConfig()); @@ -113,7 +111,7 @@ public class ZooKeeperDatabase extends Database { try{ this.listener = zksl; setupRoot(); - log.log(Level.FINEST, () -> "Fleetcontroller " + nodeIndex + ": Asking for initial data on master election"); + context.log(log, Level.FINEST, "Asking for initial data on master election"); masterDataGatherer = new MasterDataGatherer(session, zooKeeperRoot, listener, nodeIndex); completedOk = true; } finally { @@ -124,14 +122,14 @@ public class ZooKeeperDatabase extends Database { private void createNode(String prefix, String nodename, byte[] value) throws KeeperException, InterruptedException { try{ if (session.exists(prefix + nodename, false) != null) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Zookeeper node '" + prefix + nodename + "' already exists. Not creating it"); + context.log(log, Level.FINE, () -> "Zookeeper node '" + prefix + nodename + "' already exists. Not creating it"); return; } session.create(prefix + nodename, value, acl, CreateMode.PERSISTENT); - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Created zookeeper node '" + prefix + nodename + "'"); + context.log(log, Level.FINE, () -> "Created zookeeper node '" + prefix + nodename + "'"); } catch (KeeperException.NodeExistsException e) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Node to create existed, " - + "but this is normal as other nodes may create them at the same time."); + context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes " + + "may create them at the same time."); } } @@ -149,14 +147,13 @@ public class ZooKeeperDatabase extends Database { createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants byte[] val = String.valueOf(nodeIndex).getBytes(utf8); deleteNodeIfExists(getMyIndexPath()); - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + - ": Creating ephemeral master vote node with vote to self."); + context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self."); session.create(getMyIndexPath(), val, acl, CreateMode.EPHEMERAL); } private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException { if (session.exists(path, false) != null) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Removing master vote node."); + context.log(log, Level.INFO, "Removing master vote node at " + path); session.delete(path, -1); } } @@ -172,11 +169,11 @@ public class ZooKeeperDatabase extends Database { public void close() { sessionOpen = false; try{ - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Trying to close ZooKeeper session 0x" + context.log(log, Level.FINE, () -> "Trying to close ZooKeeper session 0x" + Long.toHexString(session.getSessionId())); session.close(); } catch (InterruptedException e) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got interrupt exception while closing session: " + e); + context.log(log, Level.WARNING, "Got interrupt exception while closing session: " + e); } } @@ -185,11 +182,10 @@ public class ZooKeeperDatabase extends Database { } private void maybeLogExceptionWarning(Exception e, String message) { - if (sessionOpen && reportErrors) { + if (sessionOpen) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); - log.log(Level.WARNING, String.format("Fleetcontroller %s: %s. Exception: %s\n%s", - nodeIndex, message, e.getMessage(), sw.toString())); + context.log(log, Level.WARNING, message + ". Exception: " + e.getMessage() + "\n" + sw); } } @@ -197,7 +193,7 @@ public class ZooKeeperDatabase extends Database { byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8); try{ session.setData(getMyIndexPath(), val, -1); - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex); + context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -209,7 +205,7 @@ public class ZooKeeperDatabase extends Database { public boolean storeLatestSystemStateVersion(int version) { byte[] data = Integer.toString(version).getBytes(utf8); try{ - log.log(Level.INFO, String.format("Fleetcontroller %d: Storing new cluster state version in ZooKeeper: %d", nodeIndex, version)); + context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version); var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion); lastKnownStateVersionZNodeVersion = stat.getVersion(); return true; @@ -227,13 +223,11 @@ public class ZooKeeperDatabase extends Database { public Integer retrieveLatestSystemStateVersion() { Stat stat = new Stat(); try{ - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Fetching latest cluster state at '%slatestversion'", - nodeIndex, zooKeeperRoot)); + context.log(log, Level.FINE, "Fetching latest cluster state at '%slatestversion'", zooKeeperRoot); byte[] data = session.getData(zooKeeperRoot + "latestversion", false, stat); lastKnownStateVersionZNodeVersion = stat.getVersion(); final Integer versionNumber = Integer.valueOf(new String(data, utf8)); - log.log(Level.INFO, String.format("Fleetcontroller %d: Read cluster state version %d from ZooKeeper " + - "(znode version %d)", nodeIndex, versionNumber, stat.getVersion())); + context.log(log, Level.INFO, "Read cluster state version %d from ZooKeeper (znode version %d)", versionNumber, stat.getVersion()); return versionNumber; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -262,7 +256,7 @@ public class ZooKeeperDatabase extends Database { } byte[] val = sb.toString().getBytes(utf8); try{ - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Storing wanted states at '" + zooKeeperRoot + "wantedstates'"); + context.log(log, Level.FINE, () -> "Storing wanted states at '" + zooKeeperRoot + "wantedstates'"); session.setData(zooKeeperRoot + "wantedstates", val, -1); return true; } catch (InterruptedException e) { @@ -275,7 +269,7 @@ public class ZooKeeperDatabase extends Database { public Map<Node, NodeState> retrieveWantedStates() { try{ - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Fetching wanted states at '" + zooKeeperRoot + "wantedstates'"); + context.log(log, Level.FINE, () -> "Fetching wanted states at '" + zooKeeperRoot + "wantedstates'"); Stat stat = new Stat(); byte[] data = session.getData(zooKeeperRoot + "wantedstates", false, stat); Map<Node, NodeState> wanted = new TreeMap<>(); @@ -290,7 +284,7 @@ public class ZooKeeperDatabase extends Database { NodeState nodeState = NodeState.deserialize(node.getType(), token.substring(colon + 1)); wanted.put(node, nodeState); } catch (Exception e) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Ignoring invalid wantedstate line in zookeeper '" + token + "'."); + context.log(log, Level.WARNING, "Ignoring invalid wantedstate line in zookeeper '" + token + "'."); } } } @@ -313,7 +307,7 @@ public class ZooKeeperDatabase extends Database { } byte val[] = sb.toString().getBytes(utf8); try{ - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Storing start timestamps at '" + zooKeeperRoot + "starttimestamps"); + context.log(log, Level.FINE, () -> "Storing start timestamps at '" + zooKeeperRoot + "starttimestamps"); session.setData(zooKeeperRoot + "starttimestamps", val, -1); return true; } catch (InterruptedException e) { @@ -327,7 +321,7 @@ public class ZooKeeperDatabase extends Database { @Override public Map<Node, Long> retrieveStartTimestamps() { try{ - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'"); + context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'"); Stat stat = new Stat(); byte[] data = session.getData(zooKeeperRoot + "starttimestamps", false, stat); Map<Node, Long> wanted = new TreeMap<Node, Long>(); @@ -342,7 +336,7 @@ public class ZooKeeperDatabase extends Database { Long timestamp = Long.valueOf(token.substring(colon + 1)); wanted.put(n, timestamp); } catch (Exception e) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Ignoring invalid starttimestamp line in zookeeper '" + token + "'."); + context.log(log, Level.WARNING, "Ignoring invalid starttimestamp line in zookeeper '" + token + "'."); } } } @@ -360,9 +354,11 @@ public class ZooKeeperDatabase extends Database { EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle); try{ - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at " + - "'%spublished_state_bundle' with expected znode version %d", - nodeIndex, stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion)); + context.log(log, + Level.FINE, + () -> String.format("Storing published state bundle %s at " + + "'%spublished_state_bundle' with expected znode version %d", + stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion)); var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion); lastKnownStateBundleZNodeVersion = stat.getVersion(); } catch (InterruptedException e) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java index 0f739eec1d0..71f39135609 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java @@ -1,12 +1,20 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.database; +import com.yahoo.vespa.clustercontroller.core.FleetControllerContext; + public class ZooKeeperDatabaseFactory implements DatabaseFactory { + private final FleetControllerContext context; + + public ZooKeeperDatabaseFactory(FleetControllerContext context) { + this.context = context; + } + @Override public Database create(Params params) throws Exception { - return new ZooKeeperDatabase(params.cluster, params.nodeIndex, params.dbAddress, - params.dbSessionTimeout, params.listener); + return new ZooKeeperDatabase(context, params.cluster, params.nodeIndex, params.dbAddress, + params.dbSessionTimeout, params.listener); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java index c4894e41747..7487f9546b7 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java @@ -2,17 +2,17 @@ package com.yahoo.vespa.clustercontroller.core.rpc; -import com.yahoo.jrt.slobrok.api.SlobrokList; -import com.yahoo.jrt.slobrok.api.Mirror; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; -import com.yahoo.vdslib.state.NodeType; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.api.SlobrokList; import com.yahoo.vdslib.state.Node; -import java.util.logging.Level; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vespa.clustercontroller.core.ContentCluster; +import com.yahoo.vespa.clustercontroller.core.FleetControllerContext; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.NodeLookup; import com.yahoo.vespa.clustercontroller.core.Timer; -import com.yahoo.vespa.clustercontroller.core.ContentCluster; import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; import java.util.Iterator; @@ -21,19 +21,22 @@ import java.util.List; import java.util.Map; import java.util.StringTokenizer; import java.util.TreeMap; +import java.util.logging.Level; import java.util.logging.Logger; public class SlobrokClient implements NodeLookup { public static final Logger log = Logger.getLogger(SlobrokClient.class.getName()); + private final FleetControllerContext context; private final Timer timer; private String[] connectionSpecs; private Mirror mirror; private Supervisor supervisor; private boolean freshMirror = false; - public SlobrokClient(Timer timer) { + public SlobrokClient(FleetControllerContext context, Timer timer) { + this.context = context; this.timer = timer; } @@ -81,9 +84,7 @@ public class SlobrokClient implements NodeLookup { if (freshMirror) { freshMirror = false; } else if (cluster.getSlobrokGenerationCount() == mirrorVersion) { - if (log.isLoggable(Level.FINEST)) { - log.log(Level.FINEST, "Slobrok still at generation count " + cluster.getSlobrokGenerationCount() + ". Not updating."); - } + context.log(log, Level.FINEST, () -> "Slobrok still at generation count " + cluster.getSlobrokGenerationCount() + ". Not updating."); return false; } @@ -150,16 +151,18 @@ public class SlobrokClient implements NodeLookup { cluster.setSlobrokGenerationCount(mirrorVersion); for (NodeInfo nodeInfo : cluster.getNodeInfo()) { if (slobrokNodes.containsKey(nodeInfo.getNode()) && nodeInfo.isRpcAddressOutdated()) { - log.log(Level.WARNING, "Node " + nodeInfo - + " was tagged NOT in slobrok even though it is. It was in the following lists:" - + (newNodes.contains(nodeInfo.getNode()) ? " newNodes" : "") - + (missingNodeInfos.contains(nodeInfo) ? " missingNodes" : "") - + (alteredRpcAddressNodes.contains(nodeInfo.getNode()) ? " alteredNodes" : "") - + (returningNodeInfos.contains(nodeInfo) ? " returningNodes" : "")); + context.log(log, + Level.WARNING, + "Node " + nodeInfo + + " was tagged NOT in slobrok even though it is. It was in the following lists:" + + (newNodes.contains(nodeInfo.getNode()) ? " newNodes" : "") + + (missingNodeInfos.contains(nodeInfo) ? " missingNodes" : "") + + (alteredRpcAddressNodes.contains(nodeInfo.getNode()) ? " alteredNodes" : "") + + (returningNodeInfos.contains(nodeInfo) ? " returningNodes" : "")); nodeInfo.markRpcAddressLive(); } } - log.log(Level.FINEST, "Slobrok information updated to generation " + cluster.getSlobrokGenerationCount()); + context.log(log, Level.FINEST, () -> "Slobrok information updated to generation " + cluster.getSlobrokGenerationCount()); return true; } @@ -204,7 +207,7 @@ public class SlobrokClient implements NodeLookup { private Map<Node, SlobrokData> getSlobrokData(String pattern) { Map<Node, SlobrokData> result = new TreeMap<>(); List<Mirror.Entry> entries = mirror.lookup(pattern); - log.log(Level.FINEST, "Looking for slobrok entries with pattern '" + pattern + "'. Found " + entries.size() + " entries."); + context.log(log, Level.FINEST, () -> "Looking for slobrok entries with pattern '" + pattern + "'. Found " + entries.size() + " entries."); for (Mirror.Entry entry : entries) { StringTokenizer st = new StringTokenizer(entry.getName(), "/"); String addressType = st.nextToken(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java index a52370a0654..f0b91102e8f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java @@ -54,9 +54,9 @@ public class ClusterFeedBlockTest extends FleetControllerTest { var eventLog = new EventLog(timer, metricUpdater); var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); - var stateGenerator = new StateChangeHandler(timer, eventLog); - var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(context, timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java index 75c31898408..4ce32484098 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java @@ -33,14 +33,11 @@ public class ClusterFixture { this.distribution = distribution; this.timer = new FakeTimer(); this.eventLog = mock(EventLogInterface.class); - this.nodeStateChangeHandler = createNodeStateChangeHandlerForCluster(); + var context = new FleetControllerContextImpl(new FleetControllerId(cluster.getName(), 0)); + this.nodeStateChangeHandler = new StateChangeHandler(context, timer, eventLog); this.params.cluster(this.cluster); } - private StateChangeHandler createNodeStateChangeHandlerForCluster() { - return new StateChangeHandler(timer, eventLog); - } - public ClusterFixture bringEntireClusterUp() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.UP); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImplTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImplTest.java new file mode 100644 index 00000000000..450975076bb --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImplTest.java @@ -0,0 +1,43 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; + +/** + * @author hakonhall + */ +public class FleetControllerContextImplTest { + private final MockLogger logger = new MockLogger(); + public final FleetControllerId id = new FleetControllerId("clustername", 1); + private final FleetControllerContextImpl context = new FleetControllerContextImpl(id); + + @Test + public void verify() { + context.log(logger, Level.INFO, "A %s message", "log"); + + assertEquals(1, logger.records.size()); + assertEquals(Level.INFO, logger.records.get(0).getLevel()); + assertEquals("Cluster 'clustername': A log message", logger.records.get(0).getMessage()); + } + + private static class MockLogger extends Logger { + public List<LogRecord> records = new ArrayList<>(); + + public MockLogger() { + super(MockLogger.class.getName(), null); + } + + @Override + public void log(LogRecord record) { + records.add(record); + } + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index d115f9f0060..c56b3bbdc69 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -174,18 +174,18 @@ public abstract class FleetControllerTest implements Waiter { options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - var lookUp = new SlobrokClient(timer); + var lookUp = new SlobrokClient(context, timer); lookUp.setSlobrokConnectionSpecs(new String[0]); var rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); // Setting this <1000 ms causes ECONNREFUSED on socket trying to connect to ZK server, in ZooKeeper, // after creating a new ZooKeeper (session). This causes ~10s extra time to connect after connection loss. // Reasons unknown. Larger values like the default 10_000 causes that much additional running time for some tests. database.setMinimumWaitBetweenFailedConnectionAttempts(2_000); - var stateGenerator = new StateChangeHandler(timer, log); - var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var stateGenerator = new StateChangeHandler(context, timer, log); + var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); if (startThread) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java index b370c29537d..95c097c5920 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java @@ -81,7 +81,8 @@ public class StateChangeHandlerTest { this.config = config; for (int i=0; i<config.nodeCount; ++i) configuredNodes.add(new ConfiguredNode(i, false)); cluster = new ContentCluster("testcluster", configuredNodes, distribution); - nodeStateChangeHandler = new StateChangeHandler(clock, eventLog); + var context = new FleetControllerContextImpl(new FleetControllerId(cluster.getName(), 0)); + nodeStateChangeHandler = new StateChangeHandler(context, clock, eventLog); params.minStorageNodesUp(1).minDistributorNodesUp(1) .minRatioOfStorageNodesUp(0.0).minRatioOfDistributorNodesUp(0.0) .maxPrematureCrashes(config.maxPrematureCrashes) diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index b601412ecc4..a5bb65e11d0 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -55,9 +55,9 @@ public class StateChangeTest extends FleetControllerTest { eventLog = new EventLog(timer, metricUpdater); var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); - var stateGenerator = new StateChangeHandler(timer, eventLog); - var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(context, timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 84b479cfc29..45593375c0b 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -26,7 +26,8 @@ public class SystemStateBroadcasterTest { private static class Fixture { FakeTimer timer = new FakeTimer(); final Object monitor = new Object(); - SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor); + FleetControllerContext context = mock(FleetControllerContext.class); + SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(context, timer, monitor); Communicator mockCommunicator = mock(Communicator.class); DatabaseHandler mockDatabaseHandler = mock(DatabaseHandler.class); FleetController mockFleetController = mock(FleetController.class); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java index b9d8474affb..6fe8f92ac97 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java @@ -9,6 +9,10 @@ public class TestFleetControllerContext extends FleetControllerContextImpl { super(options); } + public TestFleetControllerContext(FleetControllerId id) { + super(id); + } + @Override protected String withLogPrefix(String message) { // Include fleet controller index in prefix in tests, since many may be running diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java index d2407541680..a71665fb364 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java @@ -35,8 +35,10 @@ public class ZooKeeperDatabaseTest { void createDatabase() throws Exception { closeDatabaseIfOpen(); - zkDatabase = new ZooKeeperDatabase(clusterFixture.cluster(), nodeIndex, zkServer.getAddress(), - (int)sessionTimeout.toMillis(), mockListener); + var id = new FleetControllerId(clusterFixture.cluster.getName(), nodeIndex); + var context = new TestFleetControllerContext(id); + zkDatabase = new ZooKeeperDatabase(context, clusterFixture.cluster(), nodeIndex, zkServer.getAddress(), + (int)sessionTimeout.toMillis(), mockListener); } ZooKeeperDatabase db() { return zkDatabase; } |