diff options
35 files changed, 694 insertions, 369 deletions
diff --git a/application/pom.xml b/application/pom.xml index 3fd9e8054e7..2d9096e49f1 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -75,6 +75,18 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <!-- All dependencies that should be visible in test classpath, but not compile classpath, + for user projects must be added in compile scope here. + These dependencies are explicitly excluded (or set to non-compile scope) in the container-dev module. --> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </dependency> </dependencies> <build> diff --git a/container-dev/pom.xml b/container-dev/pom.xml index 7ac2eb03016..e858ba1c50b 100644 --- a/container-dev/pom.xml +++ b/container-dev/pom.xml @@ -112,11 +112,33 @@ <groupId>com.yahoo.vespa</groupId> <artifactId>container-search-and-docproc</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>config-bundle</artifactId> <version>${project.version}</version> </dependency> + + <!-- Dependencies below are added explicitly to exclude transitive deps that are not provided runtime by the container, + and hence make them invisible to user projects' build classpath. + Excluded artifacts should be added explicitly to the application module to make then visible in users' test classpath. --> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>predicate-search-core</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </project> diff --git a/container-search/pom.xml b/container-search/pom.xml index 837629c44b9..f622567acde 100644 --- a/container-search/pom.xml +++ b/container-search/pom.xml @@ -143,7 +143,6 @@ <dependency> <groupId>org.antlr</groupId> <artifactId>antlr4-runtime</artifactId> - <version>4.5</version> </dependency> <dependency> <groupId>org.mockito</groupId> @@ -211,7 +210,7 @@ <plugin> <!-- For the YQL query language --> <groupId>org.antlr</groupId> <artifactId>antlr4-maven-plugin</artifactId> - <version>4.5</version> + <version>${antlr4.version}</version> <executions> <execution> <configuration> diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java index b512b7b5f90..865908bdc8e 100644 --- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java +++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Docker.java @@ -6,7 +6,6 @@ import java.net.InetAddress; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; /** * API to simplify the com.github.dockerjava API for clients, @@ -60,9 +59,14 @@ public interface Docker { Optional<Container> getContainer(ContainerName containerName); - CompletableFuture<DockerImage> pullImageAsync(DockerImage image); - - boolean imageIsDownloaded(DockerImage image); + /** + * Checks if the image is currently being pulled or is already pulled, if not, starts an async + * pull of the image + * + * @param image Docker image to pull + * @return true iff image being pulled, false otherwise + */ + boolean pullImageAsyncIfNeeded(DockerImage image); void deleteImage(DockerImage dockerImage); diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java index 938b277d90b..18c1ad1dc93 100644 --- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java +++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/DockerImpl.java @@ -37,10 +37,11 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -63,7 +64,7 @@ public class DockerImpl implements Docker { private final Object monitor = new Object(); @GuardedBy("monitor") - private final Map<DockerImage, CompletableFuture<DockerImage>> scheduledPulls = new HashMap<>(); + private final Set<DockerImage> scheduledPulls = new HashSet<>(); // Exposed for testing. final DockerClient dockerClient; @@ -159,37 +160,30 @@ public class DockerImpl implements Docker { } @Override - public CompletableFuture<DockerImage> pullImageAsync(final DockerImage image) { - final CompletableFuture<DockerImage> completionListener; - synchronized (monitor) { - if (scheduledPulls.containsKey(image)) { - return scheduledPulls.get(image); - } - completionListener = new CompletableFuture<>(); - scheduledPulls.put(image, completionListener); - } - + public boolean pullImageAsyncIfNeeded(final DockerImage image) { try { - dockerClient.pullImageCmd(image.asString()).exec(new ImagePullCallback(image)); + synchronized (monitor) { + if (scheduledPulls.contains(image)) return true; + if (imageIsDownloaded(image)) return false; + dockerClient.pullImageCmd(image.asString()).exec(new ImagePullCallback(image)); + return false; + } } catch (RuntimeException e) { numberOfDockerDaemonFails.add(); throw new DockerException("Failed to pull image '" + image.asString() + "'", e); } - - return completionListener; } - private CompletableFuture<DockerImage> removeScheduledPoll(final DockerImage image) { + private void removeScheduledPoll(final DockerImage image) { synchronized (monitor) { - return scheduledPulls.remove(image); + scheduledPulls.remove(image); } } /** * Check if a given image is already in the local registry */ - @Override - public boolean imageIsDownloaded(final DockerImage dockerImage) { + private boolean imageIsDownloaded(final DockerImage dockerImage) { return inspectImage(dockerImage).isPresent(); } @@ -448,7 +442,8 @@ public class DockerImpl implements Docker { @Override public void onError(Throwable throwable) { - removeScheduledPoll(dockerImage).completeExceptionally(throwable); + removeScheduledPoll(dockerImage); + throw new DockerClientException("Could not download image: " + dockerImage); } @@ -457,10 +452,9 @@ public class DockerImpl implements Docker { Optional<Image> image = inspectImage(dockerImage); if (image.isPresent()) { // Download successful, update image GC with the newly downloaded image dockerImageGC.ifPresent(imageGC -> imageGC.updateLastUsedTimeFor(image.get().getId())); - removeScheduledPoll(dockerImage).complete(dockerImage); + removeScheduledPoll(dockerImage); } else { - removeScheduledPoll(dockerImage).completeExceptionally( - new DockerClientException("Could not download image: " + dockerImage)); + throw new DockerClientException("Could not download image: " + dockerImage); } } } diff --git a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java index 4e2567b9fa8..310fb4ffdd3 100644 --- a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java +++ b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/DockerTest.java @@ -14,7 +14,6 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; @@ -30,21 +29,6 @@ public class DockerTest { private static final DockerImage dockerImage = new DockerImage("simple-ipv6-server:Dockerfile"); private static final String MANAGER_NAME = "docker-test"; - // It is ignored since it is a bit slow and unstable, at least on Mac. - @Ignore - @Test - public void testDockerImagePullDelete() throws ExecutionException, InterruptedException { - DockerImage dockerImage = new DockerImage("busybox:1.24.0"); - - // Pull the image and wait for the pull to complete - docker.pullImageAsync(dockerImage).get(); - assertTrue("Failed to download " + dockerImage.asString() + " image", docker.imageIsDownloaded(dockerImage)); - - // Remove the image - docker.deleteImage(dockerImage); - assertFalse("Failed to delete " + dockerImage.asString() + " image", docker.imageIsDownloaded(dockerImage)); - } - // Ignored because the test is very slow (several minutes) when swap is enabled, to disable: (Linux) // $ sudo swapoff -a @Ignore diff --git a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java index 5f090abec71..915b3b53867 100644 --- a/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java +++ b/docker-api/src/test/java/com/yahoo/vespa/hosted/dockerapi/RunSystemTests.java @@ -151,9 +151,11 @@ public class RunSystemTests { } private void buildVespaSystestDockerImage(Docker docker, DockerImage vespaBaseImage) throws IOException, ExecutionException, InterruptedException { - if (!docker.imageIsDownloaded(vespaBaseImage)) { + if (docker.pullImageAsyncIfNeeded(vespaBaseImage)) { logger.info("Pulling " + vespaBaseImage.asString() + " (This may take a while)"); - docker.pullImageAsync(vespaBaseImage).get(); + while (docker.pullImageAsyncIfNeeded(vespaBaseImage)) { + Thread.sleep(5000); + }; } Path systestBuildDirectory = pathToVespaRepoInHost.resolve("docker-api/src/test/resources/systest/"); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java index 61f31ce9d8d..1d11e221a9b 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperations.java @@ -17,12 +17,9 @@ public interface DockerOperations { void removeContainer(Container existingContainer); - // Returns false if image is already downloaded - boolean shouldScheduleDownloadOfImage(DockerImage dockerImage); - Optional<Container> getContainer(ContainerName containerName); - void scheduleDownloadOfImage(ContainerName containerName, DockerImage dockerImage, Runnable callback); + boolean pullImageAsyncIfNeeded(DockerImage dockerImage); ProcessResult executeCommandInContainerAsRoot(ContainerName containerName, String... command); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java index dbf75d0ee03..b21fc573c0f 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/DockerOperationsImpl.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -169,12 +168,6 @@ public class DockerOperationsImpl implements DockerOperations { docker.deleteContainer(containerName); } - // Returns true if scheduling download - @Override - public boolean shouldScheduleDownloadOfImage(final DockerImage dockerImage) { - return !docker.imageIsDownloaded(dockerImage); - } - @Override public Optional<Container> getContainer(ContainerName containerName) { return docker.getContainer(containerName); @@ -212,18 +205,8 @@ public class DockerOperationsImpl implements DockerOperations { } @Override - public void scheduleDownloadOfImage(ContainerName containerName, DockerImage dockerImage, Runnable callback) { - PrefixLogger logger = PrefixLogger.getNodeAgentLogger(DockerOperationsImpl.class, containerName); - - logger.info("Schedule async download of " + dockerImage); - final CompletableFuture<DockerImage> asyncPullResult = docker.pullImageAsync(dockerImage); - asyncPullResult.whenComplete((image, throwable) -> { - if (throwable != null) { - logger.warning("Failed to pull " + dockerImage, throwable); - return; - } - callback.run(); - }); + public boolean pullImageAsyncIfNeeded(DockerImage dockerImage) { + return docker.pullImageAsyncIfNeeded(dockerImage); } ProcessResult executeCommandInContainer(ContainerName containerName, String... command) { diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java index 9d2198cedcc..0d110adf5a4 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java @@ -352,14 +352,8 @@ public class NodeAgentImpl implements NodeAgent { private void scheduleDownLoadIfNeeded(ContainerNodeSpec nodeSpec) { if (nodeSpec.currentDockerImage.equals(nodeSpec.wantedDockerImage)) return; - if (dockerOperations.shouldScheduleDownloadOfImage(nodeSpec.wantedDockerImage.get())) { - if (nodeSpec.wantedDockerImage.get().equals(imageBeingDownloaded)) { - // Downloading already scheduled, but not done. - return; - } + if (dockerOperations.pullImageAsyncIfNeeded(nodeSpec.wantedDockerImage.get())) { imageBeingDownloaded = nodeSpec.wantedDockerImage.get(); - // Create a signalWorkToBeDone when download is finished. - dockerOperations.scheduleDownloadOfImage(containerName, imageBeingDownloaded, this::signalWorkToBeDone); } else if (imageBeingDownloaded != null) { // Image was downloading, but now it's ready imageBeingDownloaded = null; } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java index be7fa4e49c6..dc111251af7 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integrationTests/DockerMock.java @@ -15,7 +15,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -111,31 +110,14 @@ public class DockerMock implements Docker { } @Override - public CompletableFuture<DockerImage> pullImageAsync(DockerImage image) { + public boolean pullImageAsyncIfNeeded(DockerImage image) { synchronized (monitor) { - callOrderVerifier.add("pullImageAsync with " + image); - final CompletableFuture<DockerImage> completableFuture = new CompletableFuture<>(); - new Thread() { - public void run() { - try { - Thread.sleep(500); - completableFuture.complete(image); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - } - }.start(); - return completableFuture; + callOrderVerifier.add("pullImageAsyncIfNeeded with " + image); + return false; } } @Override - public boolean imageIsDownloaded(DockerImage image) { - return true; - } - - @Override public void deleteImage(DockerImage dockerImage) { } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java index 3ddb28eed77..1c63c70453e 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java @@ -114,7 +114,7 @@ public class NodeAgentImplTest { verify(dockerOperations, never()).removeContainer(any()); verify(orchestrator, never()).suspend(any(String.class)); - verify(dockerOperations, never()).scheduleDownloadOfImage(eq(containerName), any(), any()); + verify(dockerOperations, never()).pullImageAsyncIfNeeded(any()); final InOrder inOrder = inOrder(dockerOperations, orchestrator, nodeRepository); // TODO: Verify this isn't run unless 1st time @@ -147,14 +147,15 @@ public class NodeAgentImplTest { when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec)); when(pathResolver.getApplicationStoragePathForNodeAdmin()).thenReturn(Files.createTempDirectory("foo")); + when(dockerOperations.pullImageAsyncIfNeeded(eq(dockerImage))).thenReturn(false); nodeAgent.converge(); verify(dockerOperations, never()).removeContainer(any()); verify(orchestrator, never()).suspend(any(String.class)); - verify(dockerOperations, never()).scheduleDownloadOfImage(eq(containerName), any(), any()); final InOrder inOrder = inOrder(dockerOperations, orchestrator, nodeRepository, aclMaintainer); + inOrder.verify(dockerOperations, times(1)).pullImageAsyncIfNeeded(eq(dockerImage)); inOrder.verify(aclMaintainer, times(1)).run(); inOrder.verify(dockerOperations, times(1)).startContainer(eq(containerName), eq(nodeSpec)); inOrder.verify(dockerOperations, times(1)).resumeNode(eq(containerName)); @@ -185,7 +186,7 @@ public class NodeAgentImplTest { NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec)); - when(dockerOperations.shouldScheduleDownloadOfImage(any())).thenReturn(true); + when(dockerOperations.pullImageAsyncIfNeeded(any())).thenReturn(true); nodeAgent.converge(); @@ -194,8 +195,7 @@ public class NodeAgentImplTest { verify(dockerOperations, never()).removeContainer(any()); final InOrder inOrder = inOrder(dockerOperations); - inOrder.verify(dockerOperations, times(1)).shouldScheduleDownloadOfImage(eq(newDockerImage)); - inOrder.verify(dockerOperations, times(1)).scheduleDownloadOfImage(eq(containerName), eq(newDockerImage), any()); + inOrder.verify(dockerOperations, times(1)).pullImageAsyncIfNeeded(eq(newDockerImage)); } @Test @@ -56,7 +56,7 @@ <plugin> <groupId>org.antlr</groupId> <artifactId>antlr3-maven-plugin</artifactId> - <version>3.5.2</version> + <version>${antlr.version}</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -569,7 +569,12 @@ <dependency> <groupId>org.antlr</groupId> <artifactId>antlr-runtime</artifactId> - <version>3.5.2</version> + <version>${antlr.version}</version> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>${antlr4.version}</version> </dependency> <dependency> <groupId>org.apache.aries.spifly</groupId> @@ -884,6 +889,8 @@ <properties> <javax.ws.rs-api.version>2.0.1</javax.ws.rs-api.version> <!-- must be kept in sync with version used by current jersey2.version --> + <antlr.version>3.5.2</antlr.version> + <antlr4.version>4.5</antlr4.version> <aries.spifly.version>1.0.8</aries.spifly.version> <aries.util.version>1.0.0</aries.util.version> <asm-debug-all.version>5.0.3</asm-debug-all.version> diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp index fa85f4c6a45..296a9a3cc0f 100644 --- a/storage/src/tests/common/testhelper.cpp +++ b/storage/src/tests/common/testhelper.cpp @@ -150,6 +150,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro config->set("enable_dead_lock_detector_warnings", "false"); config->set("max_merges_per_node", "25"); config->set("max_merge_queue_size", "20"); + config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0"); vespalib::string rootFolder = rootOfRoot + "_"; rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor"); config->set("root_folder", rootFolder); diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 44aed95ba77..f5715ca3531 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -6,7 +6,7 @@ vespa_add_library(storage_testpersistence TEST splitbitdetectortest.cpp legacyoperationhandlertest.cpp diskmoveoperationhandlertest.cpp - providershutdownwrappertest.cpp + provider_error_wrapper_test.cpp mergehandlertest.cpp persistencethread_splittest.cpp bucketownershipnotifiertest.cpp diff --git a/storage/src/tests/persistence/provider_error_wrapper_test.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp new file mode 100644 index 00000000000..7a8f26cbe93 --- /dev/null +++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp @@ -0,0 +1,140 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vdstestlib/cppunit/macros.h> +#include <tests/persistence/persistencetestutils.h> +#include <tests/persistence/common/persistenceproviderwrapper.h> + +namespace storage { + +class ProviderErrorWrapperTest : public SingleDiskPersistenceTestUtils { +public: + CPPUNIT_TEST_SUITE(ProviderErrorWrapperTest); + CPPUNIT_TEST(fatal_error_invokes_listener); + CPPUNIT_TEST(resource_exhaustion_error_invokes_listener); + CPPUNIT_TEST(listener_not_invoked_on_success); + CPPUNIT_TEST(listener_not_invoked_on_regular_errors); + CPPUNIT_TEST(multiple_listeners_can_be_registered); + CPPUNIT_TEST_SUITE_END(); + + void fatal_error_invokes_listener(); + void resource_exhaustion_error_invokes_listener(); + void listener_not_invoked_on_success(); + void listener_not_invoked_on_regular_errors(); + void multiple_listeners_can_be_registered(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(ProviderErrorWrapperTest); + +namespace { + +struct MockErrorListener : ProviderErrorListener { + void on_fatal_error(vespalib::stringref message) override { + _seen_fatal_error = true; + _fatal_error = message; + } + void on_resource_exhaustion_error(vespalib::stringref message) override { + _seen_resource_exhaustion_error = true; + _resource_exhaustion_error = message; + } + + vespalib::string _fatal_error; + vespalib::string _resource_exhaustion_error; + bool _seen_fatal_error{false}; + bool _seen_resource_exhaustion_error{false}; +}; + +struct Fixture { + // We wrap the wrapper. It's turtles all the way down! + PersistenceProviderWrapper providerWrapper; + TestServiceLayerApp app; + ServiceLayerComponent component; + ProviderErrorWrapper errorWrapper; + + Fixture(spi::PersistenceProvider& provider) + : providerWrapper(provider), + app(), + component(app.getComponentRegister(), "dummy"), + errorWrapper(providerWrapper) + { + providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); + } + ~Fixture() {} + + void perform_spi_operation() { + errorWrapper.getBucketInfo(spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0))); + } + + void check_no_listener_invoked_for_error(MockErrorListener& listener, spi::Result::ErrorType error) { + providerWrapper.setResult(spi::Result(error, "beep boop")); + perform_spi_operation(); + CPPUNIT_ASSERT(!listener._seen_fatal_error); + CPPUNIT_ASSERT(!listener._seen_resource_exhaustion_error); + } +}; + +} + +void ProviderErrorWrapperTest::fatal_error_invokes_listener() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.providerWrapper.setResult(spi::Result(spi::Result::FATAL_ERROR, "eject! eject!")); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT(listener->_seen_fatal_error); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), listener->_fatal_error); +} + +void ProviderErrorWrapperTest::resource_exhaustion_error_invokes_listener() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice")); + + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(listener->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT_EQUAL(vespalib::string("out of juice"), listener->_resource_exhaustion_error); +} + +void ProviderErrorWrapperTest::listener_not_invoked_on_success() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); +} + +void ProviderErrorWrapperTest::listener_not_invoked_on_regular_errors() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + + f.check_no_listener_invoked_for_error(*listener, spi::Result::TRANSIENT_ERROR); + f.check_no_listener_invoked_for_error(*listener, spi::Result::PERMANENT_ERROR); +} + +void ProviderErrorWrapperTest::multiple_listeners_can_be_registered() { + Fixture f(getPersistenceProvider()); + auto listener1 = std::make_shared<MockErrorListener>(); + auto listener2 = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener1); + f.errorWrapper.register_error_listener(listener2); + + f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice")); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(listener1->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT(listener2->_seen_resource_exhaustion_error); +} + +} // ns storage + + diff --git a/storage/src/tests/persistence/providershutdownwrappertest.cpp b/storage/src/tests/persistence/providershutdownwrappertest.cpp deleted file mode 100644 index 3475aa58dfd..00000000000 --- a/storage/src/tests/persistence/providershutdownwrappertest.cpp +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vdstestlib/cppunit/macros.h> -#include <tests/persistence/persistencetestutils.h> -#include <tests/persistence/common/persistenceproviderwrapper.h> - -namespace storage { - -class ProviderShutdownWrapperTest : public SingleDiskPersistenceTestUtils -{ -public: - CPPUNIT_TEST_SUITE(ProviderShutdownWrapperTest); - CPPUNIT_TEST(testShutdownOnFatalError); - CPPUNIT_TEST_SUITE_END(); - - void testShutdownOnFatalError(); -}; - -CPPUNIT_TEST_SUITE_REGISTRATION(ProviderShutdownWrapperTest); - -namespace { - -class TestShutdownListener - : public framework::defaultimplementation::ShutdownListener -{ -public: - TestShutdownListener() : _reason() {} - - void requestShutdown(vespalib::stringref reason) override { - _reason = reason; - } - - bool shutdownRequested() const { return !_reason.empty(); } - const vespalib::string& getReason() const { return _reason; } -private: - vespalib::string _reason; -}; - -} - -void -ProviderShutdownWrapperTest::testShutdownOnFatalError() -{ - // We wrap the wrapper. It's turtles all the way down! - PersistenceProviderWrapper providerWrapper( - getPersistenceProvider()); - TestServiceLayerApp app; - ServiceLayerComponent component(app.getComponentRegister(), "dummy"); - - ProviderShutdownWrapper shutdownWrapper(providerWrapper, component); - - TestShutdownListener shutdownListener; - - app.getComponentRegister().registerShutdownListener(shutdownListener); - - providerWrapper.setResult( - spi::Result(spi::Result::FATAL_ERROR, "eject! eject!")); - providerWrapper.setFailureMask( - PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); - - CPPUNIT_ASSERT(!shutdownListener.shutdownRequested()); - // This should cause the node to implicitly be shut down - shutdownWrapper.getBucketInfo( - spi::Bucket(document::BucketId(16, 1234), - spi::PartitionId(0))); - - CPPUNIT_ASSERT(shutdownListener.shutdownRequested()); - CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), - shutdownListener.getReason()); - - // Triggering a new error should not cause shutdown to be requested twice. - providerWrapper.setResult( - spi::Result(spi::Result::FATAL_ERROR, "boom!")); - - shutdownWrapper.getBucketInfo( - spi::Bucket(document::BucketId(16, 1234), - spi::PartitionId(0))); - - CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), - shutdownListener.getReason()); -} - -} // ns storage - - diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index eedc29989eb..959261dc8ff 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -11,6 +11,7 @@ vespa_add_library(storage_teststorageserver TEST priorityconvertertest.cpp statereportertest.cpp changedbucketownershiphandlertest.cpp + service_layer_error_listener_test.cpp DEPENDS storage_storageserver storage_testcommon diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 559d54d3620..29ff780807f 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,10 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <cppunit/extensions/HelperMacros.h> -#include <memory> -#include <iterator> -#include <vector> -#include <algorithm> -#include <ctime> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> @@ -16,6 +11,13 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> +#include <unordered_set> +#include <memory> +#include <iterator> +#include <vector> +#include <algorithm> +#include <chrono> +#include <thread> using namespace document; using namespace storage::api; @@ -24,12 +26,12 @@ namespace storage { namespace { -struct MergeBuilder -{ +struct MergeBuilder { document::BucketId _bucket; api::Timestamp _maxTimestamp; std::vector<uint16_t> _nodes; std::vector<uint16_t> _chain; + std::unordered_set<uint16_t> _source_only; uint64_t _clusterStateVersion; MergeBuilder(const document::BucketId& bucket) @@ -41,6 +43,8 @@ struct MergeBuilder nodes(0, 1, 2); } + ~MergeBuilder(); + MergeBuilder& nodes(uint16_t n0) { _nodes.push_back(n0); return *this; @@ -82,11 +86,17 @@ struct MergeBuilder _chain.push_back(n2); return *this; } + MergeBuilder& source_only(uint16_t node) { + _source_only.insert(node); + return *this; + } api::MergeBucketCommand::SP create() const { std::vector<api::MergeBucketCommand::Node> n; for (uint32_t i = 0; i < _nodes.size(); ++i) { - n.push_back(_nodes[i]); + uint16_t node = _nodes[i]; + bool source_only = (_source_only.find(node) != _source_only.end()); + n.emplace_back(node, source_only); } std::shared_ptr<MergeBucketCommand> cmd( new MergeBucketCommand(_bucket, n, _maxTimestamp, @@ -97,6 +107,8 @@ struct MergeBuilder } }; +MergeBuilder::~MergeBuilder() {} + std::shared_ptr<api::SetSystemStateCommand> makeSystemStateCmd(const std::string& state) { @@ -106,8 +118,7 @@ makeSystemStateCmd(const std::string& state) } // anon ns -class MergeThrottlerTest : public CppUnit::TestFixture -{ +class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(MergeThrottlerTest); CPPUNIT_TEST(testMergesConfig); CPPUNIT_TEST(testChain); @@ -133,6 +144,8 @@ class MergeThrottlerTest : public CppUnit::TestFixture CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); + CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); + CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -162,6 +175,8 @@ public: void testGetBucketDiffCommandNotInActiveSetIsRejected(); void testApplyBucketDiffCommandNotInActiveSetIsRejected(); void testNewClusterStateAbortsAllOutdatedActiveMerges(); + void backpressure_busy_bounces_merges_for_configured_duration(); + void source_only_merges_are_not_affected_by_backpressure(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -247,7 +262,7 @@ checkChain(const StorageMessage::SP& msg, void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) { - std::time_t start = std::time(0); + const auto start = std::chrono::steady_clock::now(); while (true) { std::size_t count; { @@ -257,14 +272,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou if (count == sz) { break; } - std::time_t now = std::time(0); - if (now - start > timeout) { + auto now = std::chrono::steady_clock::now(); + if (now - start > std::chrono::seconds(timeout)) { std::ostringstream os; os << "Timeout while waiting for merge queue with " << sz << " items. Had " << count << " at timeout."; throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC); } - FastOS_Thread::Sleep(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -1491,8 +1506,7 @@ MergeThrottlerTest::sendAndExpectReply( _topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime); StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage( expectedReplyType)); - api::StorageReply& storageReply( - dynamic_cast<api::StorageReply&>(*reply)); + auto& storageReply = dynamic_cast<api::StorageReply&>(*reply); CPPUNIT_ASSERT_EQUAL(expectedResultCode, storageReply.getResult().getResult()); } @@ -1561,6 +1575,45 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() } } +void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() { + _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); + + CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); + _throttlers[0]->apply_timed_backpressure(); + CPPUNIT_ASSERT(_throttlers[0]->backpressure_mode_active()); + document::BucketId bucket(16, 6789); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(), + api::MessageType::MERGEBUCKET_REPLY, + api::ReturnCode::BUSY); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds + // Backpressure has now been lifted. New merges should be forwarded + // to next node in chain as expected instead of being bounced with a reply. + sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); +} + +void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() { + _servers[2]->getClock().setAbsoluteTimeInSeconds(1000); + _throttlers[2]->apply_timed_backpressure(); + document::BucketId bucket(16, 6789); + + _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create()); + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); +} + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp new file mode 100644 index 00000000000..b726a24b6b6 --- /dev/null +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -0,0 +1,82 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/storageserver/service_layer_error_listener.h> +#include <vespa/storage/storageserver/mergethrottler.h> +#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vdstestlib/cppunit/dirconfig.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> + +namespace storage { + +class ServiceLayerErrorListenerTest : public CppUnit::TestFixture { +public: + CPPUNIT_TEST_SUITE(ServiceLayerErrorListenerTest); + CPPUNIT_TEST(shutdown_invoked_on_fatal_error); + CPPUNIT_TEST(merge_throttle_backpressure_invoked_on_resource_exhaustion_error); + CPPUNIT_TEST_SUITE_END(); + + void shutdown_invoked_on_fatal_error(); + void merge_throttle_backpressure_invoked_on_resource_exhaustion_error(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(ServiceLayerErrorListenerTest); + +namespace { + +class TestShutdownListener + : public framework::defaultimplementation::ShutdownListener +{ +public: + TestShutdownListener() : _reason() {} + + void requestShutdown(vespalib::stringref reason) override { + _reason = reason; + } + + bool shutdown_requested() const { return !_reason.empty(); } + const vespalib::string& reason() const { return _reason; } +private: + vespalib::string _reason; +}; + +struct Fixture { + vdstestlib::DirConfig config{getStandardConfig(true)}; + TestServiceLayerApp app; + ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; + MergeThrottler merge_throttler{config.getConfigId(), app.getComponentRegister()}; + TestShutdownListener shutdown_listener; + ServiceLayerErrorListener error_listener{component, merge_throttler}; + + ~Fixture(); +}; + +Fixture::~Fixture() {} + +} + +void ServiceLayerErrorListenerTest::shutdown_invoked_on_fatal_error() { + Fixture f; + + f.app.getComponentRegister().registerShutdownListener(f.shutdown_listener); + CPPUNIT_ASSERT(!f.shutdown_listener.shutdown_requested()); + + f.error_listener.on_fatal_error("eject! eject!"); + CPPUNIT_ASSERT(f.shutdown_listener.shutdown_requested()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason()); + + // Should only be invoked once + f.error_listener.on_fatal_error("here be dragons"); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason()); +} + +void ServiceLayerErrorListenerTest::merge_throttle_backpressure_invoked_on_resource_exhaustion_error() { + Fixture f; + + CPPUNIT_ASSERT(!f.merge_throttler.backpressure_mode_active()); + f.error_listener.on_resource_exhaustion_error("buy more RAM!"); + CPPUNIT_ASSERT(f.merge_throttler.backpressure_mode_active()); +} + +} diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index cf2226b3a3b..fbc29e0234b 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -41,6 +41,15 @@ node_reliability int default=1 restart max_merges_per_node int default=16 max_merge_queue_size int default=1024 +## If the persistence provider indicates that it has exhausted one or more +## of its internal resources during a mutating operation, new merges will +## be bounced for this duration. Not allowing further merges helps take +## load off the node while it e.g. compacts its data stores or memory in +## the background. +## Note: this does not affect merges where the current node is marked as +## "source only", as merges do not cause mutations on such nodes. +resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 + ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false restart diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 42ee585988f..1f2299bf46d 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -9,7 +9,7 @@ vespa_add_library(storage_spersistence OBJECT types.cpp mergehandler.cpp bucketprocessor.cpp - providershutdownwrapper.cpp + provider_error_wrapper.cpp bucketownershipnotifier.cpp fieldvisitor.cpp testandsethelper.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 1e74c957b01..b46217f6443 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -37,9 +37,9 @@ FileStorManager(const config::ConfigUri & configUri, _component(compReg, "filestormanager"), _partitions(partitions), _providerCore(provider), - _providerShutdown(_providerCore, _component), + _providerErrorWrapper(_providerCore), _nodeUpInLastNodeStateSeenByProvider(false), - _providerMetric(new spi::MetricPersistenceProvider(_providerShutdown)), + _providerMetric(new spi::MetricPersistenceProvider(_providerErrorWrapper)), _provider(_providerMetric.get()), _bucketIdFactory(_component.getBucketIdFactory()), _configUri(configUri), diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 82ea5ecba7f..05b9b9bc430 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -23,7 +23,7 @@ #include <vespa/config-stor-filestor.h> #include <vespa/storage/persistence/diskthread.h> -#include <vespa/storage/persistence/providershutdownwrapper.h> +#include <vespa/storage/persistence/provider_error_wrapper.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> @@ -55,7 +55,7 @@ class FileStorManager : public StorageLinkQueued, ServiceLayerComponent _component; const spi::PartitionStateList& _partitions; spi::PersistenceProvider& _providerCore; - ProviderShutdownWrapper _providerShutdown; + ProviderErrorWrapper _providerErrorWrapper; bool _nodeUpInLastNodeStateSeenByProvider; spi::MetricPersistenceProvider::UP _providerMetric; spi::PersistenceProvider* _provider; @@ -118,6 +118,9 @@ public: spi::PersistenceProvider& getPersistenceProvider() { return *_provider; } + ProviderErrorWrapper& error_wrapper() noexcept { + return _providerErrorWrapper; + } void handleNewState() override; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h index fe09758e842..d4dadf94184 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h @@ -95,10 +95,10 @@ struct FileStorThreadMetrics : public metrics::MetricSet metrics::LongCountMetric bytesMerged; metrics::LongCountMetric getBucketDiffReply; metrics::LongCountMetric applyBucketDiffReply; - metrics::LongAverageMetric mergeLatencyTotal; - metrics::LongAverageMetric mergeMetadataReadLatency; - metrics::LongAverageMetric mergeDataReadLatency; - metrics::LongAverageMetric mergeDataWriteLatency; + metrics::DoubleAverageMetric mergeLatencyTotal; + metrics::DoubleAverageMetric mergeMetadataReadLatency; + metrics::DoubleAverageMetric mergeDataReadLatency; + metrics::DoubleAverageMetric mergeDataWriteLatency; metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; metrics::LongAverageMetric batchingSize; diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index f545d673032..218d2f7dd23 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -7,7 +7,7 @@ #include "mergehandler.h" #include "diskmoveoperationhandler.h" #include "persistenceutil.h" -#include "providershutdownwrapper.h" +#include "provider_error_wrapper.h" #include <vespa/storage/common/storagecomponent.h> #include <vespa/storage/common/statusmessages.h> diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 1a2f9620e65..80873829064 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -1,77 +1,80 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "providershutdownwrapper.h" +#include "provider_error_wrapper.h" #include "persistenceutil.h" -#include <vespa/log/log.h> - -LOG_SETUP(".persistence.shutdownwrapper"); namespace storage { template <typename ResultType> ResultType -ProviderShutdownWrapper::checkResult(ResultType&& result) const +ProviderErrorWrapper::checkResult(ResultType&& result) const { if (result.getErrorCode() == spi::Result::FATAL_ERROR) { - vespalib::LockGuard guard(_shutdownLock); - if (_shutdownTriggered) { - LOG(debug, - "Received FATAL_ERROR from persistence provider: %s. " - "Node has already been instructed to shut down so " - "not doing anything now.", - result.getErrorMessage().c_str()); - } else { - LOG(info, - "Received FATAL_ERROR from persistence provider, " - "shutting down node: %s", - result.getErrorMessage().c_str()); - const_cast<ProviderShutdownWrapper*>(this)-> - _component.requestShutdown(result.getErrorMessage()); - _shutdownTriggered = true; - } + trigger_shutdown_listeners(result.getErrorMessage()); + } else if (result.getErrorCode() == spi::Result::RESOURCE_EXHAUSTED) { + trigger_resource_exhaustion_listeners(result.getErrorMessage()); } - return std::move(result); + return std::forward<ResultType>(result); +} + +void ProviderErrorWrapper::trigger_shutdown_listeners(vespalib::stringref reason) const { + std::lock_guard<std::mutex> guard(_mutex); + for (auto& listener : _listeners) { + listener->on_fatal_error(reason); + } +} + +void ProviderErrorWrapper::trigger_resource_exhaustion_listeners(vespalib::stringref reason) const { + std::lock_guard<std::mutex> guard(_mutex); + for (auto& listener : _listeners) { + listener->on_resource_exhaustion_error(reason); + } +} + +void ProviderErrorWrapper::register_error_listener(std::shared_ptr<ProviderErrorListener> listener) { + std::lock_guard<std::mutex> guard(_mutex); + _listeners.emplace_back(std::move(listener)); } spi::Result -ProviderShutdownWrapper::initialize() +ProviderErrorWrapper::initialize() { return checkResult(_impl.initialize()); } spi::PartitionStateListResult -ProviderShutdownWrapper::getPartitionStates() const +ProviderErrorWrapper::getPartitionStates() const { return checkResult(_impl.getPartitionStates()); } spi::BucketIdListResult -ProviderShutdownWrapper::listBuckets(spi::PartitionId partitionId) const +ProviderErrorWrapper::listBuckets(spi::PartitionId partitionId) const { return checkResult(_impl.listBuckets(partitionId)); } spi::Result -ProviderShutdownWrapper::setClusterState(const spi::ClusterState& state) +ProviderErrorWrapper::setClusterState(const spi::ClusterState& state) { return checkResult(_impl.setClusterState(state)); } spi::Result -ProviderShutdownWrapper::setActiveState(const spi::Bucket& bucket, +ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) { return checkResult(_impl.setActiveState(bucket, newState)); } spi::BucketInfoResult -ProviderShutdownWrapper::getBucketInfo(const spi::Bucket& bucket) const +ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const { return checkResult(_impl.getBucketInfo(bucket)); } spi::Result -ProviderShutdownWrapper::put(const spi::Bucket& bucket, +ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, const spi::DocumentSP& doc, spi::Context& context) @@ -80,7 +83,7 @@ ProviderShutdownWrapper::put(const spi::Bucket& bucket, } spi::RemoveResult -ProviderShutdownWrapper::remove(const spi::Bucket& bucket, +ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) @@ -89,7 +92,7 @@ ProviderShutdownWrapper::remove(const spi::Bucket& bucket, } spi::RemoveResult -ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket, +ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) @@ -98,7 +101,7 @@ ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket, } spi::UpdateResult -ProviderShutdownWrapper::update(const spi::Bucket& bucket, +ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, const spi::DocumentUpdateSP& docUpdate, spi::Context& context) @@ -107,7 +110,7 @@ ProviderShutdownWrapper::update(const spi::Bucket& bucket, } spi::GetResult -ProviderShutdownWrapper::get(const spi::Bucket& bucket, +ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, const document::DocumentId& docId, spi::Context& context) const @@ -116,13 +119,13 @@ ProviderShutdownWrapper::get(const spi::Bucket& bucket, } spi::Result -ProviderShutdownWrapper::flush(const spi::Bucket& bucket, spi::Context& context) +ProviderErrorWrapper::flush(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.flush(bucket, context)); } spi::CreateIteratorResult -ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket, +ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, const spi::Selection& selection, spi::IncludedVersions versions, @@ -132,7 +135,7 @@ ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket, } spi::IterateResult -ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId, +ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize, spi::Context& context) const { @@ -140,41 +143,41 @@ ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId, } spi::Result -ProviderShutdownWrapper::destroyIterator(spi::IteratorId iteratorId, +ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& context) { return checkResult(_impl.destroyIterator(iteratorId, context)); } spi::Result -ProviderShutdownWrapper::createBucket(const spi::Bucket& bucket, +ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.createBucket(bucket, context)); } spi::Result -ProviderShutdownWrapper::deleteBucket(const spi::Bucket& bucket, +ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.deleteBucket(bucket, context)); } spi::BucketIdListResult -ProviderShutdownWrapper::getModifiedBuckets() const +ProviderErrorWrapper::getModifiedBuckets() const { return checkResult(_impl.getModifiedBuckets()); } spi::Result -ProviderShutdownWrapper::maintain(const spi::Bucket& bucket, +ProviderErrorWrapper::maintain(const spi::Bucket& bucket, spi::MaintenanceLevel level) { return checkResult(_impl.maintain(bucket, level)); } spi::Result -ProviderShutdownWrapper::split(const spi::Bucket& source, +ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context& context) @@ -183,7 +186,7 @@ ProviderShutdownWrapper::split(const spi::Bucket& source, } spi::Result -ProviderShutdownWrapper::join(const spi::Bucket& source1, +ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context& context) { @@ -191,14 +194,14 @@ ProviderShutdownWrapper::join(const spi::Bucket& source1, } spi::Result -ProviderShutdownWrapper::move(const spi::Bucket& source, +ProviderErrorWrapper::move(const spi::Bucket& source, spi::PartitionId target, spi::Context& context) { return checkResult(_impl.move(source, target, context)); } spi::Result -ProviderShutdownWrapper::removeEntry(const spi::Bucket& bucket, +ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context) { return checkResult(_impl.removeEntry(bucket, ts, context)); diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 0700bcdcda2..84adf37cbc3 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -1,34 +1,43 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * \class storage::ProviderShutdownWrapper + * \class storage::ProviderErrorWrapper * * \brief Utility class which forwards all calls to the real persistence * provider implementation, transparently checking the result of each - * operation to see if the result is FATAL_ERROR. If so, it initiates a - * shutdown of the process (but still returns the response up to the caller - * as if it were just a non-wrapped call). + * operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED. * + * If FATAL_ERROR or RESOURCE_EXHAUSTED is observed, the wrapper will invoke any + * and all resource exhaustion listeners synchronously, before returning the response + * to the caller as usual. */ #pragma once +#include <vespa/persistence/spi/persistenceprovider.h> #include <vector> #include <string> -#include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/util/sync.h> +#include <memory> +#include <mutex> namespace storage { class ServiceLayerComponent; -class ProviderShutdownWrapper : public spi::PersistenceProvider -{ +class ProviderErrorListener { public: - ProviderShutdownWrapper(spi::PersistenceProvider& impl, - ServiceLayerComponent& component) + virtual ~ProviderErrorListener() = default; + virtual void on_fatal_error(vespalib::stringref message) { + (void)message; + } + virtual void on_resource_exhaustion_error(vespalib::stringref message) { + (void)message; + } +}; + +class ProviderErrorWrapper : public spi::PersistenceProvider { +public: + explicit ProviderErrorWrapper(spi::PersistenceProvider& impl) : _impl(impl), - _component(component), - _shutdownLock(), - _shutdownTriggered(false) + _mutex() { } @@ -63,19 +72,18 @@ public: const spi::PersistenceProvider& getProviderImplementation() const { return _impl; } + + void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); private: - /** - * Check whether result has a FATAL_ERROR return code and invoke - * requestShutdown with its error string if so. Will const_cast - * internally since it calls non-const on _component. - */ template <typename ResultType> ResultType checkResult(ResultType&& result) const; + void trigger_shutdown_listeners(vespalib::stringref reason) const; + void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const; + spi::PersistenceProvider& _impl; - ServiceLayerComponent& _component; - vespalib::Lock _shutdownLock; - mutable bool _shutdownTriggered; + std::vector<std::shared_ptr<ProviderErrorListener>> _listeners; + mutable std::mutex _mutex; }; } // storage diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 4b8905eb6d7..2fd3fe43a57 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -23,6 +23,7 @@ vespa_add_library(storage_storageserver statereporter.cpp storagemetricsset.cpp changedbucketownershiphandler.cpp + service_layer_error_listener.cpp INSTALL lib64 DEPENDS storage diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index d9a14fc7bb0..483992559e7 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -16,8 +16,7 @@ namespace storage { namespace { -struct NodeComparator -{ +struct NodeComparator { bool operator()(const api::MergeBucketCommand::Node& a, const api::MergeBucketCommand::Node& b) const { @@ -28,8 +27,7 @@ struct NodeComparator // Class used to sneakily get around IThrottlePolicy only accepting // messagebus objects template <typename Base> -class DummyMbusMessage : public Base -{ +class DummyMbusMessage : public Base { private: static const mbus::string NAME; public: @@ -70,6 +68,7 @@ MergeThrottler::ChainedMergeState::~ChainedMergeState() {} MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) : metrics::MetricSet("mergethrottler", "", "", owner), averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this), + bounced_due_to_back_pressure("bounced_due_to_back_pressure", "", "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) { } @@ -200,6 +199,8 @@ MergeThrottler::MergeThrottler( _component(compReg, "mergethrottler"), _thread(), _rendezvous(RENDEZVOUS_NONE), + _throttle_until_time(), + _backpressure_duration(std::chrono::seconds(30)), _closing(false) { _throttlePolicy->setMaxPendingCount(20); @@ -215,12 +216,13 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ vespalib::LockGuard lock(_stateLock); if (newConfig->maxMergesPerNode < 1) { - throw config::InvalidConfigException( - "Cannot have a max merge count of less than 1"); + throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } if (newConfig->maxMergeQueueSize < 0) { - throw config::InvalidConfigException( - "Max merge queue size cannot be less than 0"); + throw config::InvalidConfigException("Max merge queue size cannot be less than 0"); + } + if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { + throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } if (static_cast<double>(newConfig->maxMergesPerNode) != _throttlePolicy->getMaxPendingCount()) @@ -232,6 +234,8 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ LOG(debug, "Setting new max queue size to %d", newConfig->maxMergeQueueSize); _maxQueueSize = newConfig->maxMergeQueueSize; + _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( + std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); } MergeThrottler::~MergeThrottler() @@ -276,9 +280,9 @@ MergeThrottler::onClose() LOG(debug, "onClose; active: %" PRIu64 ", queued: %" PRIu64, _merges.size(), _queue.size()); } - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(&_messageLock); - _thread.reset(0); + _thread.reset(); } } @@ -408,8 +412,7 @@ MergeThrottler::enqueueMerge( MessageGuard& msgGuard) { LOG(spam, "Enqueuing %s", msg->toString().c_str()); - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex()); if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; @@ -427,8 +430,7 @@ MergeThrottler::canProcessNewMerge() const bool MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const { - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); return _merges.find(mergeCmd.getBucketId()) != _merges.end(); } @@ -441,8 +443,7 @@ MergeThrottler::rejectMergeIfOutdated( // Only reject merge commands! never reject replies (for obvious reasons..) assert(msg->getType() == api::MessageType::MERGEBUCKET); - const api::MergeBucketCommand& cmd( - static_cast<const api::MergeBucketCommand&>(*msg)); + auto& cmd = static_cast<const api::MergeBucketCommand&>(*msg); if (cmd.getClusterStateVersion() == 0 || cmd.getClusterStateVersion() >= rejectLessThanVersion) @@ -455,9 +456,7 @@ MergeThrottler::rejectMergeIfOutdated( << ", storage node has version " << rejectLessThanVersion; sendReply(cmd, - api::ReturnCode( - api::ReturnCode::WRONG_DISTRIBUTION, - oss.str()), + api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, oss.str()), msgGuard, _metrics->chaining); LOG(debug, "Immediately rejected %s, due to it having state version < %u", cmd.toString().c_str(), rejectLessThanVersion); @@ -654,6 +653,49 @@ MergeThrottler::run(framework::ThreadHandle& thread) LOG(debug, "Returning from MergeThrottler working thread"); } +bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const { + if (_throttle_until_time.time_since_epoch().count() == 0) { + return false; + } + if (merge_has_this_node_as_source_only_node(cmd)) { + return false; + } + if (backpressure_mode_active_no_lock()) { + return true; + } + // Avoid sampling the clock when it can't do anything useful. + _throttle_until_time = std::chrono::steady_clock::time_point{}; + return false; +} + +bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const { + auto self_is_source_only = [self = _component.getIndex()](auto& node) { + return (node.index == self) && node.sourceOnly; + }; + return std::any_of(cmd.getNodes().begin(), cmd.getNodes().end(), self_is_source_only); +} + +bool MergeThrottler::backpressure_mode_active_no_lock() const { + return (_component.getClock().getMonotonicTime() < _throttle_until_time); +} + +void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) { + sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY, + "Node is throttling merges due to resource exhaustion"), + guard, _metrics->local); + _metrics->bounced_due_to_back_pressure.inc(); +} + +void MergeThrottler::apply_timed_backpressure() { + vespalib::LockGuard lock(_stateLock); + _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration; +} + +bool MergeThrottler::backpressure_mode_active() const { + vespalib::LockGuard lock(_stateLock); + return backpressure_mode_active_no_lock(); +} + // Must be run from worker thread void MergeThrottler::handleMessageDown( @@ -661,22 +703,24 @@ MergeThrottler::handleMessageDown( MessageGuard& msgGuard) { if (msg->getType() == api::MessageType::MERGEBUCKET) { - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); - uint32_t ourVersion( - _component.getStateUpdater().getSystemState()->getVersion()); + uint32_t ourVersion = _component.getStateUpdater().getSystemState()->getVersion(); if (mergeCmd.getClusterStateVersion() > ourVersion) { LOG(debug, "Merge %s with newer cluster state than us arrived", mergeCmd.toString().c_str()); - rejectOutdatedQueuedMerges( - msgGuard, mergeCmd.getClusterStateVersion()); + rejectOutdatedQueuedMerges(msgGuard, mergeCmd.getClusterStateVersion()); } else if (rejectMergeIfOutdated(msg, ourVersion, msgGuard)) { // Skip merge entirely return; } + if (merge_is_backpressure_throttled(mergeCmd)) { + bounce_backpressure_throttled_merge(mergeCmd, msgGuard); + return; + } + if (isMergeAlreadyKnown(msg)) { processCycledMergeCommand(msg, msgGuard); } else if (canProcessNewMerge()) { @@ -686,13 +730,9 @@ MergeThrottler::handleMessageDown( } else { // No more room at the inn. Return BUSY so that the // distributor will wait a bit before retrying - LOG(debug, "Queue is full; busy-returning %s", - mergeCmd.toString().c_str()); - sendReply(mergeCmd, - api::ReturnCode(api::ReturnCode::BUSY, - "Merge queue is full"), - msgGuard, - _metrics->local); + LOG(debug, "Queue is full; busy-returning %s", mergeCmd.toString().c_str()); + sendReply(mergeCmd, api::ReturnCode(api::ReturnCode::BUSY, "Merge queue is full"), + msgGuard, _metrics->local); } } else { assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY); @@ -707,8 +747,7 @@ MergeThrottler::handleMessageUp( MessageGuard& msgGuard) { assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY); - const api::MergeBucketReply& mergeReply - = static_cast<const api::MergeBucketReply&>(*msg); + auto& mergeReply = static_cast<const api::MergeBucketReply&>(*msg); LOG(debug, "Processing %s from persistence layer", mergeReply.toString().c_str()); @@ -741,9 +780,7 @@ MergeThrottler::validateNewMerge( << _component.getIndex() << ", which is not in its forwarding chain"; LOG(error, "%s", oss.str().c_str()); - } else if (mergeCmd.getChain().size() - >= nodeSeq.getSortedNodes().size()) - { + } else if (mergeCmd.getChain().size() >= nodeSeq.getSortedNodes().size()) { // Chain is full but we haven't seen the merge! This means // the node has probably gone down with a merge it previously // forwarded only now coming back to haunt it. @@ -774,8 +811,7 @@ MergeThrottler::processNewMergeCommand( const api::StorageMessage::SP& msg, MessageGuard& msgGuard) { - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex()); @@ -788,7 +824,7 @@ MergeThrottler::processNewMergeCommand( // Register the merge now so that it will contribute to filling up our // merge throttling window. assert(_merges.find(mergeCmd.getBucketId()) == _merges.end()); - ActiveMergeMap::iterator state = _merges.insert( + auto state = _merges.insert( std::make_pair(mergeCmd.getBucketId(), ChainedMergeState(msg))).first; @@ -864,13 +900,11 @@ MergeThrottler::processCycledMergeCommand( // aborted, in which case we have to immediately send an abortion reply // so the cycle can be unwound. - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex()); - ActiveMergeMap::iterator mergeIter( - _merges.find(mergeCmd.getBucketId())); + auto mergeIter = _merges.find(mergeCmd.getBucketId()); assert(mergeIter != _merges.end()); if (mergeIter->second.isAborted()) { @@ -921,11 +955,9 @@ MergeThrottler::processMergeReply( bool fromPersistenceLayer, MessageGuard& msgGuard) { - const api::MergeBucketReply& mergeReply - = dynamic_cast<const api::MergeBucketReply&>(*msg); + auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg); - ActiveMergeMap::iterator mergeIter( - _merges.find(mergeReply.getBucketId())); + auto mergeIter = _merges.find(mergeReply.getBucketId()); if (mergeIter == _merges.end()) { LOG(warning, "Received %s, which has no command mapped " "for it. Cannot send chained reply!", @@ -1035,7 +1067,7 @@ MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg) return true; } else if (isDiffCommand(*msg)) { vespalib::LockGuard lock(_stateLock); - api::StorageCommand& cmd(static_cast<api::StorageCommand&>(*msg)); + auto& cmd = static_cast<api::StorageCommand&>(*msg); if (bucketIsUnknownOrAborted(cmd.getBucketId())) { sendUp(makeAbortReply(cmd, "no state recorded for bucket in merge " "throttler, source merge probably aborted earlier")); @@ -1067,7 +1099,7 @@ MergeThrottler::isMergeReply(const api::StorageMessage& msg) const bool MergeThrottler::bucketIsUnknownOrAborted(const document::BucketId& bucket) const { - ActiveMergeMap::const_iterator it(_merges.find(bucket)); + auto it = _merges.find(bucket); if (it == _merges.end()) { return true; } @@ -1089,8 +1121,7 @@ bool MergeThrottler::onUp(const std::shared_ptr<api::StorageMessage>& msg) { if (isMergeReply(*msg)) { - const api::MergeBucketReply& mergeReply - = dynamic_cast<const api::MergeBucketReply&>(*msg); + auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg); LOG(spam, "Received %s from persistence layer", mergeReply.toString().c_str()); @@ -1126,8 +1157,7 @@ MergeThrottler::releaseWorkerThreadRendezvous(vespalib::MonitorGuard& guard) } } -class ThreadRendezvousGuard -{ +class ThreadRendezvousGuard { MergeThrottler& _throttler; vespalib::MonitorGuard& _guard; public: @@ -1237,23 +1267,22 @@ MergeThrottler::reportHtmlStatus(std::ostream& out, out << "<h3>Active merges (" << _merges.size() << ")</h3>\n"; - ActiveMergeMap::const_iterator end = _merges.end(); if (!_merges.empty()) { out << "<ul>\n"; - for (ActiveMergeMap::const_iterator i = _merges.begin(); i != end; ++i) { - out << "<li>" << i->second.getMergeCmdString(); - if (i->second.isExecutingLocally()) { + for (auto& m : _merges) { + out << "<li>" << m.second.getMergeCmdString(); + if (m.second.isExecutingLocally()) { out << " <strong>("; - if (i->second.isInCycle()) { + if (m.second.isInCycle()) { out << "cycled - "; - } else if (i->second.isCycleBroken()) { + } else if (m.second.isCycleBroken()) { out << "broken cycle (another node in the chain likely went down) - "; } out << "executing on this node)</strong>"; - } else if (i->second.isUnwinding()) { + } else if (m.second.isUnwinding()) { out << " <strong>(was executed here, now unwinding)</strong>"; } - if (i->second.isAborted()) { + if (m.second.isAborted()) { out << " <strong>aborted</strong>"; } out << "</li>\n"; @@ -1268,14 +1297,13 @@ MergeThrottler::reportHtmlStatus(std::ostream& out, out << "<h3>Queued merges (in priority order) (" << _queue.size() << ")</h3>\n"; - MergePriorityQueue::const_iterator end = _queue.end(); if (!_queue.empty()) { out << "<ol>\n"; - for (MergePriorityQueue::const_iterator i = _queue.begin(); i != end; ++i) { + for (auto& qm : _queue) { // The queue always owns its messages, thus this is safe out << "<li>Pri " - << static_cast<unsigned int>(i->_msg->getPriority()) - << ": " << *i->_msg; + << static_cast<unsigned int>(qm._msg->getPriority()) + << ": " << *qm._msg; out << "</li>\n"; } out << "</ol>\n"; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index a93dab4e24e..070c2ef07c4 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -18,6 +18,7 @@ #include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/metrics/metrics.h> #include <vespa/config/config.h> +#include <chrono> namespace storage { @@ -29,8 +30,7 @@ class MergeThrottler : public framework::Runnable, private config::IFetcherCallback<vespa::config::content::core::StorServerConfig> { public: - class MergeFailureMetrics : public metrics::MetricSet - { + class MergeFailureMetrics : public metrics::MetricSet { public: metrics::SumMetric<metrics::LongCountMetric> sum; metrics::LongCountMetric notready; @@ -47,8 +47,7 @@ public: ~MergeFailureMetrics(); }; - class MergeOperationMetrics : public metrics::MetricSet - { + class MergeOperationMetrics : public metrics::MetricSet { public: metrics::LongCountMetric ok; MergeFailureMetrics failures; @@ -57,10 +56,10 @@ public: ~MergeOperationMetrics(); }; - class Metrics : public metrics::MetricSet - { + class Metrics : public metrics::MetricSet { public: metrics::DoubleAverageMetric averageQueueWaitingTime; + metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -71,8 +70,7 @@ public: private: // TODO: make PQ with stable ordering into own, generic class template <class MessageType> - struct StablePriorityOrderingWrapper - { + struct StablePriorityOrderingWrapper { MessageType _msg; metrics::MetricTimer _startTimer; uint64_t _sequence; @@ -95,8 +93,7 @@ private: } }; - struct ChainedMergeState - { + struct ChainedMergeState { api::StorageMessage::SP _cmd; std::string _cmdString; // For being able to print message even when we don't own it uint64_t _clusterStateVersion; @@ -167,6 +164,8 @@ private: StorageComponent _component; framework::Thread::UP _thread; RendezvousState _rendezvous; + mutable std::chrono::steady_clock::time_point _throttle_until_time; + std::chrono::steady_clock::duration _backpressure_duration; bool _closing; public: /** @@ -174,7 +173,7 @@ public: * than 1 as their window size. */ MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); - ~MergeThrottler(); + ~MergeThrottler() override; /** Implements document::Runnable::run */ void run(framework::ThreadHandle&) override; @@ -187,6 +186,16 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override; + /* + * When invoked, merges to the node will be BUSY-bounced by the throttler + * for a configurable period of time instead of being processed. + * + * Thread safe, but must not be called if _stateLock is already held, or + * deadlock will occur. + */ + void apply_timed_backpressure(); + bool backpressure_mode_active() const; + // For unit testing only const ActiveMergeMap& getActiveMerges() const { return _merges; } // For unit testing only @@ -206,8 +215,7 @@ private: friend class ThreadRendezvousGuard; // impl in .cpp file // Simple helper class for centralizing chaining logic - struct MergeNodeSequence - { + struct MergeNodeSequence { const api::MergeBucketCommand& _cmd; std::vector<api::MergeBucketCommand::Node> _sortedNodes; std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence @@ -328,6 +336,11 @@ private: */ bool canProcessNewMerge() const; + bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const; + void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard); + bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; + bool backpressure_mode_active_no_lock() const; + void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, MessageGuard& msgGuard, diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp new file mode 100644 index 00000000000..41177fe46b8 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "service_layer_error_listener.h" +#include <vespa/storage/common/storagecomponent.h> +#include <vespa/storage/storageserver/mergethrottler.h> + +#include <vespa/log/log.h> +LOG_SETUP(".node.errorlistener"); + +namespace storage { + +void ServiceLayerErrorListener::on_fatal_error(vespalib::stringref message) { + bool expected = false; + if (_shutdown_initiated.compare_exchange_strong(expected, true)) { + LOG(info, + "Received FATAL_ERROR from persistence provider, " + "shutting down node: %s", + message.c_str()); + _component.requestShutdown(message); // Thread safe + } else { + LOG(debug, + "Received FATAL_ERROR from persistence provider: %s. " + "Node has already been instructed to shut down so " + "not doing anything now.", + message.c_str()); + } +} + +void ServiceLayerErrorListener::on_resource_exhaustion_error(vespalib::stringref message) { + LOG(debug, "SPI reports resource exhaustion ('%s'). " + "Applying back-pressure to merge throttler", + message.c_str()); + _merge_throttler.apply_timed_backpressure(); // Thread safe +} + +} diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h new file mode 100644 index 00000000000..6995459e333 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/storage/persistence/provider_error_wrapper.h> +#include <atomic> + +namespace storage { + +class StorageComponent; +class MergeThrottler; + +/* + * Listener implementation for SPI errors that require action beyond simply + * responding to the command that generated them. + * + * - Fatal errors will trigger a process shutdown. + * - Resource exhaustion errors will trigger merge back-pressure. + */ +class ServiceLayerErrorListener : public ProviderErrorListener { + StorageComponent& _component; + MergeThrottler& _merge_throttler; + std::atomic<bool> _shutdown_initiated; +public: + ServiceLayerErrorListener(StorageComponent& component, + MergeThrottler& merge_throttler) + : _component(component), + _merge_throttler(merge_throttler), + _shutdown_initiated(false) + {} + + void on_fatal_error(vespalib::stringref message) override; + void on_resource_exhaustion_error(vespalib::stringref message) override; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 4f87b9af6f2..79d57a8f7e4 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -9,6 +9,7 @@ #include "opslogger.h" #include "statemanager.h" #include "priorityconverter.h" +#include "service_layer_error_listener.h" #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> @@ -16,6 +17,7 @@ #include <vespa/storage/bucketmover/bucketmover.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> +#include <vespa/storage/persistence/provider_error_wrapper.h> #include <vespa/persistence/spi/exceptions.h> #include <vespa/messagebus/rpcmessagebus.h> @@ -35,7 +37,7 @@ ServiceLayerNode::ServiceLayerNode( _persistenceProvider(persistenceProvider), _partitions(0), _externalVisitors(externalVisitors), - _fileStorManager(0), + _fileStorManager(nullptr), _init_has_been_called(false), _noUsablePartitionMode(false) { @@ -112,7 +114,7 @@ void ServiceLayerNode::removeConfigSubscriptions() { StorageNode::removeConfigSubscriptions(); - _configFetcher.reset(0); + _configFetcher.reset(); } void @@ -166,7 +168,7 @@ ServiceLayerNode::initializeNodeSpecific() void ServiceLayerNode::handleLiveConfigUpdate() { - if (_newServerConfig.get() != 0) { + if (_newServerConfig) { bool updated = false; vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig); vespa::config::content::core::StorServerConfig& newC(*_newServerConfig); @@ -218,9 +220,11 @@ ServiceLayerNode::configure( // updates { vespalib::LockGuard configLockGuard(_configLock); - _newDevicesConfig.reset(config.release()); + _newDevicesConfig = std::move(config); + } + if (_distributionConfig) { + handleLiveConfigUpdate(); } - if (_distributionConfig.get() != 0) handleLiveConfigUpdate(); } VisitorMessageSession::UP @@ -259,7 +263,8 @@ ServiceLayerNode::createChain() return chain; } chain->push_back(StorageLink::UP(new OpsLogger(compReg, _configUri))); - chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg))); + auto* merge_throttler = new MergeThrottler(_configUri, compReg); + chain->push_back(StorageLink::UP(merge_throttler)); chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg))); chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg))); chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); @@ -277,6 +282,12 @@ ServiceLayerNode::createChain() _configUri, _partitions, _persistenceProvider, _context.getComponentRegister()))); chain->push_back(StorageLink::UP(releaseStateManager().release())); + + // Lifetimes of all referenced components shall outlive the last call going + // through the SPI, as queues are flushed and worker threads joined when + // the storage link chain is closed prior to destruction. + auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); + _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); return chain; } diff --git a/vespaclient-java/pom.xml b/vespaclient-java/pom.xml index 074b787025a..bb8ed9387fc 100644 --- a/vespaclient-java/pom.xml +++ b/vespaclient-java/pom.xml @@ -34,12 +34,17 @@ </dependency> <dependency> <groupId>com.yahoo.vespa</groupId> + <artifactId>container-dev</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> <artifactId>documentapi</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.yahoo.vespa</groupId> - <artifactId>container-dev</artifactId> + <artifactId>predicate-search-core</artifactId> <version>${project.version}</version> </dependency> <dependency> |