From 3074e6547443246e950ed6d675a4aec91c333d80 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Wed, 31 Aug 2022 15:22:20 +0200 Subject: Make FleetControllerOptions immutable and support builder pattern --- .../core/ClusterStateGenerator.java | 16 +- .../clustercontroller/core/FleetController.java | 97 ++- .../clustercontroller/core/FleetControllerId.java | 2 +- .../core/FleetControllerOptions.java | 796 +++++++++++++++++++-- .../clustercontroller/core/StateChangeHandler.java | 10 +- .../core/rpc/RPCCommunicator.java | 26 +- .../core/status/LegacyIndexPageRequestHandler.java | 94 +-- .../core/ClusterFeedBlockTest.java | 28 +- .../core/ClusterStateGeneratorTest.java | 34 +- .../vespa/clustercontroller/core/DatabaseTest.java | 16 +- .../core/DistributionBitCountTest.java | 28 +- .../core/FleetControllerTest.java | 58 +- .../core/GroupAutoTakedownLiveConfigTest.java | 51 +- .../clustercontroller/core/MasterElectionTest.java | 118 +-- .../clustercontroller/core/NoZooKeeperTest.java | 6 +- .../NodeSlobrokConfigurationMembershipTest.java | 53 +- .../clustercontroller/core/RpcServerTest.java | 105 +-- .../core/RpcVersionAutoDowngradeTest.java | 2 +- .../vespa/clustercontroller/core/SlobrokTest.java | 18 +- .../clustercontroller/core/StateChangeTest.java | 311 ++++---- .../clustercontroller/core/StateGatherTest.java | 16 +- .../core/rpc/RPCCommunicatorTest.java | 12 +- 22 files changed, 1282 insertions(+), 615 deletions(-) (limited to 'clustercontroller-core/src') diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java index 403c5c6089b..e62190b0ec8 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java @@ -112,14 +112,14 @@ public class ClusterStateGenerator { */ static Params fromOptions(FleetControllerOptions opts) { return new Params() - .maxPrematureCrashes(opts.maxPrematureCrashes) - .minStorageNodesUp(opts.minStorageNodesUp) - .minDistributorNodesUp(opts.minDistributorNodesUp) - .minRatioOfStorageNodesUp(opts.minRatioOfStorageNodesUp) - .minRatioOfDistributorNodesUp(opts.minRatioOfDistributorNodesUp) - .minNodeRatioPerGroup(opts.minNodeRatioPerGroup) - .idealDistributionBits(opts.distributionBits) - .transitionTimes(opts.maxTransitionTime); + .maxPrematureCrashes(opts.maxPrematureCrashes()) + .minStorageNodesUp(opts.minStorageNodesUp()) + .minDistributorNodesUp(opts.minDistributorNodesUp()) + .minRatioOfStorageNodesUp(opts.minRatioOfStorageNodesUp()) + .minRatioOfDistributorNodesUp(opts.minRatioOfDistributorNodesUp()) + .minNodeRatioPerGroup(opts.minNodeRatioPerGroup()) + .idealDistributionBits(opts.distributionBits()) + .transitionTimes(opts.maxTransitionTime()); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 712c86f1e15..f58dfe553bf 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -129,7 +129,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta this.stateGatherer = nodeStateGatherer; this.stateChangeHandler = stateChangeHandler; this.systemStateBroadcaster = systemStateBroadcaster; - this.stateVersionTracker = new StateVersionTracker(options.minMergeCompletionRatio); + this.stateVersionTracker = new StateVersionTracker(options.minMergeCompletionRatio()); this.metricUpdater = metricUpdater; this.statusPageServer = Objects.requireNonNull(statusPage, "statusPage cannot be null"); this.rpcServer = server; @@ -155,26 +155,23 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta MetricReporter metricReporter) throws Exception { var context = new FleetControllerContextImpl(options); var timer = new RealTimer(); - var metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex(), options.clusterName()); var log = new EventLog(timer, metricUpdater); - var cluster = new ContentCluster( - options.clusterName, - options.nodes, - options.storageDistribution); + var cluster = new ContentCluster(options.clusterName(), options.nodes(), options.storageDistribution()); var stateGatherer = new NodeStateGatherer(timer, timer, log); var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, - options.fleetControllerIndex, - options.nodeStateRequestTimeoutMS, - options.nodeStateRequestTimeoutEarliestPercentage, - options.nodeStateRequestTimeoutLatestPercentage, - options.nodeStateRequestRoundTripTimeMaxSeconds); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + options.fleetControllerIndex(), + options.nodeStateRequestTimeoutMS(), + options.nodeStateRequestTimeoutEarliestPercentage(), + options.nodeStateRequestTimeoutLatestPercentage(), + options.nodeStateRequestRoundTripTimeMaxSeconds()); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress(), timer); var lookUp = new SlobrokClient(context, timer); var stateGenerator = new StateChangeHandler(context, timer, log); var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); - var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex(), options.fleetControllerCount(), timer, timer); var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, statusPageServer, null, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); @@ -235,7 +232,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta public FleetControllerOptions getOptions() { synchronized(monitor) { - return options.clone(); + return FleetControllerOptions.Builder.copy(options).build(); } } @@ -288,7 +285,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta synchronized(monitor) { assert newId.equals(context.id()); context.log(logger, Level.INFO, "FleetController has new options"); - nextOptions = options.clone(); + nextOptions = FleetControllerOptions.Builder.copy(options).build(); monitor.notifyAll(); } } @@ -331,7 +328,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta } private void triggerBundleRecomputationIfResourceExhaustionStateChanged(NodeInfo nodeInfo, HostInfo newHostInfo) { - if (!options.clusterFeedBlockEnabled) { + if (!options.clusterFeedBlockEnabled()) { return; } var calc = createResourceExhaustionCalculator(); @@ -372,7 +369,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta ClusterState baselineState = stateBundle.getBaselineClusterState(); newStates.add(stateBundle); metricUpdater.updateClusterStateMetrics(cluster, baselineState, - ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); + ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock())); lastMetricUpdateCycleCount = cycleCount; systemStateBroadcaster.handleNewClusterStates(stateBundle); // Iff master, always store new version in ZooKeeper _before_ publishing to any @@ -391,7 +388,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta ClusterStateBundle stateBundle = stateVersionTracker.getVersionedClusterStateBundle(); ClusterState baselineState = stateBundle.getBaselineClusterState(); metricUpdater.updateClusterStateMetrics(cluster, baselineState, - ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit, stateBundle.getFeedBlock())); + ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock())); lastMetricUpdateCycleCount = cycleCount; return true; } else { @@ -485,59 +482,59 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta verifyInControllerThread(); selfTerminateIfConfiguredNodeIndexHasChanged(); - if (changesConfiguredNodeSet(options.nodes)) { + if (changesConfiguredNodeSet(options.nodes())) { // Force slobrok node re-fetch in case of changes to the set of configured nodes cluster.setSlobrokGenerationCount(0); } configuredBucketSpaces = Set.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()); - stateVersionTracker.setMinMergeCompletionRatio(options.minMergeCompletionRatio); + stateVersionTracker.setMinMergeCompletionRatio(options.minMergeCompletionRatio()); communicator.propagateOptions(options); if (nodeLookup instanceof SlobrokClient) { - ((SlobrokClient) nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs); + ((SlobrokClient) nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs()); } - eventLog.setMaxSize(options.eventLogMaxSize, options.eventNodeLogMaxSize); - cluster.setPollingFrequency(options.statePollingFrequency); - cluster.setDistribution(options.storageDistribution); - cluster.setNodes(options.nodes, databaseContext.getNodeStateUpdateListener()); - database.setZooKeeperAddress(options.zooKeeperServerAddress, databaseContext); - database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout, databaseContext); - stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); - stateGatherer.setNodeStateRequestTimeout(options.nodeStateRequestTimeoutMS); + eventLog.setMaxSize(options.eventLogMaxSize(), options.eventNodeLogMaxSize()); + cluster.setPollingFrequency(options.statePollingFrequency()); + cluster.setDistribution(options.storageDistribution()); + cluster.setNodes(options.nodes(), databaseContext.getNodeStateUpdateListener()); + database.setZooKeeperAddress(options.zooKeeperServerAddress(), databaseContext); + database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout(), databaseContext); + stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod()); + stateGatherer.setNodeStateRequestTimeout(options.nodeStateRequestTimeoutMS()); // TODO: remove as many temporal parameter dependencies as possible here. Currently duplication of state. stateChangeHandler.reconfigureFromOptions(options); stateChangeHandler.setStateChangedFlag(); // Always trigger state recomputation after reconfig - masterElectionHandler.setFleetControllerCount(options.fleetControllerCount); - masterElectionHandler.setMasterZooKeeperCooldownPeriod(options.masterZooKeeperCooldownPeriod); - masterElectionHandler.setUsingZooKeeper(options.zooKeeperServerAddress != null && !options.zooKeeperServerAddress.isEmpty()); + masterElectionHandler.setFleetControllerCount(options.fleetControllerCount()); + masterElectionHandler.setMasterZooKeeperCooldownPeriod(options.masterZooKeeperCooldownPeriod()); + masterElectionHandler.setUsingZooKeeper(options.zooKeeperServerAddress() != null && !options.zooKeeperServerAddress().isEmpty()); if (rpcServer != null) { rpcServer.setMasterElectionHandler(masterElectionHandler); try{ - rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs, options.rpcPort); + rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs(), options.rpcPort()); } catch (ListenFailedException e) { - context.log(logger, Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort + ". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort() + ". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } catch (Exception e) { context.log(logger, Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); } } try { - statusPageServer.setPort(options.httpPort); + statusPageServer.setPort(options.httpPort()); } catch (Exception e) { context.log(logger, Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } long currentTime = timer.getCurrentTimeInMillis(); - nextStateSendTime = Math.min(currentTime + options.minTimeBetweenNewSystemStates, nextStateSendTime); + nextStateSendTime = Math.min(currentTime + options.minTimeBetweenNewSystemStates(), nextStateSendTime); } private void selfTerminateIfConfiguredNodeIndexHasChanged() { - var newId = new FleetControllerId(options.clusterName, options.fleetControllerIndex); + var newId = new FleetControllerId(options.clusterName(), options.fleetControllerIndex()); if (!newId.equals(context.id())) { context.log(logger, Level.WARNING, context.id() + " got new configuration for " + newId + ". We do not support doing this live; " + "immediately exiting now to force new configuration"); @@ -597,7 +594,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta if ( ! isRunning()) { return; } - if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount)) { + if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) { didWork |= resyncLocallyCachedState(); // Calls to metricUpdate.forWork inside method } else { stepDownAsStateGatherer(); @@ -630,7 +627,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta metricUpdater.addTickTime(tickStopTime - tickStartTime, didWork); } // Always sleep some to use avoid using too much CPU and avoid starving waiting threads - monitor.wait(didWork || waitingForCycle ? 1 : options.cycleWaitTime); + monitor.wait(didWork || waitingForCycle ? 1 : options.cycleWaitTime()); if ( ! isRunning()) { return; } tickStartTime = timer.getCurrentTimeInMillis(); processingCycle = true; @@ -707,7 +704,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta databaseContext, communicator, database.getLastKnownStateBundleVersionWrittenBySelf()); if (sentAny) { // FIXME won't this inhibit resending to unresponsive nodes? - nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates; + nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates(); } } // Always allow activations if we've already broadcasted a state @@ -815,7 +812,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta return ""; } return String.format("the following nodes have not converged to at least version %d: %s", - taskConvergeVersion, stringifyListWithLimits(nodes, options.maxDivergentNodesPrintedInTaskErrorMessages)); + taskConvergeVersion, stringifyListWithLimits(nodes, options.maxDivergentNodesPrintedInTaskErrorMessages())); } private boolean completeSatisfiedVersionDependentTasks() { @@ -923,7 +920,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta final ClusterStateBundle candidateBundle = ClusterStateBundle.builder(candidate) .bucketSpaces(configuredBucketSpaces) .stateDeriver(createBucketSpaceStateDeriver()) - .deferredActivation(options.enableTwoPhaseClusterStateActivation) + .deferredActivation(options.enableTwoPhaseClusterStateActivation()) .feedBlock(createResourceExhaustionCalculator() .inferContentClusterFeedBlockOrNull(cluster.getNodeInfos())) .deriveAndBuild(); @@ -956,7 +953,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta } private ClusterStateDeriver createBucketSpaceStateDeriver() { - if (options.clusterHasGlobalDocumentTypes) { + if (options.clusterHasGlobalDocumentTypes()) { return new MaintenanceWhenPendingGlobalMerges(stateVersionTracker.createMergePendingChecker(), createDefaultSpaceMaintenanceTransitionConstraint()); } else { @@ -965,10 +962,10 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta } private ResourceExhaustionCalculator createResourceExhaustionCalculator() { - return new ResourceExhaustionCalculator( - options.clusterFeedBlockEnabled, options.clusterFeedBlockLimit, - stateVersionTracker.getLatestCandidateStateBundle().getFeedBlockOrNull(), - options.clusterFeedBlockNoiseLevel); + return new ResourceExhaustionCalculator(options.clusterFeedBlockEnabled(), + options.clusterFeedBlockLimit(), + stateVersionTracker.getLatestCandidateStateBundle().getFeedBlockOrNull(), + options.clusterFeedBlockNoiseLevel()); } private static ClusterStateDeriver createIdentityClonedBucketSpaceStateDeriver() { @@ -1073,11 +1070,11 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " + stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis())); long currentTime = timer.getCurrentTimeInMillis(); - firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; + firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast(); isMaster = true; inMasterMoratorium = true; context.log(logger, Level.FINE, () -> "At time " + currentTime + " we set first system state broadcast time to be " - + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); + + options.minTimeBeforeFirstSystemStateBroadcast() + " ms after at time " + firstAllowedStateBroadcast + "."); didWork = true; } if (wantedStateChanged) { @@ -1146,7 +1143,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta // To wait at least one complete cycle, if a cycle is already running we need to wait for the next one beyond. long wantedCycle = cycleCount + (processingCycle ? 2 : 1); waitingForCycle = true; - try{ + try { while (cycleCount < wantedCycle) { if (Instant.now().isAfter(endTime)) throw new IllegalStateException("Timed out waiting for cycle to complete. Not completed after " + timeout); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java index fd0784d69c9..5a5d245eb69 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java @@ -13,7 +13,7 @@ public class FleetControllerId { private final int index; public static FleetControllerId fromOptions(FleetControllerOptions options) { - return new FleetControllerId(options.clusterName, options.fleetControllerIndex); + return new FleetControllerId(options.clusterName(), options.fleetControllerIndex()); } public FleetControllerId(String clusterName, int index) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java index 750d44c8fbf..6666066422a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java @@ -9,66 +9,64 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; /** - * This class represents all the options that can be set in the fleetcontroller. + * Immutable class representing all the options that can be set in the fleetcontroller. * Tests typically just generate an instance of this object to use in fleet controller for testing. - * A real application generate this object from config, and on config updates, post new options to the fleet controller. + * A real application generates this object from config, and on config updates, post new options to the fleet controller. */ -public class FleetControllerOptions implements Cloneable { +public class FleetControllerOptions { - // TODO: Make fields private + private final String fleetControllerConfigId; + private final String slobrokConfigId; - public String fleetControllerConfigId; - public String slobrokConfigId; + private final String clusterName; + private final int fleetControllerIndex; + private final int fleetControllerCount; + private final int stateGatherCount; - public String clusterName; - public int fleetControllerIndex = 0; - public int fleetControllerCount = 1; - public int stateGatherCount = 2; - - // TODO: This cannot be null but nonnull is not verified - public String[] slobrokConnectionSpecs; - public int rpcPort = 0; - public int httpPort = 0; - public int distributionBits = 16; + private final String[] slobrokConnectionSpecs; + private final int rpcPort; + private final int httpPort; + private final int distributionBits; /** Timeout before breaking zookeeper session (in milliseconds) */ - public int zooKeeperSessionTimeout = 5 * 60 * 1000; + private final int zooKeeperSessionTimeout; /** * Timeout between master disappearing before new master will take over. * (Grace period to allow old master to detect that it is disconnected from zookeeper) */ - public int masterZooKeeperCooldownPeriod = 15 * 1000; + private final int masterZooKeeperCooldownPeriod; - public String zooKeeperServerAddress = null; + private final String zooKeeperServerAddress; - public int statePollingFrequency = 5000; + private final int statePollingFrequency; /** * Max amount of time to keep a node, that has previously been available * in steady state, in maintenance mode, while node is unreachable, before setting it down. */ - public Map maxTransitionTime = new TreeMap<>(); + private final Map maxTransitionTime; /** * Max amount of time to keep a storage node, that is initializing, in maintenance mode, without any further * initializing progress being received, before setting it down. */ - public int maxInitProgressTime = 5000; + private final int maxInitProgressTime; - public int maxPrematureCrashes = 4; - public long stableStateTimePeriod = 2 * 60 * 60 * 1000; + private final int maxPrematureCrashes; + private final long stableStateTimePeriod; - public int eventLogMaxSize = 1024; - public int eventNodeLogMaxSize = 1024; + private final int eventLogMaxSize; + private final int eventNodeLogMaxSize; - public int minDistributorNodesUp = 1; - public int minStorageNodesUp = 1; - public double minRatioOfDistributorNodesUp = 0.50; - public double minRatioOfStorageNodesUp = 0.50; + private final int minDistributorNodesUp; + private final int minStorageNodesUp; + private final double minRatioOfDistributorNodesUp; + private final double minRatioOfStorageNodesUp; /** * Minimum ratio of nodes in an "available" state (up, initializing or maintenance) @@ -78,13 +76,13 @@ public class FleetControllerOptions implements Cloneable { * * A value of 0.0 implies group auto-takedown feature is effectively disabled. */ - public double minNodeRatioPerGroup = 0.0; + private final double minNodeRatioPerGroup; /** * Milliseconds to sleep after doing a work cycle where we did no work. Some events do not interrupt the sleeping, * such as slobrok changes, so shouldn't set this too high. */ - public int cycleWaitTime = 100; + private final int cycleWaitTime; /** * Minimum time to pass (in milliseconds) before broadcasting our first systemstate. Set small in unit tests, * but should be a few seconds in a real system to prevent new nodes taking over from disturbing the system by @@ -93,7 +91,7 @@ public class FleetControllerOptions implements Cloneable { * reported their state in Slobrok and getnodestate. This value should typically be in the order of * maxSlobrokDisconnectGracePeriod and nodeStateRequestTimeoutMS. */ - public long minTimeBeforeFirstSystemStateBroadcast = 0; + private final long minTimeBeforeFirstSystemStateBroadcast; /** * StateRequestTimeout for the request are randomized a bit to avoid congestion on replies. The effective @@ -101,72 +99,728 @@ public class FleetControllerOptions implements Cloneable { * [nodeStateRequestTimeoutEarliestPercentage * nodeStateRequestTimeoutMS / 100, * nodeStateRequestTimeoutLatestPercentage * nodeStateRequestTimeoutMS / 100]. */ - public int nodeStateRequestTimeoutMS = 5 * 60 * 1000; - public int nodeStateRequestTimeoutEarliestPercentage = 80; - public int nodeStateRequestTimeoutLatestPercentage = 95; - public int nodeStateRequestRoundTripTimeMaxSeconds = 5; + private final int nodeStateRequestTimeoutMS; + private final int nodeStateRequestTimeoutEarliestPercentage; + private final int nodeStateRequestTimeoutLatestPercentage; + private final int nodeStateRequestRoundTripTimeMaxSeconds; - public int minTimeBetweenNewSystemStates = 0; - public boolean showLocalSystemStatesInEventLog = true; + private final int minTimeBetweenNewSystemStates; + private final boolean showLocalSystemStatesInEventLog; /** Maximum time a node can be missing from slobrok before it is tagged down. */ - public int maxSlobrokDisconnectGracePeriod = 1000; + private final int maxSlobrokDisconnectGracePeriod; /** Set by tests to retry often. */ - public BackOffPolicy slobrokBackOffPolicy = null; + private final BackOffPolicy slobrokBackOffPolicy; - public Distribution storageDistribution; + private final Distribution storageDistribution; // TODO: Get rid of this by always getting nodes by distribution.getNodes() - public Set nodes; + private final Set nodes; - public Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30); + private final Duration maxDeferredTaskVersionWaitTime; - public boolean clusterHasGlobalDocumentTypes = false; + private final boolean clusterHasGlobalDocumentTypes; - public boolean enableTwoPhaseClusterStateActivation = false; + private final boolean enableTwoPhaseClusterStateActivation; - // TODO: Choose a default value - public double minMergeCompletionRatio = 1.0; + private final double minMergeCompletionRatio; - public int maxDivergentNodesPrintedInTaskErrorMessages = 10; + private final int maxDivergentNodesPrintedInTaskErrorMessages; - public boolean clusterFeedBlockEnabled = false; + private final boolean clusterFeedBlockEnabled; // Resource type -> limit in [0, 1] - public Map clusterFeedBlockLimit = Collections.emptyMap(); + private final Map clusterFeedBlockLimit; + + private final double clusterFeedBlockNoiseLevel; + + private FleetControllerOptions(String fleetControllerConfigId, + String slobrokConfigId, + String clusterName, + int fleetControllerIndex, + int fleetControllerCount, + int stateGatherCount, + String[] slobrokConnectionSpecs, + int rpcPort, + int httpPort, + int distributionBits, + int zooKeeperSessionTimeout, + int masterZooKeeperCooldownPeriod, + String zooKeeperServerAddress, + int statePollingFrequency, + Map maxTransitionTime, + int maxInitProgressTime, + int maxPrematureCrashes, + long stableStateTimePeriod, + int eventLogMaxSize, + int eventNodeLogMaxSize, + int minDistributorNodesUp, + int minStorageNodesUp, + double minRatioOfDistributorNodesUp, + double minRatioOfStorageNodesUp, + double minNodeRatioPerGroup, + int cycleWaitTime, + long minTimeBeforeFirstSystemStateBroadcast, + int nodeStateRequestTimeoutMS, + int nodeStateRequestTimeoutEarliestPercentage, + int nodeStateRequestTimeoutLatestPercentage, + int nodeStateRequestRoundTripTimeMaxSeconds, + int minTimeBetweenNewSystemStates, + boolean showLocalSystemStatesInEventLog, + int maxSlobrokDisconnectGracePeriod, + BackOffPolicy slobrokBackOffPolicy, + Distribution storageDistribution, + Set nodes, + Duration maxDeferredTaskVersionWaitTime, + boolean clusterHasGlobalDocumentTypes, + boolean enableTwoPhaseClusterStateActivation, + double minMergeCompletionRatio, + int maxDivergentNodesPrintedInTaskErrorMessages, + boolean clusterFeedBlockEnabled, + Map clusterFeedBlockLimit, + double clusterFeedBlockNoiseLevel) { + this.fleetControllerConfigId = fleetControllerConfigId; + this.slobrokConfigId = slobrokConfigId; + this.clusterName = clusterName; + this.fleetControllerIndex = fleetControllerIndex; + this.fleetControllerCount = fleetControllerCount; + this.stateGatherCount = stateGatherCount; + this.slobrokConnectionSpecs = slobrokConnectionSpecs; + this.rpcPort = rpcPort; + this.httpPort = httpPort; + this.distributionBits = distributionBits; + this.zooKeeperSessionTimeout = zooKeeperSessionTimeout; + this.masterZooKeeperCooldownPeriod = masterZooKeeperCooldownPeriod; + this.zooKeeperServerAddress = zooKeeperServerAddress; + this.statePollingFrequency = statePollingFrequency; + this.maxTransitionTime = maxTransitionTime; + this.maxInitProgressTime = maxInitProgressTime; + this.maxPrematureCrashes = maxPrematureCrashes; + this.stableStateTimePeriod = stableStateTimePeriod; + this.eventLogMaxSize = eventLogMaxSize; + this.eventNodeLogMaxSize = eventNodeLogMaxSize; + this.minDistributorNodesUp = minDistributorNodesUp; + this.minStorageNodesUp = minStorageNodesUp; + this.minRatioOfDistributorNodesUp = minRatioOfDistributorNodesUp; + this.minRatioOfStorageNodesUp = minRatioOfStorageNodesUp; + this.minNodeRatioPerGroup = minNodeRatioPerGroup; + this.cycleWaitTime = cycleWaitTime; + this.minTimeBeforeFirstSystemStateBroadcast = minTimeBeforeFirstSystemStateBroadcast; + this.nodeStateRequestTimeoutMS = nodeStateRequestTimeoutMS; + this.nodeStateRequestTimeoutEarliestPercentage = nodeStateRequestTimeoutEarliestPercentage; + this.nodeStateRequestTimeoutLatestPercentage = nodeStateRequestTimeoutLatestPercentage; + this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds; + this.minTimeBetweenNewSystemStates = minTimeBetweenNewSystemStates; + this.showLocalSystemStatesInEventLog = showLocalSystemStatesInEventLog; + this.maxSlobrokDisconnectGracePeriod = maxSlobrokDisconnectGracePeriod; + this.slobrokBackOffPolicy = slobrokBackOffPolicy; + this.storageDistribution = storageDistribution; + this.nodes = nodes; + this.maxDeferredTaskVersionWaitTime = maxDeferredTaskVersionWaitTime; + this.clusterHasGlobalDocumentTypes = clusterHasGlobalDocumentTypes; + this.enableTwoPhaseClusterStateActivation = enableTwoPhaseClusterStateActivation; + this.minMergeCompletionRatio = minMergeCompletionRatio; + this.maxDivergentNodesPrintedInTaskErrorMessages = maxDivergentNodesPrintedInTaskErrorMessages; + this.clusterFeedBlockEnabled = clusterFeedBlockEnabled; + this.clusterFeedBlockLimit = clusterFeedBlockLimit; + this.clusterFeedBlockNoiseLevel = clusterFeedBlockNoiseLevel; + } - public double clusterFeedBlockNoiseLevel = 0.01; + public Duration getMaxDeferredTaskVersionWaitTime() { + return maxDeferredTaskVersionWaitTime; + } - public FleetControllerOptions(String clusterName, Collection nodes) { - this.clusterName = clusterName; - maxTransitionTime.put(NodeType.DISTRIBUTOR, 0); - maxTransitionTime.put(NodeType.STORAGE, 5000); - this.nodes = new TreeSet<>(nodes); + public long storageNodeMaxTransitionTimeMs() { + return maxTransitionTime.getOrDefault(NodeType.STORAGE, 10_000); } - /** Called on reconfiguration of this cluster */ - public void setStorageDistribution(Distribution distribution) { - this.storageDistribution = distribution; + public String fleetControllerConfigId() {return fleetControllerConfigId;} + + public String slobrokConfigId() { + return slobrokConfigId; } - public Duration getMaxDeferredTaskVersionWaitTime() { + public String clusterName() { + return clusterName; + } + + public int fleetControllerIndex() { + return fleetControllerIndex; + } + + public int fleetControllerCount() { + return fleetControllerCount; + } + + public int stateGatherCount() { + return stateGatherCount; + } + + public String[] slobrokConnectionSpecs() { + return slobrokConnectionSpecs; + } + + public int rpcPort() { + return rpcPort; + } + + public int httpPort() { + return httpPort; + } + + public int distributionBits() { + return distributionBits; + } + + public int zooKeeperSessionTimeout() { + return zooKeeperSessionTimeout; + } + + public int masterZooKeeperCooldownPeriod() { + return masterZooKeeperCooldownPeriod; + } + + public String zooKeeperServerAddress() { + return zooKeeperServerAddress; + } + + public int statePollingFrequency() { + return statePollingFrequency; + } + + public Map maxTransitionTime() { + return maxTransitionTime; + } + + public int maxInitProgressTime() { + return maxInitProgressTime; + } + + public int maxPrematureCrashes() { + return maxPrematureCrashes; + } + + public long stableStateTimePeriod() { + return stableStateTimePeriod; + } + + public int eventLogMaxSize() { + return eventLogMaxSize; + } + + public int eventNodeLogMaxSize() { + return eventNodeLogMaxSize; + } + + public int minDistributorNodesUp() { + return minDistributorNodesUp; + } + + public int minStorageNodesUp() { + return minStorageNodesUp; + } + + public double minRatioOfDistributorNodesUp() { + return minRatioOfDistributorNodesUp; + } + + public double minRatioOfStorageNodesUp() { + return minRatioOfStorageNodesUp; + } + + public double minNodeRatioPerGroup() { + return minNodeRatioPerGroup; + } + + public int cycleWaitTime() { + return cycleWaitTime; + } + + public long minTimeBeforeFirstSystemStateBroadcast() { + return minTimeBeforeFirstSystemStateBroadcast; + } + + public int nodeStateRequestTimeoutMS() { + return nodeStateRequestTimeoutMS; + } + + public int nodeStateRequestTimeoutEarliestPercentage() { + return nodeStateRequestTimeoutEarliestPercentage; + } + + public int nodeStateRequestTimeoutLatestPercentage() { + return nodeStateRequestTimeoutLatestPercentage; + } + + public int nodeStateRequestRoundTripTimeMaxSeconds() { + return nodeStateRequestRoundTripTimeMaxSeconds; + } + + public int minTimeBetweenNewSystemStates() { + return minTimeBetweenNewSystemStates; + } + + public boolean showLocalSystemStatesInEventLog() { + return showLocalSystemStatesInEventLog; + } + + public int maxSlobrokDisconnectGracePeriod() { + return maxSlobrokDisconnectGracePeriod; + } + + public BackOffPolicy slobrokBackOffPolicy() { + return slobrokBackOffPolicy; + } + + public Distribution storageDistribution() { + return storageDistribution; + } + + public Set nodes() { + return nodes; + } + + public Duration maxDeferredTaskVersionWaitTime() { return maxDeferredTaskVersionWaitTime; } - public void setMaxDeferredTaskVersionWaitTime(Duration maxDeferredTaskVersionWaitTime) { - this.maxDeferredTaskVersionWaitTime = maxDeferredTaskVersionWaitTime; + public boolean clusterHasGlobalDocumentTypes() { + return clusterHasGlobalDocumentTypes; } - public long storageNodeMaxTransitionTimeMs() { - return maxTransitionTime.getOrDefault(NodeType.STORAGE, 10_000); + public boolean enableTwoPhaseClusterStateActivation() { + return enableTwoPhaseClusterStateActivation; + } + + public double minMergeCompletionRatio() { + return minMergeCompletionRatio; + } + + public int maxDivergentNodesPrintedInTaskErrorMessages() { + return maxDivergentNodesPrintedInTaskErrorMessages; + } + + public boolean clusterFeedBlockEnabled() { + return clusterFeedBlockEnabled; } - public FleetControllerOptions clone() { - try { - // TODO: This should deep clone - return (FleetControllerOptions) super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException("Will not happen"); + public Map clusterFeedBlockLimit() { + return clusterFeedBlockLimit; + } + + public double clusterFeedBlockNoiseLevel() { + return clusterFeedBlockNoiseLevel; + } + + public static class Builder { + + private String fleetControllerConfigId; + private String slobrokConfigId; + private String clusterName; + private int index = 0; + private int count = 1; + private int stateGatherCount = 2; + private String[] slobrokConnectionSpecs; + private int rpcPort = 0; + private int httpPort = 0; + private int distributionBits = 16; + private int zooKeeperSessionTimeout = 5 * 60 * 1000; + private int masterZooKeeperCooldownPeriod = 15 * 1000; + private String zooKeeperServerAddress = null; + private int statePollingFrequency = 5000; + private Map maxTransitionTime = new TreeMap<>(); + private int maxInitProgressTime = 5000; + private int maxPrematureCrashes = 4; + private long stableStateTimePeriod = 2 * 60 * 60 * 1000; + private int eventLogMaxSize = 1024; + private int eventNodeLogMaxSize = 1024; + private int minDistributorNodesUp = 1; + private int minStorageNodesUp = 1; + private double minRatioOfDistributorNodesUp = 0.50; + private double minRatioOfStorageNodesUp = 0.50; + private double minNodeRatioPerGroup = 0.0; + private int cycleWaitTime = 100; + private long minTimeBeforeFirstSystemStateBroadcast = 0; + private int nodeStateRequestTimeoutMS = 5 * 60 * 1000; + private int nodeStateRequestTimeoutEarliestPercentage = 80; + private int nodeStateRequestTimeoutLatestPercentage = 95; + private int nodeStateRequestRoundTripTimeMaxSeconds = 5; + private int minTimeBetweenNewSystemStates = 0; + private boolean showLocalSystemStatesInEventLog = true; + private int maxSlobrokDisconnectGracePeriod = 1000; + private BackOffPolicy slobrokBackOffPolicy = null; + private Distribution storageDistribution; + private Set nodes; + private Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30); + private boolean clusterHasGlobalDocumentTypes = false; + private boolean enableTwoPhaseClusterStateActivation = false; + private double minMergeCompletionRatio = 1.0; + private int maxDivergentNodesPrintedInTaskErrorMessages = 10; + private boolean clusterFeedBlockEnabled = false; + private Map clusterFeedBlockLimit = Collections.emptyMap(); + private double clusterFeedBlockNoiseLevel = 0.01; + + public Builder(String clusterName, Collection nodes) { + this.clusterName = clusterName; + this.nodes = new TreeSet<>(nodes); + maxTransitionTime.put(NodeType.DISTRIBUTOR, 0); + maxTransitionTime.put(NodeType.STORAGE, 5000); + } + + public String clusterName() { + return clusterName; + } + + public Builder setClusterName(String clusterName) { + this.clusterName = clusterName; + return this; + } + + public int fleetControllerIndex() { + return index; + } + + public Builder setIndex(int index) { + this.index = index; + return this; + } + + public Builder setCount(int count) { + this.count = count; + return this; + } + + public Builder setStateGatherCount(int stateGatherCount) { + this.stateGatherCount = stateGatherCount; + return this; + } + + public String[] slobrokConnectionSpecs() { + return slobrokConnectionSpecs; + } + + public Builder setSlobrokConnectionSpecs(String[] slobrokConnectionSpecs) { + Objects.requireNonNull(slobrokConnectionSpecs, "slobrokConnectionSpecs cannot be null"); + this.slobrokConnectionSpecs = slobrokConnectionSpecs; + return this; + } + + public Builder setRpcPort(int rpcPort) { + this.rpcPort = rpcPort; + return this; + } + + public Builder setHttpPort(int httpPort) { + this.httpPort = httpPort; + return this; + } + + public Builder setDistributionBits(int distributionBits) { + this.distributionBits = distributionBits; + return this; + } + + public Builder setZooKeeperSessionTimeout(int zooKeeperSessionTimeout) { + this.zooKeeperSessionTimeout = zooKeeperSessionTimeout; + return this; + } + + public Builder setMasterZooKeeperCooldownPeriod(int masterZooKeeperCooldownPeriod) { + this.masterZooKeeperCooldownPeriod = masterZooKeeperCooldownPeriod; + return this; + } + + public String zooKeeperServerAddress() { + return zooKeeperServerAddress; + } + + public Builder setZooKeeperServerAddress(String zooKeeperServerAddress) { + if (zooKeeperServerAddress == null || "".equals(zooKeeperServerAddress)) { + throw new IllegalArgumentException("zookeeper server address must be set, was '" + zooKeeperServerAddress + "'"); + } + this.zooKeeperServerAddress = zooKeeperServerAddress; + return this; + } + + public Builder setStatePollingFrequency(int statePollingFrequency) { + this.statePollingFrequency = statePollingFrequency; + return this; + } + + public Map maxTransitionTime() { + return maxTransitionTime; + } + + public Builder setMaxTransitionTime(NodeType nodeType, Integer maxTransitionTime) { + this.maxTransitionTime.put(nodeType, maxTransitionTime); + return this; + } + + public int maxInitProgressTime() { + return maxInitProgressTime; + } + + public Builder setMaxInitProgressTime(int maxInitProgressTime) { + this.maxInitProgressTime = maxInitProgressTime; + return this; + } + + public int maxPrematureCrashes() { + return maxPrematureCrashes; + } + + public Builder setMaxPrematureCrashes(int maxPrematureCrashes) { + this.maxPrematureCrashes = maxPrematureCrashes; + return this; + } + + public long stableStateTimePeriod() { + return stableStateTimePeriod; + } + + public Builder setStableStateTimePeriod(long stableStateTimePeriod) { + this.stableStateTimePeriod = stableStateTimePeriod; + return this; + } + + public Builder setEventLogMaxSize(int eventLogMaxSize) { + this.eventLogMaxSize = eventLogMaxSize; + return this; + } + + public Builder setEventNodeLogMaxSize(int eventNodeLogMaxSize) { + this.eventNodeLogMaxSize = eventNodeLogMaxSize; + return this; + } + + public Builder setMinDistributorNodesUp(int minDistributorNodesUp) { + this.minDistributorNodesUp = minDistributorNodesUp; + return this; + } + + public Builder setMinStorageNodesUp(int minStorageNodesUp) { + this.minStorageNodesUp = minStorageNodesUp; + return this; + } + + public Builder setMinRatioOfDistributorNodesUp(double minRatioOfDistributorNodesUp) { + this.minRatioOfDistributorNodesUp = minRatioOfDistributorNodesUp; + return this; + } + + public Builder setMinRatioOfStorageNodesUp(double minRatioOfStorageNodesUp) { + this.minRatioOfStorageNodesUp = minRatioOfStorageNodesUp; + return this; + } + + public Builder setMinNodeRatioPerGroup(double minNodeRatioPerGroup) { + this.minNodeRatioPerGroup = minNodeRatioPerGroup; + return this; + } + + public Builder setCycleWaitTime(int cycleWaitTime) { + this.cycleWaitTime = cycleWaitTime; + return this; + } + + public Builder setMinTimeBeforeFirstSystemStateBroadcast(long minTimeBeforeFirstSystemStateBroadcast) { + this.minTimeBeforeFirstSystemStateBroadcast = minTimeBeforeFirstSystemStateBroadcast; + return this; + } + + public int nodeStateRequestTimeoutMS() { + return nodeStateRequestTimeoutMS; + } + + public Builder setNodeStateRequestTimeoutMS(int nodeStateRequestTimeoutMS) { + this.nodeStateRequestTimeoutMS = nodeStateRequestTimeoutMS; + return this; + } + + public Builder setNodeStateRequestTimeoutEarliestPercentage(int nodeStateRequestTimeoutEarliestPercentage) { + this.nodeStateRequestTimeoutEarliestPercentage = nodeStateRequestTimeoutEarliestPercentage; + return this; + } + + public Builder setNodeStateRequestTimeoutLatestPercentage(int nodeStateRequestTimeoutLatestPercentage) { + this.nodeStateRequestTimeoutLatestPercentage = nodeStateRequestTimeoutLatestPercentage; + return this; + } + + public Builder setMinTimeBetweenNewSystemStates(int minTimeBetweenNewSystemStates) { + this.minTimeBetweenNewSystemStates = minTimeBetweenNewSystemStates; + return this; + } + + public Builder setShowLocalSystemStatesInEventLog(boolean showLocalSystemStatesInEventLog) { + this.showLocalSystemStatesInEventLog = showLocalSystemStatesInEventLog; + return this; + } + + public int maxSlobrokDisconnectGracePeriod() { + return maxSlobrokDisconnectGracePeriod; + } + + public Builder setMaxSlobrokDisconnectGracePeriod(int maxSlobrokDisconnectGracePeriod) { + this.maxSlobrokDisconnectGracePeriod = maxSlobrokDisconnectGracePeriod; + return this; + } + + public Builder setStorageDistribution(Distribution storageDistribution) { + this.storageDistribution = storageDistribution; + return this; + } + + public Set nodes() { + return nodes; + } + + public Builder setNodes(Set nodes) { + this.nodes = nodes; + return this; + } + + public Builder setMaxDeferredTaskVersionWaitTime(Duration maxDeferredTaskVersionWaitTime) { + this.maxDeferredTaskVersionWaitTime = maxDeferredTaskVersionWaitTime; + return this; + } + + public Builder setClusterHasGlobalDocumentTypes(boolean clusterHasGlobalDocumentTypes) { + this.clusterHasGlobalDocumentTypes = clusterHasGlobalDocumentTypes; + return this; + } + + public Builder enableTwoPhaseClusterStateActivation(boolean enableTwoPhaseClusterStateActivation) { + this.enableTwoPhaseClusterStateActivation = enableTwoPhaseClusterStateActivation; + return this; + } + + public double minMergeCompletionRatio() { + return minMergeCompletionRatio; + } + + public Builder setMinMergeCompletionRatio(double minMergeCompletionRatio) { + this.minMergeCompletionRatio = minMergeCompletionRatio; + return this; + } + + public Builder setMaxDivergentNodesPrintedInTaskErrorMessages(int maxDivergentNodesPrintedInTaskErrorMessages) { + this.maxDivergentNodesPrintedInTaskErrorMessages = maxDivergentNodesPrintedInTaskErrorMessages; + return this; + } + + public Builder setClusterFeedBlockEnabled(boolean clusterFeedBlockEnabled) { + this.clusterFeedBlockEnabled = clusterFeedBlockEnabled; + return this; + } + + public Builder setClusterFeedBlockLimit(Map clusterFeedBlockLimit) { + this.clusterFeedBlockLimit = Map.copyOf(clusterFeedBlockLimit); + return this; + } + + public Builder setClusterFeedBlockNoiseLevel(double clusterFeedBlockNoiseLevel) { + this.clusterFeedBlockNoiseLevel = clusterFeedBlockNoiseLevel; + return this; + } + + public FleetControllerOptions build() { + return new FleetControllerOptions(fleetControllerConfigId, + slobrokConfigId, + clusterName, + index, + count, + stateGatherCount, + slobrokConnectionSpecs, + rpcPort, + httpPort, + distributionBits, + zooKeeperSessionTimeout, + masterZooKeeperCooldownPeriod, + zooKeeperServerAddress, + statePollingFrequency, + maxTransitionTime, + maxInitProgressTime, + maxPrematureCrashes, + stableStateTimePeriod, + eventLogMaxSize, + eventNodeLogMaxSize, + minDistributorNodesUp, + minStorageNodesUp, + minRatioOfDistributorNodesUp, + minRatioOfStorageNodesUp, + minNodeRatioPerGroup, + cycleWaitTime, + minTimeBeforeFirstSystemStateBroadcast, + nodeStateRequestTimeoutMS, + nodeStateRequestTimeoutEarliestPercentage, + nodeStateRequestTimeoutLatestPercentage, + nodeStateRequestRoundTripTimeMaxSeconds, + minTimeBetweenNewSystemStates, + showLocalSystemStatesInEventLog, + maxSlobrokDisconnectGracePeriod, + slobrokBackOffPolicy, + storageDistribution, + nodes, + maxDeferredTaskVersionWaitTime, + clusterHasGlobalDocumentTypes, + enableTwoPhaseClusterStateActivation, + minMergeCompletionRatio, + maxDivergentNodesPrintedInTaskErrorMessages, + clusterFeedBlockEnabled, + clusterFeedBlockLimit, + clusterFeedBlockNoiseLevel); + } + + public static Builder copy(FleetControllerOptions options) { + Builder builder = new Builder(options.clusterName(), options.nodes()); + builder.fleetControllerConfigId = options.fleetControllerConfigId; + builder.slobrokConfigId = options.slobrokConfigId; + builder.clusterName = options.clusterName; + builder.index = options.fleetControllerIndex; + builder.count = options.fleetControllerCount; + builder.stateGatherCount = options.stateGatherCount; + builder.slobrokConnectionSpecs = options.slobrokConnectionSpecs; + builder.rpcPort = options.rpcPort; + builder.httpPort = options.httpPort; + builder.distributionBits = options.distributionBits; + builder.zooKeeperSessionTimeout = options.zooKeeperSessionTimeout; + builder.masterZooKeeperCooldownPeriod = options.masterZooKeeperCooldownPeriod; + builder.zooKeeperServerAddress = options.zooKeeperServerAddress; + builder.statePollingFrequency = options.statePollingFrequency; + builder.maxTransitionTime = Map.copyOf(options.maxTransitionTime); + builder.maxInitProgressTime = options.maxInitProgressTime; + builder.maxPrematureCrashes = options.maxPrematureCrashes; + builder.stableStateTimePeriod = options.stableStateTimePeriod; + builder.eventLogMaxSize = options.eventLogMaxSize; + builder.eventNodeLogMaxSize = options.eventNodeLogMaxSize; + builder.minDistributorNodesUp = options.minDistributorNodesUp; + builder.minStorageNodesUp = options.minStorageNodesUp; + builder.minRatioOfDistributorNodesUp = options.minRatioOfStorageNodesUp; + builder.minRatioOfStorageNodesUp = options.minRatioOfStorageNodesUp; + builder.minNodeRatioPerGroup = options.minNodeRatioPerGroup; + builder.cycleWaitTime = options.cycleWaitTime; + builder.minTimeBeforeFirstSystemStateBroadcast = options.minTimeBeforeFirstSystemStateBroadcast; + builder.nodeStateRequestTimeoutMS = options.nodeStateRequestTimeoutMS; + builder.nodeStateRequestTimeoutEarliestPercentage = options.nodeStateRequestTimeoutEarliestPercentage; + builder.nodeStateRequestTimeoutLatestPercentage = options.nodeStateRequestTimeoutLatestPercentage; + builder.nodeStateRequestRoundTripTimeMaxSeconds = options.nodeStateRequestRoundTripTimeMaxSeconds; + builder.minTimeBetweenNewSystemStates = options.minTimeBetweenNewSystemStates; + builder.showLocalSystemStatesInEventLog = options.showLocalSystemStatesInEventLog; + builder.maxSlobrokDisconnectGracePeriod = options.maxSlobrokDisconnectGracePeriod; + builder.slobrokBackOffPolicy = options.slobrokBackOffPolicy; + builder.storageDistribution = options.storageDistribution; + builder.nodes = Set.copyOf(options.nodes); + builder.maxDeferredTaskVersionWaitTime = options.maxDeferredTaskVersionWaitTime; + builder.clusterHasGlobalDocumentTypes = options.clusterHasGlobalDocumentTypes; + builder.enableTwoPhaseClusterStateActivation = options.enableTwoPhaseClusterStateActivation; + builder.minMergeCompletionRatio = options.minMergeCompletionRatio; + builder.maxDivergentNodesPrintedInTaskErrorMessages = options.maxDivergentNodesPrintedInTaskErrorMessages; + builder.clusterFeedBlockEnabled = options.clusterFeedBlockEnabled; + builder.clusterFeedBlockLimit = Map.copyOf(options.clusterFeedBlockLimit); + builder.clusterFeedBlockNoiseLevel = options.clusterFeedBlockNoiseLevel; + + return builder; } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java index 9897e3cf04c..4ab80ec6d7a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -228,11 +228,11 @@ public class StateChangeHandler { } void reconfigureFromOptions(FleetControllerOptions options) { - setMaxPrematureCrashes(options.maxPrematureCrashes); - setStableStateTimePeriod(options.stableStateTimePeriod); - setMaxInitProgressTime(options.maxInitProgressTime); - setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); - setMaxTransitionTime(options.maxTransitionTime); + setMaxPrematureCrashes(options.maxPrematureCrashes()); + setStableStateTimePeriod(options.stableStateTimePeriod()); + setMaxInitProgressTime(options.maxInitProgressTime()); + setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod()); + setMaxTransitionTime(options.maxTransitionTime()); } // TODO too many hidden behavior dependencies between this and the actually diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index e223ad12fb9..3738a17f4b6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -30,8 +30,8 @@ import java.util.logging.Logger; import static com.google.common.base.Preconditions.checkArgument; /** - * Responsible for doing RPC requests to VDS nodes. - This class is not thread-safe. + * Responsible for doing RPC requests to storage nodes. + * This class is not thread-safe. */ public class RPCCommunicator implements Communicator { @@ -56,7 +56,6 @@ public class RPCCommunicator implements Communicator { public static Supervisor createRealSupervisor() { return new Supervisor(new Transport("rpc-communicator")).setDropEmptyBuffers(true); - } public RPCCommunicator(Supervisor supervisor, @@ -95,17 +94,16 @@ public class RPCCommunicator implements Communicator { @Override public void propagateOptions(FleetControllerOptions options) { - checkArgument(options.nodeStateRequestTimeoutMS > 0); - checkArgument(options.nodeStateRequestTimeoutEarliestPercentage >= 0); - checkArgument(options.nodeStateRequestTimeoutEarliestPercentage <= 100); - checkArgument(options.nodeStateRequestTimeoutLatestPercentage - >= options.nodeStateRequestTimeoutEarliestPercentage); - checkArgument(options.nodeStateRequestTimeoutLatestPercentage <= 100); - checkArgument(options.nodeStateRequestRoundTripTimeMaxSeconds >= 0); - this.nodeStateRequestTimeoutIntervalMax = Duration.ofMillis(options.nodeStateRequestTimeoutMS); - this.nodeStateRequestTimeoutIntervalStartPercentage = options.nodeStateRequestTimeoutEarliestPercentage; - this.nodeStateRequestTimeoutIntervalStopPercentage = options.nodeStateRequestTimeoutLatestPercentage; - this.nodeStateRequestRoundTripTimeMax = Duration.ofSeconds(options.nodeStateRequestRoundTripTimeMaxSeconds); + checkArgument(options.nodeStateRequestTimeoutMS() > 0); + checkArgument(options.nodeStateRequestTimeoutEarliestPercentage() >= 0); + checkArgument(options.nodeStateRequestTimeoutEarliestPercentage() <= 100); + checkArgument(options.nodeStateRequestTimeoutLatestPercentage() >= options.nodeStateRequestTimeoutEarliestPercentage()); + checkArgument(options.nodeStateRequestTimeoutLatestPercentage() <= 100); + checkArgument(options.nodeStateRequestRoundTripTimeMaxSeconds() >= 0); + this.nodeStateRequestTimeoutIntervalMax = Duration.ofMillis(options.nodeStateRequestTimeoutMS()); + this.nodeStateRequestTimeoutIntervalStartPercentage = options.nodeStateRequestTimeoutEarliestPercentage(); + this.nodeStateRequestTimeoutIntervalStopPercentage = options.nodeStateRequestTimeoutLatestPercentage(); + this.nodeStateRequestRoundTripTimeMax = Duration.ofSeconds(options.nodeStateRequestRoundTripTimeMaxSeconds()); } @Override diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java index 0611d754b69..a7ec2ddaa8f 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java @@ -68,7 +68,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa response.setContentType("text/html"); StringBuilder content = new StringBuilder(); content.append("\n"); - response.writeHtmlHeader(content, cluster.getName() + " Cluster Controller " + options.fleetControllerIndex + " Status Page"); + response.writeHtmlHeader(content, cluster.getName() + " Cluster Controller " + options.fleetControllerIndex() + " Status Page"); content.append("

