diff options
13 files changed, 301 insertions, 106 deletions
diff --git a/configdefinitions/src/vespa/configserver.def b/configdefinitions/src/vespa/configserver.def index d2cdc30882c..494dae8b086 100644 --- a/configdefinitions/src/vespa/configserver.def +++ b/configdefinitions/src/vespa/configserver.def @@ -4,7 +4,8 @@ namespace=cloud.config # Ports rpcport int default=19070 httpport int default=19071 -numthreads int default=32 +# 0 means use the number of CPU cores available +numRpcThreads int default=0 # ZooKeeper zookeeperserver[].hostname string diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java index b7567769afd..c39f85ec87f 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java @@ -57,7 +57,7 @@ public class FileDistributionImpl implements FileDistribution { log.log(LogLevel.DEBUG, "Executing " + request.methodName() + " against " + target.toString()); target.invokeSync(request, timeout); if (request.isError() && request.errorCode() != ErrorCode.CONNECTION) { - log.log(LogLevel.INFO, request.methodName() + " failed: " + request.errorCode() + " (" + request.errorMessage() + ")"); + log.log(LogLevel.DEBUG, request.methodName() + " failed: " + request.errorCode() + " (" + request.errorMessage() + ")"); } target.close(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 08095c55373..a5f288bf254 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -116,7 +116,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap()); this.hostLivenessTracker = hostLivenessTracker; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients()); - executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(), + int numberOfRpcThreads = (config.numRpcThreads() == 0) ? Runtime.getRuntime().availableProcessors() : config.numRpcThreads(); + executorService = new ThreadPoolExecutor(numberOfRpcThreads, numberOfRpcThreads, 0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME)); delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads()); spec = new Spec(null, config.rpcport()); @@ -461,7 +462,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { request.parameters().add(new StringValue(fileData.filename())); request.parameters().add(new StringValue(fileData.type().name())); request.parameters().add(new Int64Value(fileData.size())); - target.invokeSync(request, 600); + invokeRpcIfValidConnection(request); if (request.isError()) { log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + target.toString() + " with error: '" + request.errorMessage() + "'."); @@ -479,7 +480,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { request.parameters().add(new Int32Value(session)); request.parameters().add(new Int32Value(partId)); request.parameters().add(new DataValue(buf)); - target.invokeSync(request, 600); + invokeRpcIfValidConnection(request); if (request.isError()) { throw new IllegalArgumentException("Failed delivering reference '" + ref.value() + "' to " + target.toString() + " with error: '" + request.errorMessage() + "'."); @@ -496,7 +497,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { request.parameters().add(new Int64Value(fileData.xxhash())); request.parameters().add(new Int32Value(status.getCode())); request.parameters().add(new StringValue(status.getDescription())); - target.invokeSync(request, 600); + invokeRpcIfValidConnection(request); if (request.isError()) { throw new IllegalArgumentException("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + target.toString() + " with error: '" + request.errorMessage() + "'."); @@ -506,6 +507,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } } + + private void invokeRpcIfValidConnection(Request request) { + if (target.isValid()) { + target.invokeSync(request, 600); + } else { + throw new RuntimeException("Connection to " + target + " is invalid", target.getConnectionLostReason()); + } + } } @SuppressWarnings("UnusedDeclaration") diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java index 12dc584f055..e022b622fb0 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java @@ -82,7 +82,11 @@ public class TestWithRpc { protected void createAndStartRpcServer(boolean hostedVespa) { ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder()); - rpcServer = new RpcServer(new ConfigserverConfig(new ConfigserverConfig.Builder().rpcport(port).numthreads(1).maxgetconfigclients(1).hostedVespa(hostedVespa)), + rpcServer = new RpcServer(new ConfigserverConfig(new ConfigserverConfig.Builder() + .rpcport(port) + .numRpcThreads(1) + .maxgetconfigclients(1) + .hostedVespa(hostedVespa)), new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, new SuperModelManager( diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 20ad2e48fe2..7a590f5e77d 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -33,7 +33,7 @@ import java.util.logging.Logger; public class FileReferenceDownloader { private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); - private final static Duration rpcTimeout = Duration.ofSeconds(10); + private final static Duration rpcTimeout = Duration.ofMinutes(10); private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); diff --git a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java index 26abdd43815..65fa83598b6 100644 --- a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java +++ b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java @@ -3,6 +3,9 @@ package com.yahoo.logserver; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import com.yahoo.io.SelectLoopHook; @@ -20,18 +23,14 @@ import com.yahoo.logserver.handlers.LogHandler; public class LogDispatcher implements LogHandler, SelectLoopHook { private static final Logger log = Logger.getLogger(LogDispatcher.class.getName()); - private final List<LogHandler> handlers = new ArrayList<>(); - private int messageCount = 0; - private boolean hasBeenShutDown = false; - private boolean batchedMode = false; + private final List<LogHandler> handlers = new CopyOnWriteArrayList<>(); + private final AtomicInteger messageCount = new AtomicInteger(0); + private final AtomicBoolean batchedMode = new AtomicBoolean(false); private final int batchSize = 5000; - private List<LogMessage> currentBatchList; - private int roundCount = 0; - @SuppressWarnings("unused") - private int lastRoundCount = 0; + private final AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); + private List<LogMessage> currentBatchList = null; - public LogDispatcher() { - } + public LogDispatcher() { } /** * Dispatches a message to all the LogHandler instances we've @@ -41,47 +40,53 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * @param msg The LogMessage instance we wish to dispatch to the * plugins */ - public synchronized void handle(LogMessage msg) { + public void handle(LogMessage msg) { if (msg == null) { throw new NullPointerException("LogMessage was null"); } - if (batchedMode) { + if (batchedMode.get()) { addToBatch(msg); } else { - for (LogHandler h : handlers) { - h.handle(msg); - } + send(msg); } - messageCount++; + messageCount.incrementAndGet(); } private void addToBatch(LogMessage msg) { - if (currentBatchList == null) { - currentBatchList = new ArrayList<LogMessage>(batchSize); - currentBatchList.add(msg); - return; - } + List<LogMessage> toSend = null; + synchronized (this) { + if (currentBatchList == null) { + currentBatchList = new ArrayList<LogMessage>(batchSize); + currentBatchList.add(msg); + return; + } - currentBatchList.add(msg); + currentBatchList.add(msg); - if (currentBatchList.size() == batchSize) { - flushBatch(); + if (currentBatchList.size() == batchSize) { + toSend = stealBatch(); + } } + flushBatch(toSend); } - private void flushBatch() { - List<LogMessage> todo; - synchronized(this) { - todo = currentBatchList; - currentBatchList = null; + private void send(List<LogMessage> messages) { + for (LogHandler ht : handlers) { + ht.handle(messages); } - if (todo == null) return; + } + private void send(LogMessage message) { for (LogHandler ht : handlers) { - ht.handle(todo); + ht.handle(message); } } + private void flushBatch(List<LogMessage> todo) { + if (todo == null) { return; } + send(todo); + } + public void handle(List<LogMessage> messages) { throw new IllegalStateException("method not supported"); } @@ -94,12 +99,20 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * but lists of same. */ public void setBatchedMode(boolean batchedMode) { - this.batchedMode = batchedMode; + this.batchedMode.set(batchedMode); } - public synchronized void flush() { - if (batchedMode) { - flushBatch(); + private List<LogMessage> stealBatch() { + List<LogMessage> toSend = null; + synchronized (this) { + toSend = currentBatchList; + currentBatchList = null; + } + return toSend; + } + public void flush() { + if (batchedMode.get()) { + flushBatch(stealBatch()); } for (LogHandler h : handlers) { @@ -110,15 +123,15 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { } } - public synchronized void close() { - if (hasBeenShutDown) { + public void close() { + if (hasBeenShutDown.getAndSet(true)) { throw new IllegalStateException("Shutdown already in progress"); } - hasBeenShutDown = true; for (LogHandler ht : handlers) { if (ht instanceof Thread) { log.fine("Stopping " + ht); + // Todo: Very bad, never do.... ((Thread) ht).interrupt(); } } @@ -134,17 +147,18 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * <p> * If the thread is not alive it will be start()'ed. */ - public synchronized void registerLogHandler(LogHandler ht) { - if (hasBeenShutDown) { - throw new IllegalStateException("Tried to register LogHandler on" + - " LogDispatcher which was shut down"); + public void registerLogHandler(LogHandler ht) { + if (hasBeenShutDown.get()) { + throw new IllegalStateException("Tried to register LogHandler on LogDispatcher which was shut down"); } - if (handlers.contains(ht)) { - log.warning("LogHandler was already registered: " + ht); - return; + synchronized (this) { + if (handlers.contains(ht)) { + log.warning("LogHandler was already registered: " + ht); + return; + } + handlers.add(ht); } - handlers.add(ht); if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) { ((Thread) ht).start(); @@ -166,19 +180,16 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * * @return Returns the number of messages that we have seen. */ - public synchronized int getMessageCount() { - return messageCount; + public int getMessageCount() { + return messageCount.get(); } /** * Hook which is called when the select loop has finished. */ public void selectLoopHook(boolean before) { - if (batchedMode) { - flushBatch(); + if (batchedMode.get()) { + flushBatch(stealBatch()); } - - lastRoundCount = messageCount - roundCount; - roundCount = messageCount; } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java index b6b64dbf5dd..a1cc192be2a 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java @@ -3,8 +3,12 @@ package com.yahoo.vespa.hosted.node.admin.component; /** * This class is thread unsafe: All method calls MUST be exclusive and serialized. + * + * In a specialized environment it is possible to provide a richer context than TaskContext: + * - Define a subclass T of TaskContext with the additional functionality. + * - Define task classes that implement IdempotentTask<T>. */ -public interface IdempotentTask { +public interface IdempotentTask<T extends TaskContext> { String name(); /** @@ -17,5 +21,5 @@ public interface IdempotentTask { * @return false if the system was already converged, i.e. converge() was a no-op. * @throws RuntimeException (or a subclass) if the task is unable to converge. */ - boolean converge(TaskContext context); + boolean converge(T context); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java index c54f9ee00c8..cbe9b32cc47 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java @@ -4,7 +4,7 @@ package com.yahoo.vespa.hosted.node.admin.component; import com.yahoo.component.ComponentId; import com.yahoo.component.chain.ChainedComponent; -public abstract class TaskComponent extends ChainedComponent implements IdempotentTask { +public abstract class TaskComponent extends ChainedComponent implements IdempotentTask<TaskContext> { protected TaskComponent(ComponentId id) { super(id); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java index d0a5570b8dc..0c49e478d6a 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java @@ -50,10 +50,4 @@ public interface TaskContext { * or later if the task failed. Either way, it will only be called once. */ default void logOnFailure(Logger logger, Supplier<String> messageSupplier) {} - - /** - * Execute a task as a child of this task, and with its own sub-TaskContext. Please avoid - * excessive task hierarchies. - */ - boolean executeSubtask(IdempotentTask task); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java index 108d42114f7..bc0ec2c8700 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.hosted.node.admin.component; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; import java.util.logging.Logger; public class TestTaskContext implements TaskContext { @@ -18,9 +17,6 @@ public class TestTaskContext implements TaskContext { @Override public void log(Logger logger, String message) { } - @Override - public void logOnFailure(Logger logger, Supplier<String> messageSupplier) { } - public List<String> getSystemModificationLog() { return systemModifications; } @@ -28,9 +24,4 @@ public class TestTaskContext implements TaskContext { public void clearSystemModificationLog() { systemModifications.clear(); } - - @Override - public boolean executeSubtask(IdempotentTask task) { - throw new UnsupportedOperationException(); - } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index d9d7b4a5d12..2a140945f43 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -213,23 +213,23 @@ class NodeAllocation { * @return the final list of nodes */ List<Node> finalNodes(List<Node> surplusNodes) { - long currentRetired = nodes.stream().filter(node -> node.node.allocation().get().membership().retired()).count(); - long surplus = requestedNodes.surplusGiven(nodes.size()) - currentRetired; + int currentRetiredCount = (int) nodes.stream().filter(node -> node.node.allocation().get().membership().retired()).count(); + int deltaRetiredCount = requestedNodes.idealRetiredCount(nodes.size(), currentRetiredCount) - currentRetiredCount; - if (surplus > 0) { // retire until surplus is 0, prefer to retire higher indexes to minimize redistribution + if (deltaRetiredCount > 0) { // retire until deltaRetiredCount is 0, prefer to retire higher indexes to minimize redistribution for (PrioritizableNode node : byDecreasingIndex(nodes)) { if ( ! node.node.allocation().get().membership().retired() && node.node.state().equals(Node.State.active)) { node.node = node.node.retire(Agent.application, clock.instant()); surplusNodes.add(node.node); // offer this node to other groups - if (--surplus == 0) break; + if (--deltaRetiredCount == 0) break; } } } - else if (surplus < 0) { // unretire until surplus is 0 + else if (deltaRetiredCount < 0) { // unretire until deltaRetiredCount is 0 for (PrioritizableNode node : byIncreasingIndex(nodes)) { if ( node.node.allocation().get().membership().retired() && hasCompatibleFlavor(node.node)) { node.node = node.node.unretire(); - if (++surplus == 0) break; + if (++deltaRetiredCount == 0) break; } } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java index 23a6e3a8b9a..dc3f4a64421 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java @@ -34,8 +34,8 @@ public interface NodeSpec { /** Returns whether the given node count is sufficient to fulfill this spec */ boolean fulfilledBy(int count); - /** Returns the amount the given count is above the minimum amount needed to fulfill this request */ - int surplusGiven(int count); + /** Returns the ideal number of nodes that should be retired to fulfill this spec */ + int idealRetiredCount(int acceptedCount, int currentRetiredCount); /** Returns a specification of a fraction of all the nodes of this. It is assumed the argument is a valid divisor. */ NodeSpec fraction(int divisor); @@ -97,7 +97,7 @@ public interface NodeSpec { public boolean saturatedBy(int count) { return fulfilledBy(count); } // min=max for count specs @Override - public int surplusGiven(int count) { return count - this.count; } + public int idealRetiredCount(int acceptedCount, int currentRetiredCount) { return acceptedCount - this.count; } @Override public NodeSpec fraction(int divisor) { return new CountNodeSpec(count/divisor, requestedFlavor); } @@ -152,7 +152,14 @@ public interface NodeSpec { public boolean saturatedBy(int count) { return false; } @Override - public int surplusGiven(int count) { return 0; } + public int idealRetiredCount(int acceptedCount, int currentRetiredCount) { + /* + * All nodes marked with wantToRetire get marked as retired just before this function is called, + * the job of this function is to throttle the retired count. If no nodes are marked as retired + * then continue this way, otherwise allow only 1 node to be retired + */ + return Math.min(1, currentRetiredCount); + } @Override public NodeSpec fraction(int divisor) { return this; } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java index 873193ac3b8..97fde2274f2 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java @@ -11,15 +11,22 @@ import com.yahoo.config.provision.NodeType; import com.yahoo.config.provision.RegionName; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.maintenance.JobControl; +import com.yahoo.vespa.hosted.provision.maintenance.RetiredExpirer; import com.yahoo.vespa.hosted.provision.node.Agent; +import com.yahoo.vespa.hosted.provision.testutils.MockDeployer; +import org.junit.Before; import org.junit.Test; +import java.time.Duration; +import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Optional; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * Tests provisioning by node type instead of by count and flavor @@ -28,28 +35,31 @@ import static org.junit.Assert.assertFalse; */ public class NodeTypeProvisioningTest { - @Test - public void proxy_deployment() { - ProvisioningTester tester = new ProvisioningTester(new Zone(Environment.prod, RegionName.from("us-east"))); + private final ProvisioningTester tester = new ProvisioningTester(new Zone(Environment.prod, RegionName.from("us-east"))); + + private final ApplicationId application = tester.makeApplicationId(); // application using proxy nodes + private final Capacity capacity = Capacity.fromRequiredNodeType(NodeType.proxy); + private final ClusterSpec clusterSpec = ClusterSpec.request( + ClusterSpec.Type.container, ClusterSpec.Id.from("test"), Version.fromString("6.42")); + @Before + public void setup() { tester.makeReadyNodes( 1, "small", NodeType.proxy); tester.makeReadyNodes( 3, "small", NodeType.host); tester.makeReadyNodes( 5, "small", NodeType.tenant); tester.makeReadyNodes(10, "large", NodeType.proxy); tester.makeReadyNodes(20, "large", NodeType.host); tester.makeReadyNodes(40, "large", NodeType.tenant); + } - ApplicationId application = tester.makeApplicationId(); // application using proxy nodes - - + @Test + public void proxy_deployment() { { // Deploy List<HostSpec> hosts = deployProxies(application, tester); assertEquals("Reserved all proxies", 11, hosts.size()); tester.activate(application, new HashSet<>(hosts)); List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); assertEquals("Activated all proxies", 11, nodes.size()); - for (Node node : nodes) - assertEquals(NodeType.proxy, node.type()); } { // Redeploy with no changes @@ -83,14 +93,178 @@ public class NodeTypeProvisioningTest { } } + @Test + public void retire_proxy() { + MockDeployer deployer = new MockDeployer( + tester.provisioner(), + Collections.singletonMap( + application, new MockDeployer.ApplicationContext(application, clusterSpec, capacity, 1))); + RetiredExpirer retiredExpirer = new RetiredExpirer(tester.nodeRepository(), tester.orchestrator(), deployer, + tester.clock(), Duration.ofDays(30), Duration.ofMinutes(10), new JobControl(tester.nodeRepository().database())); + + { // Deploy + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals("Reserved all proxies", 11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals("Activated all proxies", 11, nodes.size()); + } + + Node nodeToRetire = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active).get(5); + { // Pick out a node and retire it + tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true))); + + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(11, nodes.size()); + + // Verify that wantToRetire has been propagated + assertTrue(tester.nodeRepository().getNode(nodeToRetire.hostname()) + .flatMap(Node::allocation) + .map(allocation -> allocation.membership().retired()) + .orElseThrow(RuntimeException::new)); + } + + { // Redeploying while the node is still retiring has no effect + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(11, nodes.size()); + + // Verify that the node is still marked as retired + assertTrue(tester.nodeRepository().getNode(nodeToRetire.hostname()) + .flatMap(Node::allocation) + .map(allocation -> allocation.membership().retired()) + .orElseThrow(RuntimeException::new)); + } + + { + tester.advanceTime(Duration.ofMinutes(11)); + retiredExpirer.run(); + + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(10, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(10, nodes.size()); + + // Verify that the node is now inactive + assertEquals(Node.State.inactive, tester.nodeRepository().getNode(nodeToRetire.hostname()) + .orElseThrow(RuntimeException::new).state()); + } + } + + @Test + public void retire_multiple_proxy_simultaneously() { + MockDeployer deployer = new MockDeployer( + tester.provisioner(), + Collections.singletonMap( + application, new MockDeployer.ApplicationContext(application, clusterSpec, capacity, 1))); + RetiredExpirer retiredExpirer = new RetiredExpirer(tester.nodeRepository(), tester.orchestrator(), deployer, + tester.clock(), Duration.ofDays(30), Duration.ofMinutes(10), new JobControl(tester.nodeRepository().database())); + final int numNodesToRetire = 5; + + { // Deploy + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals("Reserved all proxies", 11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals("Activated all proxies", 11, nodes.size()); + } + + List<Node> nodesToRetire = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active) + .subList(3, 3 + numNodesToRetire); + String currentyRetiringHostname; + { + nodesToRetire.forEach(nodeToRetire -> + tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true)))); + + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(11, nodes.size()); + + // Verify that wantToRetire has been propagated + List<Node> nodesCurrentlyRetiring = nodes.stream() + .filter(node -> node.allocation().get().membership().retired()) + .collect(Collectors.toList()); + assertEquals(1, nodesCurrentlyRetiring.size()); + + // The retiring node should be one of the nodes we marked for retirement + currentyRetiringHostname = nodesCurrentlyRetiring.get(0).hostname(); + assertTrue(nodesToRetire.stream().map(Node::hostname).filter(hostname -> hostname.equals(currentyRetiringHostname)).count() == 1); + } + + { // Redeploying while the node is still retiring has no effect + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(11, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(11, nodes.size()); + + // Verify that wantToRetire has been propagated + List<Node> nodesCurrentlyRetiring = nodes.stream() + .filter(node -> node.allocation().get().membership().retired()) + .collect(Collectors.toList()); + assertEquals(1, nodesCurrentlyRetiring.size()); + + // The node that started retiring is still the only one retiring + assertEquals(currentyRetiringHostname, nodesCurrentlyRetiring.get(0).hostname()); + } + + { + tester.advanceTime(Duration.ofMinutes(11)); + retiredExpirer.run(); + + List<HostSpec> hosts = deployProxies(application, tester); + assertEquals(10, hosts.size()); + tester.activate(application, new HashSet<>(hosts)); + List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active); + assertEquals(10, nodes.size()); + + // Verify the node we previously set to retire has finished retiring + assertEquals(Node.State.inactive, tester.nodeRepository().getNode(currentyRetiringHostname) + .orElseThrow(RuntimeException::new).state()); + + // Verify that a node is currently retiring + List<Node> nodesCurrentlyRetiring = nodes.stream() + .filter(node -> node.allocation().get().membership().retired()) + .collect(Collectors.toList()); + assertEquals(1, nodesCurrentlyRetiring.size()); + + // This node is different from the one that was retiring previously + String newRetiringHostname = nodesCurrentlyRetiring.get(0).hostname(); + assertNotEquals(currentyRetiringHostname, newRetiringHostname); + // ... but is one of the nodes that were put to wantToRetire earlier + assertTrue(nodesToRetire.stream().map(Node::hostname).filter(hostname -> hostname.equals(newRetiringHostname)).count() == 1); + } + + + for (int i = 0; i < 10; i++){ + tester.advanceTime(Duration.ofMinutes(11)); + retiredExpirer.run(); + List<HostSpec> hosts = deployProxies(application, tester); + tester.activate(application, new HashSet<>(hosts)); + } + + // After a long time, all currently active proxy nodes are not marked with wantToRetire or as retired + long numRetiredActiveProxyNodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active).stream() + .filter(node -> !node.status().wantToRetire()) + .filter(node -> !node.allocation().get().membership().retired()) + .count(); + assertEquals(11 - numNodesToRetire, numRetiredActiveProxyNodes); + + // All the nodes that were marked with wantToRetire earlier are now inactive + assertEquals(nodesToRetire.stream().map(Node::hostname).collect(Collectors.toSet()), + tester.nodeRepository().getNodes(Node.State.inactive).stream().map(Node::hostname).collect(Collectors.toSet())); + } + private List<HostSpec> deployProxies(ApplicationId application, ProvisioningTester tester) { - return tester.prepare(application, - ClusterSpec.request(ClusterSpec.Type.container, - ClusterSpec.Id.from("test"), - Version.fromString("6.42")), - Capacity.fromRequiredNodeType(NodeType.proxy), - 1); - + return tester.prepare(application, clusterSpec, capacity, 1); } } |