diff options
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 3137dfff606..7f385c6077c 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -12,8 +12,8 @@ import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.SlobrokListener; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeListener; import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import com.yahoo.vespa.clustercontroller.core.rpc.RpcServer; @@ -47,7 +47,7 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; -public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener, +public class FleetController implements NodeListener, SlobrokListener, SystemStateListener, Runnable, RemoteClusterControllerTaskScheduler { private static final Logger logger = Logger.getLogger(FleetController.class.getName()); @@ -332,6 +332,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } @Override + public void handleRemovedNode(Node node) { + verifyInControllerThread(); + // Prune orphaned wanted states + wantedStateChanged = true; + } + + @Override public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo newHostInfo) { verifyInControllerThread(); triggerBundleRecomputationIfResourceExhaustionStateChanged(nodeInfo, newHostInfo); @@ -380,7 +387,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd ClusterState baselineState = stateBundle.getBaselineClusterState(); newStates.add(stateBundle); metricUpdater.updateClusterStateMetrics(cluster, baselineState, - ResourceUsageStats.calculateFrom(cluster.getNodeInfo(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); + ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); lastMetricUpdateCycleCount = cycleCount; systemStateBroadcaster.handleNewClusterStates(stateBundle); // Iff master, always store new version in ZooKeeper _before_ publishing to any @@ -399,7 +406,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd ClusterStateBundle stateBundle = stateVersionTracker.getVersionedClusterStateBundle(); ClusterState baselineState = stateBundle.getBaselineClusterState(); metricUpdater.updateClusterStateMetrics(cluster, baselineState, - ResourceUsageStats.calculateFrom(cluster.getNodeInfo(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); + ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); lastMetricUpdateCycleCount = cycleCount; return true; } else { @@ -511,7 +518,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd eventLog.setMaxSize(options.eventLogMaxSize, options.eventNodeLogMaxSize); cluster.setPollingFrequency(options.statePollingFrequency); cluster.setDistribution(options.storageDistribution); - cluster.setNodes(options.nodes); + cluster.setNodes(options.nodes, databaseContext.getNodeStateUpdateListener()); database.setZooKeeperAddress(options.zooKeeperServerAddress, databaseContext); database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout, databaseContext); stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); @@ -790,8 +797,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public boolean inMasterMoratorium() { return inMasterMoratorium; } }; - context.nodeStateOrHostInfoChangeHandler = this; - context.nodeAddedOrRemovedListener = this; + context.nodeListener = this; + context.slobrokListener = this; return context; } @@ -806,10 +813,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if (bundle == null) { return List.of(); } - return cluster.getNodeInfo().stream(). - filter(n -> effectiveActivatedStateVersion(n, bundle) < version). - map(NodeInfo::getNode). - collect(Collectors.toList()); + return cluster.getNodeInfos().stream(). + filter(n -> effectiveActivatedStateVersion(n, bundle) < version). + map(NodeInfo::getNode). + collect(Collectors.toList()); } private static <E> String stringifyListWithLimits(List<E> list, int limit) { @@ -939,7 +946,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd .stateDeriver(createBucketSpaceStateDeriver()) .deferredActivation(options.enableTwoPhaseClusterStateActivation) .feedBlock(createResourceExhaustionCalculator() - .inferContentClusterFeedBlockOrNull(cluster.getNodeInfo())) + .inferContentClusterFeedBlockOrNull(cluster.getNodeInfos())) .deriveAndBuild(); stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle); invokeCandidateStateListeners(candidateBundle); @@ -1095,7 +1102,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork = true; } if (wantedStateChanged) { - database.saveWantedStates(databaseContext); + didWork |= database.saveWantedStates(databaseContext); wantedStateChanged = false; } } else { @@ -1150,9 +1157,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public FleetController getFleetController() { return FleetController.this; } @Override - public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() { return FleetController.this; } + public SlobrokListener getNodeAddedOrRemovedListener() { return FleetController.this; } @Override - public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() { return FleetController.this; } + public NodeListener getNodeStateUpdateListener() { return FleetController.this; } }; public void waitForCompleteCycle(long timeoutMS) { @@ -1183,7 +1190,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (monitor) { while (true) { int ackedNodes = 0; - for (NodeInfo node : cluster.getNodeInfo()) { + for (NodeInfo node : cluster.getNodeInfos()) { if (node.getClusterStateVersionBundleAcknowledged() >= version) { ++ackedNodes; } @@ -1206,7 +1213,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (monitor) { while (true) { int distCount = 0, storCount = 0; - for (NodeInfo info : cluster.getNodeInfo()) { + for (NodeInfo info : cluster.getNodeInfos()) { if (!info.isRpcAddressOutdated()) { if (info.isDistributor()) ++distCount; else ++storCount; |