diff options
25 files changed, 280 insertions, 432 deletions
diff --git a/config-model-api/abi-spec.json b/config-model-api/abi-spec.json index cac9d21ee1f..d946dd972f4 100644 --- a/config-model-api/abi-spec.json +++ b/config-model-api/abi-spec.json @@ -194,9 +194,10 @@ "public" ], "methods": [ - "public void <init>(com.yahoo.config.provision.InstanceName, java.util.List, com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy, com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout, java.util.List, java.util.Optional, java.util.Optional, com.yahoo.config.application.api.Notifications, java.util.List, java.time.Instant)", + "public void <init>(com.yahoo.config.provision.InstanceName, java.util.List, com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy, com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision, com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout, java.util.List, java.util.Optional, java.util.Optional, com.yahoo.config.application.api.Notifications, java.util.List, java.time.Instant)", "public com.yahoo.config.provision.InstanceName name()", "public com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy upgradePolicy()", + "public com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision upgradeRevision()", "public com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout upgradeRollout()", "public java.util.List changeBlocker()", "public java.util.Optional globalServiceId()", @@ -365,6 +366,23 @@ "public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy conservative" ] }, + "com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision": { + "superClass": "java.lang.Enum", + "interfaces": [], + "attributes": [ + "public", + "final", + "enum" + ], + "methods": [ + "public static com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision[] values()", + "public static com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision valueOf(java.lang.String)" + ], + "fields": [ + "public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision separate", + "public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision latest" + ] + }, "com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout": { "superClass": "java.lang.Enum", "interfaces": [], diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java index 67ddb9ef83c..ea38860c29b 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java @@ -31,6 +31,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { private final InstanceName name; private final DeploymentSpec.UpgradePolicy upgradePolicy; + private final DeploymentSpec.UpgradeRevision upgradeRevision; private final DeploymentSpec.UpgradeRollout upgradeRollout; private final List<DeploymentSpec.ChangeBlocker> changeBlockers; private final Optional<String> globalServiceId; @@ -41,6 +42,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { public DeploymentInstanceSpec(InstanceName name, List<DeploymentSpec.Step> steps, DeploymentSpec.UpgradePolicy upgradePolicy, + DeploymentSpec.UpgradeRevision upgradeRevision, DeploymentSpec.UpgradeRollout upgradeRollout, List<DeploymentSpec.ChangeBlocker> changeBlockers, Optional<String> globalServiceId, @@ -51,6 +53,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { super(steps); this.name = name; this.upgradePolicy = upgradePolicy; + this.upgradeRevision = upgradeRevision; this.upgradeRollout = upgradeRollout; this.changeBlockers = changeBlockers; this.globalServiceId = globalServiceId; @@ -150,6 +153,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { /** Returns the upgrade policy of this, which is defaultPolicy if none is specified */ public DeploymentSpec.UpgradePolicy upgradePolicy() { return upgradePolicy; } + /** Returns the upgrade revision strategy of this, which is separate if none is specified */ + public DeploymentSpec.UpgradeRevision upgradeRevision() { return upgradeRevision; } + /** Returns the upgrade rollout strategy of this, which is separate if none is specified */ public DeploymentSpec.UpgradeRollout upgradeRollout() { return upgradeRollout; } @@ -198,6 +204,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { DeploymentInstanceSpec other = (DeploymentInstanceSpec) o; return globalServiceId.equals(other.globalServiceId) && upgradePolicy == other.upgradePolicy && + upgradeRevision == other.upgradeRevision && upgradeRollout == other.upgradeRollout && changeBlockers.equals(other.changeBlockers) && steps().equals(other.steps()) && @@ -208,7 +215,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps { @Override public int hashCode() { - return Objects.hash(globalServiceId, upgradePolicy, upgradeRollout, changeBlockers, steps(), athenzService, notifications, endpoints); + return Objects.hash(globalServiceId, upgradePolicy, upgradeRevision, upgradeRollout, changeBlockers, steps(), athenzService, notifications, endpoints); } @Override diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java index 88363db6e49..4b019bd9f7a 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java @@ -550,6 +550,15 @@ public class DeploymentSpec { } + /** Determines when application changes deploy, when an older revision is already rolling out. */ + public enum UpgradeRevision { + /** Separate: Application changes wait for previous application changes to complete, unless they fail. */ + separate, + /** Latest: Application changes immediately supersede previous application changes, unless currently blocked. */ + latest + } + + /** Determines when application changes deploy, when there is already an ongoing platform upgrade. */ public enum UpgradeRollout { /** Separate: Application changes wait for upgrade to complete, unless upgrade fails. */ diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java index 8f866654d56..b031af9faf2 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java @@ -165,6 +165,7 @@ public class DeploymentSpecXmlReader { // Values where the parent may provide a default DeploymentSpec.UpgradePolicy upgradePolicy = readUpgradePolicy(instanceTag, parentTag); + DeploymentSpec.UpgradeRevision upgradeRevision = readUpgradeRevision(instanceTag, parentTag); DeploymentSpec.UpgradeRollout upgradeRollout = readUpgradeRollout(instanceTag, parentTag); List<DeploymentSpec.ChangeBlocker> changeBlockers = readChangeBlockers(instanceTag, parentTag); Optional<AthenzService> athenzService = mostSpecificAttribute(instanceTag, athenzServiceAttribute).map(AthenzService::from); @@ -183,6 +184,7 @@ public class DeploymentSpecXmlReader { .map(name -> new DeploymentInstanceSpec(InstanceName.from(name), steps, upgradePolicy, + upgradeRevision, upgradeRollout, changeBlockers, globalServiceId.asOptional(), @@ -472,6 +474,25 @@ public class DeploymentSpecXmlReader { } } + private DeploymentSpec.UpgradeRevision readUpgradeRevision(Element parent, Element fallbackParent) { + Element upgradeElement = XML.getChild(parent, upgradeTag); + if (upgradeElement == null) + upgradeElement = XML.getChild(fallbackParent, upgradeTag); + if (upgradeElement == null) + return DeploymentSpec.UpgradeRevision.separate; + + String revision = upgradeElement.getAttribute("revision"); + if (revision.isEmpty()) + return DeploymentSpec.UpgradeRevision.separate; + + switch (revision) { + case "separate": return DeploymentSpec.UpgradeRevision.separate; + case "latest": return DeploymentSpec.UpgradeRevision.latest; + default: throw new IllegalArgumentException("Illegal upgrade revision '" + revision + "': " + + "Must be one of " + Arrays.toString(DeploymentSpec.UpgradeRevision.values())); + } + } + private DeploymentSpec.UpgradeRollout readUpgradeRollout(Element parent, Element fallbackParent) { Element upgradeElement = XML.getChild(parent, upgradeTag); if (upgradeElement == null) @@ -487,8 +508,8 @@ public class DeploymentSpecXmlReader { case "separate": return DeploymentSpec.UpgradeRollout.separate; case "leading": return DeploymentSpec.UpgradeRollout.leading; // case "simultaneous": return DeploymentSpec.UpgradePolicy.conservative; - default: throw new IllegalArgumentException("Illegal upgrade policy '" + rollout + "': " + - "Must be one of " + Arrays.toString(DeploymentSpec.UpgradePolicy.values())); + default: throw new IllegalArgumentException("Illegal upgrade rollout '" + rollout + "': " + + "Must be one of " + Arrays.toString(DeploymentSpec.UpgradeRollout.values())); } } diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java index 2fa2ba83291..a97faf5995d 100644 --- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java +++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java @@ -121,6 +121,7 @@ public class DeploymentSpecTest { assertFalse(spec.requireInstance("default").globalServiceId().isPresent()); assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy()); + assertEquals(DeploymentSpec.UpgradeRevision.separate, spec.requireInstance("default").upgradeRevision()); assertEquals(DeploymentSpec.UpgradeRollout.separate, spec.requireInstance("default").upgradeRollout()); } @@ -364,6 +365,21 @@ public class DeploymentSpecTest { } @Test + public void productionSpecWithUpgradeRevision() { + StringReader r = new StringReader( + "<deployment>" + + " <instance id='default'>" + + " <upgrade revision='latest' />" + + " </instance>" + + " <instance id='custom'/>" + + "</deployment>" + ); + DeploymentSpec spec = DeploymentSpec.fromXml(r); + assertEquals("latest", spec.requireInstance("default").upgradeRevision().toString()); + assertEquals("separate", spec.requireInstance("custom").upgradeRevision().toString()); + } + + @Test public void productionSpecWithUpgradeRollout() { StringReader r = new StringReader( "<deployment>" + @@ -397,10 +413,10 @@ public class DeploymentSpecTest { public void upgradePolicyDefault() { StringReader r = new StringReader( "<deployment version='1.0'>" + - " <upgrade policy='canary' rollout='leading'/>" + + " <upgrade policy='canary' rollout='leading' revision='latest' />" + " <instance id='instance1'/>" + " <instance id='instance2'>" + - " <upgrade policy='conservative' rollout='separate'/>" + + " <upgrade policy='conservative' rollout='separate' revision='separate'/>" + " </instance>" + "</deployment>" ); @@ -408,6 +424,8 @@ public class DeploymentSpecTest { DeploymentSpec spec = DeploymentSpec.fromXml(r); assertEquals("canary", spec.requireInstance("instance1").upgradePolicy().toString()); assertEquals("conservative", spec.requireInstance("instance2").upgradePolicy().toString()); + assertEquals("latest", spec.requireInstance("instance1").upgradeRevision().toString()); + assertEquals("separate", spec.requireInstance("instance2").upgradeRevision().toString()); assertEquals("leading", spec.requireInstance("instance1").upgradeRollout().toString()); assertEquals("separate", spec.requireInstance("instance2").upgradeRollout().toString()); } diff --git a/config-model/src/main/resources/schema/deployment.rnc b/config-model/src/main/resources/schema/deployment.rnc index 819e6b79fbb..3e751a379d4 100644 --- a/config-model/src/main/resources/schema/deployment.rnc +++ b/config-model/src/main/resources/schema/deployment.rnc @@ -52,6 +52,7 @@ ParallelInstances = element parallel { Upgrade = element upgrade { attribute policy { xsd:string }? & + attribute revision { xsd:string }? & attribute rollout { xsd:string }? } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java index 3fe5240ce34..cc68d47666d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java @@ -188,7 +188,7 @@ public class Instance { return change; } - /** Returns the application version that last completedd roll-out to this instance. */ + /** Returns the application version that last rolled out to this instance. */ public Optional<ApplicationVersion> latestDeployed() { return latestDeployed; } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java index d3f2d4e4501..9790ece421e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java @@ -364,11 +364,13 @@ public class DeploymentTrigger { // ---------- Change management o_O ---------- private boolean acceptNewApplicationVersion(DeploymentStatus status, InstanceName instance) { - if (status.application().require(instance).change().application().isPresent()) return true; // Replacing a previous application change is ok. - if (status.hasFailures()) return true; // Allow changes to fix upgrade problems. - if (status.application().deploymentSpec().instance(instance) // Leading upgrade allows app change to join in. - .map(spec -> spec.upgradeRollout() == DeploymentSpec.UpgradeRollout.leading).orElse(false)) return true; - return status.application().require(instance).change().platform().isEmpty(); + if (status.application().deploymentSpec().instance(instance).isEmpty()) return false; // Unknown instance. + if (status.hasFailures()) return true; // Allow changes to fix upgrade or previous revision problems. + DeploymentInstanceSpec spec = status.application().deploymentSpec().requireInstance(instance); + Change change = status.application().require(instance).change(); + if (change.platform().isPresent() && spec.upgradeRollout() == DeploymentSpec.UpgradeRollout.separate) return false; + if (change.application().isPresent() && spec.upgradeRevision() == DeploymentSpec.UpgradeRevision.separate) return false; + return true; } private Instance withRemainingChange(Instance instance, Change change, DeploymentStatus status) { diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java index a338efd856c..bc5a70b1fa0 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java @@ -57,6 +57,7 @@ public class ApplicationPackageBuilder { private OptionalInt majorVersion = OptionalInt.empty(); private String instances = "default"; private String upgradePolicy = null; + private String upgradeRevision = "latest"; private String upgradeRollout = null; private String globalServiceId = null; private String athenzIdentityAttributes = "athenz-domain='domain' athenz-service='service'"; @@ -80,6 +81,11 @@ public class ApplicationPackageBuilder { return this; } + public ApplicationPackageBuilder upgradeRevision(String upgradeRevision) { + this.upgradeRevision = upgradeRevision; + return this; + } + public ApplicationPackageBuilder upgradeRollout(String upgradeRollout) { this.upgradeRollout = upgradeRollout; return this; @@ -253,9 +259,10 @@ public class ApplicationPackageBuilder { } xml.append(">\n"); xml.append(" <instance id='").append(instances).append("'>\n"); - if (upgradePolicy != null || upgradeRollout != null) { + if (upgradePolicy != null || upgradeRevision != null || upgradeRollout != null) { xml.append(" <upgrade "); if (upgradePolicy != null) xml.append("policy='").append(upgradePolicy).append("' "); + if (upgradeRevision != null) xml.append("revision='").append(upgradeRevision).append("' "); if (upgradeRollout != null) xml.append("rollout='").append(upgradeRollout).append("' "); xml.append("/>\n"); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java index 52521775ddd..cd1e12312a8 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java @@ -107,6 +107,40 @@ public class DeploymentTriggerTest { } @Test + public void separateRevisionMakesApplicationChangeWaitForPreviousToComplete() { + DeploymentContext app = tester.newDeploymentContext(); + ApplicationPackage applicationPackage = new ApplicationPackageBuilder() + .upgradeRevision(null) // separate by default, but we override this in test builder + .region("us-east-3") + .test("us-east-3") + .build(); + + app.submit(applicationPackage).runJob(systemTest).runJob(stagingTest).runJob(productionUsEast3); + Optional<ApplicationVersion> v0 = app.lastSubmission(); + + app.submit(applicationPackage); + Optional<ApplicationVersion> v1 = app.lastSubmission(); + assertEquals(v0, app.instance().change().application()); + + // Eager tests still run before new revision rolls out. + app.runJob(systemTest).runJob(stagingTest); + + // v0 rolls out completely. + app.runJob(testUsEast3); + assertEquals(Optional.empty(), app.instance().change().application()); + + // v1 starts rolling when v0 is done. + tester.outstandingChangeDeployer().run(); + assertEquals(v1, app.instance().change().application()); + + // v1 fails, so v2 starts immediately. + app.runJob(productionUsEast3).failDeployment(testUsEast3); + app.submit(applicationPackage); + Optional<ApplicationVersion> v2 = app.lastSubmission(); + assertEquals(v2, app.instance().change().application()); + } + + @Test public void leadingUpgradeAllowsApplicationChangeWhileUpgrading() { var applicationPackage = new ApplicationPackageBuilder().region("us-east-3") .upgradeRollout("leading") @@ -826,6 +860,7 @@ public class DeploymentTriggerTest { DeploymentContext i4 = tester.newDeploymentContext("t", "a", "i4"); ApplicationPackage applicationPackage = ApplicationPackageBuilder .fromDeploymentXml("<deployment version='1'>\n" + + " <upgrade revision='separate' />\n" + " <parallel>\n" + " <instance id='i1'>\n" + " <prod>\n" + @@ -910,8 +945,7 @@ public class DeploymentTriggerTest { tester.clock().advance(Duration.ofHours(3)); // v1 is all done in i1 and i2, but does not yet roll out in i3; v2 is not completely rolled out there yet. - // TODO jonmv: thie belowh new revision policy, but must be faked for now, as v1 would not wait for v0 to complete. - //tester.outstandingChangeDeployer().run(); + tester.outstandingChangeDeployer().run(); assertEquals(v0, i3.instance().change().application()); // i3 completes v0, which rolls out to i4; v1 is ready for i3, but v2 is not. diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index fb8120210c1..7b165e11b66 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp - shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp deleted file mode 100644 index 0ad380937c7..00000000000 --- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/shared_operation_throttler.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/barrier.h> -#include <chrono> -#include <thread> - -using namespace ::testing; - -namespace storage { - -using ThrottleToken = SharedOperationThrottler::Token; - -TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { - // We technically can't test that the unlimited throttler _never_ throttles, but at - // least check that it doesn't throttle _twice_, and then induce from this ;) - auto throttler = SharedOperationThrottler::make_unlimited_throttler(); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->blocking_acquire_one(); - EXPECT_TRUE(token2.valid()); - // Window size should be zero (i.e. unlimited) for unlimited throttler - EXPECT_EQ(throttler->current_window_size(), 0); -} - -TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->try_acquire_one(); - EXPECT_FALSE(token2.valid()); - - EXPECT_EQ(throttler->current_window_size(), 1); -} - -TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - token.reset(); - token = throttler->blocking_acquire_one(600s); // Should never block. - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - vespalib::Barrier barrier(2); - std::thread t([&] { - auto token = throttler->try_acquire_one(); - assert(token.valid()); - barrier.await(); - while (throttler->waiting_threads() != 1) { - std::this_thread::sleep_for(100us); - } - // Implicit token release at thread scope exit - }); - barrier.await(); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - t.join(); -} - -TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto window_filling_token = throttler->try_acquire_one(); - auto before = std::chrono::steady_clock::now(); - // Will block for at least 1ms. Since no window slot will be available by that time, - // an invalid token should be returned. - auto token = throttler->blocking_acquire_one(1ms); - auto after = std::chrono::steady_clock::now(); - EXPECT_TRUE((after - before) >= 1ms); - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { - ThrottleToken token; - EXPECT_FALSE(token.valid()); - token.reset(); // no-op - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - { - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); - } - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - auto token2 = std::move(token1); // move ctor - EXPECT_TRUE(token2.valid()); - EXPECT_FALSE(token1.valid()); - ThrottleToken token3; - token3 = std::move(token2); // move assignment op - EXPECT_TRUE(token3.valid()); - EXPECT_FALSE(token2.valid()); - - // Trying to fetch new token should not succeed due to active token and win size of 1 - token1 = throttler->try_acquire_one(); - EXPECT_FALSE(token1.valid()); - // Resetting the token should free up the slot in the window - token3.reset(); - token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); -} - -// TODO ideally we'd test that the dynamic throttler has a window size that is actually -// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so -// it's not trivial to know how to do this reliably. - -} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 5e068236026..c737d2bed28 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -14,7 +14,6 @@ vespa_add_library(storage_spersistence OBJECT persistenceutil.cpp processallhandler.cpp provider_error_wrapper.cpp - shared_operation_throttler.cpp simplemessagehandler.cpp splitbitdetector.cpp splitjoinhandler.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 73ccc7f6085..69d35253aa6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -9,7 +9,7 @@ namespace storage { ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, - SharedOperationThrottler::Token throttle_token, + ThrottleToken throttle_token, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index 8478cab4c17..a78fbe38ae5 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -22,14 +22,14 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete const spi::ResultHandler* _result_handler; std::shared_ptr<ApplyBucketDiffState> _state; document::DocumentId _doc_id; - SharedOperationThrottler::Token _throttle_token; + ThrottleToken _throttle_token; const char* _op; framework::MilliSecTimer _start_time; metrics::DoubleAverageMetric& _latency_metric; public: ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, - SharedOperationThrottler::Token throttle_token, + ThrottleToken throttle_token, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); ~ApplyBucketDiffEntryComplete() override; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 6f740ce2c28..66dc7126058 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -78,7 +78,7 @@ public: struct LockedMessage { std::shared_ptr<BucketLockInterface> lock; std::shared_ptr<api::StorageMessage> msg; - SharedOperationThrottler::Token throttle_token; + ThrottleToken throttle_token; LockedMessage() noexcept = default; LockedMessage(std::shared_ptr<BucketLockInterface> lock_, @@ -89,7 +89,7 @@ public: {} LockedMessage(std::shared_ptr<BucketLockInterface> lock_, std::shared_ptr<api::StorageMessage> msg_, - SharedOperationThrottler::Token token) noexcept + ThrottleToken token) noexcept : lock(std::move(lock_)), msg(std::move(msg_)), throttle_token(std::move(token)) @@ -274,7 +274,7 @@ public: virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0; - virtual SharedOperationThrottler& operation_throttler() const noexcept = 0; + virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 2ccbc7a85ef..f7d4f750884 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -40,14 +40,14 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) - : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler()) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler()) { } FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg, - std::unique_ptr<SharedOperationThrottler> operation_throttler) + std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), _metrics(nullptr), @@ -920,7 +920,7 @@ FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { std::unique_lock guard(*_lock); - SharedOperationThrottler::Token throttle_token; + ThrottleToken throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register @@ -997,7 +997,7 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) { std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index c4b85ac596c..698f52359f5 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -149,7 +149,7 @@ public: // with its locking requirements. FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -191,7 +191,7 @@ public: FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>); + ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>); ~FileStorHandlerImpl() override; void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } @@ -243,7 +243,7 @@ public: ResumeGuard pause() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; - SharedOperationThrottler& operation_throttler() const noexcept override { + vespalib::SharedOperationThrottler& operation_throttler() const noexcept override { return *_operation_throttler; } @@ -257,7 +257,7 @@ private: ServiceLayerComponent _component; std::atomic<DiskState> _state; FileStorDiskMetrics * _metrics; - std::unique_ptr<SharedOperationThrottler> _operation_throttler; + std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler; std::vector<Stripe> _stripes; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 530d96dfe3f..03b16e75297 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -143,16 +143,19 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) { } } -std::unique_ptr<SharedOperationThrottler> +std::unique_ptr<vespalib::SharedOperationThrottler> make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads) { const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC); if (use_dynamic_throttling) { auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1); auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads); - return SharedOperationThrottler::make_dynamic_throttler(win_size_increment); + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.window_size_increment = win_size_increment; + params.min_window_size = win_size_increment; + return vespalib::SharedOperationThrottler::make_dynamic_throttler(params); } else { - return SharedOperationThrottler::make_unlimited_throttler(); + return vespalib::SharedOperationThrottler::make_unlimited_throttler(); } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index e1c821aab48..ee6eed63eb5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -22,7 +22,10 @@ #include <vespa/vespalib/util/monitored_refcount.h> #include <atomic> -namespace vespalib { class ISequencedTaskExecutor; } +namespace vespalib { +class ISequencedTaskExecutor; +class SharedOperationThrottler; +} namespace storage { @@ -34,7 +37,6 @@ namespace spi { class PersistenceUtil; class ApplyBucketDiffState; class MergeStatus; -class SharedOperationThrottler; class MergeHandler : public Types, public MergeBucketInfoSyncer { @@ -85,7 +87,7 @@ private: const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; - SharedOperationThrottler& _operation_throttler; + vespalib::SharedOperationThrottler& _operation_throttler; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 65eab99b8fb..2781cc61b83 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -32,7 +32,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, @@ -41,7 +41,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) : _sendReply(true), _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), @@ -60,7 +60,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), - std::move(msg), SharedOperationThrottler::Token())); + std::move(msg), ThrottleToken())); } void diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 588cbef2170..4130a276239 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -31,7 +31,7 @@ public: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); ~MessageTracker(); @@ -93,7 +93,7 @@ public: private: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; @@ -101,7 +101,7 @@ private: bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; std::shared_ptr<api::StorageMessage> _msg; - SharedOperationThrottler::Token _throttle_token; + ThrottleToken _throttle_token; spi::Context _context; const PersistenceUtil &_env; MessageSender &_replySender; diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp deleted file mode 100644 index 7db1a0ccdbb..00000000000 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp +++ /dev/null @@ -1,201 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "shared_operation_throttler.h" -#include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/storage/common/dummy_mbus_messages.h> -#include <condition_variable> -#include <cassert> -#include <mutex> - -namespace storage { - -namespace { - -class NoLimitsOperationThrottler final : public SharedOperationThrottler { -public: - ~NoLimitsOperationThrottler() override = default; - Token blocking_acquire_one() noexcept override { - return Token(this, TokenCtorTag{}); - } - Token blocking_acquire_one(vespalib::duration) noexcept override { - return Token(this, TokenCtorTag{}); - } - Token try_acquire_one() noexcept override { - return Token(this, TokenCtorTag{}); - } - uint32_t current_window_size() const noexcept override { return 0; } - uint32_t waiting_threads() const noexcept override { return 0; } -private: - void release_one() noexcept override { /* no-op */ } -}; - -class DynamicOperationThrottler final : public SharedOperationThrottler { - mutable std::mutex _mutex; - std::condition_variable _cond; - mbus::DynamicThrottlePolicy _throttle_policy; - uint32_t _pending_ops; - uint32_t _waiting_threads; -public: - explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment); - ~DynamicOperationThrottler() override; - - Token blocking_acquire_one() noexcept override; - Token blocking_acquire_one(vespalib::duration timeout) noexcept override; - Token try_acquire_one() noexcept override; - uint32_t current_window_size() const noexcept override; - uint32_t waiting_threads() const noexcept override; -private: - void release_one() noexcept override; - // Non-const since actually checking the send window of a dynamic throttler might change - // it if enough time has passed. - [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept; - void add_one_to_active_window_size(); - void subtract_one_from_active_window_size(); -}; - -DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment) - : _mutex(), - _cond(), - _throttle_policy(static_cast<double>(min_size_and_window_increment)), - _pending_ops(0), - _waiting_threads(0) -{ - _throttle_policy.setWindowSizeDecrementFactor(1.2); - _throttle_policy.setWindowSizeBackOff(0.95); -} - -DynamicOperationThrottler::~DynamicOperationThrottler() = default; - -bool -DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept -{ - DummyMbusRequest dummy_request; - return _throttle_policy.canSend(dummy_request, _pending_ops); -} - -void -DynamicOperationThrottler::add_one_to_active_window_size() -{ - DummyMbusRequest dummy_request; - _throttle_policy.processMessage(dummy_request); - ++_pending_ops; -} - -void -DynamicOperationThrottler::subtract_one_from_active_window_size() -{ - DummyMbusReply dummy_reply; - _throttle_policy.processReply(dummy_reply); - assert(_pending_ops > 0); - --_pending_ops; -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one() noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - ++_waiting_threads; - _cond.wait(lock, [&] { - return has_spare_capacity_in_active_window(); - }); - --_waiting_threads; - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - ++_waiting_threads; - const bool accepted = _cond.wait_for(lock, timeout, [&] { - return has_spare_capacity_in_active_window(); - }); - --_waiting_threads; - if (!accepted) { - return Token(); - } - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::try_acquire_one() noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - return Token(); - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -void -DynamicOperationThrottler::release_one() noexcept -{ - std::unique_lock lock(_mutex); - subtract_one_from_active_window_size(); - // Only wake up a waiting thread if doing so would possibly result in success. - if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) { - lock.unlock(); - _cond.notify_one(); - } -} - -uint32_t -DynamicOperationThrottler::current_window_size() const noexcept -{ - std::unique_lock lock(_mutex); - return _throttle_policy.getMaxPendingCount(); // Actually returns current window size -} - -uint32_t -DynamicOperationThrottler::waiting_threads() const noexcept -{ - std::unique_lock lock(_mutex); - return _waiting_threads; -} - -} - -std::unique_ptr<SharedOperationThrottler> -SharedOperationThrottler::make_unlimited_throttler() -{ - return std::make_unique<NoLimitsOperationThrottler>(); -} - -std::unique_ptr<SharedOperationThrottler> -SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment) -{ - return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment); -} - -DynamicOperationThrottler::Token::~Token() -{ - if (_throttler) { - _throttler->release_one(); - } -} - -void -DynamicOperationThrottler::Token::reset() noexcept -{ - if (_throttler) { - _throttler->release_one(); - _throttler = nullptr; - } -} - -DynamicOperationThrottler::Token& -DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept -{ - reset(); - _throttler = rhs._throttler; - rhs._throttler = nullptr; - return *this; -} - -} diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h index 4ee8d017c05..b829f077bcb 100644 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h @@ -1,72 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/vespalib/util/time.h> -#include <memory> -#include <optional> +#include <vespa/vespalib/util/shared_operation_throttler.h> namespace storage { -/** - * Operation throttler that is intended to provide global throttling of - * async operations across all persistence stripe threads. A throttler - * wraps a logical max pending window size of in-flight operations. Depending - * on the throttler implementation, the window size may expand and shrink - * dynamically. Exactly how and when this happens is unspecified. - * - * Offers both polling and (timed, non-timed) blocking calls for acquiring - * a throttle token. If the returned token is valid, the caller may proceed - * to invoke the asynchronous operation. - * - * The window slot taken up by a valid throttle token is implicitly freed up - * when the token is destroyed. - * - * All operations on the throttler are thread safe. - */ -class SharedOperationThrottler { -protected: - struct TokenCtorTag {}; // Make available to subclasses for token construction. -public: - class Token { - SharedOperationThrottler* _throttler; - public: - constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} - constexpr Token() noexcept : _throttler(nullptr) {} - constexpr Token(Token&& rhs) noexcept - : _throttler(rhs._throttler) - { - rhs._throttler = nullptr; - } - Token& operator=(Token&& rhs) noexcept; - ~Token(); - - Token(const Token&) = delete; - Token& operator=(const Token&) = delete; - - [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } - void reset() noexcept; - }; - - virtual ~SharedOperationThrottler() = default; - - // All methods are thread safe - [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; - [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; - [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; - - // May return 0, in which case the window size is unlimited. - [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; - - // Exposed for unit testing only. - [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; - - // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) - static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); - // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood - static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment); -private: - // Exclusively called from a valid Token. Thread safe. - virtual void release_one() noexcept = 0; -}; +using ThrottleToken = vespalib::SharedOperationThrottler::Token; } diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp index 61cd2b5ef44..d9b6ae7f908 100644 --- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp +++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp @@ -1,20 +1,127 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/util/shared_operation_throttler.h> #include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/barrier.h> +#include <thread> namespace vespalib { +using ThrottleToken = SharedOperationThrottler::Token; + +struct DynamicThrottleFixture { + std::unique_ptr<SharedOperationThrottler> _throttler; + + DynamicThrottleFixture() { + SharedOperationThrottler::DynamicThrottleParams params; + params.window_size_increment = 1; + params.min_window_size = 1; + _throttler = SharedOperationThrottler::make_dynamic_throttler(params); + } +}; + +TEST("unlimited throttler does not throttle") { + // We technically can't test that the unlimited throttler _never_ throttles, but at + // least check that it doesn't throttle _twice_, and then induce from this ;) + auto throttler = SharedOperationThrottler::make_unlimited_throttler(); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->blocking_acquire_one(); + EXPECT_TRUE(token2.valid()); + // Window size should be zero (i.e. unlimited) for unlimited throttler + EXPECT_EQUAL(throttler->current_window_size(), 0u); +} + +TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) { + auto token1 = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = f1._throttler->try_acquire_one(); + EXPECT_FALSE(token2.valid()); + + EXPECT_EQUAL(f1._throttler->current_window_size(), 1u); +} + +TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) { + auto token = f1._throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + token.reset(); + token = f1._throttler->blocking_acquire_one(600s); // Should never block. + EXPECT_TRUE(token.valid()); +} + +TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) { + vespalib::Barrier barrier(2); + std::thread t([&] { + auto token = f1._throttler->try_acquire_one(); + assert(token.valid()); + barrier.await(); + while (f1._throttler->waiting_threads() != 1) { + std::this_thread::sleep_for(100us); + } + // Implicit token release at thread scope exit + }); + barrier.await(); + auto token = f1._throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + t.join(); +} + +TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) { + auto window_filling_token = f1._throttler->try_acquire_one(); + auto before = std::chrono::steady_clock::now(); + // Will block for at least 1ms. Since no window slot will be available by that time, + // an invalid token should be returned. + auto token = f1._throttler->blocking_acquire_one(1ms); + auto after = std::chrono::steady_clock::now(); + EXPECT_TRUE((after - before) >= 1ms); + EXPECT_FALSE(token.valid()); +} + +TEST("default constructed token is invalid") { + ThrottleToken token; + EXPECT_FALSE(token.valid()); + token.reset(); // no-op + EXPECT_FALSE(token.valid()); +} + +TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) { + { + auto token = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); + } + auto token = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); +} + +TEST_F("token can be moved and reset", DynamicThrottleFixture()) { + auto token1 = f1._throttler->try_acquire_one(); + auto token2 = std::move(token1); // move ctor + EXPECT_TRUE(token2.valid()); + EXPECT_FALSE(token1.valid()); + ThrottleToken token3; + token3 = std::move(token2); // move assignment op + EXPECT_TRUE(token3.valid()); + EXPECT_FALSE(token2.valid()); + + // Trying to fetch new token should not succeed due to active token and win size of 1 + token1 = f1._throttler->try_acquire_one(); + EXPECT_FALSE(token1.valid()); + // Resetting the token should free up the slot in the window + token3.reset(); + token1 = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); +} + // Note on test semantics: these tests are adapted from a subset of the MessageBus // throttling tests. Some tests have been simplified due to no longer having access // to the low-level DynamicThrottlePolicy API. -struct Fixture { +struct WindowFixture { uint64_t _milli_time; std::unique_ptr<SharedOperationThrottler> _throttler; - Fixture(uint32_t window_size_increment = 5, - uint32_t min_window_size = 20, - uint32_t max_window_size = INT_MAX) + WindowFixture(uint32_t window_size_increment = 5, + uint32_t min_window_size = 20, + uint32_t max_window_size = INT_MAX) : _milli_time(0), _throttler() { @@ -57,7 +164,7 @@ struct Fixture { } }; -TEST_F("window size changes dynamically based on throughput", Fixture()) { +TEST_F("window size changes dynamically based on throughput", WindowFixture()) { uint32_t window_size = f1.attempt_converge_on_stable_window_size(100); ASSERT_TRUE(window_size >= 90 && window_size <= 105); @@ -74,7 +181,7 @@ TEST_F("window size changes dynamically based on throughput", Fixture()) { ASSERT_TRUE(window_size >= 90 && window_size <= 115); } -TEST_F("window size is reset after idle time period", Fixture(5, 1)) { +TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) { double window_size = f1.attempt_converge_on_stable_window_size(100); ASSERT_TRUE(window_size >= 90 && window_size <= 110); @@ -88,12 +195,12 @@ TEST_F("window size is reset after idle time period", Fixture(5, 1)) { EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size } -TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) { +TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) { double window_size = f1.attempt_converge_on_stable_window_size(200); ASSERT_TRUE(window_size >= 150 && window_size <= 210); } -TEST_F("maximum window size is respected", Fixture(5, 1, 50)) { +TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) { double window_size = f1.attempt_converge_on_stable_window_size(100); ASSERT_TRUE(window_size >= 40 && window_size <= 50); } |