diff options
37 files changed, 1034 insertions, 258 deletions
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 3cd7a4634f0..2ff9b889a9e 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -77,7 +77,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea private List<String> zoneDnsSuffixes = List.of(); private int maxCompactBuffers = 1; private boolean failDeploymentWithInvalidJvmOptions = false; - private double tlsSizeFraction = 0.04; + private double tlsSizeFraction = 0.02; @Override public ModelContext.FeatureFlags featureFlags() { return this; } @Override public boolean multitenant() { return multitenant; } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java index 4c0c5ebc5b3..4295f0aa6ec 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/ContentClusterTest.java @@ -1062,7 +1062,7 @@ public class ContentClusterTest extends ContentBaseTest { var flavor = new Flavor(new FlavorsConfig.Flavor(new FlavorsConfig.Flavor.Builder().name("test").minDiskAvailableGb(100))); assertEquals(21474836480L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.empty())); assertEquals(21474836480L, resolveMaxTLSSize(OptionalDouble.of(0.02), Optional.empty())); - assertEquals(4294967296L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.of(flavor))); + assertEquals(2147483648L, resolveMaxTLSSize(OptionalDouble.empty(), Optional.of(flavor))); assertEquals(2147483648L, resolveMaxTLSSize(OptionalDouble.of(0.02), Optional.of(flavor))); assertEquals(3221225472L, resolveMaxTLSSize(OptionalDouble.of(0.03), Optional.of(flavor))); } diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 66700eff3e6..c351e52b557 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -75,3 +75,22 @@ use_async_message_handling_on_schedule bool default=false restart ## the entire resource usage sample is immediately reported to the cluster controller (via host info). ## This config can be live updated (doesn't require restart). resource_usage_reporter_noise_level double default=0.001 + +## Specify throttling used for async persistence operations. This throttling takes place +## before operations are dispatched to Proton and serves as a limiter for how many +## operations may be in flight in Proton's internal queues. +## +## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but +## has near zero overhead and never blocks. +## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window +## is full (if a blocking throttler API call is invoked). +## +async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart + +## Specifies the extent the throttling window is increased by when the async throttle +## policy has decided that more concurrent operations are desirable. Also affects the +## _minimum_ size of the throttling window; its size is implicitly set to max(this config +## value, number of threads). +## +## Only applies if async_operation_throttler_type == DYNAMIC. +async_operation_dynamic_throttling_window_increment int default=20 restart diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 02ca4ce14c4..80194337daa 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -111,6 +111,8 @@ import java.util.stream.Collectors; import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getFileReferencesOnDisk; import static com.yahoo.vespa.config.server.tenant.TenantRepository.HOSTED_VESPA_TENANT; @@ -737,16 +739,22 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye // ---------------- Convergence ---------------------------------------------------------------- - public HttpResponse checkServiceForConfigConvergence(ApplicationId applicationId, String hostAndPort, URI uri, - Duration timeout, Optional<Version> vespaVersion) { - return convergeChecker.getServiceConfigGenerationResponse(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout); + public ServiceResponse checkServiceForConfigConvergence(ApplicationId applicationId, + String hostAndPort, + Duration timeout, + Optional<Version> vespaVersion) { + return convergeChecker.getServiceConfigGeneration(getApplication(applicationId, vespaVersion), hostAndPort, timeout); } - public HttpResponse servicesToCheckForConfigConvergence(ApplicationId applicationId, URI uri, - Duration timeoutPerService, Optional<Version> vespaVersion) { - return convergeChecker.getServiceConfigGenerationsResponse(getApplication(applicationId, vespaVersion), uri, timeoutPerService); + public ServiceListResponse servicesToCheckForConfigConvergence(ApplicationId applicationId, + URI uri, + Duration timeoutPerService, + Optional<Version> vespaVersion) { + return convergeChecker.getServiceConfigGenerations(getApplication(applicationId, vespaVersion), uri, timeoutPerService); } + public ConfigConvergenceChecker configConvergenceChecker() { return convergeChecker; } + // ---------------- Logs ---------------------------------------------------------------- public HttpResponse getLogs(ApplicationId applicationId, Optional<String> hostname, String apiParams) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index 24744a1b3b2..ad14cf4aab6 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -10,8 +10,6 @@ import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.model.api.HostInfo; import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; -import com.yahoo.slime.Cursor; -import com.yahoo.vespa.config.server.http.JSONResponse; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; @@ -92,27 +90,26 @@ public class ConfigConvergenceChecker extends AbstractComponent { } /** Check all services in given application. Returns the minimum current generation of all services */ - public JSONResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) { + public ServiceListResponse getServiceConfigGenerations(Application application, URI uri, Duration timeoutPerService) { Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService); long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1); - return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(), - currentGeneration); + return new ServiceListResponse(currentGenerations, uri, application.getApplicationGeneration(), currentGeneration); } /** Check service identified by host and port in given application */ - public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { + public ServiceResponse getServiceConfigGeneration(Application application, String hostAndPortToCheck, Duration timeout) { Long wantedGeneration = application.getApplicationGeneration(); try (CloseableHttpAsyncClient client = createHttpClient()) { client.start(); if ( ! hostInApplication(application, hostAndPortToCheck)) - return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration); + return new ServiceResponse(ServiceResponse.Status.hostNotFound, wantedGeneration); long currentGeneration = getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get(); boolean converged = currentGeneration >= wantedGeneration; - return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged); + return new ServiceResponse(ServiceResponse.Status.ok, wantedGeneration, currentGeneration, converged); } catch (InterruptedException | ExecutionException | CancellationException e) { // e.g. if we cannot connect to the service to find generation - return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); + return new ServiceResponse(ServiceResponse.Status.notFound, wantedGeneration, e.getMessage()); } catch (Exception e) { - return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); + return new ServiceResponse(ServiceResponse.Status.error, wantedGeneration, e.getMessage()); } } @@ -192,7 +189,7 @@ public class ConfigConvergenceChecker extends AbstractComponent { return false; } - private static Optional<Integer> getStatePort(ServiceInfo service) { + public static Optional<Integer> getStatePort(ServiceInfo service) { return service.getPorts().stream() .filter(port -> port.getTags().contains("state")) .map(PortInfo::getPort) @@ -249,63 +246,70 @@ public class ConfigConvergenceChecker extends AbstractComponent { .build(); } - private static class ServiceListResponse extends JSONResponse { - - // Pre-condition: servicesToCheck has a state port - private ServiceListResponse(int status, Map<ServiceInfo, Long> servicesToCheck, URI uri, long wantedGeneration, - long currentGeneration) { - super(status); - Cursor serviceArray = object.setArray("services"); - servicesToCheck.forEach((service, generation) -> { - Cursor serviceObject = serviceArray.addObject(); - String hostName = service.getHostName(); - int statePort = getStatePort(service).get(); - serviceObject.setString("host", hostName); - serviceObject.setLong("port", statePort); - serviceObject.setString("type", service.getServiceType()); - serviceObject.setString("url", uri.toString() + "/" + hostName + ":" + statePort); - serviceObject.setLong("currentGeneration", generation); - }); - object.setString("url", uri.toString()); - object.setLong("currentGeneration", currentGeneration); - object.setLong("wantedGeneration", wantedGeneration); - object.setBool("converged", currentGeneration >= wantedGeneration); + public static class ServiceResponse { + + public enum Status { ok, notFound, hostNotFound, error } + + public final Status status; + public final Long wantedGeneration; + public final Long currentGeneration; + public final boolean converged; + public final Optional<String> errorMessage; + + public ServiceResponse(Status status, Long wantedGeneration) { + this(status, wantedGeneration, 0L); } - } - private static class ServiceResponse extends JSONResponse { + public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration) { + this(status, wantedGeneration, currentGeneration, false); + } - private ServiceResponse(int status, URI uri, String hostname, Long wantedGeneration) { - super(status); - object.setString("url", uri.toString()); - object.setString("host", hostname); - object.setLong("wantedGeneration", wantedGeneration); + public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged) { + this(status, wantedGeneration, currentGeneration, converged, Optional.empty()); } - static ServiceResponse createOkResponse(URI uri, String hostname, Long wantedGeneration, Long currentGeneration, boolean converged) { - ServiceResponse serviceResponse = new ServiceResponse(200, uri, hostname, wantedGeneration); - serviceResponse.object.setBool("converged", converged); - serviceResponse.object.setLong("currentGeneration", currentGeneration); - return serviceResponse; + public ServiceResponse(Status status, Long wantedGeneration, String errorMessage) { + this(status, wantedGeneration, 0L, false, Optional.ofNullable(errorMessage)); } - static ServiceResponse createHostNotFoundInAppResponse(URI uri, String hostname, Long wantedGeneration) { - ServiceResponse serviceResponse = new ServiceResponse(410, uri, hostname, wantedGeneration); - serviceResponse.object.setString("problem", "Host:port (service) no longer part of application, refetch list of services."); - return serviceResponse; + private ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged, Optional<String> errorMessage) { + this.status = status; + this.wantedGeneration = wantedGeneration; + this.currentGeneration = currentGeneration; + this.converged = converged; + this.errorMessage = errorMessage; } - static ServiceResponse createErrorResponse(URI uri, String hostname, Long wantedGeneration, String error) { - ServiceResponse serviceResponse = new ServiceResponse(500, uri, hostname, wantedGeneration); - serviceResponse.object.setString("error", error); - return serviceResponse; + } + + public static class ServiceListResponse { + + public final List<Service> services = new ArrayList<>(); + public final URI uri; + public final long wantedGeneration; + public final long currentGeneration; + + public ServiceListResponse(Map<ServiceInfo, Long> services, URI uri, long wantedGeneration, long currentGeneration) { + services.forEach((key, value) -> this.services.add(new Service(key, value))); + this.uri = uri; + this.wantedGeneration = wantedGeneration; + this.currentGeneration = currentGeneration; } - static ServiceResponse createNotFoundResponse(URI uri, String hostname, Long wantedGeneration, String error) { - ServiceResponse serviceResponse = new ServiceResponse(404, uri, hostname, wantedGeneration); - serviceResponse.object.setString("error", error); - return serviceResponse; + public List<Service> services() { return services; } + + public static class Service { + + public final ServiceInfo serviceInfo; + public final Long currentGeneration; + + public Service(ServiceInfo serviceInfo, Long currentGeneration) { + this.serviceInfo = serviceInfo; + this.currentGeneration = currentGeneration; + } + } + } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java index 4dda141491c..0131517818d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java @@ -5,6 +5,7 @@ import com.google.inject.Inject; import com.yahoo.component.Version; import com.yahoo.config.application.api.ApplicationFile; import com.yahoo.config.model.api.Model; +import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.HostFilter; import com.yahoo.config.provision.InstanceName; @@ -16,13 +17,15 @@ import com.yahoo.jdisc.Response; import com.yahoo.restapi.ErrorResponse; import com.yahoo.restapi.MessageResponse; import com.yahoo.restapi.Path; +import com.yahoo.slime.Cursor; import com.yahoo.slime.SlimeUtils; import com.yahoo.text.StringUtilities; import com.yahoo.vespa.config.server.ApplicationRepository; -import com.yahoo.vespa.config.server.application.ApplicationReindexing; +import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; import com.yahoo.vespa.config.server.http.ContentHandler; import com.yahoo.vespa.config.server.http.ContentRequest; import com.yahoo.vespa.config.server.http.HttpHandler; +import com.yahoo.vespa.config.server.http.JSONResponse; import com.yahoo.vespa.config.server.http.NotFoundException; import com.yahoo.vespa.config.server.http.v2.request.ApplicationContentRequest; import com.yahoo.vespa.config.server.http.v2.response.ApplicationSuspendedResponse; @@ -33,6 +36,7 @@ import com.yahoo.vespa.config.server.http.v2.response.ReindexingResponse; import com.yahoo.vespa.config.server.tenant.Tenant; import java.io.IOException; +import java.net.URI; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -43,8 +47,11 @@ import java.util.StringJoiner; import java.util.TreeMap; import java.util.TreeSet; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse; import static com.yahoo.yolean.Exceptions.uncheck; + /** * Operations on applications (delete, wait for config convergence, restart, application content etc.) * @@ -108,13 +115,21 @@ public class ApplicationHandler extends HttpHandler { } private HttpResponse listServiceConverge(ApplicationId applicationId, HttpRequest request) { - return applicationRepository.servicesToCheckForConfigConvergence(applicationId, request.getUri(), - getTimeoutFromRequest(request), getVespaVersionFromRequest(request)); + ServiceListResponse response = + applicationRepository.servicesToCheckForConfigConvergence(applicationId, + request.getUri(), + getTimeoutFromRequest(request), + getVespaVersionFromRequest(request)); + return new HttpServiceListResponse(response); } private HttpResponse checkServiceConverge(ApplicationId applicationId, String hostAndPort, HttpRequest request) { - return applicationRepository.checkServiceForConfigConvergence(applicationId, hostAndPort, request.getUri(), - getTimeoutFromRequest(request), getVespaVersionFromRequest(request)); + ServiceResponse response = + applicationRepository.checkServiceForConfigConvergence(applicationId, + hostAndPort, + getTimeoutFromRequest(request), + getVespaVersionFromRequest(request)); + return HttpServiceResponse.createResponse(response, hostAndPort, request.getUri()); } private HttpResponse serviceStatusPage(ApplicationId applicationId, String service, String hostname, String pathSuffix) { @@ -301,4 +316,79 @@ public class ApplicationHandler extends HttpHandler { .map(Version::fromString); } + static class HttpServiceResponse extends JSONResponse { + + public static HttpServiceResponse createResponse(ConfigConvergenceChecker.ServiceResponse serviceResponse, String hostAndPort, URI uri) { + switch (serviceResponse.status) { + case ok: + return createOkResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.currentGeneration, serviceResponse.converged); + case hostNotFound: + return createHostNotFoundInAppResponse(uri, hostAndPort, serviceResponse.wantedGeneration); + case notFound: + return createNotFoundResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.errorMessage.orElse("")); + case error: + return createErrorResponse(uri, hostAndPort, serviceResponse.wantedGeneration, serviceResponse.errorMessage.orElse("")); + default: + throw new IllegalArgumentException("Unknown status " + serviceResponse.status); + } + } + + private HttpServiceResponse(int status, URI uri, String hostname, Long wantedGeneration) { + super(status); + object.setString("url", uri.toString()); + object.setString("host", hostname); + object.setLong("wantedGeneration", wantedGeneration); + } + + private static HttpServiceResponse createOkResponse(URI uri, String hostname, Long wantedGeneration, Long currentGeneration, boolean converged) { + HttpServiceResponse serviceResponse = new HttpServiceResponse(200, uri, hostname, wantedGeneration); + serviceResponse.object.setBool("converged", converged); + serviceResponse.object.setLong("currentGeneration", currentGeneration); + return serviceResponse; + } + + private static HttpServiceResponse createHostNotFoundInAppResponse(URI uri, String hostname, Long wantedGeneration) { + HttpServiceResponse serviceResponse = new HttpServiceResponse(410, uri, hostname, wantedGeneration); + serviceResponse.object.setString("problem", "Host:port (service) no longer part of application, refetch list of services."); + return serviceResponse; + } + + private static HttpServiceResponse createErrorResponse(URI uri, String hostname, Long wantedGeneration, String error) { + HttpServiceResponse serviceResponse = new HttpServiceResponse(500, uri, hostname, wantedGeneration); + serviceResponse.object.setString("error", error); + return serviceResponse; + } + + private static HttpServiceResponse createNotFoundResponse(URI uri, String hostname, Long wantedGeneration, String error) { + HttpServiceResponse serviceResponse = new HttpServiceResponse(404, uri, hostname, wantedGeneration); + serviceResponse.object.setString("error", error); + return serviceResponse; + } + + } + + static class HttpServiceListResponse extends JSONResponse { + + // Pre-condition: servicesToCheck has a state port + public HttpServiceListResponse(ConfigConvergenceChecker.ServiceListResponse response) { + super(200); + Cursor serviceArray = object.setArray("services"); + response.services().forEach((service) -> { + ServiceInfo serviceInfo = service.serviceInfo; + Cursor serviceObject = serviceArray.addObject(); + String hostName = serviceInfo.getHostName(); + int statePort = ConfigConvergenceChecker.getStatePort(serviceInfo).get(); + serviceObject.setString("host", hostName); + serviceObject.setLong("port", statePort); + serviceObject.setString("type", serviceInfo.getServiceType()); + serviceObject.setString("url", response.uri.toString() + "/" + hostName + ":" + statePort); + serviceObject.setLong("currentGeneration", service.currentGeneration); + }); + object.setString("url", response.uri.toString()); + object.setLong("currentGeneration", response.currentGeneration); + object.setLong("wantedGeneration", response.wantedGeneration); + object.setBool("converged", response.currentGeneration >= response.wantedGeneration); + } + } + } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java index 8b21e9c3916..6afb9ef086d 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java @@ -8,7 +8,6 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.TenantName; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.vespa.config.server.ServerCache; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import org.junit.Before; @@ -16,22 +15,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.util.Arrays; -import java.util.function.Consumer; +import java.util.List; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.okJson; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; -import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals; -import static org.assertj.core.api.Assertions.assertThat; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * @author Ulf Lilleengen @@ -72,63 +69,35 @@ public class ConfigConvergenceCheckerTest { @Test public void service_convergence() { { // Known service - String serviceName = hostAndPort(this.service); - URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName); wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}"))); - HttpResponse serviceResponse = checker.getServiceConfigGenerationResponse(application, hostAndPort(this.service), requestUrl, clientTimeout); - assertResponse("{\n" + - " \"url\": \"" + requestUrl.toString() + "\",\n" + - " \"host\": \"" + hostAndPort(this.service) + "\",\n" + - " \"wantedGeneration\": 3,\n" + - " \"converged\": true,\n" + - " \"currentGeneration\": 3\n" + - "}", - 200, - serviceResponse); + + ServiceResponse response = checker.getServiceConfigGeneration(application, hostAndPort(this.service), clientTimeout); + assertEquals(3, response.wantedGeneration.longValue()); + assertEquals(3, response.currentGeneration.longValue()); + assertTrue(response.converged); + assertEquals(ServiceResponse.Status.ok, response.status); } { // Missing service - String serviceName = "notPresent:1337"; - URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName); - HttpResponse response = checker.getServiceConfigGenerationResponse(application, "notPresent:1337", requestUrl, clientTimeout); - assertResponse("{\n" + - " \"url\": \"" + requestUrl.toString() + "\",\n" + - " \"host\": \"" + serviceName + "\",\n" + - " \"wantedGeneration\": 3,\n" + - " \"problem\": \"Host:port (service) no longer part of application, refetch list of services.\"\n" + - "}", - 410, - response); + ServiceResponse response = checker.getServiceConfigGeneration(application, "notPresent:1337", clientTimeout); + assertEquals(3, response.wantedGeneration.longValue()); + assertEquals(ServiceResponse.Status.hostNotFound, response.status); } } @Test public void service_list_convergence() { { - String serviceName = hostAndPort(this.service); URI requestUrl = testServer().resolve("/serviceconverge"); - URI serviceUrl = testServer().resolve("/serviceconverge/" + serviceName); wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}"))); - HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout); - assertResponse("{\n" + - " \"services\": [\n" + - " {\n" + - " \"host\": \"" + serviceUrl.getHost() + "\",\n" + - " \"port\": " + serviceUrl.getPort() + ",\n" + - " \"type\": \"container\",\n" + - " \"url\": \"" + serviceUrl.toString() + "\",\n" + - " \"currentGeneration\":" + 3 + "\n" + - " }\n" + - " ],\n" + - " \"url\": \"" + requestUrl.toString() + "\",\n" + - " \"currentGeneration\": 3,\n" + - " \"wantedGeneration\": 3,\n" + - " \"converged\": true\n" + - "}", - 200, - response); - } + ServiceListResponse response = checker.getServiceConfigGenerations(application, requestUrl, clientTimeout); + assertEquals(3, response.wantedGeneration); + assertEquals(3, response.currentGeneration); + List<ServiceListResponse.Service> services = response.services; + assertEquals(1, services.size()); + assertService(this.service, services.get(0), 3); + } { // Model with two hosts on different generations MockModel model = new MockModel(Arrays.asList( @@ -143,53 +112,29 @@ public class ConfigConvergenceCheckerTest { wireMock2.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}"))); URI requestUrl = testServer().resolve("/serviceconverge"); - URI serviceUrl = testServer().resolve("/serviceconverge/" + hostAndPort(service)); - URI serviceUrl2 = testServer().resolve("/serviceconverge/" + hostAndPort(service2)); - HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout); - assertResponse("{\n" + - " \"services\": [\n" + - " {\n" + - " \"host\": \"" + service.getHost() + "\",\n" + - " \"port\": " + service.getPort() + ",\n" + - " \"type\": \"container\",\n" + - " \"url\": \"" + serviceUrl.toString() + "\",\n" + - " \"currentGeneration\":" + 4 + "\n" + - " },\n" + - " {\n" + - " \"host\": \"" + service2.getHost() + "\",\n" + - " \"port\": " + service2.getPort() + ",\n" + - " \"type\": \"container\",\n" + - " \"url\": \"" + serviceUrl2.toString() + "\",\n" + - " \"currentGeneration\":" + 3 + "\n" + - " }\n" + - " ],\n" + - " \"url\": \"" + requestUrl.toString() + "\",\n" + - " \"currentGeneration\": 3,\n" + - " \"wantedGeneration\": 4,\n" + - " \"converged\": false\n" + - "}", - 200, - response); + + ServiceListResponse response = checker.getServiceConfigGenerations(application, requestUrl, clientTimeout); + assertEquals(4, response.wantedGeneration); + assertEquals(3, response.currentGeneration); + + List<ServiceListResponse.Service> services = response.services; + assertEquals(2, services.size()); + assertService(this.service, services.get(0), 4); + assertService(this.service2, services.get(1), 3); } } + @Test public void service_convergence_timeout() { - URI requestUrl = testServer().resolve("/serviceconverge"); wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(aResponse() .withFixedDelay((int) clientTimeout.plus(Duration.ofSeconds(1)).toMillis()) .withBody("response too slow"))); - HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1)); - // Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here - assertResponse( - responseBody -> - assertThat(responseBody) - .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + - "\",\"wantedGeneration\":3,\"error\":\"") - .contains("java.net.SocketTimeoutException: 1 MILLISECONDS") - .endsWith("\"}"), - 404, - response); + ServiceResponse response = checker.getServiceConfigGeneration(application, hostAndPort(service), Duration.ofMillis(1)); + + assertEquals(3, response.wantedGeneration.longValue()); + assertEquals(ServiceResponse.Status.notFound, response.status); + assertTrue(response.errorMessage.get().contains("java.net.SocketTimeoutException: 1 MILLISECONDS")); } private URI testServer() { @@ -204,19 +149,11 @@ public class ConfigConvergenceCheckerTest { return uri.getHost() + ":" + uri.getPort(); } - private static void assertResponse(String expectedJson, int status, HttpResponse response) { - assertResponse((responseBody) -> assertJsonEquals(new String(responseBody.getBytes()), expectedJson), status, response); - } - - private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) { - ByteArrayOutputStream responseBody = new ByteArrayOutputStream(); - try { - response.render(responseBody); - assertFunc.accept(responseBody.toString()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - assertEquals(status, response.getStatus()); + private void assertService(URI uri, ServiceListResponse.Service service1, long expectedGeneration) { + assertEquals(expectedGeneration, service1.currentGeneration.longValue()); + assertEquals(uri.getHost(), service1.serviceInfo.getHostName()); + assertEquals(uri.getPort(), ConfigConvergenceChecker.getStatePort(service1.serviceInfo).get().intValue()); + assertEquals("container", service1.serviceInfo.getServiceType()); } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 0b8bf8e84bd..04483e0191d 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -4,6 +4,8 @@ package com.yahoo.vespa.config.server.http.v2; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; import com.yahoo.config.model.api.ModelFactory; +import com.yahoo.config.model.api.PortInfo; +import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; @@ -50,14 +52,18 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.function.Consumer; import java.util.stream.Stream; import static com.yahoo.container.jdisc.HttpRequest.createTestRequest; @@ -65,8 +71,12 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; import static com.yahoo.jdisc.http.HttpRequest.Method.GET; import static com.yahoo.jdisc.http.HttpRequest.Method.POST; import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse; +import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse; import static com.yahoo.vespa.config.server.http.HandlerTest.assertHttpStatusCodeAndMessage; import static com.yahoo.vespa.config.server.http.SessionHandlerTest.getRenderedString; +import static com.yahoo.vespa.config.server.http.v2.ApplicationHandler.HttpServiceListResponse; +import static com.yahoo.vespa.config.server.http.v2.ApplicationHandler.HttpServiceResponse.createResponse; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -146,7 +156,7 @@ public class ApplicationHandlerTest { Tenant mytenant = applicationRepository.getTenant(applicationId); deleteAndAssertOKResponse(mytenant, applicationId); } - + { applicationRepository.deploy(testApp, prepareParams(applicationId)); deleteAndAssertOKResponseMocked(applicationId, true); @@ -539,6 +549,143 @@ public class ApplicationHandlerTest { "}\n"); } + @Test + public void service_convergence() { + String hostAndPort = "localhost:1234"; + URI uri = URI.create("https://" + hostAndPort + "/serviceconvergence/container"); + + { // Known service + HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.ok, + 3L, + 3L, + true), + hostAndPort, + uri); + assertResponse("{\n" + + " \"url\": \"" + uri.toString() + "\",\n" + + " \"host\": \"" + hostAndPort + "\",\n" + + " \"wantedGeneration\": 3,\n" + + " \"converged\": true,\n" + + " \"currentGeneration\": 3\n" + + "}", + 200, + response); + } + + { // Missing service + HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.hostNotFound, + 3L), + hostAndPort, + uri); + + assertResponse("{\n" + + " \"url\": \"" + uri.toString() + "\",\n" + + " \"host\": \"" + hostAndPort + "\",\n" + + " \"wantedGeneration\": 3,\n" + + " \"problem\": \"Host:port (service) no longer part of application, refetch list of services.\"\n" + + "}", + 410, + response); + } + } + + @Test + public void service_list_convergence() { + URI requestUrl = URI.create("https://configserver/serviceconvergence"); + + String hostname = "localhost"; + int port = 1234; + String hostAndPort = hostname + ":" + port; + URI serviceUrl = URI.create("https://configserver/serviceconvergence/" + hostAndPort); + + { + HttpServiceListResponse response = + new HttpServiceListResponse(new ServiceListResponse(Map.of(createServiceInfo(hostname, port), 3L), + requestUrl, + 3L, + 3L)); + assertResponse("{\n" + + " \"services\": [\n" + + " {\n" + + " \"host\": \"" + hostname + "\",\n" + + " \"port\": " + port + ",\n" + + " \"type\": \"container\",\n" + + " \"url\": \"" + serviceUrl.toString() + "\",\n" + + " \"currentGeneration\":" + 3 + "\n" + + " }\n" + + " ],\n" + + " \"url\": \"" + requestUrl.toString() + "\",\n" + + " \"currentGeneration\": 3,\n" + + " \"wantedGeneration\": 3,\n" + + " \"converged\": true\n" + + "}", + 200, + response); + } + + { // Two hosts on different generations + String hostname2 = "localhost2"; + int port2 = 5678; + String hostAndPort2 = hostname2 + ":" + port2; + URI serviceUrl2 = URI.create("https://configserver/serviceconvergence/" + hostAndPort2); + + Map<ServiceInfo, Long> serviceInfos = new HashMap<>(); + serviceInfos.put(createServiceInfo(hostname, port), 4L); + serviceInfos.put(createServiceInfo(hostname2, port2), 3L); + + HttpServiceListResponse response = + new HttpServiceListResponse(new ServiceListResponse(serviceInfos, + requestUrl, + 4L, + 3L)); + assertResponse("{\n" + + " \"services\": [\n" + + " {\n" + + " \"host\": \"" + hostname + "\",\n" + + " \"port\": " + port + ",\n" + + " \"type\": \"container\",\n" + + " \"url\": \"" + serviceUrl.toString() + "\",\n" + + " \"currentGeneration\":" + 4 + "\n" + + " },\n" + + " {\n" + + " \"host\": \"" + hostname2 + "\",\n" + + " \"port\": " + port2 + ",\n" + + " \"type\": \"container\",\n" + + " \"url\": \"" + serviceUrl2.toString() + "\",\n" + + " \"currentGeneration\":" + 3 + "\n" + + " }\n" + + " ],\n" + + " \"url\": \"" + requestUrl.toString() + "\",\n" + + " \"currentGeneration\": 3,\n" + + " \"wantedGeneration\": 4,\n" + + " \"converged\": false\n" + + "}", + 200, + response); + } + } + + @Test + public void service_convergence_timeout() { + String hostAndPort = "localhost:1234"; + URI uri = URI.create("https://" + hostAndPort + "/serviceconvergence/container"); + + HttpResponse response = createResponse(new ServiceResponse(ServiceResponse.Status.notFound, + 3L, + "some error message"), + hostAndPort, + uri); + + assertResponse("{\n" + + " \"url\": \"" + uri.toString() + "\",\n" + + " \"host\": \"" + hostAndPort + "\",\n" + + " \"wantedGeneration\": 3,\n" + + " \"error\": \"some error message\"" + + "}", + 404, + response); + } + private void assertNotAllowed(Method method) throws IOException { String url = "http://myhost:14000/application/v2/tenant/" + mytenantName + "/application/default"; deleteAndAssertResponse(url, Response.Status.METHOD_NOT_ALLOWED, HttpErrorResponse.ErrorCode.METHOD_NOT_ALLOWED, "{\"error-code\":\"METHOD_NOT_ALLOWED\",\"message\":\"Method '" + method + "' is not supported\"}", @@ -668,4 +815,29 @@ public class ApplicationHandlerTest { return new PrepareParams.Builder().applicationId(applicationId).build(); } + private static void assertResponse(String expectedJson, int status, HttpResponse response) { + assertResponse((responseBody) -> assertJsonEquals(new String(responseBody + .getBytes()), expectedJson), status, response); + } + + private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) { + ByteArrayOutputStream responseBody = new ByteArrayOutputStream(); + try { + response.render(responseBody); + assertFunc.accept(responseBody.toString()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + assertEquals(status, response.getStatus()); + } + + private ServiceInfo createServiceInfo(String hostname, int port) { + return new ServiceInfo("container", + "container", + List.of(new PortInfo(port, List.of("state"))), + Map.of(), + "configId", + hostname); + } + } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java index 99ab6d420cb..8d5851be62f 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/SystemUpgrader.java @@ -34,7 +34,7 @@ public class SystemUpgrader extends InfrastructureUpgrader<VespaVersionTarget> { @Override protected void upgrade(VespaVersionTarget target, SystemApplication application, ZoneApi zone) { - log.info(Text.format("Deploying %s version %s in %s", application.id(), target, zone.getId())); + log.info(Text.format("Deploying %s on %s in %s", application.id(), target, zone.getId())); controller().applications().deploy(application, zone.getId(), target.version(), target.downgrade()); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java index fd5603b96b8..78d6d0ebf29 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersionTarget.java @@ -30,4 +30,9 @@ public class VespaVersionTarget implements VersionTarget { return downgrade; } + @Override + public String toString() { + return "vespa version target " + version.toFullString() + (downgrade ? " (downgrade)" : ""); + } + } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index be245788ac1..32b65ac8efb 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -53,7 +53,7 @@ public class Flags { ZONE_ID, APPLICATION_ID); public static final UnboundDoubleFlag TLS_SIZE_FRACTION = defineDoubleFlag( - "tls-size-fraction", 0.04, + "tls-size-fraction", 0.02, List.of("baldersheim"), "2021-12-20", "2022-02-01", "Fraction of disk available for transaction log", "Takes effect at redeployment", @@ -414,6 +414,13 @@ public class Flags { "Takes effect on restart of Docker container", ZONE_ID, APPLICATION_ID); + public static final UnboundStringFlag ZOOKEEPER_SNAPSHOT_METHOD = defineStringFlag( + "zookeeper-snapshot-method", "", + List.of("hmusum"), "2022-01-11", "2022-02-11", + "ZooKeeper snapshot method. Valid values are '', 'gz' and 'snappy'", + "Takes effect on Docker container restart", + ZONE_ID, APPLICATION_ID, NODE_TYPE); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners, String createdAt, String expiresAt, String description, diff --git a/parent/pom.xml b/parent/pom.xml index 4558267fe9f..7ab4e2ba57e 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -796,7 +796,7 @@ <dependency> <groupId>org.hdrhistogram</groupId> <artifactId>HdrHistogram</artifactId> - <version>2.1.8</version> + <version>${hdrhistogram.version}</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> @@ -894,6 +894,7 @@ <commons.codec.version>1.15</commons.codec.version> <commons.math3.version>3.6.1</commons.math3.version> <gson.version>2.8.9</gson.version> + <hdrhistogram.version>2.1.12</hdrhistogram.version> <jna.version>5.9.0</jna.version> <junit.version>5.8.1</junit.version> <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 8a3550ac00b..62b9f6416af 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -203,7 +203,7 @@ distribution.searchablecopies long default=1 ## Control cache size in bytes. ## Postive numbers are absolute in bytes. ## Negative numbers are a percentage of memory. -summary.cache.maxbytes long default=-5 +summary.cache.maxbytes long default=-4 ## Include visits in the cache, if the visitoperation allows it. ## This will enable another separate cache of summary.cache.maxbytes size. diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 7b165e11b66..fb8120210c1 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp + shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp index a5dd3d929db..8caa84977ce 100644 --- a/storage/src/tests/persistence/active_operations_stats_test.cpp +++ b/storage/src/tests/persistence/active_operations_stats_test.cpp @@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats() auto lock0 = filestorHandler->getNextMessage(stripeId); auto lock1 = filestorHandler->getNextMessage(stripeId); auto lock2 = filestorHandler->getNextMessage(stripeId); - ASSERT_TRUE(lock0.first); - ASSERT_TRUE(lock1.first); - ASSERT_FALSE(lock2.first); + ASSERT_TRUE(lock0.lock); + ASSERT_TRUE(lock1.lock); + ASSERT_FALSE(lock2.lock); auto stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("during"); assert_active_operations_stats(stats, 2, 2, 0); } EXPECT_EQ(3, stats.get_total_size()); - lock0.first.reset(); - lock1.first.reset(); + lock0.lock.reset(); + lock1.lock.reset(); stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("after"); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index cd496605a6c..939e7ae7b6a 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } class MessagePusherThread : public document::Runnable { @@ -570,7 +570,7 @@ public: void run() override { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); - if (msg.second.get()) { + if (msg.msg.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; std::this_thread::sleep_for(5ms); @@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr); + ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr); } - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } TEST_F(FileStorManagerTest, remap_split) { @@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) { std::this_thread::sleep_for(51ms); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); - if (lock.first.get()) { - ASSERT_EQ(200, lock.second->getPriority()); + if (lock.lock.get()) { + ASSERT_EQ(200, lock.msg->getPriority()); break; } } @@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri, { EXPECT_TRUE(result.was_scheduled()); ASSERT_TRUE(result.has_async_message()); - EXPECT_EQ(exp_pri, result.async_message().second->getPriority()); + EXPECT_EQ(exp_pri, result.async_message().msg->getPriority()); } void @@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule) auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_async_message(20, result); } - EXPECT_EQ(30, get_next_message().second->getPriority()); - EXPECT_EQ(40, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); + EXPECT_EQ(40, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async) @@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_empty_async_message(result); - EXPECT_EQ(20, get_next_message().second->getPriority()); - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(20, get_next_message().msg->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) @@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) expect_async_message(40, result); } } - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } } // storage diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 2557fa537f5..5f3836727dd 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) { f.filestorHandler->schedule(createPut(5432, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); + ASSERT_TRUE(lock0.lock.get()); EXPECT_EQ(document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId()); auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); + ASSERT_TRUE(lock1.lock.get()); EXPECT_EQ(document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId()); } TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) { @@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Even though we already have a lock on the bucket, Gets allow shared locking and we // should therefore be able to get another lock. auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); + ASSERT_TRUE(lock1.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) { @@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) { @@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) { @@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } } // namespace storage diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp new file mode 100644 index 00000000000..0ad380937c7 --- /dev/null +++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp @@ -0,0 +1,116 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storage/persistence/shared_operation_throttler.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/barrier.h> +#include <chrono> +#include <thread> + +using namespace ::testing; + +namespace storage { + +using ThrottleToken = SharedOperationThrottler::Token; + +TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { + // We technically can't test that the unlimited throttler _never_ throttles, but at + // least check that it doesn't throttle _twice_, and then induce from this ;) + auto throttler = SharedOperationThrottler::make_unlimited_throttler(); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->blocking_acquire_one(); + EXPECT_TRUE(token2.valid()); + // Window size should be zero (i.e. unlimited) for unlimited throttler + EXPECT_EQ(throttler->current_window_size(), 0); +} + +TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->try_acquire_one(); + EXPECT_FALSE(token2.valid()); + + EXPECT_EQ(throttler->current_window_size(), 1); +} + +TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + token.reset(); + token = throttler->blocking_acquire_one(600s); // Should never block. + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + vespalib::Barrier barrier(2); + std::thread t([&] { + auto token = throttler->try_acquire_one(); + assert(token.valid()); + barrier.await(); + while (throttler->waiting_threads() != 1) { + std::this_thread::sleep_for(100us); + } + // Implicit token release at thread scope exit + }); + barrier.await(); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + t.join(); +} + +TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto window_filling_token = throttler->try_acquire_one(); + auto before = std::chrono::steady_clock::now(); + // Will block for at least 1ms. Since no window slot will be available by that time, + // an invalid token should be returned. + auto token = throttler->blocking_acquire_one(1ms); + auto after = std::chrono::steady_clock::now(); + EXPECT_TRUE((after - before) >= 1ms); + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { + ThrottleToken token; + EXPECT_FALSE(token.valid()); + token.reset(); // no-op + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + { + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); + } + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + auto token2 = std::move(token1); // move ctor + EXPECT_TRUE(token2.valid()); + EXPECT_FALSE(token1.valid()); + ThrottleToken token3; + token3 = std::move(token2); // move assignment op + EXPECT_TRUE(token3.valid()); + EXPECT_FALSE(token2.valid()); + + // Trying to fetch new token should not succeed due to active token and win size of 1 + token1 = throttler->try_acquire_one(); + EXPECT_FALSE(token1.valid()); + // Resetting the token should free up the slot in the window + token3.reset(); + token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); +} + +// TODO ideally we'd test that the dynamic throttler has a window size that is actually +// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so +// it's not trivial to know how to do this reliably. + +} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index c737d2bed28..5e068236026 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -14,6 +14,7 @@ vespa_add_library(storage_spersistence OBJECT persistenceutil.cpp processallhandler.cpp provider_error_wrapper.cpp + shared_operation_throttler.cpp simplemessagehandler.cpp splitbitdetector.cpp splitjoinhandler.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 2dc5989e857..73ccc7f6085 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -7,10 +7,16 @@ namespace storage { -ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, + document::DocumentId doc_id, + SharedOperationThrottler::Token throttle_token, + const char *op, + const framework::Clock& clock, + metrics::DoubleAverageMetric& latency_metric) : _result_handler(nullptr), _state(std::move(state)), _doc_id(std::move(doc_id)), + _throttle_token(std::move(throttle_token)), _op(op), _start_time(clock), _latency_metric(latency_metric) @@ -27,6 +33,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) no } double elapsed = _start_time.getElapsedTimeAsDouble(); _latency_metric.addValue(elapsed); + _throttle_token.reset(); _state->on_entry_complete(std::move(result), _doc_id, _op); } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index 1037318aec6..8478cab4c17 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -2,6 +2,7 @@ #pragma once +#include "shared_operation_throttler.h" #include <vespa/document/base/documentid.h> #include <vespa/metrics/valuemetric.h> #include <vespa/persistence/spi/operationcomplete.h> @@ -21,12 +22,17 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete const spi::ResultHandler* _result_handler; std::shared_ptr<ApplyBucketDiffState> _state; document::DocumentId _doc_id; + SharedOperationThrottler::Token _throttle_token; const char* _op; framework::MilliSecTimer _start_time; metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); - ~ApplyBucketDiffEntryComplete(); + ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, + document::DocumentId doc_id, + SharedOperationThrottler::Token throttle_token, + const char *op, const framework::Clock& clock, + metrics::DoubleAverageMetric& latency_metric); + ~ApplyBucketDiffEntryComplete() override; void onComplete(std::unique_ptr<spi::Result> result) noexcept override; void addResultHandler(const spi::ResultHandler* resultHandler) override; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt index 62d1a80501a..2d137f87118 100644 --- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(storage_filestorpersistence OBJECT SOURCES active_operations_metrics.cpp active_operations_stats.cpp + filestorhandler.cpp filestorhandlerimpl.cpp filestormanager.cpp filestormetrics.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp new file mode 100644 index 00000000000..c066277ec71 --- /dev/null +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -0,0 +1,8 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "filestorhandler.h" + +namespace storage { + +FileStorHandler::LockedMessage::~LockedMessage() = default; + +} diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index a980b5aa2e1..6f740ce2c28 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -16,6 +16,7 @@ #include <vespa/document/bucket/bucket.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/storage/persistence/shared_operation_throttler.h> #include <vespa/storageapi/messageapi/storagemessage.h> namespace storage { @@ -74,7 +75,29 @@ public: [[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0; }; - using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>; + struct LockedMessage { + std::shared_ptr<BucketLockInterface> lock; + std::shared_ptr<api::StorageMessage> msg; + SharedOperationThrottler::Token throttle_token; + + LockedMessage() noexcept = default; + LockedMessage(std::shared_ptr<BucketLockInterface> lock_, + std::shared_ptr<api::StorageMessage> msg_) noexcept + : lock(std::move(lock_)), + msg(std::move(msg_)), + throttle_token() + {} + LockedMessage(std::shared_ptr<BucketLockInterface> lock_, + std::shared_ptr<api::StorageMessage> msg_, + SharedOperationThrottler::Token token) noexcept + : lock(std::move(lock_)), + msg(std::move(msg_)), + throttle_token(std::move(token)) + {} + LockedMessage(LockedMessage&&) noexcept = default; + ~LockedMessage(); + }; + class ScheduleAsyncResult { private: bool _was_scheduled; @@ -90,7 +113,7 @@ public: return _was_scheduled; } bool has_async_message() const { - return _async_message.first.get() != nullptr; + return _async_message.lock.get() != nullptr; } const LockedMessage& async_message() const { return _async_message; @@ -250,6 +273,8 @@ public: virtual std::string dumpQueue() const = 0; virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0; + + virtual SharedOperationThrottler& operation_throttler() const noexcept = 0; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index c6991803b4d..5e0ea0359dc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -40,16 +40,18 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) - : FileStorHandlerImpl(1, 1, sender, metrics, compReg) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler()) { } FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, - ServiceLayerComponentRegister& compReg) + ServiceLayerComponentRegister& compReg, + std::unique_ptr<SharedOperationThrottler> operation_throttler) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), _metrics(nullptr), + _operation_throttler(std::move(operation_throttler)), _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), @@ -330,6 +332,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) std::lock_guard lockGuard(_mergeStatesLock); _metrics->pendingMerges.addValue(_mergeStates.size()); _metrics->queueSize.addValue(getQueueSize()); + _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size()); for (const auto & stripe : _metrics->stripes) { const auto & m = stripe->averageQueueWaitingTime; @@ -885,10 +888,39 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe _active_operations_stats() {} +namespace { + +bool +operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept +{ + // Note: SetBucketState is intentionally _not_ included in this set, even though it's + // dispatched async. The rationale behind this is that SetBucketState is very cheap + // to execute, usually comes in large waves (up to #buckets count) and processing all + // requests should complete as quickly as possible. We also don't want such waves to + // artificially boost the dynamic throttle window size due to a sudden throughput spike. + // + // Merge-related operations are transitively throttled by using the operation throttler + // directly for all async ops within the MergeHandler. + switch (type_id) { + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVELOCATION_ID: + case api::MessageType::CREATEBUCKET_ID: + case api::MessageType::DELETEBUCKET_ID: + return true; + default: + return false; + } +} + +} + FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { std::unique_lock guard(*_lock); + SharedOperationThrottler::Token throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register @@ -896,15 +928,39 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) for (int attempt = 0; (attempt < 2) && !_owner.isPaused(); ++attempt) { PriorityIdx& idx(bmi::get<1>(*_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); + bool was_throttled = false; - while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { + while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { iter++; } if (iter != end) { - return getMessage(guard, idx, iter); + const bool should_throttle_op = operation_type_should_be_throttled(iter->_command->getType().getId()); + if (!should_throttle_op && throttle_token.valid()) { + throttle_token.reset(); // Let someone else play with it. + } else if (should_throttle_op && !throttle_token.valid()) { + // Important: _non-blocking_ attempt at getting a throttle token. + throttle_token = _owner.operation_throttler().try_acquire_one(); + was_throttled = !throttle_token.valid(); + } + if (!should_throttle_op || throttle_token.valid()) { + return getMessage(guard, idx, iter, std::move(throttle_token)); + } } if (attempt == 0) { - _cond->wait_for(guard, timeout); + // Depending on whether we were blocked due to no usable ops in queue or throttling, + // wait for either the queue or throttler to (hopefully) have some fresh stuff for us. + if (!was_throttled) { + _cond->wait_for(guard, timeout); + } else { + // Have to release lock before doing a blocking throttle token fetch, since it + // prevents RPC threads from pushing onto the queue. + guard.unlock(); + throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout); + if (!throttle_token.valid()) { + return {}; // Already exhausted our timeout window. + } + guard.lock(); + } } } return {}; // No message fetched. @@ -923,14 +979,20 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) ++iter; } if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) { - return getMessage(guard, idx, iter); + // This is executed in the context of an RPC thread, so only do a _non-blocking_ + // poll of the throttle policy. + auto throttle_token = _owner.operation_throttler().try_acquire_one(); + if (throttle_token.valid()) { + return getMessage(guard, idx, iter, std::move(throttle_token)); + } } return {}; } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { - +FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, + SharedOperationThrottler::Token throttle_token) +{ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); @@ -942,7 +1004,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx msg->getType().getId(), msg->getMsgId(), msg->lockingRequirements()); guard.unlock(); - return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); + return {std::move(locker), std::move(msg), std::move(throttle_token)}; } else { std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg)); guard.unlock(); @@ -1014,7 +1076,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en std::unique_lock guard(*_lock); _queue->emplace_back(std::move(entry)); auto lockedMessage = get_next_async_message(guard); - if ( ! lockedMessage.second) { + if ( ! lockedMessage.msg) { if (guard.owns_lock()) { guard.unlock(); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 5d68be8a800..c4b85ac596c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -42,11 +42,12 @@ class AbortBucketOperationsCommand; namespace bmi = boost::multi_index; -class FileStorHandlerImpl : private framework::MetricUpdateHook, - private ResumeGuard::Callback, - public FileStorHandler { +class FileStorHandlerImpl final + : private framework::MetricUpdateHook, + private ResumeGuard::Callback, + public FileStorHandler +{ public: - struct MessageEntry { std::shared_ptr<api::StorageMessage> _command; metrics::MetricTimer _timer; @@ -147,7 +148,8 @@ public: // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, - PriorityIdx::iterator iter); + PriorityIdx::iterator iter, + SharedOperationThrottler::Token throttle_token); using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -163,7 +165,9 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: // TODO refactor, too many params - BucketLock(const monitor_guard & guard, Stripe& disk, const document::Bucket &bucket, + BucketLock(const monitor_guard & guard, + Stripe& disk, + const document::Bucket &bucket, uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id, api::LockingRequirements lockReq); ~BucketLock() override; @@ -187,9 +191,9 @@ public: FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&); + ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>); - ~FileStorHandlerImpl(); + ~FileStorHandlerImpl() override; void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } void flush(bool killPendingMerges) override; @@ -239,6 +243,10 @@ public: ResumeGuard pause() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; + SharedOperationThrottler& operation_throttler() const noexcept override { + return *_operation_throttler; + } + // Implements ResumeGuard::Callback void resume() override; @@ -249,6 +257,7 @@ private: ServiceLayerComponent _component; std::atomic<DiskState> _state; FileStorDiskMetrics * _metrics; + std::unique_ptr<SharedOperationThrottler> _operation_throttler; std::vector<Stripe> _stripes; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 2cfb3a2cffe..f5b9da0e1f5 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -32,6 +32,7 @@ LOG_SETUP(".persistence.filestor.manager"); using std::shared_ptr; using document::BucketSpace; using vespalib::make_string_short::fmt; +using vespa::config::content::StorFilestorConfig; namespace { @@ -130,18 +131,31 @@ uint32_t computeNumResponseThreads(int configured) { } vespalib::Executor::OptimizeFor -selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerType sequencerType) { +selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) { switch (sequencerType) { - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::THROUGHPUT: + case StorFilestorConfig::ResponseSequencerType::THROUGHPUT: return vespalib::Executor::OptimizeFor::THROUGHPUT; - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::LATENCY: + case StorFilestorConfig::ResponseSequencerType::LATENCY: return vespalib::Executor::OptimizeFor::LATENCY; - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::ADAPTIVE: + case StorFilestorConfig::ResponseSequencerType::ADAPTIVE: default: return vespalib::Executor::OptimizeFor::ADAPTIVE; } } +std::unique_ptr<SharedOperationThrottler> +make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads) +{ + const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC); + if (use_dynamic_throttling) { + auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1); + auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads); + return SharedOperationThrottler::make_dynamic_throttler(win_size_increment); + } else { + return SharedOperationThrottler::make_unlimited_throttler(); + } +} + #ifdef __PIC__ #define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec"))) #else @@ -185,7 +199,7 @@ FileStorManager::getThreadLocalHandler() { * incoming during reconfiguration */ void -FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) +FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) { // If true, this is not the first configure. bool liveUpdate = ! _threads.empty(); @@ -198,8 +212,10 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC size_t numThreads = _config->numThreads; size_t numStripes = std::max(size_t(1u), numThreads / 2); _metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config)); + auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads); - _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg); + _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, + _compReg, std::move(operation_throttler)); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); _sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp index c119fdc4f69..a98077da57a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp @@ -203,6 +203,7 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str averageQueueWaitingTime("averagequeuewait.sum", {}, "Average time an operation spends in input queue.", this), queueSize("queuesize", {}, "Size of input message queue.", this), pendingMerges("pendingmerge", {}, "Number of buckets currently being merged.", this), + throttle_window_size("throttlewindowsize", {}, "Current size of async operation throttler window size", this), waitingForLockHitRate("waitingforlockrate", {}, "Amount of times a filestor thread has needed to wait for " "lock to take next message in queue.", this), diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h index 7543e6e0771..d8135c9aeca 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h @@ -147,6 +147,7 @@ public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongAverageMetric queueSize; metrics::LongAverageMetric pendingMerges; + metrics::LongAverageMetric throttle_window_size; metrics::DoubleAverageMetric waitingForLockHitRate; metrics::DoubleAverageMetric lockWaitTime; // unused ActiveOperationsMetrics active_operations; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index b9739fcf734..7dcf4bcbee2 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,6 +2,7 @@ #include "mergehandler.h" #include "persistenceutil.h" +#include "shared_operation_throttler.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> @@ -32,6 +33,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), + _operation_throttler(_env._fileStorHandler.operation_throttler()), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), @@ -514,17 +516,22 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results spi::Context& context, const document::DocumentTypeRepo& repo) const { + auto throttle_token = _operation_throttler.blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), + std::move(throttle_token), "put", + _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); } else { std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids; ids.emplace_back(timestamp, e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, + std::move(throttle_token), "remove", + _clock, _env._metrics.merge_handler_metrics.remove_latency); _spi.removeAsync(bucket, std::move(ids), context, std::move(complete)); } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f52fe63bc2b..1007f35c241 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -34,6 +34,7 @@ namespace spi { class PersistenceUtil; class ApplyBucketDiffState; class MergeStatus; +class SharedOperationThrottler; class MergeHandler : public Types, public MergeBucketInfoSyncer { @@ -52,7 +53,7 @@ public: uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); - ~MergeHandler(); + ~MergeHandler() override; bool buildBucketInfoList( const spi::Bucket& bucket, @@ -86,6 +87,7 @@ private: const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; + SharedOperationThrottler& _operation_throttler; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 8b546771b71..c0c95ffd7af 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -158,12 +158,13 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP void PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const { - LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get()); - api::StorageMessage & msg(*lock.second); + LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get()); + api::StorageMessage & msg(*lock.msg); // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains // valid even if the tracker is destroyed by an exception in processMessage(). - auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second); + auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, + std::move(lock.lock), lock.msg, std::move(lock.throttle_token)); tracker = processMessage(msg, std::move(tracker)); if (tracker) { tracker->sendReply(); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index f9da4d63d7f..499e9807cbf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -38,7 +38,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId)); - if (lock.first) { + if (lock.lock) { _persistenceHandler.processLockedMessage(std::move(lock)); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index cbfc9463a8c..65eab99b8fb 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -31,19 +31,22 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg) - : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg)) + api::StorageMessage::SP msg, + SharedOperationThrottler::Token throttle_token) + : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg) + api::StorageMessage::SP msg, + SharedOperationThrottler::Token throttle_token) : _sendReply(true), _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), _msg(std::move(msg)), + _throttle_token(std::move(throttle_token)), _context(_msg->getPriority(), _msg->getTrace().getLevel()), _env(env), _replySender(replySender), @@ -56,7 +59,8 @@ MessageTracker::UP MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { - return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg))); + return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), + std::move(msg), SharedOperationThrottler::Token())); } void diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 4fd0e60c730..588cbef2170 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -30,7 +30,8 @@ public: using UP = std::unique_ptr<MessageTracker>; MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + SharedOperationThrottler::Token throttle_token); ~MessageTracker(); @@ -91,7 +92,8 @@ public: private: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + SharedOperationThrottler::Token throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; @@ -99,6 +101,7 @@ private: bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; std::shared_ptr<api::StorageMessage> _msg; + SharedOperationThrottler::Token _throttle_token; spi::Context _context; const PersistenceUtil &_env; MessageSender &_replySender; diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp new file mode 100644 index 00000000000..b72b1a8ba28 --- /dev/null +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp @@ -0,0 +1,191 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "shared_operation_throttler.h" +#include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/messagebus/message.h> +#include <condition_variable> +#include <cassert> +#include <mutex> + +namespace storage { + +namespace { + +class NoLimitsOperationThrottler final : public SharedOperationThrottler { +public: + ~NoLimitsOperationThrottler() override = default; + Token blocking_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + Token blocking_acquire_one(vespalib::duration) noexcept override { + return Token(this, TokenCtorTag{}); + } + Token try_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + uint32_t current_window_size() const noexcept override { return 0; } + uint32_t waiting_threads() const noexcept override { return 0; } +private: + void release_one() noexcept override { /* no-op */ } +}; + +// Class used to sneakily get around IThrottlePolicy only accepting MBus objects +template <typename Base> +class DummyMbusMessage final : public Base { + static const mbus::string NAME; +public: + const mbus::string& getProtocol() const override { return NAME; } + uint32_t getType() const override { return 0x1badb007; } + uint8_t priority() const override { return 255; } +}; + +template <typename Base> +const mbus::string DummyMbusMessage<Base>::NAME = "FooBar"; + +class DynamicOperationThrottler final : public SharedOperationThrottler { + mutable std::mutex _mutex; + std::condition_variable _cond; + mbus::DynamicThrottlePolicy _throttle_policy; + uint32_t _pending_ops; + uint32_t _waiting_threads; +public: + explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment); + ~DynamicOperationThrottler() override; + + Token blocking_acquire_one() noexcept override; + Token blocking_acquire_one(vespalib::duration timeout) noexcept override; + Token try_acquire_one() noexcept override; + uint32_t current_window_size() const noexcept override; + uint32_t waiting_threads() const noexcept override; +private: + void release_one() noexcept override; +}; + +DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment) + : _mutex(), + _cond(), + _throttle_policy(static_cast<double>(min_size_and_window_increment)), + _pending_ops(0), + _waiting_threads(0) +{ +} + +DynamicOperationThrottler::~DynamicOperationThrottler() = default; + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + ++_waiting_threads; + _cond.wait(lock, [&] { + return _throttle_policy.canSend(dummy_msg, _pending_ops); + }); + --_waiting_threads; + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + ++_waiting_threads; + const bool accepted = _cond.wait_for(lock, timeout, [&] { + return _throttle_policy.canSend(dummy_msg, _pending_ops); + }); + --_waiting_threads; + if (!accepted) { + return Token(); + } + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::try_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + return Token(); + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +void +DynamicOperationThrottler::release_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Reply> dummy_reply; + _throttle_policy.processReply(dummy_reply); + assert(_pending_ops > 0); + --_pending_ops; + if (_waiting_threads > 0) { + lock.unlock(); + _cond.notify_one(); + } +} + +uint32_t +DynamicOperationThrottler::current_window_size() const noexcept +{ + std::unique_lock lock(_mutex); + return _throttle_policy.getMaxPendingCount(); // Actually returns current window size +} + +uint32_t +DynamicOperationThrottler::waiting_threads() const noexcept +{ + std::unique_lock lock(_mutex); + return _waiting_threads; +} + +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_unlimited_throttler() +{ + return std::make_unique<NoLimitsOperationThrottler>(); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment) +{ + return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment); +} + +DynamicOperationThrottler::Token::~Token() +{ + if (_throttler) { + _throttler->release_one(); + } +} + +void +DynamicOperationThrottler::Token::reset() noexcept +{ + if (_throttler) { + _throttler->release_one(); + _throttler = nullptr; + } +} + +DynamicOperationThrottler::Token& +DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept +{ + reset(); + _throttler = rhs._throttler; + rhs._throttler = nullptr; + return *this; +} + +} diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h new file mode 100644 index 00000000000..2e1de86c4b8 --- /dev/null +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h @@ -0,0 +1,71 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/time.h> +#include <memory> +#include <optional> + +namespace storage { + +/** + * Operation throttler that is intended to provide global throttling of + * async operations across all persistence stripe threads. A throttler + * wraps a logical max pending window size of in-flight operations. Depending + * on the throttler implementation, the window size may expand and shrink + * dynamically. Exactly how and when this happens is unspecified. + * + * Offers both polling and (timed, non-timed) blocking calls for acquiring + * a throttle token. If the returned token is valid, the caller may proceed + * to invoke the asynchronous operation. + * + * The window slot taken up by a valid throttle token is implicitly freed up + * when the token is destroyed. + * + * All operations on the throttler are thread safe. + */ +class SharedOperationThrottler { +protected: + struct TokenCtorTag {}; // Make available to subclasses for token construction. +public: + class Token { + SharedOperationThrottler* _throttler; + public: + constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} + constexpr Token() noexcept : _throttler(nullptr) {} + constexpr Token(Token&& rhs) noexcept + : _throttler(rhs._throttler) + { + rhs._throttler = nullptr; + } + Token& operator=(Token&& rhs) noexcept; + ~Token(); + + Token(const Token&) = delete; + Token& operator=(const Token&) = delete; + + [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } + void reset() noexcept; + }; + + virtual ~SharedOperationThrottler() = default; + + // All methods are thread safe + [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; + + // May return 0, in which case the window size is unlimited. + [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; + + // Exposed for unit testing only. + [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; + + static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); + + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment); +private: + // Exclusively called from a valid Token. Thread safe. + virtual void release_one() noexcept = 0; +}; + +} |