summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--application/pom.xml12
-rw-r--r--container-dev/pom.xml22
-rw-r--r--container-search/pom.xml3
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java12
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java40
-rw-r--r--docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java16
-rw-r--r--docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java6
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java5
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java21
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java8
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java24
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java10
-rw-r--r--pom.xml11
-rw-r--r--storage/src/tests/common/testhelper.cpp1
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt2
-rw-r--r--storage/src/tests/persistence/provider_error_wrapper_test.cpp140
-rw-r--r--storage/src/tests/persistence/providershutdownwrappertest.cpp85
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp85
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp82
-rw-r--r--storage/src/vespa/storage/config/stor-server.def9
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h7
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h8
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp (renamed from storage/src/vespa/storage/persistence/providershutdownwrapper.cpp)93
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h (renamed from storage/src/vespa/storage/persistence/providershutdownwrapper.h)50
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp160
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h39
-rw-r--r--storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/service_layer_error_listener.h36
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp23
-rw-r--r--vespaclient-java/pom.xml7
35 files changed, 694 insertions, 369 deletions
diff --git a/application/pom.xml b/application/pom.xml
index 3fd9e8054e7..2d9096e49f1 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -75,6 +75,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- All dependencies that should be visible in test classpath, but not compile classpath,
+ for user projects must be added in compile scope here.
+ These dependencies are explicitly excluded (or set to non-compile scope) in the container-dev module. -->
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/container-dev/pom.xml b/container-dev/pom.xml
index 7ac2eb03016..e858ba1c50b 100644
--- a/container-dev/pom.xml
+++ b/container-dev/pom.xml
@@ -112,11 +112,33 @@
<groupId>com.yahoo.vespa</groupId>
<artifactId>container-search-and-docproc</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>config-bundle</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- Dependencies below are added explicitly to exclude transitive deps that are not provided runtime by the container,
+ and hence make them invisible to user projects' build classpath.
+ Excluded artifacts should be added explicitly to the application module to make then visible in users' test classpath. -->
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>predicate-search-core</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
</project>
diff --git a/container-search/pom.xml b/container-search/pom.xml
index 837629c44b9..f622567acde 100644
--- a/container-search/pom.xml
+++ b/container-search/pom.xml
@@ -143,7 +143,6 @@
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
- <version>4.5</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
@@ -211,7 +210,7 @@
<plugin> <!-- For the YQL query language -->
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
- <version>4.5</version>
+ <version>${antlr4.version}</version>
<executions>
<execution>
<configuration>
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java
index b512b7b5f90..865908bdc8e 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java
@@ -6,7 +6,6 @@ import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
/**
* API to simplify the com.github.dockerjava API for clients,
@@ -60,9 +59,14 @@ public interface Docker {
Optional<Container> getContainer(ContainerName containerName);
- CompletableFuture<DockerImage> pullImageAsync(DockerImage image);
-
- boolean imageIsDownloaded(DockerImage image);
+ /**
+ * Checks if the image is currently being pulled or is already pulled, if not, starts an async
+ * pull of the image
+ *
+ * @param image Docker image to pull
+ * @return true iff image being pulled, false otherwise
+ */
+ boolean pullImageAsyncIfNeeded(DockerImage image);
void deleteImage(DockerImage dockerImage);
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java
index 938b277d90b..18c1ad1dc93 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java
@@ -37,10 +37,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -63,7 +64,7 @@ public class DockerImpl implements Docker {
private final Object monitor = new Object();
@GuardedBy("monitor")
- private final Map<DockerImage, CompletableFuture<DockerImage>> scheduledPulls = new HashMap<>();
+ private final Set<DockerImage> scheduledPulls = new HashSet<>();
// Exposed for testing.
final DockerClient dockerClient;
@@ -159,37 +160,30 @@ public class DockerImpl implements Docker {
}
@Override
- public CompletableFuture<DockerImage> pullImageAsync(final DockerImage image) {
- final CompletableFuture<DockerImage> completionListener;
- synchronized (monitor) {
- if (scheduledPulls.containsKey(image)) {
- return scheduledPulls.get(image);
- }
- completionListener = new CompletableFuture<>();
- scheduledPulls.put(image, completionListener);
- }
-
+ public boolean pullImageAsyncIfNeeded(final DockerImage image) {
try {
- dockerClient.pullImageCmd(image.asString()).exec(new ImagePullCallback(image));
+ synchronized (monitor) {
+ if (scheduledPulls.contains(image)) return true;
+ if (imageIsDownloaded(image)) return false;
+ dockerClient.pullImageCmd(image.asString()).exec(new ImagePullCallback(image));
+ return false;
+ }
} catch (RuntimeException e) {
numberOfDockerDaemonFails.add();
throw new DockerException("Failed to pull image '" + image.asString() + "'", e);
}
-
- return completionListener;
}
- private CompletableFuture<DockerImage> removeScheduledPoll(final DockerImage image) {
+ private void removeScheduledPoll(final DockerImage image) {
synchronized (monitor) {
- return scheduledPulls.remove(image);
+ scheduledPulls.remove(image);
}
}
/**
* Check if a given image is already in the local registry
*/
- @Override
- public boolean imageIsDownloaded(final DockerImage dockerImage) {
+ private boolean imageIsDownloaded(final DockerImage dockerImage) {
return inspectImage(dockerImage).isPresent();
}
@@ -448,7 +442,8 @@ public class DockerImpl implements Docker {
@Override
public void onError(Throwable throwable) {
- removeScheduledPoll(dockerImage).completeExceptionally(throwable);
+ removeScheduledPoll(dockerImage);
+ throw new DockerClientException("Could not download image: " + dockerImage);
}
@@ -457,10 +452,9 @@ public class DockerImpl implements Docker {
Optional<Image> image = inspectImage(dockerImage);
if (image.isPresent()) { // Download successful, update image GC with the newly downloaded image
dockerImageGC.ifPresent(imageGC -> imageGC.updateLastUsedTimeFor(image.get().getId()));
- removeScheduledPoll(dockerImage).complete(dockerImage);
+ removeScheduledPoll(dockerImage);
} else {
- removeScheduledPoll(dockerImage).completeExceptionally(
- new DockerClientException("Could not download image: " + dockerImage));
+ throw new DockerClientException("Could not download image: " + dockerImage);
}
}
}
diff --git a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java
index 4e2567b9fa8..310fb4ffdd3 100644
--- a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java
+++ b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java
@@ -14,7 +14,6 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@@ -30,21 +29,6 @@ public class DockerTest {
private static final DockerImage dockerImage = new DockerImage("simple-ipv6-server:Dockerfile");
private static final String MANAGER_NAME = "docker-test";
- // It is ignored since it is a bit slow and unstable, at least on Mac.
- @Ignore
- @Test
- public void testDockerImagePullDelete() throws ExecutionException, InterruptedException {
- DockerImage dockerImage = new DockerImage("busybox:1.24.0");
-
- // Pull the image and wait for the pull to complete
- docker.pullImageAsync(dockerImage).get();
- assertTrue("Failed to download " + dockerImage.asString() + " image", docker.imageIsDownloaded(dockerImage));
-
- // Remove the image
- docker.deleteImage(dockerImage);
- assertFalse("Failed to delete " + dockerImage.asString() + " image", docker.imageIsDownloaded(dockerImage));
- }
-
// Ignored because the test is very slow (several minutes) when swap is enabled, to disable: (Linux)
// $ sudo swapoff -a
@Ignore
diff --git a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java
index 5f090abec71..915b3b53867 100644
--- a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java
+++ b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java
@@ -151,9 +151,11 @@ public class RunSystemTests {
}
private void buildVespaSystestDockerImage(Docker docker, DockerImage vespaBaseImage) throws IOException, ExecutionException, InterruptedException {
- if (!docker.imageIsDownloaded(vespaBaseImage)) {
+ if (docker.pullImageAsyncIfNeeded(vespaBaseImage)) {
logger.info("Pulling " + vespaBaseImage.asString() + " (This may take a while)");
- docker.pullImageAsync(vespaBaseImage).get();
+ while (docker.pullImageAsyncIfNeeded(vespaBaseImage)) {
+ Thread.sleep(5000);
+ };
}
Path systestBuildDirectory = pathToVespaRepoInHost.resolve("docker-api/src/test/resources/systest/");
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java
index 61f31ce9d8d..1d11e221a9b 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java
@@ -17,12 +17,9 @@ public interface DockerOperations {
void removeContainer(Container existingContainer);
- // Returns false if image is already downloaded
- boolean shouldScheduleDownloadOfImage(DockerImage dockerImage);
-
Optional<Container> getContainer(ContainerName containerName);
- void scheduleDownloadOfImage(ContainerName containerName, DockerImage dockerImage, Runnable callback);
+ boolean pullImageAsyncIfNeeded(DockerImage dockerImage);
ProcessResult executeCommandInContainerAsRoot(ContainerName containerName, String... command);
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java
index dbf75d0ee03..b21fc573c0f 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -169,12 +168,6 @@ public class DockerOperationsImpl implements DockerOperations {
docker.deleteContainer(containerName);
}
- // Returns true if scheduling download
- @Override
- public boolean shouldScheduleDownloadOfImage(final DockerImage dockerImage) {
- return !docker.imageIsDownloaded(dockerImage);
- }
-
@Override
public Optional<Container> getContainer(ContainerName containerName) {
return docker.getContainer(containerName);
@@ -212,18 +205,8 @@ public class DockerOperationsImpl implements DockerOperations {
}
@Override
- public void scheduleDownloadOfImage(ContainerName containerName, DockerImage dockerImage, Runnable callback) {
- PrefixLogger logger = PrefixLogger.getNodeAgentLogger(DockerOperationsImpl.class, containerName);
-
- logger.info("Schedule async download of " + dockerImage);
- final CompletableFuture<DockerImage> asyncPullResult = docker.pullImageAsync(dockerImage);
- asyncPullResult.whenComplete((image, throwable) -> {
- if (throwable != null) {
- logger.warning("Failed to pull " + dockerImage, throwable);
- return;
- }
- callback.run();
- });
+ public boolean pullImageAsyncIfNeeded(DockerImage dockerImage) {
+ return docker.pullImageAsyncIfNeeded(dockerImage);
}
ProcessResult executeCommandInContainer(ContainerName containerName, String... command) {
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index 9d2198cedcc..0d110adf5a4 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -352,14 +352,8 @@ public class NodeAgentImpl implements NodeAgent {
private void scheduleDownLoadIfNeeded(ContainerNodeSpec nodeSpec) {
if (nodeSpec.currentDockerImage.equals(nodeSpec.wantedDockerImage)) return;
- if (dockerOperations.shouldScheduleDownloadOfImage(nodeSpec.wantedDockerImage.get())) {
- if (nodeSpec.wantedDockerImage.get().equals(imageBeingDownloaded)) {
- // Downloading already scheduled, but not done.
- return;
- }
+ if (dockerOperations.pullImageAsyncIfNeeded(nodeSpec.wantedDockerImage.get())) {
imageBeingDownloaded = nodeSpec.wantedDockerImage.get();
- // Create a signalWorkToBeDone when download is finished.
- dockerOperations.scheduleDownloadOfImage(containerName, imageBeingDownloaded, this::signalWorkToBeDone);
} else if (imageBeingDownloaded != null) { // Image was downloading, but now it's ready
imageBeingDownloaded = null;
}
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java
index be7fa4e49c6..dc111251af7 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java
@@ -15,7 +15,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@@ -111,31 +110,14 @@ public class DockerMock implements Docker {
}
@Override
- public CompletableFuture<DockerImage> pullImageAsync(DockerImage image) {
+ public boolean pullImageAsyncIfNeeded(DockerImage image) {
synchronized (monitor) {
- callOrderVerifier.add("pullImageAsync with " + image);
- final CompletableFuture<DockerImage> completableFuture = new CompletableFuture<>();
- new Thread() {
- public void run() {
- try {
- Thread.sleep(500);
- completableFuture.complete(image);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- }.start();
- return completableFuture;
+ callOrderVerifier.add("pullImageAsyncIfNeeded with " + image);
+ return false;
}
}
@Override
- public boolean imageIsDownloaded(DockerImage image) {
- return true;
- }
-
- @Override
public void deleteImage(DockerImage dockerImage) {
}
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
index 3ddb28eed77..1c63c70453e 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
@@ -114,7 +114,7 @@ public class NodeAgentImplTest {
verify(dockerOperations, never()).removeContainer(any());
verify(orchestrator, never()).suspend(any(String.class));
- verify(dockerOperations, never()).scheduleDownloadOfImage(eq(containerName), any(), any());
+ verify(dockerOperations, never()).pullImageAsyncIfNeeded(any());
final InOrder inOrder = inOrder(dockerOperations, orchestrator, nodeRepository);
// TODO: Verify this isn't run unless 1st time
@@ -147,14 +147,15 @@ public class NodeAgentImplTest {
when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec));
when(pathResolver.getApplicationStoragePathForNodeAdmin()).thenReturn(Files.createTempDirectory("foo"));
+ when(dockerOperations.pullImageAsyncIfNeeded(eq(dockerImage))).thenReturn(false);
nodeAgent.converge();
verify(dockerOperations, never()).removeContainer(any());
verify(orchestrator, never()).suspend(any(String.class));
- verify(dockerOperations, never()).scheduleDownloadOfImage(eq(containerName), any(), any());
final InOrder inOrder = inOrder(dockerOperations, orchestrator, nodeRepository, aclMaintainer);
+ inOrder.verify(dockerOperations, times(1)).pullImageAsyncIfNeeded(eq(dockerImage));
inOrder.verify(aclMaintainer, times(1)).run();
inOrder.verify(dockerOperations, times(1)).startContainer(eq(containerName), eq(nodeSpec));
inOrder.verify(dockerOperations, times(1)).resumeNode(eq(containerName));
@@ -185,7 +186,7 @@ public class NodeAgentImplTest {
NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true);
when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec));
- when(dockerOperations.shouldScheduleDownloadOfImage(any())).thenReturn(true);
+ when(dockerOperations.pullImageAsyncIfNeeded(any())).thenReturn(true);
nodeAgent.converge();
@@ -194,8 +195,7 @@ public class NodeAgentImplTest {
verify(dockerOperations, never()).removeContainer(any());
final InOrder inOrder = inOrder(dockerOperations);
- inOrder.verify(dockerOperations, times(1)).shouldScheduleDownloadOfImage(eq(newDockerImage));
- inOrder.verify(dockerOperations, times(1)).scheduleDownloadOfImage(eq(containerName), eq(newDockerImage), any());
+ inOrder.verify(dockerOperations, times(1)).pullImageAsyncIfNeeded(eq(newDockerImage));
}
@Test
diff --git a/pom.xml b/pom.xml
index 63783722af9..a5101861f58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr3-maven-plugin</artifactId>
- <version>3.5.2</version>
+ <version>${antlr.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -569,7 +569,12 @@
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
- <version>3.5.2</version>
+ <version>${antlr.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>${antlr4.version}</version>
</dependency>
<dependency>
<groupId>org.apache.aries.spifly</groupId>
@@ -884,6 +889,8 @@
<properties>
<javax.ws.rs-api.version>2.0.1</javax.ws.rs-api.version> <!-- must be kept in sync with version used by current jersey2.version -->
+ <antlr.version>3.5.2</antlr.version>
+ <antlr4.version>4.5</antlr4.version>
<aries.spifly.version>1.0.8</aries.spifly.version>
<aries.util.version>1.0.0</aries.util.version>
<asm-debug-all.version>5.0.3</asm-debug-all.version>
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
index fa85f4c6a45..296a9a3cc0f 100644
--- a/storage/src/tests/common/testhelper.cpp
+++ b/storage/src/tests/common/testhelper.cpp
@@ -150,6 +150,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro
config->set("enable_dead_lock_detector_warnings", "false");
config->set("max_merges_per_node", "25");
config->set("max_merge_queue_size", "20");
+ config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0");
vespalib::string rootFolder = rootOfRoot + "_";
rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor");
config->set("root_folder", rootFolder);
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 44aed95ba77..f5715ca3531 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -6,7 +6,7 @@ vespa_add_library(storage_testpersistence TEST
splitbitdetectortest.cpp
legacyoperationhandlertest.cpp
diskmoveoperationhandlertest.cpp
- providershutdownwrappertest.cpp
+ provider_error_wrapper_test.cpp
mergehandlertest.cpp
persistencethread_splittest.cpp
bucketownershipnotifiertest.cpp
diff --git a/storage/src/tests/persistence/provider_error_wrapper_test.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
new file mode 100644
index 00000000000..7a8f26cbe93
--- /dev/null
+++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
@@ -0,0 +1,140 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <tests/persistence/persistencetestutils.h>
+#include <tests/persistence/common/persistenceproviderwrapper.h>
+
+namespace storage {
+
+class ProviderErrorWrapperTest : public SingleDiskPersistenceTestUtils {
+public:
+ CPPUNIT_TEST_SUITE(ProviderErrorWrapperTest);
+ CPPUNIT_TEST(fatal_error_invokes_listener);
+ CPPUNIT_TEST(resource_exhaustion_error_invokes_listener);
+ CPPUNIT_TEST(listener_not_invoked_on_success);
+ CPPUNIT_TEST(listener_not_invoked_on_regular_errors);
+ CPPUNIT_TEST(multiple_listeners_can_be_registered);
+ CPPUNIT_TEST_SUITE_END();
+
+ void fatal_error_invokes_listener();
+ void resource_exhaustion_error_invokes_listener();
+ void listener_not_invoked_on_success();
+ void listener_not_invoked_on_regular_errors();
+ void multiple_listeners_can_be_registered();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(ProviderErrorWrapperTest);
+
+namespace {
+
+struct MockErrorListener : ProviderErrorListener {
+ void on_fatal_error(vespalib::stringref message) override {
+ _seen_fatal_error = true;
+ _fatal_error = message;
+ }
+ void on_resource_exhaustion_error(vespalib::stringref message) override {
+ _seen_resource_exhaustion_error = true;
+ _resource_exhaustion_error = message;
+ }
+
+ vespalib::string _fatal_error;
+ vespalib::string _resource_exhaustion_error;
+ bool _seen_fatal_error{false};
+ bool _seen_resource_exhaustion_error{false};
+};
+
+struct Fixture {
+ // We wrap the wrapper. It's turtles all the way down!
+ PersistenceProviderWrapper providerWrapper;
+ TestServiceLayerApp app;
+ ServiceLayerComponent component;
+ ProviderErrorWrapper errorWrapper;
+
+ Fixture(spi::PersistenceProvider& provider)
+ : providerWrapper(provider),
+ app(),
+ component(app.getComponentRegister(), "dummy"),
+ errorWrapper(providerWrapper)
+ {
+ providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_ALL_OPERATIONS);
+ }
+ ~Fixture() {}
+
+ void perform_spi_operation() {
+ errorWrapper.getBucketInfo(spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0)));
+ }
+
+ void check_no_listener_invoked_for_error(MockErrorListener& listener, spi::Result::ErrorType error) {
+ providerWrapper.setResult(spi::Result(error, "beep boop"));
+ perform_spi_operation();
+ CPPUNIT_ASSERT(!listener._seen_fatal_error);
+ CPPUNIT_ASSERT(!listener._seen_resource_exhaustion_error);
+ }
+};
+
+}
+
+void ProviderErrorWrapperTest::fatal_error_invokes_listener() {
+ Fixture f(getPersistenceProvider());
+ auto listener = std::make_shared<MockErrorListener>();
+ f.errorWrapper.register_error_listener(listener);
+ f.providerWrapper.setResult(spi::Result(spi::Result::FATAL_ERROR, "eject! eject!"));
+
+ CPPUNIT_ASSERT(!listener->_seen_fatal_error);
+ f.perform_spi_operation();
+
+ CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error);
+ CPPUNIT_ASSERT(listener->_seen_fatal_error);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), listener->_fatal_error);
+}
+
+void ProviderErrorWrapperTest::resource_exhaustion_error_invokes_listener() {
+ Fixture f(getPersistenceProvider());
+ auto listener = std::make_shared<MockErrorListener>();
+ f.errorWrapper.register_error_listener(listener);
+ f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice"));
+
+ CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error);
+ f.perform_spi_operation();
+
+ CPPUNIT_ASSERT(!listener->_seen_fatal_error);
+ CPPUNIT_ASSERT(listener->_seen_resource_exhaustion_error);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("out of juice"), listener->_resource_exhaustion_error);
+}
+
+void ProviderErrorWrapperTest::listener_not_invoked_on_success() {
+ Fixture f(getPersistenceProvider());
+ auto listener = std::make_shared<MockErrorListener>();
+ f.errorWrapper.register_error_listener(listener);
+ f.perform_spi_operation();
+
+ CPPUNIT_ASSERT(!listener->_seen_fatal_error);
+ CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error);
+}
+
+void ProviderErrorWrapperTest::listener_not_invoked_on_regular_errors() {
+ Fixture f(getPersistenceProvider());
+ auto listener = std::make_shared<MockErrorListener>();
+ f.errorWrapper.register_error_listener(listener);
+
+ f.check_no_listener_invoked_for_error(*listener, spi::Result::TRANSIENT_ERROR);
+ f.check_no_listener_invoked_for_error(*listener, spi::Result::PERMANENT_ERROR);
+}
+
+void ProviderErrorWrapperTest::multiple_listeners_can_be_registered() {
+ Fixture f(getPersistenceProvider());
+ auto listener1 = std::make_shared<MockErrorListener>();
+ auto listener2 = std::make_shared<MockErrorListener>();
+ f.errorWrapper.register_error_listener(listener1);
+ f.errorWrapper.register_error_listener(listener2);
+
+ f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice"));
+ f.perform_spi_operation();
+
+ CPPUNIT_ASSERT(listener1->_seen_resource_exhaustion_error);
+ CPPUNIT_ASSERT(listener2->_seen_resource_exhaustion_error);
+}
+
+} // ns storage
+
+
diff --git a/storage/src/tests/persistence/providershutdownwrappertest.cpp b/storage/src/tests/persistence/providershutdownwrappertest.cpp
deleted file mode 100644
index 3475aa58dfd..00000000000
--- a/storage/src/tests/persistence/providershutdownwrappertest.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vdstestlib/cppunit/macros.h>
-#include <tests/persistence/persistencetestutils.h>
-#include <tests/persistence/common/persistenceproviderwrapper.h>
-
-namespace storage {
-
-class ProviderShutdownWrapperTest : public SingleDiskPersistenceTestUtils
-{
-public:
- CPPUNIT_TEST_SUITE(ProviderShutdownWrapperTest);
- CPPUNIT_TEST(testShutdownOnFatalError);
- CPPUNIT_TEST_SUITE_END();
-
- void testShutdownOnFatalError();
-};
-
-CPPUNIT_TEST_SUITE_REGISTRATION(ProviderShutdownWrapperTest);
-
-namespace {
-
-class TestShutdownListener
- : public framework::defaultimplementation::ShutdownListener
-{
-public:
- TestShutdownListener() : _reason() {}
-
- void requestShutdown(vespalib::stringref reason) override {
- _reason = reason;
- }
-
- bool shutdownRequested() const { return !_reason.empty(); }
- const vespalib::string& getReason() const { return _reason; }
-private:
- vespalib::string _reason;
-};
-
-}
-
-void
-ProviderShutdownWrapperTest::testShutdownOnFatalError()
-{
- // We wrap the wrapper. It's turtles all the way down!
- PersistenceProviderWrapper providerWrapper(
- getPersistenceProvider());
- TestServiceLayerApp app;
- ServiceLayerComponent component(app.getComponentRegister(), "dummy");
-
- ProviderShutdownWrapper shutdownWrapper(providerWrapper, component);
-
- TestShutdownListener shutdownListener;
-
- app.getComponentRegister().registerShutdownListener(shutdownListener);
-
- providerWrapper.setResult(
- spi::Result(spi::Result::FATAL_ERROR, "eject! eject!"));
- providerWrapper.setFailureMask(
- PersistenceProviderWrapper::FAIL_ALL_OPERATIONS);
-
- CPPUNIT_ASSERT(!shutdownListener.shutdownRequested());
- // This should cause the node to implicitly be shut down
- shutdownWrapper.getBucketInfo(
- spi::Bucket(document::BucketId(16, 1234),
- spi::PartitionId(0)));
-
- CPPUNIT_ASSERT(shutdownListener.shutdownRequested());
- CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"),
- shutdownListener.getReason());
-
- // Triggering a new error should not cause shutdown to be requested twice.
- providerWrapper.setResult(
- spi::Result(spi::Result::FATAL_ERROR, "boom!"));
-
- shutdownWrapper.getBucketInfo(
- spi::Bucket(document::BucketId(16, 1234),
- spi::PartitionId(0)));
-
- CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"),
- shutdownListener.getReason());
-}
-
-} // ns storage
-
-
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index eedc29989eb..959261dc8ff 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(storage_teststorageserver TEST
priorityconvertertest.cpp
statereportertest.cpp
changedbucketownershiphandlertest.cpp
+ service_layer_error_listener_test.cpp
DEPENDS
storage_storageserver
storage_testcommon
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 559d54d3620..29ff780807f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,10 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <cppunit/extensions/HelperMacros.h>
-#include <memory>
-#include <iterator>
-#include <vector>
-#include <algorithm>
-#include <ctime>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
@@ -16,6 +11,13 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <unordered_set>
+#include <memory>
+#include <iterator>
+#include <vector>
+#include <algorithm>
+#include <chrono>
+#include <thread>
using namespace document;
using namespace storage::api;
@@ -24,12 +26,12 @@ namespace storage {
namespace {
-struct MergeBuilder
-{
+struct MergeBuilder {
document::BucketId _bucket;
api::Timestamp _maxTimestamp;
std::vector<uint16_t> _nodes;
std::vector<uint16_t> _chain;
+ std::unordered_set<uint16_t> _source_only;
uint64_t _clusterStateVersion;
MergeBuilder(const document::BucketId& bucket)
@@ -41,6 +43,8 @@ struct MergeBuilder
nodes(0, 1, 2);
}
+ ~MergeBuilder();
+
MergeBuilder& nodes(uint16_t n0) {
_nodes.push_back(n0);
return *this;
@@ -82,11 +86,17 @@ struct MergeBuilder
_chain.push_back(n2);
return *this;
}
+ MergeBuilder& source_only(uint16_t node) {
+ _source_only.insert(node);
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
for (uint32_t i = 0; i < _nodes.size(); ++i) {
- n.push_back(_nodes[i]);
+ uint16_t node = _nodes[i];
+ bool source_only = (_source_only.find(node) != _source_only.end());
+ n.emplace_back(node, source_only);
}
std::shared_ptr<MergeBucketCommand> cmd(
new MergeBucketCommand(_bucket, n, _maxTimestamp,
@@ -97,6 +107,8 @@ struct MergeBuilder
}
};
+MergeBuilder::~MergeBuilder() {}
+
std::shared_ptr<api::SetSystemStateCommand>
makeSystemStateCmd(const std::string& state)
{
@@ -106,8 +118,7 @@ makeSystemStateCmd(const std::string& state)
} // anon ns
-class MergeThrottlerTest : public CppUnit::TestFixture
-{
+class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(MergeThrottlerTest);
CPPUNIT_TEST(testMergesConfig);
CPPUNIT_TEST(testChain);
@@ -133,6 +144,8 @@ class MergeThrottlerTest : public CppUnit::TestFixture
CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
+ CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
+ CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -162,6 +175,8 @@ public:
void testGetBucketDiffCommandNotInActiveSetIsRejected();
void testApplyBucketDiffCommandNotInActiveSetIsRejected();
void testNewClusterStateAbortsAllOutdatedActiveMerges();
+ void backpressure_busy_bounces_merges_for_configured_duration();
+ void source_only_merges_are_not_affected_by_backpressure();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -247,7 +262,7 @@ checkChain(const StorageMessage::SP& msg,
void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
{
- std::time_t start = std::time(0);
+ const auto start = std::chrono::steady_clock::now();
while (true) {
std::size_t count;
{
@@ -257,14 +272,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou
if (count == sz) {
break;
}
- std::time_t now = std::time(0);
- if (now - start > timeout) {
+ auto now = std::chrono::steady_clock::now();
+ if (now - start > std::chrono::seconds(timeout)) {
std::ostringstream os;
os << "Timeout while waiting for merge queue with " << sz << " items. Had "
<< count << " at timeout.";
throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC);
}
- FastOS_Thread::Sleep(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@@ -1491,8 +1506,7 @@ MergeThrottlerTest::sendAndExpectReply(
_topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime);
StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage(
expectedReplyType));
- api::StorageReply& storageReply(
- dynamic_cast<api::StorageReply&>(*reply));
+ auto& storageReply = dynamic_cast<api::StorageReply&>(*reply);
CPPUNIT_ASSERT_EQUAL(expectedResultCode,
storageReply.getResult().getResult());
}
@@ -1561,6 +1575,45 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
}
}
+void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() {
+ _servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
+
+ CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
+ _throttlers[0]->apply_timed_backpressure();
+ CPPUNIT_ASSERT(_throttlers[0]->backpressure_mode_active());
+ document::BucketId bucket(16, 6789);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(),
+ api::MessageType::MERGEBUCKET_REPLY,
+ api::ReturnCode::BUSY);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds
+ // Backpressure has now been lifted. New merges should be forwarded
+ // to next node in chain as expected instead of being bounced with a reply.
+ sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+}
+
+void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() {
+ _servers[2]->getClock().setAbsoluteTimeInSeconds(1000);
+ _throttlers[2]->apply_timed_backpressure();
+ document::BucketId bucket(16, 6789);
+
+ _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create());
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+}
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
new file mode 100644
index 00000000000..b726a24b6b6
--- /dev/null
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -0,0 +1,82 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/storageserver/service_layer_error_listener.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
+#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+
+namespace storage {
+
+class ServiceLayerErrorListenerTest : public CppUnit::TestFixture {
+public:
+ CPPUNIT_TEST_SUITE(ServiceLayerErrorListenerTest);
+ CPPUNIT_TEST(shutdown_invoked_on_fatal_error);
+ CPPUNIT_TEST(merge_throttle_backpressure_invoked_on_resource_exhaustion_error);
+ CPPUNIT_TEST_SUITE_END();
+
+ void shutdown_invoked_on_fatal_error();
+ void merge_throttle_backpressure_invoked_on_resource_exhaustion_error();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(ServiceLayerErrorListenerTest);
+
+namespace {
+
+class TestShutdownListener
+ : public framework::defaultimplementation::ShutdownListener
+{
+public:
+ TestShutdownListener() : _reason() {}
+
+ void requestShutdown(vespalib::stringref reason) override {
+ _reason = reason;
+ }
+
+ bool shutdown_requested() const { return !_reason.empty(); }
+ const vespalib::string& reason() const { return _reason; }
+private:
+ vespalib::string _reason;
+};
+
+struct Fixture {
+ vdstestlib::DirConfig config{getStandardConfig(true)};
+ TestServiceLayerApp app;
+ ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
+ MergeThrottler merge_throttler{config.getConfigId(), app.getComponentRegister()};
+ TestShutdownListener shutdown_listener;
+ ServiceLayerErrorListener error_listener{component, merge_throttler};
+
+ ~Fixture();
+};
+
+Fixture::~Fixture() {}
+
+}
+
+void ServiceLayerErrorListenerTest::shutdown_invoked_on_fatal_error() {
+ Fixture f;
+
+ f.app.getComponentRegister().registerShutdownListener(f.shutdown_listener);
+ CPPUNIT_ASSERT(!f.shutdown_listener.shutdown_requested());
+
+ f.error_listener.on_fatal_error("eject! eject!");
+ CPPUNIT_ASSERT(f.shutdown_listener.shutdown_requested());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason());
+
+ // Should only be invoked once
+ f.error_listener.on_fatal_error("here be dragons");
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason());
+}
+
+void ServiceLayerErrorListenerTest::merge_throttle_backpressure_invoked_on_resource_exhaustion_error() {
+ Fixture f;
+
+ CPPUNIT_ASSERT(!f.merge_throttler.backpressure_mode_active());
+ f.error_listener.on_resource_exhaustion_error("buy more RAM!");
+ CPPUNIT_ASSERT(f.merge_throttler.backpressure_mode_active());
+}
+
+}
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index cf2226b3a3b..fbc29e0234b 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -41,6 +41,15 @@ node_reliability int default=1 restart
max_merges_per_node int default=16
max_merge_queue_size int default=1024
+## If the persistence provider indicates that it has exhausted one or more
+## of its internal resources during a mutating operation, new merges will
+## be bounced for this duration. Not allowing further merges helps take
+## load off the node while it e.g. compacts its data stores or memory in
+## the background.
+## Note: this does not affect merges where the current node is marked as
+## "source only", as merges do not cause mutations on such nodes.
+resource_exhaustion_merge_back_pressure_duration_secs double default=30.0
+
## Whether the deadlock detector should be enabled or not. If disabled, it will
## still run, but it will never actually abort the process it is running in.
enable_dead_lock_detector bool default=false restart
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 42ee585988f..1f2299bf46d 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -9,7 +9,7 @@ vespa_add_library(storage_spersistence OBJECT
types.cpp
mergehandler.cpp
bucketprocessor.cpp
- providershutdownwrapper.cpp
+ provider_error_wrapper.cpp
bucketownershipnotifier.cpp
fieldvisitor.cpp
testandsethelper.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 1e74c957b01..b46217f6443 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -37,9 +37,9 @@ FileStorManager(const config::ConfigUri & configUri,
_component(compReg, "filestormanager"),
_partitions(partitions),
_providerCore(provider),
- _providerShutdown(_providerCore, _component),
+ _providerErrorWrapper(_providerCore),
_nodeUpInLastNodeStateSeenByProvider(false),
- _providerMetric(new spi::MetricPersistenceProvider(_providerShutdown)),
+ _providerMetric(new spi::MetricPersistenceProvider(_providerErrorWrapper)),
_provider(_providerMetric.get()),
_bucketIdFactory(_component.getBucketIdFactory()),
_configUri(configUri),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 82ea5ecba7f..05b9b9bc430 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -23,7 +23,7 @@
#include <vespa/config-stor-filestor.h>
#include <vespa/storage/persistence/diskthread.h>
-#include <vespa/storage/persistence/providershutdownwrapper.h>
+#include <vespa/storage/persistence/provider_error_wrapper.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
@@ -55,7 +55,7 @@ class FileStorManager : public StorageLinkQueued,
ServiceLayerComponent _component;
const spi::PartitionStateList& _partitions;
spi::PersistenceProvider& _providerCore;
- ProviderShutdownWrapper _providerShutdown;
+ ProviderErrorWrapper _providerErrorWrapper;
bool _nodeUpInLastNodeStateSeenByProvider;
spi::MetricPersistenceProvider::UP _providerMetric;
spi::PersistenceProvider* _provider;
@@ -118,6 +118,9 @@ public:
spi::PersistenceProvider& getPersistenceProvider() {
return *_provider;
}
+ ProviderErrorWrapper& error_wrapper() noexcept {
+ return _providerErrorWrapper;
+ }
void handleNewState() override;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index fe09758e842..d4dadf94184 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -95,10 +95,10 @@ struct FileStorThreadMetrics : public metrics::MetricSet
metrics::LongCountMetric bytesMerged;
metrics::LongCountMetric getBucketDiffReply;
metrics::LongCountMetric applyBucketDiffReply;
- metrics::LongAverageMetric mergeLatencyTotal;
- metrics::LongAverageMetric mergeMetadataReadLatency;
- metrics::LongAverageMetric mergeDataReadLatency;
- metrics::LongAverageMetric mergeDataWriteLatency;
+ metrics::DoubleAverageMetric mergeLatencyTotal;
+ metrics::DoubleAverageMetric mergeMetadataReadLatency;
+ metrics::DoubleAverageMetric mergeDataReadLatency;
+ metrics::DoubleAverageMetric mergeDataWriteLatency;
metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded;
metrics::LongAverageMetric batchingSize;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index f545d673032..218d2f7dd23 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -7,7 +7,7 @@
#include "mergehandler.h"
#include "diskmoveoperationhandler.h"
#include "persistenceutil.h"
-#include "providershutdownwrapper.h"
+#include "provider_error_wrapper.h"
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/common/statusmessages.h>
diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 1a2f9620e65..80873829064 100644
--- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -1,77 +1,80 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "providershutdownwrapper.h"
+#include "provider_error_wrapper.h"
#include "persistenceutil.h"
-#include <vespa/log/log.h>
-
-LOG_SETUP(".persistence.shutdownwrapper");
namespace storage {
template <typename ResultType>
ResultType
-ProviderShutdownWrapper::checkResult(ResultType&& result) const
+ProviderErrorWrapper::checkResult(ResultType&& result) const
{
if (result.getErrorCode() == spi::Result::FATAL_ERROR) {
- vespalib::LockGuard guard(_shutdownLock);
- if (_shutdownTriggered) {
- LOG(debug,
- "Received FATAL_ERROR from persistence provider: %s. "
- "Node has already been instructed to shut down so "
- "not doing anything now.",
- result.getErrorMessage().c_str());
- } else {
- LOG(info,
- "Received FATAL_ERROR from persistence provider, "
- "shutting down node: %s",
- result.getErrorMessage().c_str());
- const_cast<ProviderShutdownWrapper*>(this)->
- _component.requestShutdown(result.getErrorMessage());
- _shutdownTriggered = true;
- }
+ trigger_shutdown_listeners(result.getErrorMessage());
+ } else if (result.getErrorCode() == spi::Result::RESOURCE_EXHAUSTED) {
+ trigger_resource_exhaustion_listeners(result.getErrorMessage());
}
- return std::move(result);
+ return std::forward<ResultType>(result);
+}
+
+void ProviderErrorWrapper::trigger_shutdown_listeners(vespalib::stringref reason) const {
+ std::lock_guard<std::mutex> guard(_mutex);
+ for (auto& listener : _listeners) {
+ listener->on_fatal_error(reason);
+ }
+}
+
+void ProviderErrorWrapper::trigger_resource_exhaustion_listeners(vespalib::stringref reason) const {
+ std::lock_guard<std::mutex> guard(_mutex);
+ for (auto& listener : _listeners) {
+ listener->on_resource_exhaustion_error(reason);
+ }
+}
+
+void ProviderErrorWrapper::register_error_listener(std::shared_ptr<ProviderErrorListener> listener) {
+ std::lock_guard<std::mutex> guard(_mutex);
+ _listeners.emplace_back(std::move(listener));
}
spi::Result
-ProviderShutdownWrapper::initialize()
+ProviderErrorWrapper::initialize()
{
return checkResult(_impl.initialize());
}
spi::PartitionStateListResult
-ProviderShutdownWrapper::getPartitionStates() const
+ProviderErrorWrapper::getPartitionStates() const
{
return checkResult(_impl.getPartitionStates());
}
spi::BucketIdListResult
-ProviderShutdownWrapper::listBuckets(spi::PartitionId partitionId) const
+ProviderErrorWrapper::listBuckets(spi::PartitionId partitionId) const
{
return checkResult(_impl.listBuckets(partitionId));
}
spi::Result
-ProviderShutdownWrapper::setClusterState(const spi::ClusterState& state)
+ProviderErrorWrapper::setClusterState(const spi::ClusterState& state)
{
return checkResult(_impl.setClusterState(state));
}
spi::Result
-ProviderShutdownWrapper::setActiveState(const spi::Bucket& bucket,
+ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket,
spi::BucketInfo::ActiveState newState)
{
return checkResult(_impl.setActiveState(bucket, newState));
}
spi::BucketInfoResult
-ProviderShutdownWrapper::getBucketInfo(const spi::Bucket& bucket) const
+ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const
{
return checkResult(_impl.getBucketInfo(bucket));
}
spi::Result
-ProviderShutdownWrapper::put(const spi::Bucket& bucket,
+ProviderErrorWrapper::put(const spi::Bucket& bucket,
spi::Timestamp ts,
const spi::DocumentSP& doc,
spi::Context& context)
@@ -80,7 +83,7 @@ ProviderShutdownWrapper::put(const spi::Bucket& bucket,
}
spi::RemoveResult
-ProviderShutdownWrapper::remove(const spi::Bucket& bucket,
+ProviderErrorWrapper::remove(const spi::Bucket& bucket,
spi::Timestamp ts,
const document::DocumentId& docId,
spi::Context& context)
@@ -89,7 +92,7 @@ ProviderShutdownWrapper::remove(const spi::Bucket& bucket,
}
spi::RemoveResult
-ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket,
+ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket,
spi::Timestamp ts,
const document::DocumentId& docId,
spi::Context& context)
@@ -98,7 +101,7 @@ ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket,
}
spi::UpdateResult
-ProviderShutdownWrapper::update(const spi::Bucket& bucket,
+ProviderErrorWrapper::update(const spi::Bucket& bucket,
spi::Timestamp ts,
const spi::DocumentUpdateSP& docUpdate,
spi::Context& context)
@@ -107,7 +110,7 @@ ProviderShutdownWrapper::update(const spi::Bucket& bucket,
}
spi::GetResult
-ProviderShutdownWrapper::get(const spi::Bucket& bucket,
+ProviderErrorWrapper::get(const spi::Bucket& bucket,
const document::FieldSet& fieldSet,
const document::DocumentId& docId,
spi::Context& context) const
@@ -116,13 +119,13 @@ ProviderShutdownWrapper::get(const spi::Bucket& bucket,
}
spi::Result
-ProviderShutdownWrapper::flush(const spi::Bucket& bucket, spi::Context& context)
+ProviderErrorWrapper::flush(const spi::Bucket& bucket, spi::Context& context)
{
return checkResult(_impl.flush(bucket, context));
}
spi::CreateIteratorResult
-ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket,
+ProviderErrorWrapper::createIterator(const spi::Bucket& bucket,
const document::FieldSet& fieldSet,
const spi::Selection& selection,
spi::IncludedVersions versions,
@@ -132,7 +135,7 @@ ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket,
}
spi::IterateResult
-ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId,
+ProviderErrorWrapper::iterate(spi::IteratorId iteratorId,
uint64_t maxByteSize,
spi::Context& context) const
{
@@ -140,41 +143,41 @@ ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId,
}
spi::Result
-ProviderShutdownWrapper::destroyIterator(spi::IteratorId iteratorId,
+ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId,
spi::Context& context)
{
return checkResult(_impl.destroyIterator(iteratorId, context));
}
spi::Result
-ProviderShutdownWrapper::createBucket(const spi::Bucket& bucket,
+ProviderErrorWrapper::createBucket(const spi::Bucket& bucket,
spi::Context& context)
{
return checkResult(_impl.createBucket(bucket, context));
}
spi::Result
-ProviderShutdownWrapper::deleteBucket(const spi::Bucket& bucket,
+ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket,
spi::Context& context)
{
return checkResult(_impl.deleteBucket(bucket, context));
}
spi::BucketIdListResult
-ProviderShutdownWrapper::getModifiedBuckets() const
+ProviderErrorWrapper::getModifiedBuckets() const
{
return checkResult(_impl.getModifiedBuckets());
}
spi::Result
-ProviderShutdownWrapper::maintain(const spi::Bucket& bucket,
+ProviderErrorWrapper::maintain(const spi::Bucket& bucket,
spi::MaintenanceLevel level)
{
return checkResult(_impl.maintain(bucket, level));
}
spi::Result
-ProviderShutdownWrapper::split(const spi::Bucket& source,
+ProviderErrorWrapper::split(const spi::Bucket& source,
const spi::Bucket& target1,
const spi::Bucket& target2,
spi::Context& context)
@@ -183,7 +186,7 @@ ProviderShutdownWrapper::split(const spi::Bucket& source,
}
spi::Result
-ProviderShutdownWrapper::join(const spi::Bucket& source1,
+ProviderErrorWrapper::join(const spi::Bucket& source1,
const spi::Bucket& source2,
const spi::Bucket& target, spi::Context& context)
{
@@ -191,14 +194,14 @@ ProviderShutdownWrapper::join(const spi::Bucket& source1,
}
spi::Result
-ProviderShutdownWrapper::move(const spi::Bucket& source,
+ProviderErrorWrapper::move(const spi::Bucket& source,
spi::PartitionId target, spi::Context& context)
{
return checkResult(_impl.move(source, target, context));
}
spi::Result
-ProviderShutdownWrapper::removeEntry(const spi::Bucket& bucket,
+ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket,
spi::Timestamp ts, spi::Context& context)
{
return checkResult(_impl.removeEntry(bucket, ts, context));
diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 0700bcdcda2..84adf37cbc3 100644
--- a/storage/src/vespa/storage/persistence/providershutdownwrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -1,34 +1,43 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * \class storage::ProviderShutdownWrapper
+ * \class storage::ProviderErrorWrapper
*
* \brief Utility class which forwards all calls to the real persistence
* provider implementation, transparently checking the result of each
- * operation to see if the result is FATAL_ERROR. If so, it initiates a
- * shutdown of the process (but still returns the response up to the caller
- * as if it were just a non-wrapped call).
+ * operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED.
*
+ * If FATAL_ERROR or RESOURCE_EXHAUSTED is observed, the wrapper will invoke any
+ * and all resource exhaustion listeners synchronously, before returning the response
+ * to the caller as usual.
*/
#pragma once
+#include <vespa/persistence/spi/persistenceprovider.h>
#include <vector>
#include <string>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/util/sync.h>
+#include <memory>
+#include <mutex>
namespace storage {
class ServiceLayerComponent;
-class ProviderShutdownWrapper : public spi::PersistenceProvider
-{
+class ProviderErrorListener {
public:
- ProviderShutdownWrapper(spi::PersistenceProvider& impl,
- ServiceLayerComponent& component)
+ virtual ~ProviderErrorListener() = default;
+ virtual void on_fatal_error(vespalib::stringref message) {
+ (void)message;
+ }
+ virtual void on_resource_exhaustion_error(vespalib::stringref message) {
+ (void)message;
+ }
+};
+
+class ProviderErrorWrapper : public spi::PersistenceProvider {
+public:
+ explicit ProviderErrorWrapper(spi::PersistenceProvider& impl)
: _impl(impl),
- _component(component),
- _shutdownLock(),
- _shutdownTriggered(false)
+ _mutex()
{
}
@@ -63,19 +72,18 @@ public:
const spi::PersistenceProvider& getProviderImplementation() const {
return _impl;
}
+
+ void register_error_listener(std::shared_ptr<ProviderErrorListener> listener);
private:
- /**
- * Check whether result has a FATAL_ERROR return code and invoke
- * requestShutdown with its error string if so. Will const_cast
- * internally since it calls non-const on _component.
- */
template <typename ResultType>
ResultType checkResult(ResultType&& result) const;
+ void trigger_shutdown_listeners(vespalib::stringref reason) const;
+ void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const;
+
spi::PersistenceProvider& _impl;
- ServiceLayerComponent& _component;
- vespalib::Lock _shutdownLock;
- mutable bool _shutdownTriggered;
+ std::vector<std::shared_ptr<ProviderErrorListener>> _listeners;
+ mutable std::mutex _mutex;
};
} // storage
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 4b8905eb6d7..2fd3fe43a57 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -23,6 +23,7 @@ vespa_add_library(storage_storageserver
statereporter.cpp
storagemetricsset.cpp
changedbucketownershiphandler.cpp
+ service_layer_error_listener.cpp
INSTALL lib64
DEPENDS
storage
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index d9a14fc7bb0..483992559e7 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -16,8 +16,7 @@ namespace storage {
namespace {
-struct NodeComparator
-{
+struct NodeComparator {
bool operator()(const api::MergeBucketCommand::Node& a,
const api::MergeBucketCommand::Node& b) const
{
@@ -28,8 +27,7 @@ struct NodeComparator
// Class used to sneakily get around IThrottlePolicy only accepting
// messagebus objects
template <typename Base>
-class DummyMbusMessage : public Base
-{
+class DummyMbusMessage : public Base {
private:
static const mbus::string NAME;
public:
@@ -70,6 +68,7 @@ MergeThrottler::ChainedMergeState::~ChainedMergeState() {}
MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
: metrics::MetricSet("mergethrottler", "", "", owner),
averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this),
+ bounced_due_to_back_pressure("bounced_due_to_back_pressure", "", "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
{ }
@@ -200,6 +199,8 @@ MergeThrottler::MergeThrottler(
_component(compReg, "mergethrottler"),
_thread(),
_rendezvous(RENDEZVOUS_NONE),
+ _throttle_until_time(),
+ _backpressure_duration(std::chrono::seconds(30)),
_closing(false)
{
_throttlePolicy->setMaxPendingCount(20);
@@ -215,12 +216,13 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
vespalib::LockGuard lock(_stateLock);
if (newConfig->maxMergesPerNode < 1) {
- throw config::InvalidConfigException(
- "Cannot have a max merge count of less than 1");
+ throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
if (newConfig->maxMergeQueueSize < 0) {
- throw config::InvalidConfigException(
- "Max merge queue size cannot be less than 0");
+ throw config::InvalidConfigException("Max merge queue size cannot be less than 0");
+ }
+ if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
+ throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
if (static_cast<double>(newConfig->maxMergesPerNode)
!= _throttlePolicy->getMaxPendingCount())
@@ -232,6 +234,8 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
LOG(debug, "Setting new max queue size to %d",
newConfig->maxMergeQueueSize);
_maxQueueSize = newConfig->maxMergeQueueSize;
+ _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
+ std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs));
}
MergeThrottler::~MergeThrottler()
@@ -276,9 +280,9 @@ MergeThrottler::onClose()
LOG(debug, "onClose; active: %" PRIu64 ", queued: %" PRIu64,
_merges.size(), _queue.size());
}
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interruptAndJoin(&_messageLock);
- _thread.reset(0);
+ _thread.reset();
}
}
@@ -408,8 +412,7 @@ MergeThrottler::enqueueMerge(
MessageGuard& msgGuard)
{
LOG(spam, "Enqueuing %s", msg->toString().c_str());
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex());
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
@@ -427,8 +430,7 @@ MergeThrottler::canProcessNewMerge() const
bool
MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const
{
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
return _merges.find(mergeCmd.getBucketId()) != _merges.end();
}
@@ -441,8 +443,7 @@ MergeThrottler::rejectMergeIfOutdated(
// Only reject merge commands! never reject replies (for obvious reasons..)
assert(msg->getType() == api::MessageType::MERGEBUCKET);
- const api::MergeBucketCommand& cmd(
- static_cast<const api::MergeBucketCommand&>(*msg));
+ auto& cmd = static_cast<const api::MergeBucketCommand&>(*msg);
if (cmd.getClusterStateVersion() == 0
|| cmd.getClusterStateVersion() >= rejectLessThanVersion)
@@ -455,9 +456,7 @@ MergeThrottler::rejectMergeIfOutdated(
<< ", storage node has version "
<< rejectLessThanVersion;
sendReply(cmd,
- api::ReturnCode(
- api::ReturnCode::WRONG_DISTRIBUTION,
- oss.str()),
+ api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, oss.str()),
msgGuard, _metrics->chaining);
LOG(debug, "Immediately rejected %s, due to it having state version < %u",
cmd.toString().c_str(), rejectLessThanVersion);
@@ -654,6 +653,49 @@ MergeThrottler::run(framework::ThreadHandle& thread)
LOG(debug, "Returning from MergeThrottler working thread");
}
+bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const {
+ if (_throttle_until_time.time_since_epoch().count() == 0) {
+ return false;
+ }
+ if (merge_has_this_node_as_source_only_node(cmd)) {
+ return false;
+ }
+ if (backpressure_mode_active_no_lock()) {
+ return true;
+ }
+ // Avoid sampling the clock when it can't do anything useful.
+ _throttle_until_time = std::chrono::steady_clock::time_point{};
+ return false;
+}
+
+bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const {
+ auto self_is_source_only = [self = _component.getIndex()](auto& node) {
+ return (node.index == self) && node.sourceOnly;
+ };
+ return std::any_of(cmd.getNodes().begin(), cmd.getNodes().end(), self_is_source_only);
+}
+
+bool MergeThrottler::backpressure_mode_active_no_lock() const {
+ return (_component.getClock().getMonotonicTime() < _throttle_until_time);
+}
+
+void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) {
+ sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY,
+ "Node is throttling merges due to resource exhaustion"),
+ guard, _metrics->local);
+ _metrics->bounced_due_to_back_pressure.inc();
+}
+
+void MergeThrottler::apply_timed_backpressure() {
+ vespalib::LockGuard lock(_stateLock);
+ _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration;
+}
+
+bool MergeThrottler::backpressure_mode_active() const {
+ vespalib::LockGuard lock(_stateLock);
+ return backpressure_mode_active_no_lock();
+}
+
// Must be run from worker thread
void
MergeThrottler::handleMessageDown(
@@ -661,22 +703,24 @@ MergeThrottler::handleMessageDown(
MessageGuard& msgGuard)
{
if (msg->getType() == api::MessageType::MERGEBUCKET) {
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
- uint32_t ourVersion(
- _component.getStateUpdater().getSystemState()->getVersion());
+ uint32_t ourVersion = _component.getStateUpdater().getSystemState()->getVersion();
if (mergeCmd.getClusterStateVersion() > ourVersion) {
LOG(debug, "Merge %s with newer cluster state than us arrived",
mergeCmd.toString().c_str());
- rejectOutdatedQueuedMerges(
- msgGuard, mergeCmd.getClusterStateVersion());
+ rejectOutdatedQueuedMerges(msgGuard, mergeCmd.getClusterStateVersion());
} else if (rejectMergeIfOutdated(msg, ourVersion, msgGuard)) {
// Skip merge entirely
return;
}
+ if (merge_is_backpressure_throttled(mergeCmd)) {
+ bounce_backpressure_throttled_merge(mergeCmd, msgGuard);
+ return;
+ }
+
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
} else if (canProcessNewMerge()) {
@@ -686,13 +730,9 @@ MergeThrottler::handleMessageDown(
} else {
// No more room at the inn. Return BUSY so that the
// distributor will wait a bit before retrying
- LOG(debug, "Queue is full; busy-returning %s",
- mergeCmd.toString().c_str());
- sendReply(mergeCmd,
- api::ReturnCode(api::ReturnCode::BUSY,
- "Merge queue is full"),
- msgGuard,
- _metrics->local);
+ LOG(debug, "Queue is full; busy-returning %s", mergeCmd.toString().c_str());
+ sendReply(mergeCmd, api::ReturnCode(api::ReturnCode::BUSY, "Merge queue is full"),
+ msgGuard, _metrics->local);
}
} else {
assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY);
@@ -707,8 +747,7 @@ MergeThrottler::handleMessageUp(
MessageGuard& msgGuard)
{
assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY);
- const api::MergeBucketReply& mergeReply
- = static_cast<const api::MergeBucketReply&>(*msg);
+ auto& mergeReply = static_cast<const api::MergeBucketReply&>(*msg);
LOG(debug, "Processing %s from persistence layer",
mergeReply.toString().c_str());
@@ -741,9 +780,7 @@ MergeThrottler::validateNewMerge(
<< _component.getIndex()
<< ", which is not in its forwarding chain";
LOG(error, "%s", oss.str().c_str());
- } else if (mergeCmd.getChain().size()
- >= nodeSeq.getSortedNodes().size())
- {
+ } else if (mergeCmd.getChain().size() >= nodeSeq.getSortedNodes().size()) {
// Chain is full but we haven't seen the merge! This means
// the node has probably gone down with a merge it previously
// forwarded only now coming back to haunt it.
@@ -774,8 +811,7 @@ MergeThrottler::processNewMergeCommand(
const api::StorageMessage::SP& msg,
MessageGuard& msgGuard)
{
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex());
@@ -788,7 +824,7 @@ MergeThrottler::processNewMergeCommand(
// Register the merge now so that it will contribute to filling up our
// merge throttling window.
assert(_merges.find(mergeCmd.getBucketId()) == _merges.end());
- ActiveMergeMap::iterator state = _merges.insert(
+ auto state = _merges.insert(
std::make_pair(mergeCmd.getBucketId(),
ChainedMergeState(msg))).first;
@@ -864,13 +900,11 @@ MergeThrottler::processCycledMergeCommand(
// aborted, in which case we have to immediately send an abortion reply
// so the cycle can be unwound.
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex());
- ActiveMergeMap::iterator mergeIter(
- _merges.find(mergeCmd.getBucketId()));
+ auto mergeIter = _merges.find(mergeCmd.getBucketId());
assert(mergeIter != _merges.end());
if (mergeIter->second.isAborted()) {
@@ -921,11 +955,9 @@ MergeThrottler::processMergeReply(
bool fromPersistenceLayer,
MessageGuard& msgGuard)
{
- const api::MergeBucketReply& mergeReply
- = dynamic_cast<const api::MergeBucketReply&>(*msg);
+ auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg);
- ActiveMergeMap::iterator mergeIter(
- _merges.find(mergeReply.getBucketId()));
+ auto mergeIter = _merges.find(mergeReply.getBucketId());
if (mergeIter == _merges.end()) {
LOG(warning, "Received %s, which has no command mapped "
"for it. Cannot send chained reply!",
@@ -1035,7 +1067,7 @@ MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg)
return true;
} else if (isDiffCommand(*msg)) {
vespalib::LockGuard lock(_stateLock);
- api::StorageCommand& cmd(static_cast<api::StorageCommand&>(*msg));
+ auto& cmd = static_cast<api::StorageCommand&>(*msg);
if (bucketIsUnknownOrAborted(cmd.getBucketId())) {
sendUp(makeAbortReply(cmd, "no state recorded for bucket in merge "
"throttler, source merge probably aborted earlier"));
@@ -1067,7 +1099,7 @@ MergeThrottler::isMergeReply(const api::StorageMessage& msg) const
bool
MergeThrottler::bucketIsUnknownOrAborted(const document::BucketId& bucket) const
{
- ActiveMergeMap::const_iterator it(_merges.find(bucket));
+ auto it = _merges.find(bucket);
if (it == _merges.end()) {
return true;
}
@@ -1089,8 +1121,7 @@ bool
MergeThrottler::onUp(const std::shared_ptr<api::StorageMessage>& msg)
{
if (isMergeReply(*msg)) {
- const api::MergeBucketReply& mergeReply
- = dynamic_cast<const api::MergeBucketReply&>(*msg);
+ auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg);
LOG(spam, "Received %s from persistence layer",
mergeReply.toString().c_str());
@@ -1126,8 +1157,7 @@ MergeThrottler::releaseWorkerThreadRendezvous(vespalib::MonitorGuard& guard)
}
}
-class ThreadRendezvousGuard
-{
+class ThreadRendezvousGuard {
MergeThrottler& _throttler;
vespalib::MonitorGuard& _guard;
public:
@@ -1237,23 +1267,22 @@ MergeThrottler::reportHtmlStatus(std::ostream& out,
out << "<h3>Active merges ("
<< _merges.size()
<< ")</h3>\n";
- ActiveMergeMap::const_iterator end = _merges.end();
if (!_merges.empty()) {
out << "<ul>\n";
- for (ActiveMergeMap::const_iterator i = _merges.begin(); i != end; ++i) {
- out << "<li>" << i->second.getMergeCmdString();
- if (i->second.isExecutingLocally()) {
+ for (auto& m : _merges) {
+ out << "<li>" << m.second.getMergeCmdString();
+ if (m.second.isExecutingLocally()) {
out << " <strong>(";
- if (i->second.isInCycle()) {
+ if (m.second.isInCycle()) {
out << "cycled - ";
- } else if (i->second.isCycleBroken()) {
+ } else if (m.second.isCycleBroken()) {
out << "broken cycle (another node in the chain likely went down) - ";
}
out << "executing on this node)</strong>";
- } else if (i->second.isUnwinding()) {
+ } else if (m.second.isUnwinding()) {
out << " <strong>(was executed here, now unwinding)</strong>";
}
- if (i->second.isAborted()) {
+ if (m.second.isAborted()) {
out << " <strong>aborted</strong>";
}
out << "</li>\n";
@@ -1268,14 +1297,13 @@ MergeThrottler::reportHtmlStatus(std::ostream& out,
out << "<h3>Queued merges (in priority order) ("
<< _queue.size()
<< ")</h3>\n";
- MergePriorityQueue::const_iterator end = _queue.end();
if (!_queue.empty()) {
out << "<ol>\n";
- for (MergePriorityQueue::const_iterator i = _queue.begin(); i != end; ++i) {
+ for (auto& qm : _queue) {
// The queue always owns its messages, thus this is safe
out << "<li>Pri "
- << static_cast<unsigned int>(i->_msg->getPriority())
- << ": " << *i->_msg;
+ << static_cast<unsigned int>(qm._msg->getPriority())
+ << ": " << *qm._msg;
out << "</li>\n";
}
out << "</ol>\n";
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index a93dab4e24e..070c2ef07c4 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -18,6 +18,7 @@
#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metrics.h>
#include <vespa/config/config.h>
+#include <chrono>
namespace storage {
@@ -29,8 +30,7 @@ class MergeThrottler : public framework::Runnable,
private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>
{
public:
- class MergeFailureMetrics : public metrics::MetricSet
- {
+ class MergeFailureMetrics : public metrics::MetricSet {
public:
metrics::SumMetric<metrics::LongCountMetric> sum;
metrics::LongCountMetric notready;
@@ -47,8 +47,7 @@ public:
~MergeFailureMetrics();
};
- class MergeOperationMetrics : public metrics::MetricSet
- {
+ class MergeOperationMetrics : public metrics::MetricSet {
public:
metrics::LongCountMetric ok;
MergeFailureMetrics failures;
@@ -57,10 +56,10 @@ public:
~MergeOperationMetrics();
};
- class Metrics : public metrics::MetricSet
- {
+ class Metrics : public metrics::MetricSet {
public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
+ metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -71,8 +70,7 @@ public:
private:
// TODO: make PQ with stable ordering into own, generic class
template <class MessageType>
- struct StablePriorityOrderingWrapper
- {
+ struct StablePriorityOrderingWrapper {
MessageType _msg;
metrics::MetricTimer _startTimer;
uint64_t _sequence;
@@ -95,8 +93,7 @@ private:
}
};
- struct ChainedMergeState
- {
+ struct ChainedMergeState {
api::StorageMessage::SP _cmd;
std::string _cmdString; // For being able to print message even when we don't own it
uint64_t _clusterStateVersion;
@@ -167,6 +164,8 @@ private:
StorageComponent _component;
framework::Thread::UP _thread;
RendezvousState _rendezvous;
+ mutable std::chrono::steady_clock::time_point _throttle_until_time;
+ std::chrono::steady_clock::duration _backpressure_duration;
bool _closing;
public:
/**
@@ -174,7 +173,7 @@ public:
* than 1 as their window size.
*/
MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
- ~MergeThrottler();
+ ~MergeThrottler() override;
/** Implements document::Runnable::run */
void run(framework::ThreadHandle&) override;
@@ -187,6 +186,16 @@ public:
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override;
+ /*
+ * When invoked, merges to the node will be BUSY-bounced by the throttler
+ * for a configurable period of time instead of being processed.
+ *
+ * Thread safe, but must not be called if _stateLock is already held, or
+ * deadlock will occur.
+ */
+ void apply_timed_backpressure();
+ bool backpressure_mode_active() const;
+
// For unit testing only
const ActiveMergeMap& getActiveMerges() const { return _merges; }
// For unit testing only
@@ -206,8 +215,7 @@ private:
friend class ThreadRendezvousGuard; // impl in .cpp file
// Simple helper class for centralizing chaining logic
- struct MergeNodeSequence
- {
+ struct MergeNodeSequence {
const api::MergeBucketCommand& _cmd;
std::vector<api::MergeBucketCommand::Node> _sortedNodes;
std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
@@ -328,6 +336,11 @@ private:
*/
bool canProcessNewMerge() const;
+ bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
+ void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
+ bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
+ bool backpressure_mode_active_no_lock() const;
+
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,
MessageGuard& msgGuard,
diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp
new file mode 100644
index 00000000000..41177fe46b8
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp
@@ -0,0 +1,36 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "service_layer_error_listener.h"
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".node.errorlistener");
+
+namespace storage {
+
+void ServiceLayerErrorListener::on_fatal_error(vespalib::stringref message) {
+ bool expected = false;
+ if (_shutdown_initiated.compare_exchange_strong(expected, true)) {
+ LOG(info,
+ "Received FATAL_ERROR from persistence provider, "
+ "shutting down node: %s",
+ message.c_str());
+ _component.requestShutdown(message); // Thread safe
+ } else {
+ LOG(debug,
+ "Received FATAL_ERROR from persistence provider: %s. "
+ "Node has already been instructed to shut down so "
+ "not doing anything now.",
+ message.c_str());
+ }
+}
+
+void ServiceLayerErrorListener::on_resource_exhaustion_error(vespalib::stringref message) {
+ LOG(debug, "SPI reports resource exhaustion ('%s'). "
+ "Applying back-pressure to merge throttler",
+ message.c_str());
+ _merge_throttler.apply_timed_backpressure(); // Thread safe
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h
new file mode 100644
index 00000000000..6995459e333
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h
@@ -0,0 +1,36 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/storage/persistence/provider_error_wrapper.h>
+#include <atomic>
+
+namespace storage {
+
+class StorageComponent;
+class MergeThrottler;
+
+/*
+ * Listener implementation for SPI errors that require action beyond simply
+ * responding to the command that generated them.
+ *
+ * - Fatal errors will trigger a process shutdown.
+ * - Resource exhaustion errors will trigger merge back-pressure.
+ */
+class ServiceLayerErrorListener : public ProviderErrorListener {
+ StorageComponent& _component;
+ MergeThrottler& _merge_throttler;
+ std::atomic<bool> _shutdown_initiated;
+public:
+ ServiceLayerErrorListener(StorageComponent& component,
+ MergeThrottler& merge_throttler)
+ : _component(component),
+ _merge_throttler(merge_throttler),
+ _shutdown_initiated(false)
+ {}
+
+ void on_fatal_error(vespalib::stringref message) override;
+ void on_resource_exhaustion_error(vespalib::stringref message) override;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 4f87b9af6f2..79d57a8f7e4 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -9,6 +9,7 @@
#include "opslogger.h"
#include "statemanager.h"
#include "priorityconverter.h"
+#include "service_layer_error_listener.h"
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
@@ -16,6 +17,7 @@
#include <vespa/storage/bucketmover/bucketmover.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
+#include <vespa/storage/persistence/provider_error_wrapper.h>
#include <vespa/persistence/spi/exceptions.h>
#include <vespa/messagebus/rpcmessagebus.h>
@@ -35,7 +37,7 @@ ServiceLayerNode::ServiceLayerNode(
_persistenceProvider(persistenceProvider),
_partitions(0),
_externalVisitors(externalVisitors),
- _fileStorManager(0),
+ _fileStorManager(nullptr),
_init_has_been_called(false),
_noUsablePartitionMode(false)
{
@@ -112,7 +114,7 @@ void
ServiceLayerNode::removeConfigSubscriptions()
{
StorageNode::removeConfigSubscriptions();
- _configFetcher.reset(0);
+ _configFetcher.reset();
}
void
@@ -166,7 +168,7 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate()
{
- if (_newServerConfig.get() != 0) {
+ if (_newServerConfig) {
bool updated = false;
vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
vespa::config::content::core::StorServerConfig& newC(*_newServerConfig);
@@ -218,9 +220,11 @@ ServiceLayerNode::configure(
// updates
{
vespalib::LockGuard configLockGuard(_configLock);
- _newDevicesConfig.reset(config.release());
+ _newDevicesConfig = std::move(config);
+ }
+ if (_distributionConfig) {
+ handleLiveConfigUpdate();
}
- if (_distributionConfig.get() != 0) handleLiveConfigUpdate();
}
VisitorMessageSession::UP
@@ -259,7 +263,8 @@ ServiceLayerNode::createChain()
return chain;
}
chain->push_back(StorageLink::UP(new OpsLogger(compReg, _configUri)));
- chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg)));
+ auto* merge_throttler = new MergeThrottler(_configUri, compReg);
+ chain->push_back(StorageLink::UP(merge_throttler));
chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg)));
chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg)));
chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg)));
@@ -277,6 +282,12 @@ ServiceLayerNode::createChain()
_configUri, _partitions, _persistenceProvider,
_context.getComponentRegister())));
chain->push_back(StorageLink::UP(releaseStateManager().release()));
+
+ // Lifetimes of all referenced components shall outlive the last call going
+ // through the SPI, as queues are flushed and worker threads joined when
+ // the storage link chain is closed prior to destruction.
+ auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
+ _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
return chain;
}
diff --git a/vespaclient-java/pom.xml b/vespaclient-java/pom.xml
index 074b787025a..bb8ed9387fc 100644
--- a/vespaclient-java/pom.xml
+++ b/vespaclient-java/pom.xml
@@ -34,12 +34,17 @@
</dependency>
<dependency>
<groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-dev</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
<artifactId>documentapi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.vespa</groupId>
- <artifactId>container-dev</artifactId>
+ <artifactId>predicate-search-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>