") .append(" [ Current config") .append(" | Cluster states") @@ -76,7 +76,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa .append(" ]

\n"); content.append(""); content.append("
UTC time when creating this page:").append(RealTimer.printDateNoMilliSeconds(currentTime, tz)).append("
Cluster controller uptime:" + RealTimer.printDuration(currentTime - startedTime) + "
"); - if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount)) { + if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) { // Table overview of all the nodes writeHtmlState(cluster, content, timer, stateVersionTracker, options, eventLog); // Current cluster state and cluster state history @@ -86,7 +86,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa writeHtmlState(content, options); } // State of master election - masterElectionHandler.writeHtmlState(content, options.stateGatherCount); + masterElectionHandler.writeHtmlState(content, options.stateGatherCount()); // Overview of current config writeHtmlState(content, options); // Event log @@ -172,7 +172,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa HtmlTable.escape(state.getFeedBlockOrNull().getDescription()))); } - List groups = LeafGroups.enumerateFrom(options.storageDistribution.getRootGroup()); + List groups = LeafGroups.enumerateFrom(options.storageDistribution().getRootGroup()); for (Group group : groups) { assert (group != null); @@ -189,14 +189,14 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa timer, state, stateVersionTracker.getAggregatedClusterStats(), - options.minMergeCompletionRatio, - options.maxPrematureCrashes, - options.clusterFeedBlockLimit, + options.minMergeCompletionRatio(), + options.maxPrematureCrashes(), + options.clusterFeedBlockLimit(), eventLog, cluster.getName(), localName); } - table.addTable(sb, options.stableStateTimePeriod); + table.addTable(sb, options.stableStateTimePeriod()); } private void storeNodeInfo(ContentCluster cluster, int nodeIndex, NodeType nodeType, Map nodeInfoByIndex) { @@ -207,59 +207,59 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa public void writeHtmlState(StringBuilder sb, FleetControllerOptions options) { String slobrokspecs = ""; - for (int i = 0; i < options.slobrokConnectionSpecs.length; ++i) { + for (int i = 0; i < options.slobrokConnectionSpecs().length; ++i) { if (i != 0) slobrokspecs += "
"; - slobrokspecs += options.slobrokConnectionSpecs[i]; + slobrokspecs += options.slobrokConnectionSpecs()[i]; } sb.append("

