// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.server.Slobrok; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(FleetControllerTest.CleanupZookeeperLogsOnSuccess.class) @Timeout(120) public class MasterElectionTest extends FleetControllerTest { private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName()); private Supervisor supervisor; private final List fleetControllers = new ArrayList<>(); private static int defaultZkSessionTimeoutInMillis() { return 30_000; } protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception { if (zooKeeperServer == null) { zooKeeperServer = new ZooKeeperTestServer(); } slobrok = new Slobrok(); this.options = options; this.options.zooKeeperSessionTimeout = defaultZkSessionTimeoutInMillis(); this.options.zooKeeperServerAddress = zooKeeperServer.getAddress(); this.options.slobrokConnectionSpecs = getSlobrokConnectionSpecs(slobrok); this.options.fleetControllerCount = count; for (int i=0; i connections, int[] nodes) { Instant endTime = Instant.now().plus(timeout()); while (Instant.now().isBefore(endTime)) { boolean allOk = true; for (int node : nodes) { Request req = new Request("getMaster"); connections.get(node).invokeSync(req, timeoutInSeconds()); if (req.isError()) { allOk = false; break; } if (master != null && master != req.returnValues().get(0).asInt32()) { allOk = false; break; } if (reason != null && ! reason.equals(req.returnValues().get(1).asString())) { allOk = false; break; } } if (allOk) return; try{ Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ } } throw new IllegalStateException("Did not get master reason '" + reason + "' within timeout of " + timeout()); } @Test void testGetMaster() throws Exception { startingTest("MasterElectionTest::testGetMaster"); FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 3600 * 1000; // An hour setUpFleetController(3, true, options); waitForMaster(0); supervisor = new Supervisor(new Transport()); List connections = new ArrayList<>(); for (FleetController fleetController : fleetControllers) { int rpcPort = fleetController.getRpcPort(); Target connection = supervisor.connect(new Spec("localhost", rpcPort)); assertTrue(connection.isValid()); connections.add(connection); } timer.advanceTime(24 * 3600 * 1000); // A day waitForCompleteCycles(); Request req = new Request("getMaster"); for (int nodeIndex = 0; nodeIndex < 3; ++nodeIndex) { for (int retry = 0; retry < timeoutInSeconds() * 10; ++retry) { req = new Request("getMaster"); connections.get(nodeIndex).invokeSync(req, timeoutInSeconds()); assertFalse(req.isError(), req.errorMessage()); if (req.returnValues().get(0).asInt32() == 0 && req.returnValues().get(1).asString().equals("All 3 nodes agree that 0 is current master.")) { break; } } assertEquals(0, req.returnValues().get(0).asInt32(), req.toString()); assertEquals("All 3 nodes agree that 0 is current master.", req.returnValues().get(1).asString(), req.toString()); } log.log(Level.INFO, "SHUTTING DOWN FLEET CONTROLLER 0"); fleetControllers.get(0).shutdown(); // Wait until fc 1 & 2 votes for node 1 waitForCompleteCycle(1); waitForCompleteCycle(2); // 5 minutes is not long enough period to wait before letting this node be master. timer.advanceTime(300 * 1000); // 5 minutes int[] remainingNodes = {1, 2}; waitForMasterReason( "2 of 3 nodes agree 1 should be master, but old master cooldown period of 3600000 ms has not passed yet. To ensure it has got time to realize it is no longer master before we elect a new one, currently there is no master.", -1, connections, remainingNodes); // Verify that fc 1 is not master, and the correct reasons for why not assertFalse(fleetControllers.get(1).isMaster()); // But after an hour it should become one. timer.advanceTime(3600 * 1000); // 60 minutes waitForMaster(1); req = new Request("getMaster"); connections.get(0).invokeSync(req, timeoutInSeconds()); assertEquals(104, req.errorCode(), req.toString()); assertEquals("Connection error", req.errorMessage(), req.toString()); for (int i = 0; i < timeoutInSeconds() * 10; ++i) { req = new Request("getMaster"); connections.get(1).invokeSync(req, timeoutInSeconds()); assertFalse(req.isError(), req.errorMessage()); if (req.returnValues().get(0).asInt32() != -1) break; // We may have bad timing causing node not to have realized it is master yet } assertEquals(1, req.returnValues().get(0).asInt32(), req.toString()); assertEquals("2 of 3 nodes agree 1 is master.", req.returnValues().get(1).asString(), req.toString()); for (int i = 0; i < timeoutInSeconds() * 10; ++i) { req = new Request("getMaster"); connections.get(2).invokeSync(req, timeoutInSeconds()); assertFalse(req.isError(), req.errorMessage()); if (req.returnValues().get(0).asInt32() != -1) break; } assertEquals(1, req.returnValues().get(0).asInt32(), req.toString()); assertEquals("2 of 3 nodes agree 1 is master.", req.returnValues().get(1).asString(), req.toString()); } @Test void testReconfigure() throws Exception { startingTest("MasterElectionTest::testReconfigure"); FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 1; setUpFleetController(3, false, options); waitForMaster(0); FleetControllerOptions newOptions = options.clone(); for (int i = 0; i < fleetControllers.size(); ++i) { FleetControllerOptions nodeOptions = adjustConfig(newOptions, i, fleetControllers.size()); fleetControllers.get(i).updateOptions(nodeOptions); } waitForMaster(0); log.log(Level.INFO, "SHUTTING DOWN FLEET CONTROLLER 0"); fleetControllers.get(0).shutdown(); waitForMaster(1); } /** * Should always write new version to ZooKeeper, even if the version will not * be published to any nodes. External services may still observe the version * number via the cluster REST API, and we should therefore ensure that we never * risk rolling back the version number in the face of a reelection. */ @Test void cluster_state_version_written_to_zookeeper_even_with_empty_send_set() throws Exception { startingTest("MasterElectionTest::cluster_state_version_written_to_zookeeper_even_with_empty_send_set"); FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 1; options.minRatioOfDistributorNodesUp = 0; options.minRatioOfStorageNodesUp = 0; options.minDistributorNodesUp = 0; options.minStorageNodesUp = 1; setUpFleetController(3, false, options); setUpVdsNodes(false, new DummyVdsNodeOptions()); fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing waitForStableSystem(); waitForMaster(0); // Explanation for this convoluted sequence of actions: we want to trigger a scenario where // we have a cluster state version bump _without_ there being any nodes to send the new state // to. If there's an "optimization" to skip writing the version to ZooKeeper if there are no // nodes in the version send set, a newly elected, different master will end up reusing the // very same version number. // We mark all nodes' Reported states as down (which means an empty send set, as no nodes are // online), then mark one storage node as Wanted state as Maintenance. This forces a cluster // state change. this.nodes.forEach(n -> { n.disconnectImmediately(); waitForCompleteCycle(0); }); setWantedState(this.nodes.get(2 * 10 - 1), State.MAINTENANCE, "bar"); waitForCompleteCycle(0); // This receives the version number of the highest _working_ cluster state, with // no guarantees that it has been published to any nodes yet. final long preElectionVersionNumber = fleetControllers.get(0).getSystemState().getVersion(); // Nuke controller 0, leaving controller 1 in charge. // It should have observed the most recently written version number and increase this // number before publishing its own new state. fleetControllers.get(0).shutdown(); waitForMaster(1); waitForCompleteCycle(1); final long postElectionVersionNumber = fleetControllers.get(1).getSystemState().getVersion(); assertTrue(postElectionVersionNumber > preElectionVersionNumber); } @Test void previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps() throws Exception { startingTest("MasterElectionTest::previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps"); FleetControllerOptions options = defaultOptions("mycluster"); options.clusterHasGlobalDocumentTypes = true; options.masterZooKeeperCooldownPeriod = 1; options.minTimeBeforeFirstSystemStateBroadcast = 100000; boolean useFakeTimer = false; setUpFleetController(3, useFakeTimer, options); setUpVdsNodes(false, new DummyVdsNodeOptions()); fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing waitForMaster(0); waitForStableSystem(); log.info("Waiting for full maintenance mode in default space"); waitForStateInSpace("default", "version:\\d+ distributor:10 storage:10 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:m .5.s:m .6.s:m .7.s:m .8.s:m .9.s:m"); log.info("Responding with zero global merges pending from all distributors"); final int ackVersion = fleetControllers.get(0).getClusterStateBundle().getVersion(); // ACKing with no merge state in host info (implied: no pending merges) should not cause // a new state to be published before the last node has ACKed. Consequently there should // not be any race potential where a new version is published concurrently with our attempts // at ACKing a previous one. this.nodes.stream().filter(DummyVdsNode::isDistributor).forEach(node -> { node.setNodeState(new NodeState(NodeType.DISTRIBUTOR, State.UP), String.format("{\"cluster-state-version\":%d}", ackVersion)); }); waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10"); log.info("Bundle before restart cycle: " + fleetControllers.get(0).getClusterStateBundle()); log.info("Doing restart cycle of controller 0"); fleetControllers.get(0).shutdown(); waitForMaster(1); waitForCompleteCycle(1); fleetControllers.set(0, createFleetController(useFakeTimer, fleetControllers.get(0).getOptions())); waitForMaster(0); waitForCompleteCycle(0); // We should NOT publish a state where all storage nodes are in Maintenance, since they were // marked as Up in the last published cluster state. log.info("Bundle after restart cycle: " + fleetControllers.get(0).getClusterStateBundle()); waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10"); } @Test void default_space_nodes_not_marked_as_maintenance_when_cluster_has_no_global_document_types() throws Exception { startingTest("MasterElectionTest::default_space_nodes_not_marked_as_maintenance_when_cluster_has_no_global_document_types"); FleetControllerOptions options = defaultOptions("mycluster"); options.clusterHasGlobalDocumentTypes = false; options.masterZooKeeperCooldownPeriod = 1; options.minTimeBeforeFirstSystemStateBroadcast = 100000; setUpFleetController(3, false, options); setUpVdsNodes(false, new DummyVdsNodeOptions()); fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing waitForMaster(0); waitForStableSystem(); waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10"); } }