aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/configserver.def3
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java2
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java17
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java6
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java2
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java117
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java8
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java6
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java9
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java12
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java15
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java208
13 files changed, 301 insertions, 106 deletions
diff --git a/configdefinitions/src/vespa/configserver.def b/configdefinitions/src/vespa/configserver.def
index d2cdc30882c..494dae8b086 100644
--- a/configdefinitions/src/vespa/configserver.def
+++ b/configdefinitions/src/vespa/configserver.def
@@ -4,7 +4,8 @@ namespace=cloud.config
# Ports
rpcport int default=19070
httpport int default=19071
-numthreads int default=32
+# 0 means use the number of CPU cores available
+numRpcThreads int default=0
# ZooKeeper
zookeeperserver[].hostname string
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
index b7567769afd..c39f85ec87f 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
@@ -57,7 +57,7 @@ public class FileDistributionImpl implements FileDistribution {
log.log(LogLevel.DEBUG, "Executing " + request.methodName() + " against " + target.toString());
target.invokeSync(request, timeout);
if (request.isError() && request.errorCode() != ErrorCode.CONNECTION) {
- log.log(LogLevel.INFO, request.methodName() + " failed: " + request.errorCode() + " (" + request.errorMessage() + ")");
+ log.log(LogLevel.DEBUG, request.methodName() + " failed: " + request.errorCode() + " (" + request.errorMessage() + ")");
}
target.close();
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 08095c55373..a5f288bf254 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -116,7 +116,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap());
this.hostLivenessTracker = hostLivenessTracker;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients());
- executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(),
+ int numberOfRpcThreads = (config.numRpcThreads() == 0) ? Runtime.getRuntime().availableProcessors() : config.numRpcThreads();
+ executorService = new ThreadPoolExecutor(numberOfRpcThreads, numberOfRpcThreads,
0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME));
delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads());
spec = new Spec(null, config.rpcport());
@@ -461,7 +462,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
request.parameters().add(new StringValue(fileData.filename()));
request.parameters().add(new StringValue(fileData.type().name()));
request.parameters().add(new Int64Value(fileData.size()));
- target.invokeSync(request, 600);
+ invokeRpcIfValidConnection(request);
if (request.isError()) {
log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
target.toString() + " with error: '" + request.errorMessage() + "'.");
@@ -479,7 +480,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
request.parameters().add(new Int32Value(session));
request.parameters().add(new Int32Value(partId));
request.parameters().add(new DataValue(buf));
- target.invokeSync(request, 600);
+ invokeRpcIfValidConnection(request);
if (request.isError()) {
throw new IllegalArgumentException("Failed delivering reference '" + ref.value() + "' to " +
target.toString() + " with error: '" + request.errorMessage() + "'.");
@@ -496,7 +497,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
request.parameters().add(new Int64Value(fileData.xxhash()));
request.parameters().add(new Int32Value(status.getCode()));
request.parameters().add(new StringValue(status.getDescription()));
- target.invokeSync(request, 600);
+ invokeRpcIfValidConnection(request);
if (request.isError()) {
throw new IllegalArgumentException("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
target.toString() + " with error: '" + request.errorMessage() + "'.");
@@ -506,6 +507,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
}
}
}
+
+ private void invokeRpcIfValidConnection(Request request) {
+ if (target.isValid()) {
+ target.invokeSync(request, 600);
+ } else {
+ throw new RuntimeException("Connection to " + target + " is invalid", target.getConnectionLostReason());
+ }
+ }
}
@SuppressWarnings("UnusedDeclaration")
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
index 12dc584f055..e022b622fb0 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
@@ -82,7 +82,11 @@ public class TestWithRpc {
protected void createAndStartRpcServer(boolean hostedVespa) {
ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder());
- rpcServer = new RpcServer(new ConfigserverConfig(new ConfigserverConfig.Builder().rpcport(port).numthreads(1).maxgetconfigclients(1).hostedVespa(hostedVespa)),
+ rpcServer = new RpcServer(new ConfigserverConfig(new ConfigserverConfig.Builder()
+ .rpcport(port)
+ .numRpcThreads(1)
+ .maxgetconfigclients(1)
+ .hostedVespa(hostedVespa)),
new SuperModelRequestHandler(new TestConfigDefinitionRepo(),
configserverConfig,
new SuperModelManager(
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 20ad2e48fe2..7a590f5e77d 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -33,7 +33,7 @@ import java.util.logging.Logger;
public class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
- private final static Duration rpcTimeout = Duration.ofSeconds(10);
+ private final static Duration rpcTimeout = Duration.ofMinutes(10);
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
diff --git a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
index 26abdd43815..65fa83598b6 100644
--- a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
+++ b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
@@ -3,6 +3,9 @@ package com.yahoo.logserver;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import com.yahoo.io.SelectLoopHook;
@@ -20,18 +23,14 @@ import com.yahoo.logserver.handlers.LogHandler;
public class LogDispatcher implements LogHandler, SelectLoopHook {
private static final Logger log = Logger.getLogger(LogDispatcher.class.getName());
- private final List<LogHandler> handlers = new ArrayList<>();
- private int messageCount = 0;
- private boolean hasBeenShutDown = false;
- private boolean batchedMode = false;
+ private final List<LogHandler> handlers = new CopyOnWriteArrayList<>();
+ private final AtomicInteger messageCount = new AtomicInteger(0);
+ private final AtomicBoolean batchedMode = new AtomicBoolean(false);
private final int batchSize = 5000;
- private List<LogMessage> currentBatchList;
- private int roundCount = 0;
- @SuppressWarnings("unused")
- private int lastRoundCount = 0;
+ private final AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
+ private List<LogMessage> currentBatchList = null;
- public LogDispatcher() {
- }
+ public LogDispatcher() { }
/**
* Dispatches a message to all the LogHandler instances we've
@@ -41,47 +40,53 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* @param msg The LogMessage instance we wish to dispatch to the
* plugins
*/
- public synchronized void handle(LogMessage msg) {
+ public void handle(LogMessage msg) {
if (msg == null) {
throw new NullPointerException("LogMessage was null");
}
- if (batchedMode) {
+ if (batchedMode.get()) {
addToBatch(msg);
} else {
- for (LogHandler h : handlers) {
- h.handle(msg);
- }
+ send(msg);
}
- messageCount++;
+ messageCount.incrementAndGet();
}
private void addToBatch(LogMessage msg) {
- if (currentBatchList == null) {
- currentBatchList = new ArrayList<LogMessage>(batchSize);
- currentBatchList.add(msg);
- return;
- }
+ List<LogMessage> toSend = null;
+ synchronized (this) {
+ if (currentBatchList == null) {
+ currentBatchList = new ArrayList<LogMessage>(batchSize);
+ currentBatchList.add(msg);
+ return;
+ }
- currentBatchList.add(msg);
+ currentBatchList.add(msg);
- if (currentBatchList.size() == batchSize) {
- flushBatch();
+ if (currentBatchList.size() == batchSize) {
+ toSend = stealBatch();
+ }
}
+ flushBatch(toSend);
}
- private void flushBatch() {
- List<LogMessage> todo;
- synchronized(this) {
- todo = currentBatchList;
- currentBatchList = null;
+ private void send(List<LogMessage> messages) {
+ for (LogHandler ht : handlers) {
+ ht.handle(messages);
}
- if (todo == null) return;
+ }
+ private void send(LogMessage message) {
for (LogHandler ht : handlers) {
- ht.handle(todo);
+ ht.handle(message);
}
}
+ private void flushBatch(List<LogMessage> todo) {
+ if (todo == null) { return; }
+ send(todo);
+ }
+
public void handle(List<LogMessage> messages) {
throw new IllegalStateException("method not supported");
}
@@ -94,12 +99,20 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* but lists of same.
*/
public void setBatchedMode(boolean batchedMode) {
- this.batchedMode = batchedMode;
+ this.batchedMode.set(batchedMode);
}
- public synchronized void flush() {
- if (batchedMode) {
- flushBatch();
+ private List<LogMessage> stealBatch() {
+ List<LogMessage> toSend = null;
+ synchronized (this) {
+ toSend = currentBatchList;
+ currentBatchList = null;
+ }
+ return toSend;
+ }
+ public void flush() {
+ if (batchedMode.get()) {
+ flushBatch(stealBatch());
}
for (LogHandler h : handlers) {
@@ -110,15 +123,15 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
}
}
- public synchronized void close() {
- if (hasBeenShutDown) {
+ public void close() {
+ if (hasBeenShutDown.getAndSet(true)) {
throw new IllegalStateException("Shutdown already in progress");
}
- hasBeenShutDown = true;
for (LogHandler ht : handlers) {
if (ht instanceof Thread) {
log.fine("Stopping " + ht);
+ // Todo: Very bad, never do....
((Thread) ht).interrupt();
}
}
@@ -134,17 +147,18 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* <p>
* If the thread is not alive it will be start()'ed.
*/
- public synchronized void registerLogHandler(LogHandler ht) {
- if (hasBeenShutDown) {
- throw new IllegalStateException("Tried to register LogHandler on" +
- " LogDispatcher which was shut down");
+ public void registerLogHandler(LogHandler ht) {
+ if (hasBeenShutDown.get()) {
+ throw new IllegalStateException("Tried to register LogHandler on LogDispatcher which was shut down");
}
- if (handlers.contains(ht)) {
- log.warning("LogHandler was already registered: " + ht);
- return;
+ synchronized (this) {
+ if (handlers.contains(ht)) {
+ log.warning("LogHandler was already registered: " + ht);
+ return;
+ }
+ handlers.add(ht);
}
- handlers.add(ht);
if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) {
((Thread) ht).start();
@@ -166,19 +180,16 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
*
* @return Returns the number of messages that we have seen.
*/
- public synchronized int getMessageCount() {
- return messageCount;
+ public int getMessageCount() {
+ return messageCount.get();
}
/**
* Hook which is called when the select loop has finished.
*/
public void selectLoopHook(boolean before) {
- if (batchedMode) {
- flushBatch();
+ if (batchedMode.get()) {
+ flushBatch(stealBatch());
}
-
- lastRoundCount = messageCount - roundCount;
- roundCount = messageCount;
}
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java
index b6b64dbf5dd..a1cc192be2a 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/IdempotentTask.java
@@ -3,8 +3,12 @@ package com.yahoo.vespa.hosted.node.admin.component;
/**
* This class is thread unsafe: All method calls MUST be exclusive and serialized.
+ *
+ * In a specialized environment it is possible to provide a richer context than TaskContext:
+ * - Define a subclass T of TaskContext with the additional functionality.
+ * - Define task classes that implement IdempotentTask&lt;T&gt;.
*/
-public interface IdempotentTask {
+public interface IdempotentTask<T extends TaskContext> {
String name();
/**
@@ -17,5 +21,5 @@ public interface IdempotentTask {
* @return false if the system was already converged, i.e. converge() was a no-op.
* @throws RuntimeException (or a subclass) if the task is unable to converge.
*/
- boolean converge(TaskContext context);
+ boolean converge(T context);
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java
index c54f9ee00c8..cbe9b32cc47 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskComponent.java
@@ -4,7 +4,7 @@ package com.yahoo.vespa.hosted.node.admin.component;
import com.yahoo.component.ComponentId;
import com.yahoo.component.chain.ChainedComponent;
-public abstract class TaskComponent extends ChainedComponent implements IdempotentTask {
+public abstract class TaskComponent extends ChainedComponent implements IdempotentTask<TaskContext> {
protected TaskComponent(ComponentId id) {
super(id);
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java
index d0a5570b8dc..0c49e478d6a 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TaskContext.java
@@ -50,10 +50,4 @@ public interface TaskContext {
* or later if the task failed. Either way, it will only be called once.
*/
default void logOnFailure(Logger logger, Supplier<String> messageSupplier) {}
-
- /**
- * Execute a task as a child of this task, and with its own sub-TaskContext. Please avoid
- * excessive task hierarchies.
- */
- boolean executeSubtask(IdempotentTask task);
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java
index 108d42114f7..bc0ec2c8700 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/component/TestTaskContext.java
@@ -4,7 +4,6 @@ package com.yahoo.vespa.hosted.node.admin.component;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.Supplier;
import java.util.logging.Logger;
public class TestTaskContext implements TaskContext {
@@ -18,9 +17,6 @@ public class TestTaskContext implements TaskContext {
@Override
public void log(Logger logger, String message) { }
- @Override
- public void logOnFailure(Logger logger, Supplier<String> messageSupplier) { }
-
public List<String> getSystemModificationLog() {
return systemModifications;
}
@@ -28,9 +24,4 @@ public class TestTaskContext implements TaskContext {
public void clearSystemModificationLog() {
systemModifications.clear();
}
-
- @Override
- public boolean executeSubtask(IdempotentTask task) {
- throw new UnsupportedOperationException();
- }
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
index d9d7b4a5d12..2a140945f43 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java
@@ -213,23 +213,23 @@ class NodeAllocation {
* @return the final list of nodes
*/
List<Node> finalNodes(List<Node> surplusNodes) {
- long currentRetired = nodes.stream().filter(node -> node.node.allocation().get().membership().retired()).count();
- long surplus = requestedNodes.surplusGiven(nodes.size()) - currentRetired;
+ int currentRetiredCount = (int) nodes.stream().filter(node -> node.node.allocation().get().membership().retired()).count();
+ int deltaRetiredCount = requestedNodes.idealRetiredCount(nodes.size(), currentRetiredCount) - currentRetiredCount;
- if (surplus > 0) { // retire until surplus is 0, prefer to retire higher indexes to minimize redistribution
+ if (deltaRetiredCount > 0) { // retire until deltaRetiredCount is 0, prefer to retire higher indexes to minimize redistribution
for (PrioritizableNode node : byDecreasingIndex(nodes)) {
if ( ! node.node.allocation().get().membership().retired() && node.node.state().equals(Node.State.active)) {
node.node = node.node.retire(Agent.application, clock.instant());
surplusNodes.add(node.node); // offer this node to other groups
- if (--surplus == 0) break;
+ if (--deltaRetiredCount == 0) break;
}
}
}
- else if (surplus < 0) { // unretire until surplus is 0
+ else if (deltaRetiredCount < 0) { // unretire until deltaRetiredCount is 0
for (PrioritizableNode node : byIncreasingIndex(nodes)) {
if ( node.node.allocation().get().membership().retired() && hasCompatibleFlavor(node.node)) {
node.node = node.node.unretire();
- if (++surplus == 0) break;
+ if (++deltaRetiredCount == 0) break;
}
}
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java
index 23a6e3a8b9a..dc3f4a64421 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeSpec.java
@@ -34,8 +34,8 @@ public interface NodeSpec {
/** Returns whether the given node count is sufficient to fulfill this spec */
boolean fulfilledBy(int count);
- /** Returns the amount the given count is above the minimum amount needed to fulfill this request */
- int surplusGiven(int count);
+ /** Returns the ideal number of nodes that should be retired to fulfill this spec */
+ int idealRetiredCount(int acceptedCount, int currentRetiredCount);
/** Returns a specification of a fraction of all the nodes of this. It is assumed the argument is a valid divisor. */
NodeSpec fraction(int divisor);
@@ -97,7 +97,7 @@ public interface NodeSpec {
public boolean saturatedBy(int count) { return fulfilledBy(count); } // min=max for count specs
@Override
- public int surplusGiven(int count) { return count - this.count; }
+ public int idealRetiredCount(int acceptedCount, int currentRetiredCount) { return acceptedCount - this.count; }
@Override
public NodeSpec fraction(int divisor) { return new CountNodeSpec(count/divisor, requestedFlavor); }
@@ -152,7 +152,14 @@ public interface NodeSpec {
public boolean saturatedBy(int count) { return false; }
@Override
- public int surplusGiven(int count) { return 0; }
+ public int idealRetiredCount(int acceptedCount, int currentRetiredCount) {
+ /*
+ * All nodes marked with wantToRetire get marked as retired just before this function is called,
+ * the job of this function is to throttle the retired count. If no nodes are marked as retired
+ * then continue this way, otherwise allow only 1 node to be retired
+ */
+ return Math.min(1, currentRetiredCount);
+ }
@Override
public NodeSpec fraction(int divisor) { return this; }
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java
index 873193ac3b8..97fde2274f2 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java
@@ -11,15 +11,22 @@ import com.yahoo.config.provision.NodeType;
import com.yahoo.config.provision.RegionName;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.hosted.provision.Node;
+import com.yahoo.vespa.hosted.provision.maintenance.JobControl;
+import com.yahoo.vespa.hosted.provision.maintenance.RetiredExpirer;
import com.yahoo.vespa.hosted.provision.node.Agent;
+import com.yahoo.vespa.hosted.provision.testutils.MockDeployer;
+import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests provisioning by node type instead of by count and flavor
@@ -28,28 +35,31 @@ import static org.junit.Assert.assertFalse;
*/
public class NodeTypeProvisioningTest {
- @Test
- public void proxy_deployment() {
- ProvisioningTester tester = new ProvisioningTester(new Zone(Environment.prod, RegionName.from("us-east")));
+ private final ProvisioningTester tester = new ProvisioningTester(new Zone(Environment.prod, RegionName.from("us-east")));
+
+ private final ApplicationId application = tester.makeApplicationId(); // application using proxy nodes
+ private final Capacity capacity = Capacity.fromRequiredNodeType(NodeType.proxy);
+ private final ClusterSpec clusterSpec = ClusterSpec.request(
+ ClusterSpec.Type.container, ClusterSpec.Id.from("test"), Version.fromString("6.42"));
+ @Before
+ public void setup() {
tester.makeReadyNodes( 1, "small", NodeType.proxy);
tester.makeReadyNodes( 3, "small", NodeType.host);
tester.makeReadyNodes( 5, "small", NodeType.tenant);
tester.makeReadyNodes(10, "large", NodeType.proxy);
tester.makeReadyNodes(20, "large", NodeType.host);
tester.makeReadyNodes(40, "large", NodeType.tenant);
+ }
- ApplicationId application = tester.makeApplicationId(); // application using proxy nodes
-
-
+ @Test
+ public void proxy_deployment() {
{ // Deploy
List<HostSpec> hosts = deployProxies(application, tester);
assertEquals("Reserved all proxies", 11, hosts.size());
tester.activate(application, new HashSet<>(hosts));
List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
assertEquals("Activated all proxies", 11, nodes.size());
- for (Node node : nodes)
- assertEquals(NodeType.proxy, node.type());
}
{ // Redeploy with no changes
@@ -83,14 +93,178 @@ public class NodeTypeProvisioningTest {
}
}
+ @Test
+ public void retire_proxy() {
+ MockDeployer deployer = new MockDeployer(
+ tester.provisioner(),
+ Collections.singletonMap(
+ application, new MockDeployer.ApplicationContext(application, clusterSpec, capacity, 1)));
+ RetiredExpirer retiredExpirer = new RetiredExpirer(tester.nodeRepository(), tester.orchestrator(), deployer,
+ tester.clock(), Duration.ofDays(30), Duration.ofMinutes(10), new JobControl(tester.nodeRepository().database()));
+
+ { // Deploy
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals("Reserved all proxies", 11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals("Activated all proxies", 11, nodes.size());
+ }
+
+ Node nodeToRetire = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active).get(5);
+ { // Pick out a node and retire it
+ tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true)));
+
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(11, nodes.size());
+
+ // Verify that wantToRetire has been propagated
+ assertTrue(tester.nodeRepository().getNode(nodeToRetire.hostname())
+ .flatMap(Node::allocation)
+ .map(allocation -> allocation.membership().retired())
+ .orElseThrow(RuntimeException::new));
+ }
+
+ { // Redeploying while the node is still retiring has no effect
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(11, nodes.size());
+
+ // Verify that the node is still marked as retired
+ assertTrue(tester.nodeRepository().getNode(nodeToRetire.hostname())
+ .flatMap(Node::allocation)
+ .map(allocation -> allocation.membership().retired())
+ .orElseThrow(RuntimeException::new));
+ }
+
+ {
+ tester.advanceTime(Duration.ofMinutes(11));
+ retiredExpirer.run();
+
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(10, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(10, nodes.size());
+
+ // Verify that the node is now inactive
+ assertEquals(Node.State.inactive, tester.nodeRepository().getNode(nodeToRetire.hostname())
+ .orElseThrow(RuntimeException::new).state());
+ }
+ }
+
+ @Test
+ public void retire_multiple_proxy_simultaneously() {
+ MockDeployer deployer = new MockDeployer(
+ tester.provisioner(),
+ Collections.singletonMap(
+ application, new MockDeployer.ApplicationContext(application, clusterSpec, capacity, 1)));
+ RetiredExpirer retiredExpirer = new RetiredExpirer(tester.nodeRepository(), tester.orchestrator(), deployer,
+ tester.clock(), Duration.ofDays(30), Duration.ofMinutes(10), new JobControl(tester.nodeRepository().database()));
+ final int numNodesToRetire = 5;
+
+ { // Deploy
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals("Reserved all proxies", 11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals("Activated all proxies", 11, nodes.size());
+ }
+
+ List<Node> nodesToRetire = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active)
+ .subList(3, 3 + numNodesToRetire);
+ String currentyRetiringHostname;
+ {
+ nodesToRetire.forEach(nodeToRetire ->
+ tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true))));
+
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(11, nodes.size());
+
+ // Verify that wantToRetire has been propagated
+ List<Node> nodesCurrentlyRetiring = nodes.stream()
+ .filter(node -> node.allocation().get().membership().retired())
+ .collect(Collectors.toList());
+ assertEquals(1, nodesCurrentlyRetiring.size());
+
+ // The retiring node should be one of the nodes we marked for retirement
+ currentyRetiringHostname = nodesCurrentlyRetiring.get(0).hostname();
+ assertTrue(nodesToRetire.stream().map(Node::hostname).filter(hostname -> hostname.equals(currentyRetiringHostname)).count() == 1);
+ }
+
+ { // Redeploying while the node is still retiring has no effect
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(11, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(11, nodes.size());
+
+ // Verify that wantToRetire has been propagated
+ List<Node> nodesCurrentlyRetiring = nodes.stream()
+ .filter(node -> node.allocation().get().membership().retired())
+ .collect(Collectors.toList());
+ assertEquals(1, nodesCurrentlyRetiring.size());
+
+ // The node that started retiring is still the only one retiring
+ assertEquals(currentyRetiringHostname, nodesCurrentlyRetiring.get(0).hostname());
+ }
+
+ {
+ tester.advanceTime(Duration.ofMinutes(11));
+ retiredExpirer.run();
+
+ List<HostSpec> hosts = deployProxies(application, tester);
+ assertEquals(10, hosts.size());
+ tester.activate(application, new HashSet<>(hosts));
+ List<Node> nodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active);
+ assertEquals(10, nodes.size());
+
+ // Verify the node we previously set to retire has finished retiring
+ assertEquals(Node.State.inactive, tester.nodeRepository().getNode(currentyRetiringHostname)
+ .orElseThrow(RuntimeException::new).state());
+
+ // Verify that a node is currently retiring
+ List<Node> nodesCurrentlyRetiring = nodes.stream()
+ .filter(node -> node.allocation().get().membership().retired())
+ .collect(Collectors.toList());
+ assertEquals(1, nodesCurrentlyRetiring.size());
+
+ // This node is different from the one that was retiring previously
+ String newRetiringHostname = nodesCurrentlyRetiring.get(0).hostname();
+ assertNotEquals(currentyRetiringHostname, newRetiringHostname);
+ // ... but is one of the nodes that were put to wantToRetire earlier
+ assertTrue(nodesToRetire.stream().map(Node::hostname).filter(hostname -> hostname.equals(newRetiringHostname)).count() == 1);
+ }
+
+
+ for (int i = 0; i < 10; i++){
+ tester.advanceTime(Duration.ofMinutes(11));
+ retiredExpirer.run();
+ List<HostSpec> hosts = deployProxies(application, tester);
+ tester.activate(application, new HashSet<>(hosts));
+ }
+
+ // After a long time, all currently active proxy nodes are not marked with wantToRetire or as retired
+ long numRetiredActiveProxyNodes = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active).stream()
+ .filter(node -> !node.status().wantToRetire())
+ .filter(node -> !node.allocation().get().membership().retired())
+ .count();
+ assertEquals(11 - numNodesToRetire, numRetiredActiveProxyNodes);
+
+ // All the nodes that were marked with wantToRetire earlier are now inactive
+ assertEquals(nodesToRetire.stream().map(Node::hostname).collect(Collectors.toSet()),
+ tester.nodeRepository().getNodes(Node.State.inactive).stream().map(Node::hostname).collect(Collectors.toSet()));
+ }
+
private List<HostSpec> deployProxies(ApplicationId application, ProvisioningTester tester) {
- return tester.prepare(application,
- ClusterSpec.request(ClusterSpec.Type.container,
- ClusterSpec.Id.from("test"),
- Version.fromString("6.42")),
- Capacity.fromRequiredNodeType(NodeType.proxy),
- 1);
-
+ return tester.prepare(application, clusterSpec, capacity, 1);
}
}