summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java2
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java2
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def19
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java20
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java116
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java100
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java143
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java174
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java5
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java9
-rw-r--r--parent/pom.xml3
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def2
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp34
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h29
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h25
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h7
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp191
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h71
37 files changed, 1034 insertions, 258 deletions
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index 3cd7a4634f0..2ff9b889a9e 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -77,7 +77,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
private List<String> zoneDnsSuffixes = List.of();
private int maxCompactBuffers = 1;
private boolean failDeploymentWithInvalidJvmOptions = false;
- private double tlsSizeFraction = 0.04;
+ private double tlsSizeFraction = 0.02;
@Override public ModelContext.FeatureFlags featureFlags() { return this; }
@Override public boolean multitenant() { return multitenant; }
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java
index 4c0c5ebc5b3..4295f0aa6ec 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java
@@ -1062,7 +1062,7 @@ public class ContentClusterTest extends ContentBaseTest {
var flavor = new Flavor(new FlavorsConfig.Flavor(new FlavorsConfig.Flavor.Builder().name("test").minDiskAvailableGb(100)));
assertEquals(21474836480L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.empty()));
assertEquals(21474836480L, resolveMaxTLSSize(OptionalDouble.of(0.02), Optional.empty()));
- assertEquals(4294967296L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.of(flavor)));
+ assertEquals(2147483648L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.of(flavor)));
assertEquals(2147483648L, resolveMaxTLSSize(OptionalDouble.of(0.02), Optional.of(flavor)));
assertEquals(3221225472L, resolveMaxTLSSize(OptionalDouble.of(0.03), Optional.of(flavor)));
}
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 66700eff3e6..c351e52b557 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -75,3 +75,22 @@ use_async_message_handling_on_schedule bool default=false restart
## the entire resource usage sample is immediately reported to the cluster controller (via host info).
## This config can be live updated (doesn't require restart).
resource_usage_reporter_noise_level double default=0.001
+
+## Specify throttling used for async persistence operations. This throttling takes place
+## before operations are dispatched to Proton and serves as a limiter for how many
+## operations may be in flight in Proton's internal queues.
+##
+## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but
+## has near zero overhead and never blocks.
+## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
+## is full (if a blocking throttler API call is invoked).
+##
+async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+
+## Specifies the extent the throttling window is increased by when the async throttle
+## policy has decided that more concurrent operations are desirable. Also affects the
+## _minimum_ size of the throttling window; its size is implicitly set to max(this config
+## value, number of threads).
+##
+## Only applies if async_operation_throttler_type == DYNAMIC.
+async_operation_dynamic_throttling_window_increment int default=20 restart
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
index 02ca4ce14c4..80194337daa 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
@@ -111,6 +111,8 @@ import java.util.stream.Collectors;
import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getFileReferencesOnDisk;
import static com.yahoo.vespa.config.server.tenant.TenantRepository.HOSTED_VESPA_TENANT;
@@ -737,16 +739,22 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
// ---------------- Convergence ----------------------------------------------------------------
- public HttpResponse checkServiceForConfigConvergence(ApplicationId applicationId, String hostAndPort, URI uri,
- Duration timeout, Optional<Version> vespaVersion) {
- return convergeChecker.getServiceConfigGenerationResponse(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout);
+ public ServiceResponse checkServiceForConfigConvergence(ApplicationId applicationId,
+ String hostAndPort,
+ Duration timeout,
+ Optional<Version> vespaVersion) {
+ return convergeChecker.getServiceConfigGeneration(getApplication(applicationId, vespaVersion), hostAndPort, timeout);
}
- public HttpResponse servicesToCheckForConfigConvergence(ApplicationId applicationId, URI uri,
- Duration timeoutPerService, Optional<Version> vespaVersion) {
- return convergeChecker.getServiceConfigGenerationsResponse(getApplication(applicationId, vespaVersion), uri, timeoutPerService);
+ public ServiceListResponse servicesToCheckForConfigConvergence(ApplicationId applicationId,
+ URI uri,
+ Duration timeoutPerService,
+ Optional<Version> vespaVersion) {
+ return convergeChecker.getServiceConfigGenerations(getApplication(applicationId, vespaVersion), uri, timeoutPerService);
}
+ public ConfigConvergenceChecker configConvergenceChecker() { return convergeChecker; }
+
// ---------------- Logs ----------------------------------------------------------------
public HttpResponse getLogs(ApplicationId applicationId, Optional<String> hostname, String apiParams) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index 24744a1b3b2..ad14cf4aab6 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -10,8 +10,6 @@ import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
-import com.yahoo.slime.Cursor;
-import com.yahoo.vespa.config.server.http.JSONResponse;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
@@ -92,27 +90,26 @@ public class ConfigConvergenceChecker extends AbstractComponent {
}
/** Check all services in given application. Returns the minimum current generation of all services */
- public JSONResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) {
+ public ServiceListResponse getServiceConfigGenerations(Application application, URI uri, Duration timeoutPerService) {
Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService);
long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1);
- return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(),
- currentGeneration);
+ return new ServiceListResponse(currentGenerations, uri, application.getApplicationGeneration(), currentGeneration);
}
/** Check service identified by host and port in given application */
- public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
+ public ServiceResponse getServiceConfigGeneration(Application application, String hostAndPortToCheck, Duration timeout) {
Long wantedGeneration = application.getApplicationGeneration();
try (CloseableHttpAsyncClient client = createHttpClient()) {
client.start();
if ( ! hostInApplication(application, hostAndPortToCheck))
- return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration);
+ return new ServiceResponse(ServiceResponse.Status.hostNotFound, wantedGeneration);
long currentGeneration = getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get();
boolean converged = currentGeneration >= wantedGeneration;
- return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged);
+ return new ServiceResponse(ServiceResponse.Status.ok, wantedGeneration, currentGeneration, converged);
} catch (InterruptedException | ExecutionException | CancellationException e) { // e.g. if we cannot connect to the service to find generation
- return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
+ return new ServiceResponse(ServiceResponse.Status.notFound, wantedGeneration, e.getMessage());
} catch (Exception e) {
- return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
+ return new ServiceResponse(ServiceResponse.Status.error, wantedGeneration, e.getMessage());
}
}
@@ -192,7 +189,7 @@ public class ConfigConvergenceChecker extends AbstractComponent {
return false;
}
- private static Optional<Integer> getStatePort(ServiceInfo service) {
+ public static Optional<Integer> getStatePort(ServiceInfo service) {
return service.getPorts().stream()
.filter(port -> port.getTags().contains("state"))
.map(PortInfo::getPort)
@@ -249,63 +246,70 @@ public class ConfigConvergenceChecker extends AbstractComponent {
.build();
}
- private static class ServiceListResponse extends JSONResponse {
-
- // Pre-condition: servicesToCheck has a state port
- private ServiceListResponse(int status, Map<ServiceInfo, Long> servicesToCheck, URI uri, long wantedGeneration,
- long currentGeneration) {
- super(status);
- Cursor serviceArray = object.setArray("services");
- servicesToCheck.forEach((service, generation) -> {
- Cursor serviceObject = serviceArray.addObject();
- String hostName = service.getHostName();
- int statePort = getStatePort(service).get();
- serviceObject.setString("host", hostName);
- serviceObject.setLong("port", statePort);
- serviceObject.setString("type", service.getServiceType());
- serviceObject.setString("url", uri.toString() + "/" + hostName + ":" + statePort);
- serviceObject.setLong("currentGeneration", generation);
- });
- object.setString("url", uri.toString());
- object.setLong("currentGeneration", currentGeneration);
- object.setLong("wantedGeneration", wantedGeneration);
- object.setBool("converged", currentGeneration >= wantedGeneration);
+ public static class ServiceResponse {
+
+ public enum Status { ok, notFound, hostNotFound, error }
+
+ public final Status status;
+ public final Long wantedGeneration;
+ public final Long currentGeneration;
+ public final boolean converged;
+ public final Optional<String> errorMessage;
+
+ public ServiceResponse(Status status, Long wantedGeneration) {
+ this(status, wantedGeneration, 0L);
}
- }
- private static class ServiceResponse extends JSONResponse {
+ public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration) {
+ this(status, wantedGeneration, currentGeneration, false);
+ }
- private ServiceResponse(int status, URI uri, String hostname, Long wantedGeneration) {
- super(status);
- object.setString("url", uri.toString());
- object.setString("host", hostname);
- object.setLong("wantedGeneration", wantedGeneration);
+ public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged) {
+ this(status, wantedGeneration, currentGeneration, converged, Optional.empty());
}
- static ServiceResponse createOkResponse(URI uri, String hostname, Long wantedGeneration, Long currentGeneration, boolean converged) {
- ServiceResponse serviceResponse = new ServiceResponse(200, uri, hostname, wantedGeneration);
- serviceResponse.object.setBool("converged", converged);
- serviceResponse.object.setLong("currentGeneration", currentGeneration);
- return serviceResponse;
+ public ServiceResponse(Status status, Long wantedGeneration, String errorMessage) {
+ this(status, wantedGeneration, 0L, false, Optional.ofNullable(errorMessage));
}
- static ServiceResponse createHostNotFoundInAppResponse(URI uri, String hostname, Long wantedGeneration) {
- ServiceResponse serviceResponse = new ServiceResponse(410, uri, hostname, wantedGeneration);
- serviceResponse.object.setString("problem", "Host:port (service) no longer part of application, refetch list of services.");
- return serviceResponse;
+ private ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged, Optional<String> errorMessage) {
+ this.status = status;
+ this.wantedGeneration = wantedGeneration;
+ this.currentGeneration = currentGeneration;
+ this.converged = converged;
+ this.errorMessage = errorMessage;
}
- static ServiceResponse createErrorResponse(URI uri, String hostname, Long wantedGeneration, String error) {
- ServiceResponse serviceResponse = new ServiceResponse(500, uri, hostname, wantedGeneration);
- serviceResponse.object.setString("error", error);
- return serviceResponse;
+ }
+
+ public static class ServiceListResponse {
+
+ public final List<Service> services = new ArrayList<>();
+ public final URI uri;
+ public final long wantedGeneration;
+ public final long currentGeneration;
+
+ public ServiceListResponse(Map<ServiceInfo, Long> services, URI uri, long wantedGeneration, long currentGeneration) {
+ services.forEach((key, value) -> this.services.add(new Service(key, value)));
+ this.uri = uri;
+ this.wantedGeneration = wantedGeneration;
+ this.currentGeneration = currentGeneration;
}
- static ServiceResponse createNotFoundResponse(URI uri, String hostname, Long wantedGeneration, String error) {
- ServiceResponse serviceResponse = new ServiceResponse(404, uri, hostname, wantedGeneration);
- serviceResponse.object.setString("error", error);
- return serviceResponse;
+ public List<Service> services() { return services; }
+
+ public static class Service {
+
+ public final ServiceInfo serviceInfo;
+ public final Long currentGeneration;
+
+ public Service(ServiceInfo serviceInfo, Long currentGeneration) {
+ this.serviceInfo = serviceInfo;
+ this.currentGeneration = currentGeneration;
+ }
+
}
+
}
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
index 4dda141491c..0131517818d 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.yahoo.component.Version;
import com.yahoo.config.application.api.ApplicationFile;
import com.yahoo.config.model.api.Model;
+import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.HostFilter;
import com.yahoo.config.provision.InstanceName;
@@ -16,13 +17,15 @@ import com.yahoo.jdisc.Response;
import com.yahoo.restapi.ErrorResponse;
import com.yahoo.restapi.MessageResponse;
import com.yahoo.restapi.Path;
+import com.yahoo.slime.Cursor;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.text.StringUtilities;
import com.yahoo.vespa.config.server.ApplicationRepository;
-import com.yahoo.vespa.config.server.application.ApplicationReindexing;
+import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker;
import com.yahoo.vespa.config.server.http.ContentHandler;
import com.yahoo.vespa.config.server.http.ContentRequest;
import com.yahoo.vespa.config.server.http.HttpHandler;
+import com.yahoo.vespa.config.server.http.JSONResponse;
import com.yahoo.vespa.config.server.http.NotFoundException;
import com.yahoo.vespa.config.server.http.v2.request.ApplicationContentRequest;
import com.yahoo.vespa.config.server.http.v2.response.ApplicationSuspendedResponse;
@@ -33,6 +36,7 @@ import com.yahoo.vespa.config.server.http.v2.response.ReindexingResponse;
import com.yahoo.vespa.config.server.tenant.Tenant;
import java.io.IOException;
+import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
@@ -43,8 +47,11 @@ import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse;
import static com.yahoo.yolean.Exceptions.uncheck;
+
/**
* Operations on applications (delete, wait for config convergence, restart, application content etc.)
*
@@ -108,13 +115,21 @@ public class ApplicationHandler extends HttpHandler {
}
private HttpResponse listServiceConverge(ApplicationId applicationId, HttpRequest request) {
- return applicationRepository.servicesToCheckForConfigConvergence(applicationId, request.getUri(),
- getTimeoutFromRequest(request), getVespaVersionFromRequest(request));
+ ServiceListResponse response =
+ applicationRepository.servicesToCheckForConfigConvergence(applicationId,
+ request.getUri(),
+ getTimeoutFromRequest(request),
+ getVespaVersionFromRequest(request));
+ return new HttpServiceListResponse(response);
}
private HttpResponse checkServiceConverge(ApplicationId applicationId, String hostAndPort, HttpRequest request) {
- return applicationRepository.checkServiceForConfigConvergence(applicationId, hostAndPort, request.getUri(),
- getTimeoutFromRequest(request), getVespaVersionFromRequest(request));
+ ServiceResponse response =
+ applicationRepository.checkServiceForConfigConvergence(applicationId,
+ hostAndPort,
+ getTimeoutFromRequest(request),
+ getVespaVersionFromRequest(request));
+ return HttpServiceResponse.createResponse(response, hostAndPort, request.getUri());
}
private HttpResponse serviceStatusPage(ApplicationId applicationId, String service, String hostname, String pathSuffix) {
@@ -301,4 +316,79 @@ public class ApplicationHandler extends HttpHandler {
.map(Version::fromString);
}
+ static class HttpServiceResponse extends JSONResponse {
+
+ public static HttpServiceResponse createResponse(ConfigConvergenceChecker.ServiceResponse serviceResponse, String hostAndPort, URI uri) {
+ switch (serviceResponse.status) {
+ case ok:
+ return createOkResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.currentGeneration, serviceResponse.converged);
+ case hostNotFound:
+ return createHostNotFoundInAppResponse(uri, hostAndPort, serviceResponse.wantedGeneration);
+ case notFound:
+ return createNotFoundResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.errorMessage.orElse(""));
+ case error:
+ return createErrorResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.errorMessage.orElse(""));
+ default:
+ throw new IllegalArgumentException("Unknown status " + serviceResponse.status);
+ }
+ }
+
+ private HttpServiceResponse(int status, URI uri, String hostname, Long wantedGeneration) {
+ super(status);
+ object.setString("url", uri.toString());
+ object.setString("host", hostname);
+ object.setLong("wantedGeneration", wantedGeneration);
+ }
+
+ private static HttpServiceResponse createOkResponse(URI uri, String hostname, Long wantedGeneration, Long currentGeneration, boolean converged) {
+ HttpServiceResponse serviceResponse = new HttpServiceResponse(200, uri, hostname, wantedGeneration);
+ serviceResponse.object.setBool("converged", converged);
+ serviceResponse.object.setLong("currentGeneration", currentGeneration);
+ return serviceResponse;
+ }
+
+ private static HttpServiceResponse createHostNotFoundInAppResponse(URI uri, String hostname, Long wantedGeneration) {
+ HttpServiceResponse serviceResponse = new HttpServiceResponse(410, uri, hostname, wantedGeneration);
+ serviceResponse.object.setString("problem", "Host:port (service) no longer part of application, refetch list of services.");
+ return serviceResponse;
+ }
+
+ private static HttpServiceResponse createErrorResponse(URI uri, String hostname, Long wantedGeneration, String error) {
+ HttpServiceResponse serviceResponse = new HttpServiceResponse(500, uri, hostname, wantedGeneration);
+ serviceResponse.object.setString("error", error);
+ return serviceResponse;
+ }
+
+ private static HttpServiceResponse createNotFoundResponse(URI uri, String hostname, Long wantedGeneration, String error) {
+ HttpServiceResponse serviceResponse = new HttpServiceResponse(404, uri, hostname, wantedGeneration);
+ serviceResponse.object.setString("error", error);
+ return serviceResponse;
+ }
+
+ }
+
+ static class HttpServiceListResponse extends JSONResponse {
+
+ // Pre-condition: servicesToCheck has a state port
+ public HttpServiceListResponse(ConfigConvergenceChecker.ServiceListResponse response) {
+ super(200);
+ Cursor serviceArray = object.setArray("services");
+ response.services().forEach((service) -> {
+ ServiceInfo serviceInfo = service.serviceInfo;
+ Cursor serviceObject = serviceArray.addObject();
+ String hostName = serviceInfo.getHostName();
+ int statePort = ConfigConvergenceChecker.getStatePort(serviceInfo).get();
+ serviceObject.setString("host", hostName);
+ serviceObject.setLong("port", statePort);
+ serviceObject.setString("type", serviceInfo.getServiceType());
+ serviceObject.setString("url", response.uri.toString() + "/" + hostName + ":" + statePort);
+ serviceObject.setLong("currentGeneration", service.currentGeneration);
+ });
+ object.setString("url", response.uri.toString());
+ object.setLong("currentGeneration", response.currentGeneration);
+ object.setLong("wantedGeneration", response.wantedGeneration);
+ object.setBool("converged", response.currentGeneration >= response.wantedGeneration);
+ }
+ }
+
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
index 8b21e9c3916..6afb9ef086d 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
@@ -8,7 +8,6 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.TenantName;
-import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.vespa.config.server.ServerCache;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import org.junit.Before;
@@ -16,22 +15,20 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
-import java.util.function.Consumer;
+import java.util.List;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
-import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals;
-import static org.assertj.core.api.Assertions.assertThat;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* @author Ulf Lilleengen
@@ -72,63 +69,35 @@ public class ConfigConvergenceCheckerTest {
@Test
public void service_convergence() {
{ // Known service
- String serviceName = hostAndPort(this.service);
- URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName);
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}")));
- HttpResponse serviceResponse = checker.getServiceConfigGenerationResponse(application, hostAndPort(this.service), requestUrl, clientTimeout);
- assertResponse("{\n" +
- " \"url\": \"" + requestUrl.toString() + "\",\n" +
- " \"host\": \"" + hostAndPort(this.service) + "\",\n" +
- " \"wantedGeneration\": 3,\n" +
- " \"converged\": true,\n" +
- " \"currentGeneration\": 3\n" +
- "}",
- 200,
- serviceResponse);
+
+ ServiceResponse response = checker.getServiceConfigGeneration(application, hostAndPort(this.service), clientTimeout);
+ assertEquals(3, response.wantedGeneration.longValue());
+ assertEquals(3, response.currentGeneration.longValue());
+ assertTrue(response.converged);
+ assertEquals(ServiceResponse.Status.ok, response.status);
}
{ // Missing service
- String serviceName = "notPresent:1337";
- URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName);
- HttpResponse response = checker.getServiceConfigGenerationResponse(application, "notPresent:1337", requestUrl, clientTimeout);
- assertResponse("{\n" +
- " \"url\": \"" + requestUrl.toString() + "\",\n" +
- " \"host\": \"" + serviceName + "\",\n" +
- " \"wantedGeneration\": 3,\n" +
- " \"problem\": \"Host:port (service) no longer part of application, refetch list of services.\"\n" +
- "}",
- 410,
- response);
+ ServiceResponse response = checker.getServiceConfigGeneration(application, "notPresent:1337", clientTimeout);
+ assertEquals(3, response.wantedGeneration.longValue());
+ assertEquals(ServiceResponse.Status.hostNotFound, response.status);
}
}
@Test
public void service_list_convergence() {
{
- String serviceName = hostAndPort(this.service);
URI requestUrl = testServer().resolve("/serviceconverge");
- URI serviceUrl = testServer().resolve("/serviceconverge/" + serviceName);
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}")));
- HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout);
- assertResponse("{\n" +
- " \"services\": [\n" +
- " {\n" +
- " \"host\": \"" + serviceUrl.getHost() + "\",\n" +
- " \"port\": " + serviceUrl.getPort() + ",\n" +
- " \"type\": \"container\",\n" +
- " \"url\": \"" + serviceUrl.toString() + "\",\n" +
- " \"currentGeneration\":" + 3 + "\n" +
- " }\n" +
- " ],\n" +
- " \"url\": \"" + requestUrl.toString() + "\",\n" +
- " \"currentGeneration\": 3,\n" +
- " \"wantedGeneration\": 3,\n" +
- " \"converged\": true\n" +
- "}",
- 200,
- response);
- }
+ ServiceListResponse response = checker.getServiceConfigGenerations(application, requestUrl, clientTimeout);
+ assertEquals(3, response.wantedGeneration);
+ assertEquals(3, response.currentGeneration);
+ List<ServiceListResponse.Service> services = response.services;
+ assertEquals(1, services.size());
+ assertService(this.service, services.get(0), 3);
+ }
{ // Model with two hosts on different generations
MockModel model = new MockModel(Arrays.asList(
@@ -143,53 +112,29 @@ public class ConfigConvergenceCheckerTest {
wireMock2.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}")));
URI requestUrl = testServer().resolve("/serviceconverge");
- URI serviceUrl = testServer().resolve("/serviceconverge/" + hostAndPort(service));
- URI serviceUrl2 = testServer().resolve("/serviceconverge/" + hostAndPort(service2));
- HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout);
- assertResponse("{\n" +
- " \"services\": [\n" +
- " {\n" +
- " \"host\": \"" + service.getHost() + "\",\n" +
- " \"port\": " + service.getPort() + ",\n" +
- " \"type\": \"container\",\n" +
- " \"url\": \"" + serviceUrl.toString() + "\",\n" +
- " \"currentGeneration\":" + 4 + "\n" +
- " },\n" +
- " {\n" +
- " \"host\": \"" + service2.getHost() + "\",\n" +
- " \"port\": " + service2.getPort() + ",\n" +
- " \"type\": \"container\",\n" +
- " \"url\": \"" + serviceUrl2.toString() + "\",\n" +
- " \"currentGeneration\":" + 3 + "\n" +
- " }\n" +
- " ],\n" +
- " \"url\": \"" + requestUrl.toString() + "\",\n" +
- " \"currentGeneration\": 3,\n" +
- " \"wantedGeneration\": 4,\n" +
- " \"converged\": false\n" +
- "}",
- 200,
- response);
+
+ ServiceListResponse response = checker.getServiceConfigGenerations(application, requestUrl, clientTimeout);
+ assertEquals(4, response.wantedGeneration);
+ assertEquals(3, response.currentGeneration);
+
+ List<ServiceListResponse.Service> services = response.services;
+ assertEquals(2, services.size());
+ assertService(this.service, services.get(0), 4);
+ assertService(this.service2, services.get(1), 3);
}
}
+
@Test
public void service_convergence_timeout() {
- URI requestUrl = testServer().resolve("/serviceconverge");
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(aResponse()
.withFixedDelay((int) clientTimeout.plus(Duration.ofSeconds(1)).toMillis())
.withBody("response too slow")));
- HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1));
- // Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here
- assertResponse(
- responseBody ->
- assertThat(responseBody)
- .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
- "\",\"wantedGeneration\":3,\"error\":\"")
- .contains("java.net.SocketTimeoutException: 1 MILLISECONDS")
- .endsWith("\"}"),
- 404,
- response);
+ ServiceResponse response = checker.getServiceConfigGeneration(application, hostAndPort(service), Duration.ofMillis(1));
+
+ assertEquals(3, response.wantedGeneration.longValue());
+ assertEquals(ServiceResponse.Status.notFound, response.status);
+ assertTrue(response.errorMessage.get().contains("java.net.SocketTimeoutException: 1 MILLISECONDS"));
}
private URI testServer() {
@@ -204,19 +149,11 @@ public class ConfigConvergenceCheckerTest {
return uri.getHost() + ":" + uri.getPort();
}
- private static void assertResponse(String expectedJson, int status, HttpResponse response) {
- assertResponse((responseBody) -> assertJsonEquals(new String(responseBody.getBytes()), expectedJson), status, response);
- }
-
- private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) {
- ByteArrayOutputStream responseBody = new ByteArrayOutputStream();
- try {
- response.render(responseBody);
- assertFunc.accept(responseBody.toString());
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- assertEquals(status, response.getStatus());
+ private void assertService(URI uri, ServiceListResponse.Service service1, long expectedGeneration) {
+ assertEquals(expectedGeneration, service1.currentGeneration.longValue());
+ assertEquals(uri.getHost(), service1.serviceInfo.getHostName());
+ assertEquals(uri.getPort(), ConfigConvergenceChecker.getStatePort(service1.serviceInfo).get().intValue());
+ assertEquals("container", service1.serviceInfo.getServiceType());
}
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
index 0b8bf8e84bd..04483e0191d 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
@@ -4,6 +4,8 @@ package com.yahoo.vespa.config.server.http.v2;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
import com.yahoo.config.model.api.ModelFactory;
+import com.yahoo.config.model.api.PortInfo;
+import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.InstanceName;
@@ -50,14 +52,18 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static com.yahoo.container.jdisc.HttpRequest.createTestRequest;
@@ -65,8 +71,12 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse;
+import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse;
import static com.yahoo.vespa.config.server.http.HandlerTest.assertHttpStatusCodeAndMessage;
import static com.yahoo.vespa.config.server.http.SessionHandlerTest.getRenderedString;
+import static com.yahoo.vespa.config.server.http.v2.ApplicationHandler.HttpServiceListResponse;
+import static com.yahoo.vespa.config.server.http.v2.ApplicationHandler.HttpServiceResponse.createResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -146,7 +156,7 @@ public class ApplicationHandlerTest {
Tenant mytenant = applicationRepository.getTenant(applicationId);
deleteAndAssertOKResponse(mytenant, applicationId);
}
-
+
{
applicationRepository.deploy(testApp, prepareParams(applicationId));
deleteAndAssertOKResponseMocked(applicationId, true);
@@ -539,6 +549,143 @@ public class ApplicationHandlerTest {
"}\n");
}
+ @Test
+ public void service_convergence() {
+ String hostAndPort = "localhost:1234";
+ URI uri = URI.create("https://" + hostAndPort + "/serviceconvergence/container");
+
+ { // Known service
+ HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.ok,
+ 3L,
+ 3L,
+ true),
+ hostAndPort,
+ uri);
+ assertResponse("{\n" +
+ " \"url\": \"" + uri.toString() + "\",\n" +
+ " \"host\": \"" + hostAndPort + "\",\n" +
+ " \"wantedGeneration\": 3,\n" +
+ " \"converged\": true,\n" +
+ " \"currentGeneration\": 3\n" +
+ "}",
+ 200,
+ response);
+ }
+
+ { // Missing service
+ HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.hostNotFound,
+ 3L),
+ hostAndPort,
+ uri);
+
+ assertResponse("{\n" +
+ " \"url\": \"" + uri.toString() + "\",\n" +
+ " \"host\": \"" + hostAndPort + "\",\n" +
+ " \"wantedGeneration\": 3,\n" +
+ " \"problem\": \"Host:port (service) no longer part of application, refetch list of services.\"\n" +
+ "}",
+ 410,
+ response);
+ }
+ }
+
+ @Test
+ public void service_list_convergence() {
+ URI requestUrl = URI.create("https://configserver/serviceconvergence");
+
+ String hostname = "localhost";
+ int port = 1234;
+ String hostAndPort = hostname + ":" + port;
+ URI serviceUrl = URI.create("https://configserver/serviceconvergence/" + hostAndPort);
+
+ {
+ HttpServiceListResponse response =
+ new HttpServiceListResponse(new ServiceListResponse(Map.of(createServiceInfo(hostname, port), 3L),
+ requestUrl,
+ 3L,
+ 3L));
+ assertResponse("{\n" +
+ " \"services\": [\n" +
+ " {\n" +
+ " \"host\": \"" + hostname + "\",\n" +
+ " \"port\": " + port + ",\n" +
+ " \"type\": \"container\",\n" +
+ " \"url\": \"" + serviceUrl.toString() + "\",\n" +
+ " \"currentGeneration\":" + 3 + "\n" +
+ " }\n" +
+ " ],\n" +
+ " \"url\": \"" + requestUrl.toString() + "\",\n" +
+ " \"currentGeneration\": 3,\n" +
+ " \"wantedGeneration\": 3,\n" +
+ " \"converged\": true\n" +
+ "}",
+ 200,
+ response);
+ }
+
+ { // Two hosts on different generations
+ String hostname2 = "localhost2";
+ int port2 = 5678;
+ String hostAndPort2 = hostname2 + ":" + port2;
+ URI serviceUrl2 = URI.create("https://configserver/serviceconvergence/" + hostAndPort2);
+
+ Map<ServiceInfo, Long> serviceInfos = new HashMap<>();
+ serviceInfos.put(createServiceInfo(hostname, port), 4L);
+ serviceInfos.put(createServiceInfo(hostname2, port2), 3L);
+
+ HttpServiceListResponse response =
+ new HttpServiceListResponse(new ServiceListResponse(serviceInfos,
+ requestUrl,
+ 4L,
+ 3L));
+ assertResponse("{\n" +
+ " \"services\": [\n" +
+ " {\n" +
+ " \"host\": \"" + hostname + "\",\n" +
+ " \"port\": " + port + ",\n" +
+ " \"type\": \"container\",\n" +
+ " \"url\": \"" + serviceUrl.toString() + "\",\n" +
+ " \"currentGeneration\":" + 4 + "\n" +
+ " },\n" +
+ " {\n" +
+ " \"host\": \"" + hostname2 + "\",\n" +
+ " \"port\": " + port2 + ",\n" +
+ " \"type\": \"container\",\n" +
+ " \"url\": \"" + serviceUrl2.toString() + "\",\n" +
+ " \"currentGeneration\":" + 3 + "\n" +
+ " }\n" +
+ " ],\n" +
+ " \"url\": \"" + requestUrl.toString() + "\",\n" +
+ " \"currentGeneration\": 3,\n" +
+ " \"wantedGeneration\": 4,\n" +
+ " \"converged\": false\n" +
+ "}",
+ 200,
+ response);
+ }
+ }
+
+ @Test
+ public void service_convergence_timeout() {
+ String hostAndPort = "localhost:1234";
+ URI uri = URI.create("https://" + hostAndPort + "/serviceconvergence/container");
+
+ HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.notFound,
+ 3L,
+ "some error message"),
+ hostAndPort,
+ uri);
+
+ assertResponse("{\n" +
+ " \"url\": \"" + uri.toString() + "\",\n" +
+ " \"host\": \"" + hostAndPort + "\",\n" +
+ " \"wantedGeneration\": 3,\n" +
+ " \"error\": \"some error message\"" +
+ "}",
+ 404,
+ response);
+ }
+
private void assertNotAllowed(Method method) throws IOException {
String url = "http://myhost:14000/application/v2/tenant/" + mytenantName + "/application/default";
deleteAndAssertResponse(url, Response.Status.METHOD_NOT_ALLOWED, HttpErrorResponse.ErrorCode.METHOD_NOT_ALLOWED, "{\"error-code\":\"METHOD_NOT_ALLOWED\",\"message\":\"Method '" + method + "' is not supported\"}",
@@ -668,4 +815,29 @@ public class ApplicationHandlerTest {
return new PrepareParams.Builder().applicationId(applicationId).build();
}
+ private static void assertResponse(String expectedJson, int status, HttpResponse response) {
+ assertResponse((responseBody) -> assertJsonEquals(new String(responseBody
+ .getBytes()), expectedJson), status, response);
+ }
+
+ private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) {
+ ByteArrayOutputStream responseBody = new ByteArrayOutputStream();
+ try {
+ response.render(responseBody);
+ assertFunc.accept(responseBody.toString());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ assertEquals(status, response.getStatus());
+ }
+
+ private ServiceInfo createServiceInfo(String hostname, int port) {
+ return new ServiceInfo("container",
+ "container",
+ List.of(new PortInfo(port, List.of("state"))),
+ Map.of(),
+ "configId",
+ hostname);
+ }
+
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java
index 99ab6d420cb..8d5851be62f 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java
@@ -34,7 +34,7 @@ public class SystemUpgrader extends InfrastructureUpgrader<VespaVersionTarget> {
@Override
protected void upgrade(VespaVersionTarget target, SystemApplication application, ZoneApi zone) {
- log.info(Text.format("Deploying %s version %s in %s", application.id(), target, zone.getId()));
+ log.info(Text.format("Deploying %s on %s in %s", application.id(), target, zone.getId()));
controller().applications().deploy(application, zone.getId(), target.version(), target.downgrade());
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java
index fd5603b96b8..78d6d0ebf29 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java
@@ -30,4 +30,9 @@ public class VespaVersionTarget implements VersionTarget {
return downgrade;
}
+ @Override
+ public String toString() {
+ return "vespa version target " + version.toFullString() + (downgrade ? " (downgrade)" : "");
+ }
+
}
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index be245788ac1..32b65ac8efb 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -53,7 +53,7 @@ public class Flags {
ZONE_ID, APPLICATION_ID);
public static final UnboundDoubleFlag TLS_SIZE_FRACTION = defineDoubleFlag(
- "tls-size-fraction", 0.04,
+ "tls-size-fraction", 0.02,
List.of("baldersheim"), "2021-12-20", "2022-02-01",
"Fraction of disk available for transaction log",
"Takes effect at redeployment",
@@ -414,6 +414,13 @@ public class Flags {
"Takes effect on restart of Docker container",
ZONE_ID, APPLICATION_ID);
+ public static final UnboundStringFlag ZOOKEEPER_SNAPSHOT_METHOD = defineStringFlag(
+ "zookeeper-snapshot-method", "",
+ List.of("hmusum"), "2022-01-11", "2022-02-11",
+ "ZooKeeper snapshot method. Valid values are '', 'gz' and 'snappy'",
+ "Takes effect on Docker container restart",
+ ZONE_ID, APPLICATION_ID, NODE_TYPE);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/parent/pom.xml b/parent/pom.xml
index 4558267fe9f..7ab4e2ba57e 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -796,7 +796,7 @@
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
- <version>2.1.8</version>
+ <version>${hdrhistogram.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -894,6 +894,7 @@
<commons.codec.version>1.15</commons.codec.version>
<commons.math3.version>3.6.1</commons.math3.version>
<gson.version>2.8.9</gson.version>
+ <hdrhistogram.version>2.1.12</hdrhistogram.version>
<jna.version>5.9.0</jna.version>
<junit.version>5.8.1</junit.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 8a3550ac00b..62b9f6416af 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -203,7 +203,7 @@ distribution.searchablecopies long default=1
## Control cache size in bytes.
## Postive numbers are absolute in bytes.
## Negative numbers are a percentage of memory.
-summary.cache.maxbytes long default=-5
+summary.cache.maxbytes long default=-4
## Include visits in the cache, if the visitoperation allows it.
## This will enable another separate cache of summary.cache.maxbytes size.
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 7b165e11b66..fb8120210c1 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
+ shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
index a5dd3d929db..8caa84977ce 100644
--- a/storage/src/tests/persistence/active_operations_stats_test.cpp
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats()
auto lock0 = filestorHandler->getNextMessage(stripeId);
auto lock1 = filestorHandler->getNextMessage(stripeId);
auto lock2 = filestorHandler->getNextMessage(stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock1.first);
- ASSERT_FALSE(lock2.first);
+ ASSERT_TRUE(lock0.lock);
+ ASSERT_TRUE(lock1.lock);
+ ASSERT_FALSE(lock2.lock);
auto stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("during");
assert_active_operations_stats(stats, 2, 2, 0);
}
EXPECT_EQ(3, stats.get_total_size());
- lock0.first.reset();
- lock1.first.reset();
+ lock0.lock.reset();
+ lock1.lock.reset();
stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("after");
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index cd496605a6c..939e7ae7b6a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -570,7 +570,7 @@ public:
void run() override {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
- if (msg.second.get()) {
+ if (msg.msg.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
std::this_thread::sleep_for(5ms);
@@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) {
std::this_thread::sleep_for(51ms);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
- if (lock.first.get()) {
- ASSERT_EQ(200, lock.second->getPriority());
+ if (lock.lock.get()) {
+ ASSERT_EQ(200, lock.msg->getPriority());
break;
}
}
@@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri,
{
EXPECT_TRUE(result.was_scheduled());
ASSERT_TRUE(result.has_async_message());
- EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+ EXPECT_EQ(exp_pri, result.async_message().msg->getPriority());
}
void
@@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_async_message(20, result);
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
- EXPECT_EQ(40, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
+ EXPECT_EQ(40, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
@@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_empty_async_message(result);
- EXPECT_EQ(20, get_next_message().second->getPriority());
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(20, get_next_message().msg->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
@@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
expect_async_message(40, result);
}
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 2557fa537f5..5f3836727dd 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
f.filestorHandler->schedule(createPut(5432, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
+ ASSERT_TRUE(lock0.lock.get());
EXPECT_EQ(document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId());
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
+ ASSERT_TRUE(lock1.lock.get());
EXPECT_EQ(document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId());
}
TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) {
@@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Even though we already have a lock on the bucket, Gets allow shared locking and we
// should therefore be able to get another lock.
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+ ASSERT_TRUE(lock1.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) {
@@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) {
@@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) {
@@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..0ad380937c7
--- /dev/null
+++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
@@ -0,0 +1,116 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/persistence/shared_operation_throttler.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <chrono>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage {
+
+using ThrottleToken = SharedOperationThrottler::Token;
+
+TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQ(throttler->current_window_size(), 0);
+}
+
+TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQ(throttler->current_window_size(), 1);
+}
+
+TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto window_filling_token = throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ {
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
+// TODO ideally we'd test that the dynamic throttler has a window size that is actually
+// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
+// it's not trivial to know how to do this reliably.
+
+}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index c737d2bed28..5e068236026 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -14,6 +14,7 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
+ shared_operation_throttler.cpp
simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 2dc5989e857..73ccc7f6085 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -7,10 +7,16 @@
namespace storage {
-ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
+ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op,
+ const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric)
: _result_handler(nullptr),
_state(std::move(state)),
_doc_id(std::move(doc_id)),
+ _throttle_token(std::move(throttle_token)),
_op(op),
_start_time(clock),
_latency_metric(latency_metric)
@@ -27,6 +33,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) no
}
double elapsed = _start_time.getElapsedTimeAsDouble();
_latency_metric.addValue(elapsed);
+ _throttle_token.reset();
_state->on_entry_complete(std::move(result), _doc_id, _op);
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 1037318aec6..8478cab4c17 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -2,6 +2,7 @@
#pragma once
+#include "shared_operation_throttler.h"
#include <vespa/document/base/documentid.h>
#include <vespa/metrics/valuemetric.h>
#include <vespa/persistence/spi/operationcomplete.h>
@@ -21,12 +22,17 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
const spi::ResultHandler* _result_handler;
std::shared_ptr<ApplyBucketDiffState> _state;
document::DocumentId _doc_id;
+ SharedOperationThrottler::Token _throttle_token;
const char* _op;
framework::MilliSecTimer _start_time;
metrics::DoubleAverageMetric& _latency_metric;
public:
- ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
- ~ApplyBucketDiffEntryComplete();
+ ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op, const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric);
+ ~ApplyBucketDiffEntryComplete() override;
void onComplete(std::unique_ptr<spi::Result> result) noexcept override;
void addResultHandler(const spi::ResultHandler* resultHandler) override;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index 62d1a80501a..2d137f87118 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(storage_filestorpersistence OBJECT
SOURCES
active_operations_metrics.cpp
active_operations_stats.cpp
+ filestorhandler.cpp
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
new file mode 100644
index 00000000000..c066277ec71
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -0,0 +1,8 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "filestorhandler.h"
+
+namespace storage {
+
+FileStorHandler::LockedMessage::~LockedMessage() = default;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index a980b5aa2e1..6f740ce2c28 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -16,6 +16,7 @@
#include <vespa/document/bucket/bucket.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/persistence/shared_operation_throttler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
namespace storage {
@@ -74,7 +75,29 @@ public:
[[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
};
- using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
+ struct LockedMessage {
+ std::shared_ptr<BucketLockInterface> lock;
+ std::shared_ptr<api::StorageMessage> msg;
+ SharedOperationThrottler::Token throttle_token;
+
+ LockedMessage() noexcept = default;
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token()
+ {}
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_,
+ SharedOperationThrottler::Token token) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token(std::move(token))
+ {}
+ LockedMessage(LockedMessage&&) noexcept = default;
+ ~LockedMessage();
+ };
+
class ScheduleAsyncResult {
private:
bool _was_scheduled;
@@ -90,7 +113,7 @@ public:
return _was_scheduled;
}
bool has_async_message() const {
- return _async_message.first.get() != nullptr;
+ return _async_message.lock.get() != nullptr;
}
const LockedMessage& async_message() const {
return _async_message;
@@ -250,6 +273,8 @@ public:
virtual std::string dumpQueue() const = 0;
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
+
+ virtual SharedOperationThrottler& operation_throttler() const noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c6991803b4d..5e0ea0359dc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,16 +40,18 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg)
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
- ServiceLayerComponentRegister& compReg)
+ ServiceLayerComponentRegister& compReg,
+ std::unique_ptr<SharedOperationThrottler> operation_throttler)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
+ _operation_throttler(std::move(operation_throttler)),
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
@@ -330,6 +332,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
std::lock_guard lockGuard(_mergeStatesLock);
_metrics->pendingMerges.addValue(_mergeStates.size());
_metrics->queueSize.addValue(getQueueSize());
+ _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size());
for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime;
@@ -885,10 +888,39 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_active_operations_stats()
{}
+namespace {
+
+bool
+operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept
+{
+ // Note: SetBucketState is intentionally _not_ included in this set, even though it's
+ // dispatched async. The rationale behind this is that SetBucketState is very cheap
+ // to execute, usually comes in large waves (up to #buckets count) and processing all
+ // requests should complete as quickly as possible. We also don't want such waves to
+ // artificially boost the dynamic throttle window size due to a sudden throughput spike.
+ //
+ // Merge-related operations are transitively throttled by using the operation throttler
+ // directly for all async ops within the MergeHandler.
+ switch (type_id) {
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVELOCATION_ID:
+ case api::MessageType::CREATEBUCKET_ID:
+ case api::MessageType::DELETEBUCKET_ID:
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
+ SharedOperationThrottler::Token throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
@@ -896,15 +928,39 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
for (int attempt = 0; (attempt < 2) && !_owner.isPaused(); ++attempt) {
PriorityIdx& idx(bmi::get<1>(*_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
+ bool was_throttled = false;
- while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
+ while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
iter++;
}
if (iter != end) {
- return getMessage(guard, idx, iter);
+ const bool should_throttle_op = operation_type_should_be_throttled(iter->_command->getType().getId());
+ if (!should_throttle_op && throttle_token.valid()) {
+ throttle_token.reset(); // Let someone else play with it.
+ } else if (should_throttle_op && !throttle_token.valid()) {
+ // Important: _non-blocking_ attempt at getting a throttle token.
+ throttle_token = _owner.operation_throttler().try_acquire_one();
+ was_throttled = !throttle_token.valid();
+ }
+ if (!should_throttle_op || throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
if (attempt == 0) {
- _cond->wait_for(guard, timeout);
+ // Depending on whether we were blocked due to no usable ops in queue or throttling,
+ // wait for either the queue or throttler to (hopefully) have some fresh stuff for us.
+ if (!was_throttled) {
+ _cond->wait_for(guard, timeout);
+ } else {
+ // Have to release lock before doing a blocking throttle token fetch, since it
+ // prevents RPC threads from pushing onto the queue.
+ guard.unlock();
+ throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout);
+ if (!throttle_token.valid()) {
+ return {}; // Already exhausted our timeout window.
+ }
+ guard.lock();
+ }
}
}
return {}; // No message fetched.
@@ -923,14 +979,20 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
++iter;
}
if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) {
- return getMessage(guard, idx, iter);
+ // This is executed in the context of an RPC thread, so only do a _non-blocking_
+ // poll of the throttle policy.
+ auto throttle_token = _owner.operation_throttler().try_acquire_one();
+ if (throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
return {};
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
-
+FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token)
+{
std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
@@ -942,7 +1004,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
msg->getType().getId(), msg->getMsgId(),
msg->lockingRequirements());
guard.unlock();
- return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
+ return {std::move(locker), std::move(msg), std::move(throttle_token)};
} else {
std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
guard.unlock();
@@ -1014,7 +1076,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en
std::unique_lock guard(*_lock);
_queue->emplace_back(std::move(entry));
auto lockedMessage = get_next_async_message(guard);
- if ( ! lockedMessage.second) {
+ if ( ! lockedMessage.msg) {
if (guard.owns_lock()) {
guard.unlock();
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 5d68be8a800..c4b85ac596c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -42,11 +42,12 @@ class AbortBucketOperationsCommand;
namespace bmi = boost::multi_index;
-class FileStorHandlerImpl : private framework::MetricUpdateHook,
- private ResumeGuard::Callback,
- public FileStorHandler {
+class FileStorHandlerImpl final
+ : private framework::MetricUpdateHook,
+ private ResumeGuard::Callback,
+ public FileStorHandler
+{
public:
-
struct MessageEntry {
std::shared_ptr<api::StorageMessage> _command;
metrics::MetricTimer _timer;
@@ -147,7 +148,8 @@ public:
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
- PriorityIdx::iterator iter);
+ PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -163,7 +165,9 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
// TODO refactor, too many params
- BucketLock(const monitor_guard & guard, Stripe& disk, const document::Bucket &bucket,
+ BucketLock(const monitor_guard & guard,
+ Stripe& disk,
+ const document::Bucket &bucket,
uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
api::LockingRequirements lockReq);
~BucketLock() override;
@@ -187,9 +191,9 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&);
+ ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>);
- ~FileStorHandlerImpl();
+ ~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
void flush(bool killPendingMerges) override;
@@ -239,6 +243,10 @@ public:
ResumeGuard pause() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
+ SharedOperationThrottler& operation_throttler() const noexcept override {
+ return *_operation_throttler;
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -249,6 +257,7 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
+ std::unique_ptr<SharedOperationThrottler> _operation_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 2cfb3a2cffe..f5b9da0e1f5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -32,6 +32,7 @@ LOG_SETUP(".persistence.filestor.manager");
using std::shared_ptr;
using document::BucketSpace;
using vespalib::make_string_short::fmt;
+using vespa::config::content::StorFilestorConfig;
namespace {
@@ -130,18 +131,31 @@ uint32_t computeNumResponseThreads(int configured) {
}
vespalib::Executor::OptimizeFor
-selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerType sequencerType) {
+selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
switch (sequencerType) {
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
+ case StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
return vespalib::Executor::OptimizeFor::THROUGHPUT;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::LATENCY:
+ case StorFilestorConfig::ResponseSequencerType::LATENCY:
return vespalib::Executor::OptimizeFor::LATENCY;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
+ case StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
default:
return vespalib::Executor::OptimizeFor::ADAPTIVE;
}
}
+std::unique_ptr<SharedOperationThrottler>
+make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
+{
+ const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
+ if (use_dynamic_throttling) {
+ auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
+ auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
+ return SharedOperationThrottler::make_dynamic_throttler(win_size_increment);
+ } else {
+ return SharedOperationThrottler::make_unlimited_throttler();
+ }
+}
+
#ifdef __PIC__
#define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec")))
#else
@@ -185,7 +199,7 @@ FileStorManager::getThreadLocalHandler() {
* incoming during reconfiguration
*/
void
-FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config)
+FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
{
// If true, this is not the first configure.
bool liveUpdate = ! _threads.empty();
@@ -198,8 +212,10 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numThreads = _config->numThreads;
size_t numStripes = std::max(size_t(1u), numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
+ auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads);
- _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg);
+ _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics,
+ _compReg, std::move(operation_throttler));
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000,
selectSequencer(_config->responseSequencerType));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index c119fdc4f69..a98077da57a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -203,6 +203,7 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str
averageQueueWaitingTime("averagequeuewait.sum", {}, "Average time an operation spends in input queue.", this),
queueSize("queuesize", {}, "Size of input message queue.", this),
pendingMerges("pendingmerge", {}, "Number of buckets currently being merged.", this),
+ throttle_window_size("throttlewindowsize", {}, "Current size of async operation throttler window size", this),
waitingForLockHitRate("waitingforlockrate", {},
"Amount of times a filestor thread has needed to wait for "
"lock to take next message in queue.", this),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 7543e6e0771..d8135c9aeca 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -147,6 +147,7 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongAverageMetric queueSize;
metrics::LongAverageMetric pendingMerges;
+ metrics::LongAverageMetric throttle_window_size;
metrics::DoubleAverageMetric waitingForLockHitRate;
metrics::DoubleAverageMetric lockWaitTime; // unused
ActiveOperationsMetrics active_operations;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index b9739fcf734..7dcf4bcbee2 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -2,6 +2,7 @@
#include "mergehandler.h"
#include "persistenceutil.h"
+#include "shared_operation_throttler.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
@@ -32,6 +33,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _operation_throttler(_env._fileStorHandler.operation_throttler()),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
@@ -514,17 +516,22 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
+ auto throttle_token = _operation_throttler.blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId),
+ std::move(throttle_token), "put",
+ _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
} else {
std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids;
ids.emplace_back(timestamp, e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second,
+ std::move(throttle_token), "remove",
+ _clock, _env._metrics.merge_handler_metrics.remove_latency);
_spi.removeAsync(bucket, std::move(ids), context, std::move(complete));
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f52fe63bc2b..1007f35c241 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -34,6 +34,7 @@ namespace spi {
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
+class SharedOperationThrottler;
class MergeHandler : public Types,
public MergeBucketInfoSyncer {
@@ -52,7 +53,7 @@ public:
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
- ~MergeHandler();
+ ~MergeHandler() override;
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -86,6 +87,7 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 8b546771b71..c0c95ffd7af 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -158,12 +158,13 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
void
PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
- api::StorageMessage & msg(*lock.second);
+ LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get());
+ api::StorageMessage & msg(*lock.msg);
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
- auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second);
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler,
+ std::move(lock.lock), lock.msg, std::move(lock.throttle_token));
tracker = processMessage(msg, std::move(tracker));
if (tracker) {
tracker->sendReply();
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index f9da4d63d7f..499e9807cbf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -38,7 +38,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId));
- if (lock.first) {
+ if (lock.lock) {
_persistenceHandler.processLockedMessage(std::move(lock));
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index cbfc9463a8c..65eab99b8fb 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -31,19 +31,22 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
- : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg))
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
: _sendReply(true),
_updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
_msg(std::move(msg)),
+ _throttle_token(std::move(throttle_token)),
_context(_msg->getPriority(), _msg->getTrace().getLevel()),
_env(env),
_replySender(replySender),
@@ -56,7 +59,8 @@ MessageTracker::UP
MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
- return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg)));
+ return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
+ std::move(msg), SharedOperationThrottler::Token()));
}
void
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 4fd0e60c730..588cbef2170 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -30,7 +30,8 @@ public:
using UP = std::unique_ptr<MessageTracker>;
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
~MessageTracker();
@@ -91,7 +92,8 @@ public:
private:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -99,6 +101,7 @@ private:
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
std::shared_ptr<api::StorageMessage> _msg;
+ SharedOperationThrottler::Token _throttle_token;
spi::Context _context;
const PersistenceUtil &_env;
MessageSender &_replySender;
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
new file mode 100644
index 00000000000..b72b1a8ba28
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
@@ -0,0 +1,191 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_operation_throttler.h"
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/messagebus/message.h>
+#include <condition_variable>
+#include <cassert>
+#include <mutex>
+
+namespace storage {
+
+namespace {
+
+class NoLimitsOperationThrottler final : public SharedOperationThrottler {
+public:
+ ~NoLimitsOperationThrottler() override = default;
+ Token blocking_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token blocking_acquire_one(vespalib::duration) noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token try_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t waiting_threads() const noexcept override { return 0; }
+private:
+ void release_one() noexcept override { /* no-op */ }
+};
+
+// Class used to sneakily get around IThrottlePolicy only accepting MBus objects
+template <typename Base>
+class DummyMbusMessage final : public Base {
+ static const mbus::string NAME;
+public:
+ const mbus::string& getProtocol() const override { return NAME; }
+ uint32_t getType() const override { return 0x1badb007; }
+ uint8_t priority() const override { return 255; }
+};
+
+template <typename Base>
+const mbus::string DummyMbusMessage<Base>::NAME = "FooBar";
+
+class DynamicOperationThrottler final : public SharedOperationThrottler {
+ mutable std::mutex _mutex;
+ std::condition_variable _cond;
+ mbus::DynamicThrottlePolicy _throttle_policy;
+ uint32_t _pending_ops;
+ uint32_t _waiting_threads;
+public:
+ explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment);
+ ~DynamicOperationThrottler() override;
+
+ Token blocking_acquire_one() noexcept override;
+ Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token try_acquire_one() noexcept override;
+ uint32_t current_window_size() const noexcept override;
+ uint32_t waiting_threads() const noexcept override;
+private:
+ void release_one() noexcept override;
+};
+
+DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
+ : _mutex(),
+ _cond(),
+ _throttle_policy(static_cast<double>(min_size_and_window_increment)),
+ _pending_ops(0),
+ _waiting_threads(0)
+{
+}
+
+DynamicOperationThrottler::~DynamicOperationThrottler() = default;
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ _cond.wait(lock, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ if (!accepted) {
+ return Token();
+ }
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::try_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ return Token();
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+void
+DynamicOperationThrottler::release_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Reply> dummy_reply;
+ _throttle_policy.processReply(dummy_reply);
+ assert(_pending_ops > 0);
+ --_pending_ops;
+ if (_waiting_threads > 0) {
+ lock.unlock();
+ _cond.notify_one();
+ }
+}
+
+uint32_t
+DynamicOperationThrottler::current_window_size() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _throttle_policy.getMaxPendingCount(); // Actually returns current window size
+}
+
+uint32_t
+DynamicOperationThrottler::waiting_threads() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _waiting_threads;
+}
+
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_unlimited_throttler()
+{
+ return std::make_unique<NoLimitsOperationThrottler>();
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment)
+{
+ return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment);
+}
+
+DynamicOperationThrottler::Token::~Token()
+{
+ if (_throttler) {
+ _throttler->release_one();
+ }
+}
+
+void
+DynamicOperationThrottler::Token::reset() noexcept
+{
+ if (_throttler) {
+ _throttler->release_one();
+ _throttler = nullptr;
+ }
+}
+
+DynamicOperationThrottler::Token&
+DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
+{
+ reset();
+ _throttler = rhs._throttler;
+ rhs._throttler = nullptr;
+ return *this;
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
new file mode 100644
index 00000000000..2e1de86c4b8
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -0,0 +1,71 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+#include <optional>
+
+namespace storage {
+
+/**
+ * Operation throttler that is intended to provide global throttling of
+ * async operations across all persistence stripe threads. A throttler
+ * wraps a logical max pending window size of in-flight operations. Depending
+ * on the throttler implementation, the window size may expand and shrink
+ * dynamically. Exactly how and when this happens is unspecified.
+ *
+ * Offers both polling and (timed, non-timed) blocking calls for acquiring
+ * a throttle token. If the returned token is valid, the caller may proceed
+ * to invoke the asynchronous operation.
+ *
+ * The window slot taken up by a valid throttle token is implicitly freed up
+ * when the token is destroyed.
+ *
+ * All operations on the throttler are thread safe.
+ */
+class SharedOperationThrottler {
+protected:
+ struct TokenCtorTag {}; // Make available to subclasses for token construction.
+public:
+ class Token {
+ SharedOperationThrottler* _throttler;
+ public:
+ constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
+ constexpr Token() noexcept : _throttler(nullptr) {}
+ constexpr Token(Token&& rhs) noexcept
+ : _throttler(rhs._throttler)
+ {
+ rhs._throttler = nullptr;
+ }
+ Token& operator=(Token&& rhs) noexcept;
+ ~Token();
+
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
+ void reset() noexcept;
+ };
+
+ virtual ~SharedOperationThrottler() = default;
+
+ // All methods are thread safe
+ [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
+
+ // May return 0, in which case the window size is unlimited.
+ [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
+
+ // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
+private:
+ // Exclusively called from a valid Token. Thread safe.
+ virtual void release_one() noexcept = 0;
+};
+
+}