Current config

\n") - .append("

Fleet controller config id: ").append(options.fleetControllerConfigId == null ? null : options.fleetControllerConfigId.replaceAll("\n", "
\n")).append("

\n") - .append("

Slobrok config id: ").append(options.slobrokConfigId == null ? null : options.slobrokConfigId.replaceAll("\n", "
\n")).append("

\n") + .append("

Fleet controller config id: ").append(options.fleetControllerConfigId() == null ? null : options.fleetControllerConfigId().replaceAll("\n", "
\n")).append("

\n") + .append("

Slobrok config id: ").append(options.slobrokConfigId() == null ? null : options.slobrokConfigId().replaceAll("\n", "
\n")).append("

\n") .append("\n"); - sb.append(""); - sb.append(""); - sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - String zooKeeperAddress = (options.zooKeeperServerAddress == null ? "Not using Zookeeper" : splitZooKeeperAddress(options.zooKeeperServerAddress)); + sb.append(""); + sb.append(""); + sb.append(""); + String zooKeeperAddress = (options.zooKeeperServerAddress() == null ? "Not using Zookeeper" : splitZooKeeperAddress(options.zooKeeperServerAddress())); sb.append(""); - sb.append(""); + sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); - sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); sb.append(""); + .append(options.clusterFeedBlockEnabled()).append(""); sb.append(""); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java index 5a9d28bc327..39878928944 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java @@ -44,21 +44,21 @@ public class ClusterFeedBlockTest extends FleetControllerTest { private void initialize(FleetControllerOptions options) throws Exception { List nodes = new ArrayList<>(); - for (int i = 0; i < options.nodes.size(); ++i) { + for (int i = 0; i < options.nodes().size(); ++i) { nodes.add(new Node(NodeType.STORAGE, i)); nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } var context = new TestFleetControllerContext(options); communicator = new DummyCommunicator(nodes, timer); - var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex(), options.clusterName()); var eventLog = new EventLog(timer, metricUpdater); - var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var cluster = new ContentCluster(options.clusterName(), options.nodes(), options.storageDistribution()); var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress(), timer); var stateGenerator = new StateChangeHandler(context, timer, eventLog); var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); - var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex(), options.fleetControllerCount(), timer, timer); var status = new StatusHandler.ContainerStatusPageServer(); ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, status, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); @@ -69,7 +69,7 @@ public class ClusterFeedBlockTest extends FleetControllerTest { } private void markAllNodesAsUp(FleetControllerOptions options) throws Exception { - for (int i = 0; i < options.nodes.size(); ++i) { + for (int i = 0; i < options.nodes().size(); ++i) { communicator.setNodeState(new Node(NodeType.STORAGE, i), State.UP, ""); communicator.setNodeState(new Node(NodeType.DISTRIBUTOR, i), State.UP, ""); } @@ -84,15 +84,13 @@ public class ClusterFeedBlockTest extends FleetControllerTest { super.tearDown(); } - private static FleetControllerOptions createOptions(Map feedBlockLimits, - double clusterFeedBlockNoiseLevel) { - FleetControllerOptions options = defaultOptions("mycluster"); - options.setStorageDistribution(DistributionBuilder.forFlatCluster(NODE_COUNT)); - options.nodes = new HashSet<>(DistributionBuilder.buildConfiguredNodes(NODE_COUNT)); - options.clusterFeedBlockEnabled = true; - options.clusterFeedBlockLimit = Map.copyOf(feedBlockLimits); - options.clusterFeedBlockNoiseLevel = clusterFeedBlockNoiseLevel; - return options; + private static FleetControllerOptions createOptions(Map feedBlockLimits, double clusterFeedBlockNoiseLevel) { + return defaultOptions("mycluster") + .setStorageDistribution(DistributionBuilder.forFlatCluster(NODE_COUNT)) + .setNodes(new HashSet<>(DistributionBuilder.buildConfiguredNodes(NODE_COUNT))) + .setClusterFeedBlockEnabled(true) + .setClusterFeedBlockLimit(feedBlockLimits) + .setClusterFeedBlockNoiseLevel(clusterFeedBlockNoiseLevel).build(); } private static FleetControllerOptions createOptions(Map feedBlockLimits) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java index ef1d676fc4a..30c90ee0664 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java @@ -861,23 +861,25 @@ public class ClusterStateGeneratorTest { @Test void generator_params_can_inherit_values_from_controller_options() { - FleetControllerOptions options = new FleetControllerOptions("foocluster", Set.of(new ConfiguredNode(0, false))); - options.maxPrematureCrashes = 1; - options.minStorageNodesUp = 2; - options.minDistributorNodesUp = 3; - options.minRatioOfStorageNodesUp = 0.4; - options.minRatioOfDistributorNodesUp = 0.5; - options.minNodeRatioPerGroup = 0.6; - options.distributionBits = 7; - options.maxTransitionTime = ClusterStateGenerator.Params.buildTransitionTimeMap(1000, 2000); + FleetControllerOptions options = new FleetControllerOptions.Builder("foocluster", Set.of(new ConfiguredNode(0, false))) + .setMaxPrematureCrashes(1) + .setMinStorageNodesUp(2) + .setMinDistributorNodesUp(3) + .setMinRatioOfStorageNodesUp(0.4) + .setMinRatioOfDistributorNodesUp(0.5) + .setMinNodeRatioPerGroup(0.6) + .setDistributionBits(7) + .setMaxTransitionTime(NodeType.DISTRIBUTOR, 1000) + .setMaxTransitionTime(NodeType.STORAGE, 2000).build(); + final ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options); - assertThat(params.maxPrematureCrashes, equalTo(options.maxPrematureCrashes)); - assertThat(params.minStorageNodesUp, equalTo(options.minStorageNodesUp)); - assertThat(params.minDistributorNodesUp, equalTo(options.minDistributorNodesUp)); - assertThat(params.minRatioOfStorageNodesUp, equalTo(options.minRatioOfStorageNodesUp)); - assertThat(params.minRatioOfDistributorNodesUp, equalTo(options.minRatioOfDistributorNodesUp)); - assertThat(params.minNodeRatioPerGroup, equalTo(options.minNodeRatioPerGroup)); - assertThat(params.transitionTimes, equalTo(options.maxTransitionTime)); + assertThat(params.maxPrematureCrashes, equalTo(options.maxPrematureCrashes())); + assertThat(params.minStorageNodesUp, equalTo(options.minStorageNodesUp())); + assertThat(params.minDistributorNodesUp, equalTo(options.minDistributorNodesUp())); + assertThat(params.minRatioOfStorageNodesUp, equalTo(options.minRatioOfStorageNodesUp())); + assertThat(params.minRatioOfDistributorNodesUp, equalTo(options.minRatioOfDistributorNodesUp())); + assertThat(params.minNodeRatioPerGroup, equalTo(options.minNodeRatioPerGroup())); + assertThat(params.transitionTimes, equalTo(options.maxTransitionTime())); } @Test diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseTest.java index 9a6a9e063ac..a383c225e89 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseTest.java @@ -28,9 +28,9 @@ public class DatabaseTest extends FleetControllerTest { @Test void testWantedStatesInZooKeeper() throws Exception { startingTest("DatabaseTest::testWantedStatesInZooKeeper"); - FleetControllerOptions options = defaultOptions("mycluster"); - options.zooKeeperServerAddress = "127.0.0.1"; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster"); + builder.setZooKeeperServerAddress("127.0.0.1"); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions()); log.info("WAITING FOR STABLE SYSTEM"); waitForStableSystem(); @@ -82,11 +82,11 @@ public class DatabaseTest extends FleetControllerTest { @Test void testWantedStateOfUnknownNode() throws Exception { startingTest("DatabaseTest::testWantedStatesOfUnknownNode"); - FleetControllerOptions options = defaultOptions("mycluster"); - options.minRatioOfDistributorNodesUp = 0; - options.minRatioOfStorageNodesUp = 0; - options.zooKeeperServerAddress = "localhost"; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster") + .setMinRatioOfDistributorNodesUp(0) + .setMinRatioOfStorageNodesUp(0) + .setZooKeeperServerAddress("localhost"); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index d3b0addbb13..bf8bb722e3d 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -15,14 +15,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class DistributionBitCountTest extends FleetControllerTest { - private void setUpSystem(String testName) throws Exception { + private FleetControllerOptions setUpSystem(String testName) throws Exception { List configuredNodes = new ArrayList<>(); for (int i = 0 ; i < 10; i++) { configuredNodes.add(new ConfiguredNode(i, false)); } - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.distributionBits = 17; - setUpFleetController(false, options); + var builder = defaultOptions("mycluster", configuredNodes); + builder.setDistributionBits(17); + setUpFleetController(false, builder); startingTest(testName); List nodes = setUpVdsNodes(false, new DummyVdsNodeOptions(), true, configuredNodes); for (DummyVdsNode node : nodes) { @@ -30,6 +30,7 @@ public class DistributionBitCountTest extends FleetControllerTest { node.connect(); } waitForState("version:\\d+ bits:17 distributor:10 storage:10"); + return builder.build(); } /** @@ -38,14 +39,15 @@ public class DistributionBitCountTest extends FleetControllerTest { */ @Test void testDistributionBitCountConfigIncrease() throws Exception { - setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigIncrease"); - options.distributionBits = 20; - fleetController.updateOptions(options); + var options = setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigIncrease"); + var builder = FleetControllerOptions.Builder.copy(options); + builder.setDistributionBits(20); + fleetController.updateOptions(builder.build()); ClusterState currentState = waitForState("version:\\d+ bits:20 distributor:10 storage:10"); int version = currentState.getVersion(); - options.distributionBits = 23; - fleetController.updateOptions(options); + builder.setDistributionBits(23); + fleetController.updateOptions(builder.build()); assertEquals(version, currentState.getVersion()); } @@ -54,13 +56,13 @@ public class DistributionBitCountTest extends FleetControllerTest { */ @Test void testDistributionBitCountConfigDecrease() throws Exception { - setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigDecrease"); - options.distributionBits = 12; - fleetController.updateOptions(options); + FleetControllerOptions options = setUpSystem("DistributionBitCountTest::testDistributionBitCountConfigDecrease"); + var builder = FleetControllerOptions.Builder.copy(options); + builder.setDistributionBits(12); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ bits:12 distributor:10 storage:10"); } - /** * Test that when storage node reports higher bit count, but another storage * node has equally low bitcount, the fleetcontroller does nothing. diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index c39a5c52836..dd723cf3d37 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -111,53 +111,53 @@ public abstract class FleetControllerTest implements Waiter { testName = name; } - static protected FleetControllerOptions defaultOptions(String clusterName) { + static protected FleetControllerOptions.Builder defaultOptions(String clusterName) { return defaultOptions(clusterName, DEFAULT_NODE_COUNT); } - static protected FleetControllerOptions defaultOptions(String clusterName, int nodeCount) { + static protected FleetControllerOptions.Builder defaultOptions(String clusterName, int nodeCount) { return defaultOptions(clusterName, IntStream.range(0, nodeCount) .mapToObj(i -> new ConfiguredNode(i, false)) .collect(Collectors.toSet())); } - static protected FleetControllerOptions defaultOptions(String clusterName, Collection nodes) { - var opts = new FleetControllerOptions(clusterName, nodes); - opts.enableTwoPhaseClusterStateActivation = true; // Enable by default, tests can explicitly disable. - return opts; + static protected FleetControllerOptions.Builder defaultOptions(String clusterName, Collection nodes) { + var builder = new FleetControllerOptions.Builder(clusterName, nodes); + builder.enableTwoPhaseClusterStateActivation(true); // Enable by default, tests can explicitly disable. + return builder; } - void setUpSystem(FleetControllerOptions options) throws Exception { + void setUpSystem(FleetControllerOptions.Builder builder) throws Exception { log.log(Level.FINE, "Setting up system"); slobrok = new Slobrok(); - this.options = options; - if (options.zooKeeperServerAddress != null) { + if (builder.zooKeeperServerAddress() != null) { zooKeeperServer = new ZooKeeperTestServer(); - this.options.zooKeeperServerAddress = zooKeeperServer.getAddress(); - log.log(Level.FINE, "Set up new zookeeper server at " + this.options.zooKeeperServerAddress); + builder.setZooKeeperServerAddress(zooKeeperServer.getAddress()); + log.log(Level.FINE, "Set up new zookeeper server at " + zooKeeperServer.getAddress()); } - this.options.slobrokConnectionSpecs = getSlobrokConnectionSpecs(slobrok); + builder.setSlobrokConnectionSpecs(getSlobrokConnectionSpecs(slobrok)); + this.options = builder.build(); } FleetController createFleetController(boolean useFakeTimer, FleetControllerOptions options) throws Exception { var context = new TestFleetControllerContext(options); Timer timer = useFakeTimer ? this.timer : new RealTimer(); - var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex(), options.clusterName()); var log = new EventLog(timer, metricUpdater); - var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var cluster = new ContentCluster(options.clusterName(), options.nodes(), options.storageDistribution()); var stateGatherer = new NodeStateGatherer(timer, timer, log); var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, - options.fleetControllerIndex, - options.nodeStateRequestTimeoutMS, - options.nodeStateRequestTimeoutEarliestPercentage, - options.nodeStateRequestTimeoutLatestPercentage, - options.nodeStateRequestRoundTripTimeMaxSeconds); + options.fleetControllerIndex(), + options.nodeStateRequestTimeoutMS(), + options.nodeStateRequestTimeoutEarliestPercentage(), + options.nodeStateRequestTimeoutLatestPercentage(), + options.nodeStateRequestRoundTripTimeMaxSeconds()); var lookUp = new SlobrokClient(context, timer); lookUp.setSlobrokConnectionSpecs(new String[0]); - var rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var rpcServer = new RpcServer(timer, timer, options.clusterName(), options.fleetControllerIndex(), options.slobrokBackOffPolicy()); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress(), timer); // Setting this <1000 ms causes ECONNREFUSED on socket trying to connect to ZK server, in ZooKeeper, // after creating a new ZooKeeper (session). This causes ~10s extra time to connect after connection loss. @@ -166,7 +166,7 @@ public abstract class FleetControllerTest implements Waiter { var stateGenerator = new StateChangeHandler(context, timer, log); var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); - var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex(), options.fleetControllerCount(), timer, timer); var status = new StatusHandler.ContainerStatusPageServer(); var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, @@ -175,13 +175,15 @@ public abstract class FleetControllerTest implements Waiter { return controller; } - protected void setUpFleetController(boolean useFakeTimer, FleetControllerOptions options) throws Exception { - if (slobrok == null) setUpSystem(options); + protected FleetControllerOptions setUpFleetController(boolean useFakeTimer, FleetControllerOptions.Builder builder) throws Exception { + if (slobrok == null) setUpSystem(builder); + options = builder.build(); if (fleetController == null) { fleetController = createFleetController(useFakeTimer, options); } else { throw new Exception("called setUpFleetcontroller but it was already setup"); } + return options; } void stopFleetController() throws Exception { @@ -214,9 +216,9 @@ public abstract class FleetControllerTest implements Waiter { protected void setUpVdsNodes(boolean useFakeTimer, DummyVdsNodeOptions options, boolean startDisconnected, Set nodeIndexes) throws Exception { String[] connectionSpecs = getSlobrokConnectionSpecs(slobrok); for (int nodeIndex : nodeIndexes) { - nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName, true, nodeIndex)); + nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName(), true, nodeIndex)); if ( ! startDisconnected) nodes.get(nodes.size() - 1).connect(); - nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName, false, nodeIndex)); + nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName(), false, nodeIndex)); if ( ! startDisconnected) nodes.get(nodes.size() - 1).connect(); } } @@ -232,9 +234,9 @@ public abstract class FleetControllerTest implements Waiter { nodes = new ArrayList<>(); final boolean distributor = true; for (ConfiguredNode configuredNode : configuredNodes) { - nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName, distributor, configuredNode.index())); + nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName(), distributor, configuredNode.index())); if ( ! startDisconnected) nodes.get(nodes.size() - 1).connect(); - nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName, !distributor, configuredNode.index())); + nodes.add(new DummyVdsNode(useFakeTimer ? timer : new RealTimer(), options, connectionSpecs, this.options.clusterName(), !distributor, configuredNode.index())); if ( ! startDisconnected) nodes.get(nodes.size() - 1).connect(); } return nodes; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java index 7f6684e595d..2d2790119e9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownLiveConfigTest.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.vdslib.state.NodeType; import org.junit.jupiter.api.Test; import java.util.HashSet; @@ -9,39 +10,39 @@ import static org.junit.jupiter.api.Assertions.assertFalse; public class GroupAutoTakedownLiveConfigTest extends FleetControllerTest { - private static FleetControllerOptions createOptions(DistributionBuilder.GroupBuilder groupBuilder, double minNodeRatio) { - FleetControllerOptions options = defaultOptions("mycluster"); - options.setStorageDistribution(DistributionBuilder.forHierarchicCluster(groupBuilder)); - options.nodes = new HashSet<>(DistributionBuilder.buildConfiguredNodes(groupBuilder.totalNodeCount())); - options.minNodeRatioPerGroup = minNodeRatio; - options.maxTransitionTime = transitionTimes(0); - return options; + private static FleetControllerOptions.Builder createOptions(DistributionBuilder.GroupBuilder groupBuilder, double minNodeRatio) { + return defaultOptions("mycluster") + .setStorageDistribution(DistributionBuilder.forHierarchicCluster(groupBuilder)) + .setNodes(new HashSet<>(DistributionBuilder.buildConfiguredNodes(groupBuilder.totalNodeCount()))) + .setMinNodeRatioPerGroup(minNodeRatio) + .setMaxTransitionTime(NodeType.DISTRIBUTOR, 0) + .setMaxTransitionTime(NodeType.STORAGE, 0); } private void updateConfigLive(FleetControllerOptions newOptions) { this.fleetController.updateOptions(newOptions); } - private void reconfigureWithMinNodeRatio(double minNodeRatio) { - FleetControllerOptions newOptions = this.options.clone(); - newOptions.minNodeRatioPerGroup = minNodeRatio; - updateConfigLive(newOptions); + private void reconfigureWithMinNodeRatio(FleetControllerOptions options, double minNodeRatio) { + FleetControllerOptions.Builder newOptions = FleetControllerOptions.Builder.copy(options); + newOptions.setMinNodeRatioPerGroup(minNodeRatio); + updateConfigLive(newOptions.build()); } - private void reconfigureWithDistribution(DistributionBuilder.GroupBuilder groupBuilder) { - FleetControllerOptions newOptions = this.options.clone(); - newOptions.nodes = new HashSet<>(DistributionBuilder.buildConfiguredNodes(groupBuilder.totalNodeCount())); - newOptions.storageDistribution = DistributionBuilder.forHierarchicCluster(groupBuilder); - updateConfigLive(newOptions); + private void reconfigureWithDistribution(FleetControllerOptions options, DistributionBuilder.GroupBuilder groupBuilder) { + FleetControllerOptions.Builder builder = + FleetControllerOptions.Builder.copy(options) + .setNodes(new HashSet<>(DistributionBuilder.buildConfiguredNodes(groupBuilder.totalNodeCount()))) + .setStorageDistribution(DistributionBuilder.forHierarchicCluster(groupBuilder)); + updateConfigLive(builder.build()); } - private void setUp3x3ClusterWithMinNodeRatio(double minNodeRatio) throws Exception { - FleetControllerOptions options = createOptions( - DistributionBuilder.withGroups(3).eachWithNodeCount(3), - minNodeRatio); + private FleetControllerOptions setUp3x3ClusterWithMinNodeRatio(double minNodeRatio) throws Exception { + FleetControllerOptions.Builder options = createOptions(DistributionBuilder.withGroups(3).eachWithNodeCount(3), minNodeRatio); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, 9); waitForState("version:\\d+ distributor:9 storage:9"); + return options.build(); } private void takeDownContentNode(int index) { @@ -62,28 +63,28 @@ public class GroupAutoTakedownLiveConfigTest extends FleetControllerTest { @Test void min_ratio_live_reconfig_immediately_takes_effect() throws Exception { // Initially, arbitrarily many nodes may be down in a group. - setUp3x3ClusterWithMinNodeRatio(0.0); + var options = setUp3x3ClusterWithMinNodeRatio(0.0); takeDownContentNode(3); waitForStateExcludingNodeSubset("version:\\d+ distributor:9 storage:9 .3.s:d", asIntSet(3)); - reconfigureWithMinNodeRatio(0.67); + reconfigureWithMinNodeRatio(options, 0.67); waitForStateExcludingNodeSubset("version:\\d+ distributor:9 storage:9 .3.s:d .4.s:d .5.s:d", asIntSet(3)); - reconfigureWithMinNodeRatio(0.0); + reconfigureWithMinNodeRatio(options, 0.0); // Aaaand back up again! waitForStateExcludingNodeSubset("version:\\d+ distributor:9 storage:9 .3.s:d", asIntSet(3)); } @Test void live_distribution_config_changes_trigger_cluster_state_change() throws Exception { - setUp3x3ClusterWithMinNodeRatio(0.65); + var options = setUp3x3ClusterWithMinNodeRatio(0.65); takeDownContentNode(6); // Not enough nodes down to trigger group take-down yet waitForStateExcludingNodeSubset("version:\\d+ distributor:9 storage:9 .6.s:d", asIntSet(6)); // Removing a node from the same group as node 6 will dip it under the configured threshold, // taking down the entire group. In this case we configure out node 8. - reconfigureWithDistribution(DistributionBuilder.withGroupNodes(3, 3, 2)); + reconfigureWithDistribution(options, DistributionBuilder.withGroupNodes(3, 3, 2)); waitForStateExcludingNodeSubset("version:\\d+ distributor:8 storage:6", asIntSet(6, 8)); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index 702a18b7bbc..0594fc13ef5 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(FleetControllerTest.CleanupZookeeperLogsOnSuccess.class) -@Timeout(120) +@Timeout(20) public class MasterElectionTest extends FleetControllerTest { private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName()); @@ -39,31 +39,32 @@ public class MasterElectionTest extends FleetControllerTest { private static int defaultZkSessionTimeoutInMillis() { return 30_000; } - protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception { + protected FleetControllerOptions setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions.Builder builder) throws Exception { if (zooKeeperServer == null) { zooKeeperServer = new ZooKeeperTestServer(); } slobrok = new Slobrok(); - this.options = options; - this.options.zooKeeperSessionTimeout = defaultZkSessionTimeoutInMillis(); - this.options.zooKeeperServerAddress = zooKeeperServer.getAddress(); - this.options.slobrokConnectionSpecs = getSlobrokConnectionSpecs(slobrok); - this.options.fleetControllerCount = count; - for (int i=0; i nodeIndices = asIntSet(0, 1, 2, 3); - private final int foreignNode = 6; + private final int foreignNodeIndex = 6; - private void setUpClusterWithForeignNode(Set validIndices, final int foreignNodeIndex) throws Exception { - final Set configuredNodes = asConfiguredNodes(validIndices); - FleetControllerOptions options = optionsForConfiguredNodes(configuredNodes); + private FleetControllerOptions setUpClusterWithForeignNode(Set validIndices) throws Exception { + Set configuredNodes = asConfiguredNodes(validIndices); + FleetControllerOptions.Builder options = optionsForConfiguredNodes(configuredNodes); setUpFleetController(true, options); Set nodesWithStranger = new TreeSet<>(validIndices); nodesWithStranger.add(foreignNodeIndex); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, nodesWithStranger); + return options.build(); } - private FleetControllerOptions optionsForConfiguredNodes(Set configuredNodes) { - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; - options.nodeStateRequestTimeoutMS = 10000 * 60 * 1000; - options.maxTransitionTime = transitionTimes(0); - return options; + private FleetControllerOptions.Builder optionsForConfiguredNodes(Set configuredNodes) { + return defaultOptions("mycluster", configuredNodes) + .setMaxSlobrokDisconnectGracePeriod(60 * 1000) + .setNodeStateRequestTimeoutMS(10000 * 60 * 1000) + .setMaxTransitionTime(NodeType.DISTRIBUTOR, 0) + .setMaxTransitionTime(NodeType.STORAGE, 0); } @Test void testSlobrokNodeOutsideConfiguredIndexSetIsNotIncludedInCluster() throws Exception { - setUpClusterWithForeignNode(nodeIndices, foreignNode); - waitForStateExcludingNodeSubset("version:\\d+ distributor:4 storage:4", asIntSet(foreignNode)); + setUpClusterWithForeignNode(nodeIndices); + waitForStateExcludingNodeSubset("version:\\d+ distributor:4 storage:4", asIntSet(foreignNodeIndex)); } @Test void testNodeSetReconfigurationForcesFreshSlobrokFetch() throws Exception { - setUpClusterWithForeignNode(nodeIndices, foreignNode); - waitForStateExcludingNodeSubset("version:\\d+ distributor:4 storage:4", asIntSet(foreignNode)); + var options = setUpClusterWithForeignNode(nodeIndices); + waitForStateExcludingNodeSubset("version:\\d+ distributor:4 storage:4", asIntSet(foreignNodeIndex)); // If we get a configuration with the node present, we have to accept it into // cluster. If we do not re-fetch state from slobrok we risk racing - nodeIndices.add(foreignNode); - options.nodes = asConfiguredNodes(nodeIndices); + nodeIndices.add(foreignNodeIndex); + var a = FleetControllerOptions.Builder.copy(options); + a.setNodes(asConfiguredNodes(nodeIndices)); + options = a.build(); fleetController.updateOptions(options); // Need to treat cluster as having 6 nodes due to ideal state algo semantics. // Note that we do not use subsetWaiter here since we want node 6 included. @@ -54,9 +59,9 @@ public class NodeSlobrokConfigurationMembershipTest extends FleetControllerTest @Test void test_removed_retired_node_is_not_included_in_state() throws Exception { - final Set configuredNodes = asConfiguredNodes(nodeIndices); - FleetControllerOptions options = optionsForConfiguredNodes(configuredNodes); - setUpFleetController(true, options); + Set configuredNodes = asConfiguredNodes(nodeIndices); + FleetControllerOptions.Builder builder = optionsForConfiguredNodes(configuredNodes); + options = setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, nodeIndices); waitForState("version:\\d+ distributor:4 storage:4"); @@ -64,13 +69,19 @@ public class NodeSlobrokConfigurationMembershipTest extends FleetControllerTest // Update options with 1 node config-retired assertTrue(configuredNodes.remove(new ConfiguredNode(0, false))); configuredNodes.add(new ConfiguredNode(0, true)); - options.nodes = configuredNodes; + + builder = FleetControllerOptions.Builder.copy(options); + builder.setNodes(configuredNodes); + options = builder.build(); fleetController.updateOptions(options); waitForState("version:\\d+ distributor:4 storage:4 .0.s:r"); // Now remove the retired node entirely from config assertTrue(configuredNodes.remove(new ConfiguredNode(0, true))); + builder = FleetControllerOptions.Builder.copy(options); + builder.setNodes(configuredNodes); + options = builder.build(); fleetController.updateOptions(options); // The previously retired node should now be marked as down, as it no longer diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java index 789c86bd6cf..c91bb7b19b7 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java @@ -94,7 +94,7 @@ public class RpcServerTest extends FleetControllerTest { void testGetSystemState() throws Exception { LogFormatter.initializeLogging(); startingTest("RpcServerTest::testGetSystemState"); - FleetControllerOptions options = defaultOptions("mycluster"); + FleetControllerOptions.Builder options = defaultOptions("mycluster"); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -104,7 +104,7 @@ public class RpcServerTest extends FleetControllerTest { nodes.get(0).disconnect(); nodes.get(19).disconnect(); fleetController.waitForNodesInSlobrok(9, 9, timeout()); - timer.advanceTime(options.nodeStateRequestTimeoutMS + options.maxSlobrokDisconnectGracePeriod); + timer.advanceTime(options.nodeStateRequestTimeoutMS() + options.maxSlobrokDisconnectGracePeriod()); wait(new WaitCondition.StateWait(fleetController, fleetController.getMonitor()) { @Override @@ -161,11 +161,11 @@ public class RpcServerTest extends FleetControllerTest { Set configuredNodes = new TreeSet<>(); for (int i = 0; i < 10; i++) configuredNodes.add(new ConfiguredNode(i, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.minRatioOfStorageNodesUp = 0; - options.maxInitProgressTime = 30000; - options.stableStateTimePeriod = 60000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes); + builder.setMinRatioOfStorageNodesUp(0); + builder.setMaxInitProgressTime(30000); + builder.setStableStateTimePeriod(60000); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -256,11 +256,11 @@ public class RpcServerTest extends FleetControllerTest { for (int i = 0; i < 4; i++) configuredNodes.add(new ConfiguredNode(i, false)); configuredNodes.add(new ConfiguredNode(4, true)); // Last node is configured retired - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.minRatioOfStorageNodesUp = 0; - options.maxInitProgressTime = 30000; - options.stableStateTimePeriod = 60000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setMinRatioOfStorageNodesUp(0) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, configuredNodes); waitForState("version:\\d+ distributor:5 storage:5 .4.s:r"); @@ -291,10 +291,10 @@ public class RpcServerTest extends FleetControllerTest { List configuredNodes = new ArrayList<>(); for (int i = 0; i < 5; i++) configuredNodes.add(new ConfiguredNode(i, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.maxInitProgressTime = 30000; - options.stableStateTimePeriod = 60000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, configuredNodes); waitForState("version:\\d+ distributor:5 storage:5"); } @@ -315,11 +315,11 @@ public class RpcServerTest extends FleetControllerTest { configuredNodes.add(new ConfiguredNode(i, true)); configuredNodes.add(new ConfiguredNode(5, false)); configuredNodes.add(new ConfiguredNode(6, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; - this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(this.options.slobrokConnectionSpecs()) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ distributor:7 storage:7 .0.s:m .1.s:m .2.s:r .3.s:r .4.s:r"); } @@ -345,11 +345,11 @@ public class RpcServerTest extends FleetControllerTest { Set configuredNodes = new TreeSet<>(); for (int i = 0; i < 7; i++) configuredNodes.add(new ConfiguredNode(i, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; - this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(this.options.slobrokConnectionSpecs()) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ distributor:7 storage:7 .0.s:m .1.s:m"); } @@ -372,10 +372,11 @@ public class RpcServerTest extends FleetControllerTest { List configuredNodes = new ArrayList<>(); for (int i = 0; i < 5; i++) configuredNodes.add(new ConfiguredNode(i, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.maxInitProgressTime = 30000; - options.stableStateTimePeriod = 60000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + options = builder.build(); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, configuredNodes); waitForState("version:\\d+ distributor:5 storage:5"); } @@ -384,11 +385,11 @@ public class RpcServerTest extends FleetControllerTest { Set configuredNodes = new TreeSet<>(); for (int i = 0; i < 5; i++) configuredNodes.add(new ConfiguredNode(i, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; - this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(options.slobrokConnectionSpecs()) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ distributor:5 storage:5"); } @@ -399,11 +400,11 @@ public class RpcServerTest extends FleetControllerTest { configuredNodes.add(new ConfiguredNode(i, true)); configuredNodes.add(new ConfiguredNode(5, false)); configuredNodes.add(new ConfiguredNode(6, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; - this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(options.slobrokConnectionSpecs()) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ distributor:7 storage:7 .0.s:r .1.s:r .2.s:r .3.s:r .4.s:r"); } @@ -413,11 +414,11 @@ public class RpcServerTest extends FleetControllerTest { configuredNodes.add(new ConfiguredNode(i, true)); configuredNodes.add(new ConfiguredNode(5, false)); configuredNodes.add(new ConfiguredNode(6, false)); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; - this.options.stableStateTimePeriod = 60000; - fleetController.updateOptions(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(options.slobrokConnectionSpecs()) + .setMaxInitProgressTime(30000) + .setStableStateTimePeriod(60000); + fleetController.updateOptions(builder.build()); waitForState("version:\\d+ distributor:7 storage:7 .0.s:r .1.s:r .2.s:r .3.s:r .4.s:r"); } @@ -428,10 +429,10 @@ public class RpcServerTest extends FleetControllerTest { Set configuredNodes = new TreeSet<>(); configuredNodes.add(new ConfiguredNode(5, false)); configuredNodes.add(new ConfiguredNode(6, false)); - FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); - options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTimeMs = 30000; - this.options.stableStateTimePeriod = 60000; + FleetControllerOptions.Builder builder = new FleetControllerOptions.Builder("mycluster", configuredNodes) + .setSlobrokConnectionSpecs(options.slobrokConnectionSpecs()) + .setMaxInitProgressTimeMs(30000) + .setStableStateTimePeriod(60000); fleetController.updateOptions(options, 0); for (int i = 0; i < 5*2; i++) { nodes.get(i).disconnectSlobrok(); @@ -447,7 +448,7 @@ public class RpcServerTest extends FleetControllerTest { startingTest("RpcServerTest::testSetNodeState"); Set nodeIndexes = new TreeSet<>(List.of(4, 6, 9, 10, 14, 16, 21, 22, 23, 25)); Set configuredNodes = nodeIndexes.stream().map(i -> new ConfiguredNode(i, false)).collect(Collectors.toSet()); - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); + FleetControllerOptions.Builder options = defaultOptions("mycluster", configuredNodes); //options.setStorageDistribution(new Distribution(getDistConfig(nodeIndexes))); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions(), false, nodeIndexes); @@ -486,7 +487,7 @@ public class RpcServerTest extends FleetControllerTest { @Test void testSetNodeStateOutOfRange() throws Exception { startingTest("RpcServerTest::testSetNodeStateOutOfRange"); - FleetControllerOptions options = defaultOptions("mycluster"); + FleetControllerOptions.Builder options = defaultOptions("mycluster"); options.setStorageDistribution(new Distribution(Distribution.getDefaultDistributionConfig(2, 10))); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions()); @@ -514,7 +515,7 @@ public class RpcServerTest extends FleetControllerTest { @Test void testGetMaster() throws Exception { startingTest("RpcServerTest::testGetMaster"); - FleetControllerOptions options = defaultOptions("mycluster"); + FleetControllerOptions.Builder options = defaultOptions("mycluster"); options.setStorageDistribution(new Distribution(Distribution.getDefaultDistributionConfig(2, 10))); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions()); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java index df0b873e25b..2c68c498e68 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java @@ -16,7 +16,7 @@ public class RpcVersionAutoDowngradeTest extends FleetControllerTest { for (int i = 0 ; i < 10; i++) { configuredNodes.add(new ConfiguredNode(i, false)); } - FleetControllerOptions options = defaultOptions("mycluster", configuredNodes); + FleetControllerOptions.Builder options = defaultOptions("mycluster", configuredNodes); setUpFleetController(false, options); DummyVdsNodeOptions nodeOptions = new DummyVdsNodeOptions(); nodeOptions.stateCommunicationVersion = nodeRpcVersion; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SlobrokTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SlobrokTest.java index 78576f9600c..be3f7f27488 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SlobrokTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SlobrokTest.java @@ -16,10 +16,10 @@ public class SlobrokTest extends FleetControllerTest { @Test void testSingleSlobrokRestart() throws Exception { startingTest("SlobrokTest::testSingleSlobrokRestart"); - FleetControllerOptions options = defaultOptions("mycluster"); - options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; - options.maxSlobrokDisconnectGracePeriod = 60 * 60 * 1000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster") + .setNodeStateRequestTimeoutMS(60 * 60 * 1000) + .setMaxSlobrokDisconnectGracePeriod(60 * 60 * 1000); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -70,10 +70,10 @@ public class SlobrokTest extends FleetControllerTest { @Test void testNodeTooLongOutOfSlobrok() throws Exception { startingTest("SlobrokTest::testNodeTooLongOutOfSlobrok"); - FleetControllerOptions options = defaultOptions("mycluster"); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; - options.nodeStateRequestTimeoutMS = 10000 * 60 * 1000; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster") + .setMaxSlobrokDisconnectGracePeriod(60 * 1000) + .setNodeStateRequestTimeoutMS(10000 * 60 * 1000); + setUpFleetController(true, builder); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -95,7 +95,7 @@ public class SlobrokTest extends FleetControllerTest { log.log(Level.INFO, "JUMPING TIME. NODE SHOULD BE MARKED DOWN"); // At this point the fleetcontroller might not have noticed that the node is out of slobrok yet. // Thus we keep advancing time another minute such that it should get down. - timer.advanceTime(options.nodeStateRequestTimeoutMS + options.maxSlobrokDisconnectGracePeriod); + timer.advanceTime(builder.nodeStateRequestTimeoutMS() + builder.maxSlobrokDisconnectGracePeriod()); waitForState("version:\\d+ distributor:10 .0.s:d storage:10"); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index fd6ff30a08f..30eae09724e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -43,33 +43,33 @@ public class StateChangeTest extends FleetControllerTest { private void initialize(FleetControllerOptions options) throws Exception { List nodes = new ArrayList<>(); - for (int i = 0; i < options.nodes.size(); ++i) { + for (int i = 0; i < options.nodes().size(); ++i) { nodes.add(new Node(NodeType.STORAGE, i)); nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } var context = new TestFleetControllerContext(options); communicator = new DummyCommunicator(nodes, timer); - var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex(), options.clusterName()); eventLog = new EventLog(timer, metricUpdater); - var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var cluster = new ContentCluster(options.clusterName(), options.nodes(), options.storageDistribution()); var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress, timer); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(context), timer, options.zooKeeperServerAddress(), timer); var stateGenerator = new StateChangeHandler(context, timer, eventLog); var stateBroadcaster = new SystemStateBroadcaster(context, timer, timer); - var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex(), options.fleetControllerCount(), timer, timer); var status = new StatusHandler.ContainerStatusPageServer(); ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, status, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); ctrl.tick(); - if (options.fleetControllerCount == 1) { + if (options.fleetControllerCount() == 1) { markAllNodesAsUp(options); } } private void markAllNodesAsUp(FleetControllerOptions options) throws Exception { - for (int i = 0; i < options.nodes.size(); ++i) { + for (int i = 0; i < options.nodes().size(); ++i) { communicator.setNodeState(new Node(NodeType.STORAGE, i), State.UP, ""); communicator.setNodeState(new Node(NodeType.DISTRIBUTOR, i), State.UP, ""); } @@ -103,10 +103,10 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNormalStartup() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxInitProgressTime = 50000; + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setMaxInitProgressTime(50000); - initialize(options); + initialize(options.build()); // Should now pick up previous node states ctrl.tick(); @@ -117,7 +117,7 @@ public class StateChangeTest extends FleetControllerTest { } for (int i = 0; i < 100; i += 10) { - timer.advanceTime(options.maxInitProgressTime / 20); + timer.advanceTime(options.maxInitProgressTime() / 20); ctrl.tick(); for (int j = 0; j < 10; ++j) { communicator.setNodeState(new Node(NodeType.STORAGE, j), new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(i / 100.0f), ""); @@ -135,21 +135,21 @@ public class StateChangeTest extends FleetControllerTest { ".4.s:i .4.i:0.1 .5.s:i .5.i:0.1 .6.s:i .6.i:0.1 .7.s:i .7.i:0.1 .8.s:i .8.i:0.1 .9.s:i .9.i:0.1", ctrl.consolidatedClusterState().toString()); - timer.advanceTime(options.maxInitProgressTime / 20); + timer.advanceTime(options.maxInitProgressTime() / 20); ctrl.tick(); for (int i = 0; i < 10; ++i) { communicator.setNodeState(new Node(NodeType.STORAGE, i), new NodeState(NodeType.STORAGE, State.UP), ""); } - timer.advanceTime(options.maxInitProgressTime / 20); + timer.advanceTime(options.maxInitProgressTime() / 20); ctrl.tick(); for (int i = 0; i < 10; ++i) { communicator.setNodeState(new Node(NodeType.DISTRIBUTOR, i), new NodeState(NodeType.STORAGE, State.UP), ""); } - timer.advanceTime(options.maxInitProgressTime / 20); + timer.advanceTime(options.maxInitProgressTime() / 20); ctrl.tick(); assertEquals("version:8 distributor:10 storage:10", ctrl.getSystemState().toString()); @@ -175,15 +175,15 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNodeGoingDownAndUp() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; - options.minTimeBetweenNewSystemStates = 0; - options.maxInitProgressTime = 50000; - // This test makes very specific assumptions about the amount of work done in a single tick. - // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now. - options.enableTwoPhaseClusterStateActivation = false; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setNodeStateRequestTimeoutMS(60 * 60 * 1000) + .setMinTimeBetweenNewSystemStates(0) + .setMaxInitProgressTime(50000) + // This test makes very specific assumptions about the amount of work done in a single tick. + // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now. + .enableTwoPhaseClusterStateActivation(false); - initialize(options); + initialize(builder.build()); ctrl.tick(); @@ -211,7 +211,7 @@ public class StateChangeTest extends FleetControllerTest { desc = ctrl.getReportedNodeState(new Node(NodeType.STORAGE, 0)).getDescription(); assertTrue(desc.contains("Closed at other end"), desc); - timer.advanceTime(options.maxTransitionTime.get(NodeType.STORAGE) + 1); + timer.advanceTime(builder.maxTransitionTime().get(NodeType.STORAGE) + 1); ctrl.tick(); @@ -265,15 +265,15 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNodeGoingDownAndUpNotifying() throws Exception { // Same test as above, but node manages to notify why it is going down first. - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; - options.maxSlobrokDisconnectGracePeriod = 100000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setNodeStateRequestTimeoutMS(60 * 60 * 1000) + .setMaxSlobrokDisconnectGracePeriod(100000); - initialize(options); + initialize(builder.build()); ctrl.tick(); - tick((int) options.stableStateTimePeriod + 1); + tick((int) builder.stableStateTimePeriod() + 1); communicator.setNodeState(new Node(NodeType.DISTRIBUTOR, 0), State.DOWN, "controlled shutdown"); @@ -297,7 +297,7 @@ public class StateChangeTest extends FleetControllerTest { assertTrue(desc.contains("Received signal 15 (SIGTERM - Termination signal)") || desc.contains("controlled shutdown"), desc); - tick(options.maxTransitionTime.get(NodeType.STORAGE) + 1); + tick(builder.maxTransitionTime().get(NodeType.STORAGE) + 1); assertEquals("version:6 distributor:10 storage:10 .0.s:d", ctrl.getSystemState().toString()); desc = ctrl.getReportedNodeState(new Node(NodeType.STORAGE, 0)).getDescription(); @@ -336,10 +336,10 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNodeGoingDownAndUpFast() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxSlobrokDisconnectGracePeriod(60 * 1000); - initialize(options); + initialize(builder.build()); ctrl.tick(); @@ -377,10 +377,10 @@ public class StateChangeTest extends FleetControllerTest { @Test void testMaintenanceWhileNormalStorageNodeRestart() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxSlobrokDisconnectGracePeriod(60 * 1000); - initialize(options); + initialize(builder.build()); communicator.setNodeState(new Node(NodeType.STORAGE, 6), State.DOWN, "Connection error: Closed at other end"); @@ -437,10 +437,10 @@ public class StateChangeTest extends FleetControllerTest { nodes.add(new ConfiguredNode(i, retired)); } - FleetControllerOptions options = defaultOptions("mycluster", nodes); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", nodes) + .setMaxSlobrokDisconnectGracePeriod(60 * 1000); - initialize(options); + initialize(builder.build()); communicator.setNodeState(new Node(NodeType.STORAGE, 6), State.DOWN, "Connection error: Closed at other end"); @@ -496,10 +496,9 @@ public class StateChangeTest extends FleetControllerTest { nodes.add(new ConfiguredNode(i, retired)); } - FleetControllerOptions options = defaultOptions("mycluster", nodes); - options.maxSlobrokDisconnectGracePeriod = 60 * 1000; - - initialize(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", nodes) + .setMaxSlobrokDisconnectGracePeriod(60 * 1000); + initialize(builder.build()); communicator.setNodeState(new Node(NodeType.STORAGE, 6), State.DOWN, "Connection error: Closed at other end"); @@ -519,14 +518,14 @@ public class StateChangeTest extends FleetControllerTest { @Test void testDownNodeInitializing() throws Exception { // Actually report initializing state if node has been down steadily for a while - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 5000); - options.maxInitProgressTime = 5000; - options.stableStateTimePeriod = 20000; - options.nodeStateRequestTimeoutMS = 1000000; - options.maxSlobrokDisconnectGracePeriod = 1000000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxTransitionTime(NodeType.STORAGE, 5000) + .setMaxInitProgressTime(5000) + .setStableStateTimePeriod(20000) + .setNodeStateRequestTimeoutMS(1000000) + .setMaxSlobrokDisconnectGracePeriod(1000000); - initialize(options); + initialize(builder.build()); timer.advanceTime(100000); // Node has been in steady state up ctrl.tick(); @@ -582,13 +581,13 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNodeInitializationStalled() throws Exception { // Node should eventually be marked down, and not become initializing next time, but stay down until up - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 5000); - options.maxInitProgressTime = 5000; - options.stableStateTimePeriod = 1000000; - options.maxSlobrokDisconnectGracePeriod = 10000000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxTransitionTime(NodeType.STORAGE, 5000) + .setMaxInitProgressTime(5000) + .setStableStateTimePeriod(1000000) + .setMaxSlobrokDisconnectGracePeriod(10000000); - initialize(options); + initialize(builder.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -612,7 +611,7 @@ public class StateChangeTest extends FleetControllerTest { assertEquals("version:6 distributor:10 storage:10 .6.s:i .6.i:0.1", ctrl.getSystemState().toString()); - timer.advanceTime(options.maxInitProgressTime + 1); + timer.advanceTime(builder.maxInitProgressTime() + 1); ctrl.tick(); @@ -625,7 +624,7 @@ public class StateChangeTest extends FleetControllerTest { ctrl.tick(); - tick(options.nodeStateRequestTimeoutMS + 1); + tick(builder.nodeStateRequestTimeoutMS() + 1); communicator.setNodeState(new Node(NodeType.STORAGE, 6), new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.0f), ""); @@ -668,14 +667,14 @@ public class StateChangeTest extends FleetControllerTest { @Test void testBackwardsInitializationProgress() throws Exception { // Same as stalled. Mark down, keep down until up - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 5000); - options.maxInitProgressTime = 5000; - options.stableStateTimePeriod = 1000000; - // Set long so we dont time out RPC requests and mark nodes down due to advancing time to get in steady state - options.nodeStateRequestTimeoutMS = (int) options.stableStateTimePeriod * 2; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)); + builder.setMaxTransitionTime(NodeType.STORAGE, 5000); + builder.setMaxInitProgressTime(5000); + builder.setStableStateTimePeriod(1000000); + // Set long so we don't time out RPC requests and mark nodes down due to advancing time to get in steady state + builder.setNodeStateRequestTimeoutMS((int) builder.stableStateTimePeriod() * 2); - initialize(options); + initialize(builder.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -711,13 +710,14 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNodeGoingDownWhileInitializing() throws Exception { // Same as stalled. Mark down, keep down until up - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 5000); - options.maxInitProgressTime = 5000; - options.stableStateTimePeriod = 1000000; - options.nodeStateRequestTimeoutMS = 365 * 24 * 60 * 1000; // Set very high so the advanceTime don't start sending state replies right before we disconnect. + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxTransitionTime(NodeType.STORAGE, 5000) + .setMaxInitProgressTime(5000) + .setStableStateTimePeriod(1000000) + // Set very high so the advanceTime don't start sending state replies right before we disconnect. + .setNodeStateRequestTimeoutMS(365 * 24 * 60 * 1000); - initialize(options); + initialize(builder.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -770,14 +770,14 @@ public class StateChangeTest extends FleetControllerTest { void testContinuousCrashRightAfterInit() throws Exception { startingTest("StateChangeTest::testContinuousCrashRightAfterInit"); // If node does this too many times, take it out of service - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 5000); - options.maxInitProgressTime = 5000; - options.maxPrematureCrashes = 2; - options.stableStateTimePeriod = 1000000; - options.maxSlobrokDisconnectGracePeriod = 10000000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxTransitionTime(NodeType.STORAGE, 5000) + .setMaxInitProgressTime(5000) + .setMaxPrematureCrashes(2) + .setStableStateTimePeriod(1000000) + .setMaxSlobrokDisconnectGracePeriod(10000000); - initialize(options); + initialize(builder.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -795,22 +795,22 @@ public class StateChangeTest extends FleetControllerTest { assertEquals("version:5 distributor:10 storage:10 .6.s:d", ctrl.getSystemState().toString()); - for (int j = 0; j <= options.maxPrematureCrashes; ++j) { + for (int j = 0; j <= builder.maxPrematureCrashes(); ++j) { ctrl.tick(); - tick(options.nodeStateRequestTimeoutMS + 1); + tick(builder.nodeStateRequestTimeoutMS() + 1); communicator.setNodeState(new Node(NodeType.STORAGE, 6), State.DOWN, "Connection error: Closed at other end"); ctrl.tick(); - tick(options.nodeStateRequestTimeoutMS + 1); + tick(builder.nodeStateRequestTimeoutMS() + 1); communicator.setNodeState(new Node(NodeType.STORAGE, 6), new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.0f), ""); ctrl.tick(); - tick(options.nodeStateRequestTimeoutMS + 1); + tick(builder.nodeStateRequestTimeoutMS() + 1); communicator.setNodeState(new Node(NodeType.STORAGE, 6), new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.1f), ""); @@ -824,15 +824,15 @@ public class StateChangeTest extends FleetControllerTest { void testClusterStateMinNodes() throws Exception { startingTest("StateChangeTest::testClusterStateMinNodes"); // If node does this too many times, take it out of service - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 0); - options.maxInitProgressTime = 0; - options.minDistributorNodesUp = 6; - options.minStorageNodesUp = 8; - options.minRatioOfDistributorNodesUp = 0.0; - options.minRatioOfStorageNodesUp = 0.0; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)) + .setMaxTransitionTime(NodeType.STORAGE, 0) + .setMaxInitProgressTime(0) + .setMinDistributorNodesUp(6) + .setMinStorageNodesUp(8) + .setMinRatioOfDistributorNodesUp(0.0) + .setMinRatioOfStorageNodesUp(0.0); - initialize(options); + initialize(builder.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -879,15 +879,15 @@ public class StateChangeTest extends FleetControllerTest { void testClusterStateMinFactor() throws Exception { startingTest("StateChangeTest::testClusterStateMinFactor"); // If node does this too many times, take it out of service - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 0); - options.maxInitProgressTime = 0; - options.minDistributorNodesUp = 0; - options.minStorageNodesUp = 0; - options.minRatioOfDistributorNodesUp = 0.6; - options.minRatioOfStorageNodesUp = 0.8; + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setMaxTransitionTime(NodeType.STORAGE, 0); + options.setMaxInitProgressTime(0); + options.setMinDistributorNodesUp(0); + options.setMinStorageNodesUp(0); + options.setMinRatioOfDistributorNodesUp(0.6); + options.setMinRatioOfStorageNodesUp(0.8); - initialize(options); + initialize(options.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -952,9 +952,9 @@ public class StateChangeTest extends FleetControllerTest { @Test void testNoSystemStateBeforeInitialTimePeriod() throws Exception { startingTest("StateChangeTest::testNoSystemStateBeforeInitialTimePeriod()"); - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.minTimeBeforeFirstSystemStateBroadcast = 3 * 60 * 1000; - setUpSystem(options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)); + builder.setMinTimeBeforeFirstSystemStateBroadcast(3 * 60 * 1000); + setUpSystem(builder); boolean useFakeTimer = true; setUpVdsNodes(useFakeTimer, new DummyVdsNodeOptions(), true); // Leave one node down to avoid sending cluster state due to having seen all node states. @@ -963,7 +963,7 @@ public class StateChangeTest extends FleetControllerTest { nodes.get(i).connect(); } } - setUpFleetController(useFakeTimer, options); + setUpFleetController(useFakeTimer, builder); StateWaiter waiter = new StateWaiter(timer); fleetController.addSystemStateListener(waiter); @@ -998,11 +998,11 @@ public class StateChangeTest extends FleetControllerTest { @Test void testSystemStateSentWhenNodesReplied() throws Exception { startingTest("StateChangeTest::testSystemStateSentWhenNodesReplied()"); - final FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.minTimeBeforeFirstSystemStateBroadcast = 300 * 60 * 1000; + FleetControllerOptions.Builder builder = defaultOptions("mycluster", createNodes(10)); + builder.setMinTimeBeforeFirstSystemStateBroadcast(300 * 60 * 1000); boolean useFakeTimer = true; - setUpSystem(options); + setUpSystem(builder); setUpVdsNodes(useFakeTimer, new DummyVdsNodeOptions(), true); @@ -1012,7 +1012,7 @@ public class StateChangeTest extends FleetControllerTest { // Marking one node as 'initializing' improves testing of state later on. nodes.get(3).setNodeState(State.INITIALIZING); - setUpFleetController(useFakeTimer, options); + setUpFleetController(useFakeTimer, builder); final StateWaiter waiter = new StateWaiter(timer); @@ -1036,7 +1036,7 @@ public class StateChangeTest extends FleetControllerTest { @Test void testDontTagFailingSetSystemStateOk() throws Exception { startingTest("StateChangeTest::testDontTagFailingSetSystemStateOk()"); - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -1067,10 +1067,10 @@ public class StateChangeTest extends FleetControllerTest { @Test void testAlteringDistributionSplitCount() throws Exception { startingTest("StateChangeTest::testAlteringDistributionSplitCount"); - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.distributionBits = 17; + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setDistributionBits(17); - initialize(options); + initialize(options.build()); timer.advanceTime(1000000); // Node has been in steady state up @@ -1114,7 +1114,7 @@ public class StateChangeTest extends FleetControllerTest { @Test void testSetAllTimestampsAfterDowntime() throws Exception { startingTest("StateChangeTest::testSetAllTimestampsAfterDowntime"); - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); setUpFleetController(true, options); setUpVdsNodes(true, new DummyVdsNodeOptions()); waitForStableSystem(); @@ -1142,19 +1142,19 @@ public class StateChangeTest extends FleetControllerTest { } if (node.getNode().equals(new Node(NodeType.DISTRIBUTOR, 0))) { - for (ConfiguredNode i : options.nodes) { + for (ConfiguredNode i : options.nodes()) { Node nodeId = new Node(NodeType.STORAGE, i.index()); long ts = lastState.getNodeState(nodeId).getStartTimestamp(); assertTrue(ts > 0, nodeId + "\n" + stateHistory + "\nWas " + ts + " should be " + fleetController.getCluster().getNodeInfo(nodeId).getStartTimestamp()); } } else { - for (ConfiguredNode i : options.nodes) { + for (ConfiguredNode i : options.nodes()) { Node nodeId = new Node(NodeType.STORAGE, i.index()); assertEquals(0, lastState.getNodeState(nodeId).getStartTimestamp(), nodeId.toString()); } } - for (ConfiguredNode i : options.nodes) { + for (ConfiguredNode i : options.nodes()) { Node nodeId = new Node(NodeType.DISTRIBUTOR, i.index()); assertEquals(0, lastState.getNodeState(nodeId).getStartTimestamp(), nodeId.toString()); } @@ -1163,11 +1163,11 @@ public class StateChangeTest extends FleetControllerTest { @Test void consolidated_cluster_state_reflects_node_changes_when_cluster_is_down() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 0); - options.minStorageNodesUp = 10; - options.minDistributorNodesUp = 10; - initialize(options); + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setMaxTransitionTime(NodeType.STORAGE, 0); + options.setMinStorageNodesUp(10); + options.setMinDistributorNodesUp(10); + initialize(options.build()); ctrl.tick(); assertThat(ctrl.consolidatedClusterState().toString(), equalTo("version:3 distributor:10 storage:10")); @@ -1197,11 +1197,11 @@ public class StateChangeTest extends FleetControllerTest { // of previous timer invocations (with subsequent state generation) would not be visible. @Test void timer_events_during_cluster_down_observe_most_recent_node_changes() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 1000); - options.minStorageNodesUp = 10; - options.minDistributorNodesUp = 10; - initialize(options); + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setMaxTransitionTime(NodeType.STORAGE, 1000); + options.setMinStorageNodesUp(10); + options.setMinDistributorNodesUp(10); + initialize(options.build()); ctrl.tick(); communicator.setNodeState(new Node(NodeType.STORAGE, 2), State.DOWN, "foo"); @@ -1232,8 +1232,8 @@ public class StateChangeTest extends FleetControllerTest { @Test void do_not_emit_multiple_events_when_node_state_does_not_match_versioned_state() throws Exception { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - initialize(options); + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + initialize(options.build()); ctrl.tick(); communicator.setNodeState( @@ -1366,7 +1366,7 @@ public class StateChangeTest extends FleetControllerTest { void sendAllDeferredDistributorClusterStateAcks() throws Exception { communicator.sendAllDeferredDistributorClusterStateAcks(); ctrl.tick(); // Process cluster state bundle ACKs - if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation()) { ctrl.tick(); // Send activations ctrl.tick(); // Process activation ACKs } @@ -1374,7 +1374,7 @@ public class StateChangeTest extends FleetControllerTest { void processScheduledTask() throws Exception { ctrl.tick(); // Cluster state recompute iteration and send - if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation()) { ctrl.tick(); // Send activations ctrl.tick(); // Process activation ACKs } @@ -1402,20 +1402,20 @@ public class StateChangeTest extends FleetControllerTest { } } - private static FleetControllerOptions defaultOptions() { + private static FleetControllerOptions.Builder defaultOptions() { return defaultOptions("mycluster", createNodes(10)); } - private static FleetControllerOptions optionsWithZeroTransitionTime() { - FleetControllerOptions options = defaultOptions("mycluster", createNodes(10)); - options.maxTransitionTime.put(NodeType.STORAGE, 0); + private static FleetControllerOptions.Builder optionsWithZeroTransitionTime() { + FleetControllerOptions.Builder options = defaultOptions("mycluster", createNodes(10)); + options.setMaxTransitionTime(NodeType.STORAGE, 0); return options; } - private static FleetControllerOptions optionsAllowingZeroNodesDown() { - FleetControllerOptions options = optionsWithZeroTransitionTime(); - options.minStorageNodesUp = 10; - options.minDistributorNodesUp = 10; + private static FleetControllerOptions.Builder optionsAllowingZeroNodesDown() { + FleetControllerOptions.Builder options = optionsWithZeroTransitionTime(); + options.setMinStorageNodesUp(10); + options.setMinDistributorNodesUp(10); return options; } @@ -1424,7 +1424,7 @@ public class StateChangeTest extends FleetControllerTest { } private RemoteTaskFixture createDefaultFixture() throws Exception { - return new RemoteTaskFixture(defaultOptions()); + return new RemoteTaskFixture(defaultOptions().build()); } @Test @@ -1457,7 +1457,7 @@ public class StateChangeTest extends FleetControllerTest { @Test void no_op_synchronous_remote_task_can_complete_immediately_if_current_state_already_acked() throws Exception { - RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime().build()); fixture.markStorageNodeDown(0); MockTask task = fixture.scheduleNoOpVersionDependentTask(); // Tries to set node 0 into Down; already in that state @@ -1470,7 +1470,7 @@ public class StateChangeTest extends FleetControllerTest { @Test void no_op_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception { - RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime().build()); communicator.setShouldDeferDistributorClusterStateAcks(true); fixture.markStorageNodeDown(0); @@ -1494,7 +1494,7 @@ public class StateChangeTest extends FleetControllerTest { // the cluster down-state to have been published. @Test void immediately_complete_sync_remote_task_when_cluster_is_down() throws Exception { - RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown()); + RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown().build()); // Controller options require 10/10 nodes up, so take one down to trigger a cluster Down edge. fixture.markStorageNodeDown(1); MockTask task = fixture.scheduleVersionDependentTaskWithSideEffects(); @@ -1526,12 +1526,12 @@ public class StateChangeTest extends FleetControllerTest { @Test void synchronous_task_immediately_failed_when_leadership_lost() throws Exception { - FleetControllerOptions options = optionsWithZeroTransitionTime(); - options.fleetControllerCount = 3; - RemoteTaskFixture fixture = createFixtureWith(options); + FleetControllerOptions.Builder options = optionsWithZeroTransitionTime(); + options.setCount(3); + RemoteTaskFixture fixture = createFixtureWith(options.build()); fixture.winLeadership(); - markAllNodesAsUp(options); + markAllNodesAsUp(options.build()); MockTask task = fixture.scheduleVersionDependentTaskWithSideEffects(); @@ -1551,9 +1551,9 @@ public class StateChangeTest extends FleetControllerTest { @Test void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception { - FleetControllerOptions options = defaultOptions(); - options.minTimeBetweenNewSystemStates = 10_000; - RemoteTaskFixture fixture = createFixtureWith(options); + FleetControllerOptions.Builder options = defaultOptions(); + options.setMinTimeBetweenNewSystemStates(10_000); + RemoteTaskFixture fixture = createFixtureWith(options.build()); // Have to increment timer here to be able to send state generated by the scheduled task timer.advanceTime(10_000); @@ -1571,8 +1571,9 @@ public class StateChangeTest extends FleetControllerTest { @Test void synchronous_task_immediately_answered_when_not_leader() throws Exception { - FleetControllerOptions options = optionsWithZeroTransitionTime(); - options.fleetControllerCount = 3; + FleetControllerOptions.Builder builder = optionsWithZeroTransitionTime(); + builder.setCount(3); + var options = builder.build(); RemoteTaskFixture fixture = createFixtureWith(options); fixture.loseLeadership(); @@ -1586,9 +1587,9 @@ public class StateChangeTest extends FleetControllerTest { @Test void task_not_completed_within_deadline_is_failed_with_deadline_exceeded_error() throws Exception { - FleetControllerOptions options = defaultOptions(); - options.setMaxDeferredTaskVersionWaitTime(Duration.ofSeconds(60)); - RemoteTaskFixture fixture = createFixtureWith(options); + FleetControllerOptions.Builder builder = defaultOptions(); + builder.setMaxDeferredTaskVersionWaitTime(Duration.ofSeconds(60)); + RemoteTaskFixture fixture = createFixtureWith(builder.build()); MockTask task = fixture.scheduleVersionDependentTaskWithSideEffects(); communicator.setShouldDeferDistributorClusterStateAcks(true); @@ -1610,11 +1611,11 @@ public class StateChangeTest extends FleetControllerTest { } private void doTestTaskDeadlineExceeded(boolean deferredActivation, String expectedMessage) throws Exception { - FleetControllerOptions options = defaultOptions(); + FleetControllerOptions.Builder options = defaultOptions(); options.setMaxDeferredTaskVersionWaitTime(Duration.ofSeconds(60)); - options.enableTwoPhaseClusterStateActivation = deferredActivation; - options.maxDivergentNodesPrintedInTaskErrorMessages = 10; - RemoteTaskFixture fixture = createFixtureWith(options); + options.enableTwoPhaseClusterStateActivation(deferredActivation); + options.setMaxDivergentNodesPrintedInTaskErrorMessages(10); + RemoteTaskFixture fixture = createFixtureWith(options.build()); MockTask task = fixture.scheduleVersionDependentTaskWithSideEffects(); communicator.setShouldDeferDistributorClusterStateAcks(true); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateGatherTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateGatherTest.java index 656d5a187d1..712ed9172d5 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateGatherTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateGatherTest.java @@ -27,16 +27,16 @@ public class StateGatherTest extends FleetControllerTest { void testAlwaysHavePendingGetNodeStateRequestTowardsNodes() throws Exception { Logger.getLogger(NodeStateGatherer.class.getName()).setLevel(Level.FINEST); startingTest("StateGatherTest::testOverlappingGetNodeStateRequests"); - FleetControllerOptions options = defaultOptions("mycluster"); - options.nodeStateRequestTimeoutMS = 10 * 60 * 1000; - // Force actual message timeout to be lower than request timeout. - options.nodeStateRequestTimeoutEarliestPercentage = 80; - options.nodeStateRequestTimeoutLatestPercentage = 80; - setUpFleetController(true, options); + FleetControllerOptions.Builder builder = defaultOptions("mycluster") + .setNodeStateRequestTimeoutMS(10 * 60 * 1000) + // Force actual message timeout to be lower than request timeout. + .setNodeStateRequestTimeoutEarliestPercentage(80) + .setNodeStateRequestTimeoutLatestPercentage(80); + setUpFleetController(true, builder); String[] connectionSpecs = getSlobrokConnectionSpecs(slobrok); DummyVdsNodeOptions dummyOptions = new DummyVdsNodeOptions(); - DummyVdsNode dnode = new DummyVdsNode(timer, dummyOptions, connectionSpecs, this.options.clusterName, true, 0); - DummyVdsNode snode = new DummyVdsNode(timer, dummyOptions, connectionSpecs, this.options.clusterName, false, 0); + DummyVdsNode dnode = new DummyVdsNode(timer, dummyOptions, connectionSpecs, builder.clusterName(), true, 0); + DummyVdsNode snode = new DummyVdsNode(timer, dummyOptions, connectionSpecs, builder.clusterName(), false, 0); dnode.connect(); snode.connect(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index b9a9d7fbf8f..7e0b7f6d953 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -23,7 +23,6 @@ import com.yahoo.vespa.clustercontroller.core.SetClusterStateRequest; import com.yahoo.vespa.clustercontroller.core.Timer; import org.junit.jupiter.api.Test; import org.mockito.Mockito; - import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -34,7 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -79,11 +77,11 @@ public class RPCCommunicatorTest { @Test void testGenerateNodeStateRequestTimeoutMsWithUpdates() { final RPCCommunicator communicator = new RPCCommunicator(RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, 1, 1, 100, 0); - FleetControllerOptions fleetControllerOptions = new FleetControllerOptions(null /*clustername*/, Set.of(new ConfiguredNode(0, false))); - fleetControllerOptions.nodeStateRequestTimeoutEarliestPercentage = 100; - fleetControllerOptions.nodeStateRequestTimeoutLatestPercentage = 100; - fleetControllerOptions.nodeStateRequestTimeoutMS = NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS; - communicator.propagateOptions(fleetControllerOptions); + FleetControllerOptions.Builder builder = new FleetControllerOptions.Builder(null /*clustername*/, Set.of(new ConfiguredNode(0, false))); + builder.setNodeStateRequestTimeoutEarliestPercentage(100); + builder.setNodeStateRequestTimeoutLatestPercentage(100); + builder.setNodeStateRequestTimeoutMS(NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS); + communicator.propagateOptions(builder.build()); long timeOutMs = communicator.generateNodeStateRequestTimeout().toMillis(); assertEquals(timeOutMs, NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS); } -- cgit v1.2.3
PropertyValue
Cluster name").append(options.clusterName).append("
Fleet controller index").append(options.fleetControllerIndex).append("/").append(options.fleetControllerCount).append("
Number of fleetcontrollers gathering states from nodes").append(options.stateGatherCount).append("
Cluster name").append(options.clusterName()).append("
Fleet controller index").append(options.fleetControllerIndex()).append("/").append(options.fleetControllerCount()).append("
Number of fleetcontrollers gathering states from nodes").append(options.stateGatherCount()).append("
Slobrok connection spec").append(slobrokspecs).append("
RPC port").append(options.rpcPort == 0 ? "Pick random available" : options.rpcPort).append("
HTTP port").append(options.httpPort == 0 ? "Pick random available" : options.httpPort).append("
Master cooldown period").append(RealTimer.printDuration(options.masterZooKeeperCooldownPeriod)).append("
RPC port").append(options.rpcPort() == 0 ? "Pick random available" : options.rpcPort()).append("
HTTP port").append(options.httpPort() == 0 ? "Pick random available" : options.httpPort()).append("
Master cooldown period").append(RealTimer.printDuration(options.masterZooKeeperCooldownPeriod())).append("
Zookeeper server address").append(zooKeeperAddress).append("
Zookeeper session timeout").append(RealTimer.printDuration(options.zooKeeperSessionTimeout)).append("
Zookeeper session timeout").append(RealTimer.printDuration(options.zooKeeperSessionTimeout())).append("
Cycle wait time").append(options.cycleWaitTime).append(" ms
Minimum time before first clusterstate broadcast as master").append(RealTimer.printDuration(options.minTimeBeforeFirstSystemStateBroadcast)).append("
Minimum time between official cluster states").append(RealTimer.printDuration(options.minTimeBetweenNewSystemStates)).append("
Slobrok mirror backoff policy").append(options.slobrokBackOffPolicy == null ? "default" : "overridden").append("
Cycle wait time").append(options.cycleWaitTime()).append(" ms
Minimum time before first clusterstate broadcast as master").append(RealTimer.printDuration(options.minTimeBeforeFirstSystemStateBroadcast())).append("
Minimum time between official cluster states").append(RealTimer.printDuration(options.minTimeBetweenNewSystemStates())).append("
Slobrok mirror backoff policy").append(options.slobrokBackOffPolicy() == null ? "default" : "overridden").append("
Node state request timeout").append(RealTimer.printDuration(options.nodeStateRequestTimeoutMS)).append("
VDS 4.1 node state polling frequency").append(RealTimer.printDuration(options.statePollingFrequency)).append("
Maximum distributor transition time").append(RealTimer.printDuration(options.maxTransitionTime.get(NodeType.DISTRIBUTOR))).append("
Maximum storage transition time").append(RealTimer.printDuration(options.maxTransitionTime.get(NodeType.STORAGE))).append("
Maximum initialize without progress time").append(RealTimer.printDuration(options.maxInitProgressTime)).append("
Maximum premature crashes").append(options.maxPrematureCrashes).append("
Stable state time period").append(RealTimer.printDuration(options.stableStateTimePeriod)).append("
Slobrok disconnect grace period").append(RealTimer.printDuration(options.maxSlobrokDisconnectGracePeriod)).append("
Node state request timeout").append(RealTimer.printDuration(options.nodeStateRequestTimeoutMS())).append("
VDS 4.1 node state polling frequency").append(RealTimer.printDuration(options.statePollingFrequency())).append("
Maximum distributor transition time").append(RealTimer.printDuration(options.maxTransitionTime().get(NodeType.DISTRIBUTOR))).append("
Maximum storage transition time").append(RealTimer.printDuration(options.maxTransitionTime().get(NodeType.STORAGE))).append("
Maximum initialize without progress time").append(RealTimer.printDuration(options.maxInitProgressTime())).append("
Maximum premature crashes").append(options.maxPrematureCrashes()).append("
Stable state time period").append(RealTimer.printDuration(options.stableStateTimePeriod())).append("
Slobrok disconnect grace period").append(RealTimer.printDuration(options.maxSlobrokDisconnectGracePeriod())).append("
Number of distributor nodes").append(options.nodes == null ? "Autodetect" : options.nodes.size()).append("
Number of storage nodes").append(options.nodes == null ? "Autodetect" : options.nodes.size()).append("
Minimum distributor nodes being up for cluster to be up").append(options.minDistributorNodesUp).append("
Minimum storage nodes being up for cluster to be up").append(options.minStorageNodesUp).append("
Minimum percentage of distributor nodes being up for cluster to be up").append(DecimalDot2.format(100 * options.minRatioOfDistributorNodesUp)).append(" %
Minimum percentage of storage nodes being up for cluster to be up").append(DecimalDot2.format(100 * options.minRatioOfStorageNodesUp)).append(" %
Number of distributor nodes").append(options.nodes() == null ? "Autodetect" : options.nodes().size()).append("
Number of storage nodes").append(options.nodes() == null ? "Autodetect" : options.nodes().size()).append("
Minimum distributor nodes being up for cluster to be up").append(options.minDistributorNodesUp()).append("
Minimum storage nodes being up for cluster to be up").append(options.minStorageNodesUp()).append("
Minimum percentage of distributor nodes being up for cluster to be up").append(DecimalDot2.format(100 * options.minRatioOfDistributorNodesUp())).append(" %
Minimum percentage of storage nodes being up for cluster to be up").append(DecimalDot2.format(100 * options.minRatioOfStorageNodesUp())).append(" %
Show local cluster state changes").append(options.showLocalSystemStatesInEventLog).append("
Maximum event log size").append(options.eventLogMaxSize).append("
Maximum node event log size").append(options.eventNodeLogMaxSize).append("
Wanted distribution bits").append(options.distributionBits).append("
Max deferred task version wait time").append(options.maxDeferredTaskVersionWaitTime.toMillis()).append("ms
Cluster has global document types configured").append(options.clusterHasGlobalDocumentTypes).append("
Enable 2-phase cluster state activation protocol").append(options.enableTwoPhaseClusterStateActivation).append("
Show local cluster state changes").append(options.showLocalSystemStatesInEventLog()).append("
Maximum event log size").append(options.eventLogMaxSize()).append("
Maximum node event log size").append(options.eventNodeLogMaxSize()).append("
Wanted distribution bits").append(options.distributionBits()).append("
Max deferred task version wait time").append(options.maxDeferredTaskVersionWaitTime().toMillis()).append("ms
Cluster has global document types configured").append(options.clusterHasGlobalDocumentTypes()).append("
Enable 2-phase cluster state activation protocol").append(options.enableTwoPhaseClusterStateActivation()).append("
Cluster auto feed block on resource exhaustion enabled") - .append(options.clusterFeedBlockEnabled).append("
Feed block limits") - .append(options.clusterFeedBlockLimit.entrySet().stream() + .append(options.clusterFeedBlockLimit().entrySet().stream() .map(kv -> String.format("%s: %.2f%%", kv.getKey(), kv.getValue() * 100.0)) .collect(Collectors.joining("
"))).append("