summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model-api/abi-spec.json20
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java9
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java9
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java25
-rw-r--r--config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java22
-rw-r--r--config-model/src/main/resources/schema/deployment.rnc1
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java12
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java9
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java38
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h8
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp201
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h66
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp123
25 files changed, 280 insertions, 432 deletions
diff --git a/config-model-api/abi-spec.json b/config-model-api/abi-spec.json
index cac9d21ee1f..d946dd972f4 100644
--- a/config-model-api/abi-spec.json
+++ b/config-model-api/abi-spec.json
@@ -194,9 +194,10 @@
"public"
],
"methods": [
- "public void <init>(com.yahoo.config.provision.InstanceName, java.util.List, com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy, com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout, java.util.List, java.util.Optional, java.util.Optional, com.yahoo.config.application.api.Notifications, java.util.List, java.time.Instant)",
+ "public void <init>(com.yahoo.config.provision.InstanceName, java.util.List, com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy, com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision, com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout, java.util.List, java.util.Optional, java.util.Optional, com.yahoo.config.application.api.Notifications, java.util.List, java.time.Instant)",
"public com.yahoo.config.provision.InstanceName name()",
"public com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy upgradePolicy()",
+ "public com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision upgradeRevision()",
"public com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout upgradeRollout()",
"public java.util.List changeBlocker()",
"public java.util.Optional globalServiceId()",
@@ -365,6 +366,23 @@
"public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradePolicy conservative"
]
},
+ "com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision": {
+ "superClass": "java.lang.Enum",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "enum"
+ ],
+ "methods": [
+ "public static com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision[] values()",
+ "public static com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision valueOf(java.lang.String)"
+ ],
+ "fields": [
+ "public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision separate",
+ "public static final enum com.yahoo.config.application.api.DeploymentSpec$UpgradeRevision latest"
+ ]
+ },
"com.yahoo.config.application.api.DeploymentSpec$UpgradeRollout": {
"superClass": "java.lang.Enum",
"interfaces": [],
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
index 67ddb9ef83c..ea38860c29b 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentInstanceSpec.java
@@ -31,6 +31,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
private final InstanceName name;
private final DeploymentSpec.UpgradePolicy upgradePolicy;
+ private final DeploymentSpec.UpgradeRevision upgradeRevision;
private final DeploymentSpec.UpgradeRollout upgradeRollout;
private final List<DeploymentSpec.ChangeBlocker> changeBlockers;
private final Optional<String> globalServiceId;
@@ -41,6 +42,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
public DeploymentInstanceSpec(InstanceName name,
List<DeploymentSpec.Step> steps,
DeploymentSpec.UpgradePolicy upgradePolicy,
+ DeploymentSpec.UpgradeRevision upgradeRevision,
DeploymentSpec.UpgradeRollout upgradeRollout,
List<DeploymentSpec.ChangeBlocker> changeBlockers,
Optional<String> globalServiceId,
@@ -51,6 +53,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
super(steps);
this.name = name;
this.upgradePolicy = upgradePolicy;
+ this.upgradeRevision = upgradeRevision;
this.upgradeRollout = upgradeRollout;
this.changeBlockers = changeBlockers;
this.globalServiceId = globalServiceId;
@@ -150,6 +153,9 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
/** Returns the upgrade policy of this, which is defaultPolicy if none is specified */
public DeploymentSpec.UpgradePolicy upgradePolicy() { return upgradePolicy; }
+ /** Returns the upgrade revision strategy of this, which is separate if none is specified */
+ public DeploymentSpec.UpgradeRevision upgradeRevision() { return upgradeRevision; }
+
/** Returns the upgrade rollout strategy of this, which is separate if none is specified */
public DeploymentSpec.UpgradeRollout upgradeRollout() { return upgradeRollout; }
@@ -198,6 +204,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
DeploymentInstanceSpec other = (DeploymentInstanceSpec) o;
return globalServiceId.equals(other.globalServiceId) &&
upgradePolicy == other.upgradePolicy &&
+ upgradeRevision == other.upgradeRevision &&
upgradeRollout == other.upgradeRollout &&
changeBlockers.equals(other.changeBlockers) &&
steps().equals(other.steps()) &&
@@ -208,7 +215,7 @@ public class DeploymentInstanceSpec extends DeploymentSpec.Steps {
@Override
public int hashCode() {
- return Objects.hash(globalServiceId, upgradePolicy, upgradeRollout, changeBlockers, steps(), athenzService, notifications, endpoints);
+ return Objects.hash(globalServiceId, upgradePolicy, upgradeRevision, upgradeRollout, changeBlockers, steps(), athenzService, notifications, endpoints);
}
@Override
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
index 88363db6e49..4b019bd9f7a 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
@@ -550,6 +550,15 @@ public class DeploymentSpec {
}
+ /** Determines when application changes deploy, when an older revision is already rolling out. */
+ public enum UpgradeRevision {
+ /** Separate: Application changes wait for previous application changes to complete, unless they fail. */
+ separate,
+ /** Latest: Application changes immediately supersede previous application changes, unless currently blocked. */
+ latest
+ }
+
+
/** Determines when application changes deploy, when there is already an ongoing platform upgrade. */
public enum UpgradeRollout {
/** Separate: Application changes wait for upgrade to complete, unless upgrade fails. */
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
index 8f866654d56..b031af9faf2 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/xml/DeploymentSpecXmlReader.java
@@ -165,6 +165,7 @@ public class DeploymentSpecXmlReader {
// Values where the parent may provide a default
DeploymentSpec.UpgradePolicy upgradePolicy = readUpgradePolicy(instanceTag, parentTag);
+ DeploymentSpec.UpgradeRevision upgradeRevision = readUpgradeRevision(instanceTag, parentTag);
DeploymentSpec.UpgradeRollout upgradeRollout = readUpgradeRollout(instanceTag, parentTag);
List<DeploymentSpec.ChangeBlocker> changeBlockers = readChangeBlockers(instanceTag, parentTag);
Optional<AthenzService> athenzService = mostSpecificAttribute(instanceTag, athenzServiceAttribute).map(AthenzService::from);
@@ -183,6 +184,7 @@ public class DeploymentSpecXmlReader {
.map(name -> new DeploymentInstanceSpec(InstanceName.from(name),
steps,
upgradePolicy,
+ upgradeRevision,
upgradeRollout,
changeBlockers,
globalServiceId.asOptional(),
@@ -472,6 +474,25 @@ public class DeploymentSpecXmlReader {
}
}
+ private DeploymentSpec.UpgradeRevision readUpgradeRevision(Element parent, Element fallbackParent) {
+ Element upgradeElement = XML.getChild(parent, upgradeTag);
+ if (upgradeElement == null)
+ upgradeElement = XML.getChild(fallbackParent, upgradeTag);
+ if (upgradeElement == null)
+ return DeploymentSpec.UpgradeRevision.separate;
+
+ String revision = upgradeElement.getAttribute("revision");
+ if (revision.isEmpty())
+ return DeploymentSpec.UpgradeRevision.separate;
+
+ switch (revision) {
+ case "separate": return DeploymentSpec.UpgradeRevision.separate;
+ case "latest": return DeploymentSpec.UpgradeRevision.latest;
+ default: throw new IllegalArgumentException("Illegal upgrade revision '" + revision + "': " +
+ "Must be one of " + Arrays.toString(DeploymentSpec.UpgradeRevision.values()));
+ }
+ }
+
private DeploymentSpec.UpgradeRollout readUpgradeRollout(Element parent, Element fallbackParent) {
Element upgradeElement = XML.getChild(parent, upgradeTag);
if (upgradeElement == null)
@@ -487,8 +508,8 @@ public class DeploymentSpecXmlReader {
case "separate": return DeploymentSpec.UpgradeRollout.separate;
case "leading": return DeploymentSpec.UpgradeRollout.leading;
// case "simultaneous": return DeploymentSpec.UpgradePolicy.conservative;
- default: throw new IllegalArgumentException("Illegal upgrade policy '" + rollout + "': " +
- "Must be one of " + Arrays.toString(DeploymentSpec.UpgradePolicy.values()));
+ default: throw new IllegalArgumentException("Illegal upgrade rollout '" + rollout + "': " +
+ "Must be one of " + Arrays.toString(DeploymentSpec.UpgradeRollout.values()));
}
}
diff --git a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
index 2fa2ba83291..a97faf5995d 100644
--- a/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
+++ b/config-model-api/src/test/java/com/yahoo/config/application/api/DeploymentSpecTest.java
@@ -121,6 +121,7 @@ public class DeploymentSpecTest {
assertFalse(spec.requireInstance("default").globalServiceId().isPresent());
assertEquals(DeploymentSpec.UpgradePolicy.defaultPolicy, spec.requireInstance("default").upgradePolicy());
+ assertEquals(DeploymentSpec.UpgradeRevision.separate, spec.requireInstance("default").upgradeRevision());
assertEquals(DeploymentSpec.UpgradeRollout.separate, spec.requireInstance("default").upgradeRollout());
}
@@ -364,6 +365,21 @@ public class DeploymentSpecTest {
}
@Test
+ public void productionSpecWithUpgradeRevision() {
+ StringReader r = new StringReader(
+ "<deployment>" +
+ " <instance id='default'>" +
+ " <upgrade revision='latest' />" +
+ " </instance>" +
+ " <instance id='custom'/>" +
+ "</deployment>"
+ );
+ DeploymentSpec spec = DeploymentSpec.fromXml(r);
+ assertEquals("latest", spec.requireInstance("default").upgradeRevision().toString());
+ assertEquals("separate", spec.requireInstance("custom").upgradeRevision().toString());
+ }
+
+ @Test
public void productionSpecWithUpgradeRollout() {
StringReader r = new StringReader(
"<deployment>" +
@@ -397,10 +413,10 @@ public class DeploymentSpecTest {
public void upgradePolicyDefault() {
StringReader r = new StringReader(
"<deployment version='1.0'>" +
- " <upgrade policy='canary' rollout='leading'/>" +
+ " <upgrade policy='canary' rollout='leading' revision='latest' />" +
" <instance id='instance1'/>" +
" <instance id='instance2'>" +
- " <upgrade policy='conservative' rollout='separate'/>" +
+ " <upgrade policy='conservative' rollout='separate' revision='separate'/>" +
" </instance>" +
"</deployment>"
);
@@ -408,6 +424,8 @@ public class DeploymentSpecTest {
DeploymentSpec spec = DeploymentSpec.fromXml(r);
assertEquals("canary", spec.requireInstance("instance1").upgradePolicy().toString());
assertEquals("conservative", spec.requireInstance("instance2").upgradePolicy().toString());
+ assertEquals("latest", spec.requireInstance("instance1").upgradeRevision().toString());
+ assertEquals("separate", spec.requireInstance("instance2").upgradeRevision().toString());
assertEquals("leading", spec.requireInstance("instance1").upgradeRollout().toString());
assertEquals("separate", spec.requireInstance("instance2").upgradeRollout().toString());
}
diff --git a/config-model/src/main/resources/schema/deployment.rnc b/config-model/src/main/resources/schema/deployment.rnc
index 819e6b79fbb..3e751a379d4 100644
--- a/config-model/src/main/resources/schema/deployment.rnc
+++ b/config-model/src/main/resources/schema/deployment.rnc
@@ -52,6 +52,7 @@ ParallelInstances = element parallel {
Upgrade = element upgrade {
attribute policy { xsd:string }? &
+ attribute revision { xsd:string }? &
attribute rollout { xsd:string }?
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java
index 3fe5240ce34..cc68d47666d 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Instance.java
@@ -188,7 +188,7 @@ public class Instance {
return change;
}
- /** Returns the application version that last completedd roll-out to this instance. */
+ /** Returns the application version that last rolled out to this instance. */
public Optional<ApplicationVersion> latestDeployed() {
return latestDeployed;
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
index d3f2d4e4501..9790ece421e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
@@ -364,11 +364,13 @@ public class DeploymentTrigger {
// ---------- Change management o_O ----------
private boolean acceptNewApplicationVersion(DeploymentStatus status, InstanceName instance) {
- if (status.application().require(instance).change().application().isPresent()) return true; // Replacing a previous application change is ok.
- if (status.hasFailures()) return true; // Allow changes to fix upgrade problems.
- if (status.application().deploymentSpec().instance(instance) // Leading upgrade allows app change to join in.
- .map(spec -> spec.upgradeRollout() == DeploymentSpec.UpgradeRollout.leading).orElse(false)) return true;
- return status.application().require(instance).change().platform().isEmpty();
+ if (status.application().deploymentSpec().instance(instance).isEmpty()) return false; // Unknown instance.
+ if (status.hasFailures()) return true; // Allow changes to fix upgrade or previous revision problems.
+ DeploymentInstanceSpec spec = status.application().deploymentSpec().requireInstance(instance);
+ Change change = status.application().require(instance).change();
+ if (change.platform().isPresent() && spec.upgradeRollout() == DeploymentSpec.UpgradeRollout.separate) return false;
+ if (change.application().isPresent() && spec.upgradeRevision() == DeploymentSpec.UpgradeRevision.separate) return false;
+ return true;
}
private Instance withRemainingChange(Instance instance, Change change, DeploymentStatus status) {
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java
index a338efd856c..bc5a70b1fa0 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ApplicationPackageBuilder.java
@@ -57,6 +57,7 @@ public class ApplicationPackageBuilder {
private OptionalInt majorVersion = OptionalInt.empty();
private String instances = "default";
private String upgradePolicy = null;
+ private String upgradeRevision = "latest";
private String upgradeRollout = null;
private String globalServiceId = null;
private String athenzIdentityAttributes = "athenz-domain='domain' athenz-service='service'";
@@ -80,6 +81,11 @@ public class ApplicationPackageBuilder {
return this;
}
+ public ApplicationPackageBuilder upgradeRevision(String upgradeRevision) {
+ this.upgradeRevision = upgradeRevision;
+ return this;
+ }
+
public ApplicationPackageBuilder upgradeRollout(String upgradeRollout) {
this.upgradeRollout = upgradeRollout;
return this;
@@ -253,9 +259,10 @@ public class ApplicationPackageBuilder {
}
xml.append(">\n");
xml.append(" <instance id='").append(instances).append("'>\n");
- if (upgradePolicy != null || upgradeRollout != null) {
+ if (upgradePolicy != null || upgradeRevision != null || upgradeRollout != null) {
xml.append(" <upgrade ");
if (upgradePolicy != null) xml.append("policy='").append(upgradePolicy).append("' ");
+ if (upgradeRevision != null) xml.append("revision='").append(upgradeRevision).append("' ");
if (upgradeRollout != null) xml.append("rollout='").append(upgradeRollout).append("' ");
xml.append("/>\n");
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java
index 52521775ddd..cd1e12312a8 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTriggerTest.java
@@ -107,6 +107,40 @@ public class DeploymentTriggerTest {
}
@Test
+ public void separateRevisionMakesApplicationChangeWaitForPreviousToComplete() {
+ DeploymentContext app = tester.newDeploymentContext();
+ ApplicationPackage applicationPackage = new ApplicationPackageBuilder()
+ .upgradeRevision(null) // separate by default, but we override this in test builder
+ .region("us-east-3")
+ .test("us-east-3")
+ .build();
+
+ app.submit(applicationPackage).runJob(systemTest).runJob(stagingTest).runJob(productionUsEast3);
+ Optional<ApplicationVersion> v0 = app.lastSubmission();
+
+ app.submit(applicationPackage);
+ Optional<ApplicationVersion> v1 = app.lastSubmission();
+ assertEquals(v0, app.instance().change().application());
+
+ // Eager tests still run before new revision rolls out.
+ app.runJob(systemTest).runJob(stagingTest);
+
+ // v0 rolls out completely.
+ app.runJob(testUsEast3);
+ assertEquals(Optional.empty(), app.instance().change().application());
+
+ // v1 starts rolling when v0 is done.
+ tester.outstandingChangeDeployer().run();
+ assertEquals(v1, app.instance().change().application());
+
+ // v1 fails, so v2 starts immediately.
+ app.runJob(productionUsEast3).failDeployment(testUsEast3);
+ app.submit(applicationPackage);
+ Optional<ApplicationVersion> v2 = app.lastSubmission();
+ assertEquals(v2, app.instance().change().application());
+ }
+
+ @Test
public void leadingUpgradeAllowsApplicationChangeWhileUpgrading() {
var applicationPackage = new ApplicationPackageBuilder().region("us-east-3")
.upgradeRollout("leading")
@@ -826,6 +860,7 @@ public class DeploymentTriggerTest {
DeploymentContext i4 = tester.newDeploymentContext("t", "a", "i4");
ApplicationPackage applicationPackage = ApplicationPackageBuilder
.fromDeploymentXml("<deployment version='1'>\n" +
+ " <upgrade revision='separate' />\n" +
" <parallel>\n" +
" <instance id='i1'>\n" +
" <prod>\n" +
@@ -910,8 +945,7 @@ public class DeploymentTriggerTest {
tester.clock().advance(Duration.ofHours(3));
// v1 is all done in i1 and i2, but does not yet roll out in i3; v2 is not completely rolled out there yet.
- // TODO jonmv: thie belowh new revision policy, but must be faked for now, as v1 would not wait for v0 to complete.
- //tester.outstandingChangeDeployer().run();
+ tester.outstandingChangeDeployer().run();
assertEquals(v0, i3.instance().change().application());
// i3 completes v0, which rolls out to i4; v1 is ready for i3, but v2 is not.
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index fb8120210c1..7b165e11b66 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
- shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
deleted file mode 100644
index 0ad380937c7..00000000000
--- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/persistence/shared_operation_throttler.h>
-#include <vespa/vespalib/gtest/gtest.h>
-#include <vespa/vespalib/util/barrier.h>
-#include <chrono>
-#include <thread>
-
-using namespace ::testing;
-
-namespace storage {
-
-using ThrottleToken = SharedOperationThrottler::Token;
-
-TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
- // We technically can't test that the unlimited throttler _never_ throttles, but at
- // least check that it doesn't throttle _twice_, and then induce from this ;)
- auto throttler = SharedOperationThrottler::make_unlimited_throttler();
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->blocking_acquire_one();
- EXPECT_TRUE(token2.valid());
- // Window size should be zero (i.e. unlimited) for unlimited throttler
- EXPECT_EQ(throttler->current_window_size(), 0);
-}
-
-TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->try_acquire_one();
- EXPECT_FALSE(token2.valid());
-
- EXPECT_EQ(throttler->current_window_size(), 1);
-}
-
-TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- token.reset();
- token = throttler->blocking_acquire_one(600s); // Should never block.
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- vespalib::Barrier barrier(2);
- std::thread t([&] {
- auto token = throttler->try_acquire_one();
- assert(token.valid());
- barrier.await();
- while (throttler->waiting_threads() != 1) {
- std::this_thread::sleep_for(100us);
- }
- // Implicit token release at thread scope exit
- });
- barrier.await();
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- t.join();
-}
-
-TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto window_filling_token = throttler->try_acquire_one();
- auto before = std::chrono::steady_clock::now();
- // Will block for at least 1ms. Since no window slot will be available by that time,
- // an invalid token should be returned.
- auto token = throttler->blocking_acquire_one(1ms);
- auto after = std::chrono::steady_clock::now();
- EXPECT_TRUE((after - before) >= 1ms);
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
- ThrottleToken token;
- EXPECT_FALSE(token.valid());
- token.reset(); // no-op
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- {
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
- }
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- auto token2 = std::move(token1); // move ctor
- EXPECT_TRUE(token2.valid());
- EXPECT_FALSE(token1.valid());
- ThrottleToken token3;
- token3 = std::move(token2); // move assignment op
- EXPECT_TRUE(token3.valid());
- EXPECT_FALSE(token2.valid());
-
- // Trying to fetch new token should not succeed due to active token and win size of 1
- token1 = throttler->try_acquire_one();
- EXPECT_FALSE(token1.valid());
- // Resetting the token should free up the slot in the window
- token3.reset();
- token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
-}
-
-// TODO ideally we'd test that the dynamic throttler has a window size that is actually
-// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
-// it's not trivial to know how to do this reliably.
-
-}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 5e068236026..c737d2bed28 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
- shared_operation_throttler.cpp
simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 73ccc7f6085..69d35253aa6 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -9,7 +9,7 @@ namespace storage {
ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
document::DocumentId doc_id,
- SharedOperationThrottler::Token throttle_token,
+ ThrottleToken throttle_token,
const char *op,
const framework::Clock& clock,
metrics::DoubleAverageMetric& latency_metric)
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 8478cab4c17..a78fbe38ae5 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -22,14 +22,14 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
const spi::ResultHandler* _result_handler;
std::shared_ptr<ApplyBucketDiffState> _state;
document::DocumentId _doc_id;
- SharedOperationThrottler::Token _throttle_token;
+ ThrottleToken _throttle_token;
const char* _op;
framework::MilliSecTimer _start_time;
metrics::DoubleAverageMetric& _latency_metric;
public:
ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
document::DocumentId doc_id,
- SharedOperationThrottler::Token throttle_token,
+ ThrottleToken throttle_token,
const char *op, const framework::Clock& clock,
metrics::DoubleAverageMetric& latency_metric);
~ApplyBucketDiffEntryComplete() override;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 6f740ce2c28..66dc7126058 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -78,7 +78,7 @@ public:
struct LockedMessage {
std::shared_ptr<BucketLockInterface> lock;
std::shared_ptr<api::StorageMessage> msg;
- SharedOperationThrottler::Token throttle_token;
+ ThrottleToken throttle_token;
LockedMessage() noexcept = default;
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
@@ -89,7 +89,7 @@ public:
{}
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
std::shared_ptr<api::StorageMessage> msg_,
- SharedOperationThrottler::Token token) noexcept
+ ThrottleToken token) noexcept
: lock(std::move(lock_)),
msg(std::move(msg_)),
throttle_token(std::move(token))
@@ -274,7 +274,7 @@ public:
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
- virtual SharedOperationThrottler& operation_throttler() const noexcept = 0;
+ virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 2ccbc7a85ef..f7d4f750884 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,14 +40,14 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler())
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg,
- std::unique_ptr<SharedOperationThrottler> operation_throttler)
+ std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
@@ -920,7 +920,7 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
- SharedOperationThrottler::Token throttle_token;
+ ThrottleToken throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
@@ -997,7 +997,7 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
{
std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index c4b85ac596c..698f52359f5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -149,7 +149,7 @@ public:
// with its locking requirements.
FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -191,7 +191,7 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>);
+ ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>);
~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
@@ -243,7 +243,7 @@ public:
ResumeGuard pause() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
- SharedOperationThrottler& operation_throttler() const noexcept override {
+ vespalib::SharedOperationThrottler& operation_throttler() const noexcept override {
return *_operation_throttler;
}
@@ -257,7 +257,7 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
- std::unique_ptr<SharedOperationThrottler> _operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 530d96dfe3f..03b16e75297 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -143,16 +143,19 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
}
}
-std::unique_ptr<SharedOperationThrottler>
+std::unique_ptr<vespalib::SharedOperationThrottler>
make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
{
const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
if (use_dynamic_throttling) {
auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
- return SharedOperationThrottler::make_dynamic_throttler(win_size_increment);
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = win_size_increment;
+ params.min_window_size = win_size_increment;
+ return vespalib::SharedOperationThrottler::make_dynamic_throttler(params);
} else {
- return SharedOperationThrottler::make_unlimited_throttler();
+ return vespalib::SharedOperationThrottler::make_unlimited_throttler();
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index e1c821aab48..ee6eed63eb5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -22,7 +22,10 @@
#include <vespa/vespalib/util/monitored_refcount.h>
#include <atomic>
-namespace vespalib { class ISequencedTaskExecutor; }
+namespace vespalib {
+class ISequencedTaskExecutor;
+class SharedOperationThrottler;
+}
namespace storage {
@@ -34,7 +37,6 @@ namespace spi {
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
-class SharedOperationThrottler;
class MergeHandler : public Types,
public MergeBucketInfoSyncer {
@@ -85,7 +87,7 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
- SharedOperationThrottler& _operation_throttler;
+ vespalib::SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 65eab99b8fb..2781cc61b83 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -32,7 +32,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
: MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
@@ -41,7 +41,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
: _sendReply(true),
_updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
@@ -60,7 +60,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
- std::move(msg), SharedOperationThrottler::Token()));
+ std::move(msg), ThrottleToken()));
}
void
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 588cbef2170..4130a276239 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -31,7 +31,7 @@ public:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
~MessageTracker();
@@ -93,7 +93,7 @@ public:
private:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -101,7 +101,7 @@ private:
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
std::shared_ptr<api::StorageMessage> _msg;
- SharedOperationThrottler::Token _throttle_token;
+ ThrottleToken _throttle_token;
spi::Context _context;
const PersistenceUtil &_env;
MessageSender &_replySender;
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
deleted file mode 100644
index 7db1a0ccdbb..00000000000
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "shared_operation_throttler.h"
-#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/common/dummy_mbus_messages.h>
-#include <condition_variable>
-#include <cassert>
-#include <mutex>
-
-namespace storage {
-
-namespace {
-
-class NoLimitsOperationThrottler final : public SharedOperationThrottler {
-public:
- ~NoLimitsOperationThrottler() override = default;
- Token blocking_acquire_one() noexcept override {
- return Token(this, TokenCtorTag{});
- }
- Token blocking_acquire_one(vespalib::duration) noexcept override {
- return Token(this, TokenCtorTag{});
- }
- Token try_acquire_one() noexcept override {
- return Token(this, TokenCtorTag{});
- }
- uint32_t current_window_size() const noexcept override { return 0; }
- uint32_t waiting_threads() const noexcept override { return 0; }
-private:
- void release_one() noexcept override { /* no-op */ }
-};
-
-class DynamicOperationThrottler final : public SharedOperationThrottler {
- mutable std::mutex _mutex;
- std::condition_variable _cond;
- mbus::DynamicThrottlePolicy _throttle_policy;
- uint32_t _pending_ops;
- uint32_t _waiting_threads;
-public:
- explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment);
- ~DynamicOperationThrottler() override;
-
- Token blocking_acquire_one() noexcept override;
- Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
- Token try_acquire_one() noexcept override;
- uint32_t current_window_size() const noexcept override;
- uint32_t waiting_threads() const noexcept override;
-private:
- void release_one() noexcept override;
- // Non-const since actually checking the send window of a dynamic throttler might change
- // it if enough time has passed.
- [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept;
- void add_one_to_active_window_size();
- void subtract_one_from_active_window_size();
-};
-
-DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
- : _mutex(),
- _cond(),
- _throttle_policy(static_cast<double>(min_size_and_window_increment)),
- _pending_ops(0),
- _waiting_threads(0)
-{
- _throttle_policy.setWindowSizeDecrementFactor(1.2);
- _throttle_policy.setWindowSizeBackOff(0.95);
-}
-
-DynamicOperationThrottler::~DynamicOperationThrottler() = default;
-
-bool
-DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
-{
- DummyMbusRequest dummy_request;
- return _throttle_policy.canSend(dummy_request, _pending_ops);
-}
-
-void
-DynamicOperationThrottler::add_one_to_active_window_size()
-{
- DummyMbusRequest dummy_request;
- _throttle_policy.processMessage(dummy_request);
- ++_pending_ops;
-}
-
-void
-DynamicOperationThrottler::subtract_one_from_active_window_size()
-{
- DummyMbusReply dummy_reply;
- _throttle_policy.processReply(dummy_reply);
- assert(_pending_ops > 0);
- --_pending_ops;
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::blocking_acquire_one() noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- ++_waiting_threads;
- _cond.wait(lock, [&] {
- return has_spare_capacity_in_active_window();
- });
- --_waiting_threads;
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- ++_waiting_threads;
- const bool accepted = _cond.wait_for(lock, timeout, [&] {
- return has_spare_capacity_in_active_window();
- });
- --_waiting_threads;
- if (!accepted) {
- return Token();
- }
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::try_acquire_one() noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- return Token();
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-void
-DynamicOperationThrottler::release_one() noexcept
-{
- std::unique_lock lock(_mutex);
- subtract_one_from_active_window_size();
- // Only wake up a waiting thread if doing so would possibly result in success.
- if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) {
- lock.unlock();
- _cond.notify_one();
- }
-}
-
-uint32_t
-DynamicOperationThrottler::current_window_size() const noexcept
-{
- std::unique_lock lock(_mutex);
- return _throttle_policy.getMaxPendingCount(); // Actually returns current window size
-}
-
-uint32_t
-DynamicOperationThrottler::waiting_threads() const noexcept
-{
- std::unique_lock lock(_mutex);
- return _waiting_threads;
-}
-
-}
-
-std::unique_ptr<SharedOperationThrottler>
-SharedOperationThrottler::make_unlimited_throttler()
-{
- return std::make_unique<NoLimitsOperationThrottler>();
-}
-
-std::unique_ptr<SharedOperationThrottler>
-SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment)
-{
- return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment);
-}
-
-DynamicOperationThrottler::Token::~Token()
-{
- if (_throttler) {
- _throttler->release_one();
- }
-}
-
-void
-DynamicOperationThrottler::Token::reset() noexcept
-{
- if (_throttler) {
- _throttler->release_one();
- _throttler = nullptr;
- }
-}
-
-DynamicOperationThrottler::Token&
-DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
-{
- reset();
- _throttler = rhs._throttler;
- rhs._throttler = nullptr;
- return *this;
-}
-
-}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
index 4ee8d017c05..b829f077bcb 100644
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -1,72 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vespalib/util/time.h>
-#include <memory>
-#include <optional>
+#include <vespa/vespalib/util/shared_operation_throttler.h>
namespace storage {
-/**
- * Operation throttler that is intended to provide global throttling of
- * async operations across all persistence stripe threads. A throttler
- * wraps a logical max pending window size of in-flight operations. Depending
- * on the throttler implementation, the window size may expand and shrink
- * dynamically. Exactly how and when this happens is unspecified.
- *
- * Offers both polling and (timed, non-timed) blocking calls for acquiring
- * a throttle token. If the returned token is valid, the caller may proceed
- * to invoke the asynchronous operation.
- *
- * The window slot taken up by a valid throttle token is implicitly freed up
- * when the token is destroyed.
- *
- * All operations on the throttler are thread safe.
- */
-class SharedOperationThrottler {
-protected:
- struct TokenCtorTag {}; // Make available to subclasses for token construction.
-public:
- class Token {
- SharedOperationThrottler* _throttler;
- public:
- constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
- constexpr Token() noexcept : _throttler(nullptr) {}
- constexpr Token(Token&& rhs) noexcept
- : _throttler(rhs._throttler)
- {
- rhs._throttler = nullptr;
- }
- Token& operator=(Token&& rhs) noexcept;
- ~Token();
-
- Token(const Token&) = delete;
- Token& operator=(const Token&) = delete;
-
- [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
- void reset() noexcept;
- };
-
- virtual ~SharedOperationThrottler() = default;
-
- // All methods are thread safe
- [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
- [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
- [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
-
- // May return 0, in which case the window size is unlimited.
- [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
-
- // Exposed for unit testing only.
- [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
-
- // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
- static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
- // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood
- static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
-private:
- // Exclusively called from a valid Token. Thread safe.
- virtual void release_one() noexcept = 0;
-};
+using ThrottleToken = vespalib::SharedOperationThrottler::Token;
}
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
index 61cd2b5ef44..d9b6ae7f908 100644
--- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -1,20 +1,127 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <thread>
namespace vespalib {
+using ThrottleToken = SharedOperationThrottler::Token;
+
+struct DynamicThrottleFixture {
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ DynamicThrottleFixture() {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = 1;
+ params.min_window_size = 1;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params);
+ }
+};
+
+TEST("unlimited throttler does not throttle") {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQUAL(throttler->current_window_size(), 0u);
+}
+
+TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
+}
+
+TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = f1._throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) {
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = f1._throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (f1._throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
+ auto window_filling_token = f1._throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = f1._throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST("default constructed token is invalid") {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) {
+ {
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
// Note on test semantics: these tests are adapted from a subset of the MessageBus
// throttling tests. Some tests have been simplified due to no longer having access
// to the low-level DynamicThrottlePolicy API.
-struct Fixture {
+struct WindowFixture {
uint64_t _milli_time;
std::unique_ptr<SharedOperationThrottler> _throttler;
- Fixture(uint32_t window_size_increment = 5,
- uint32_t min_window_size = 20,
- uint32_t max_window_size = INT_MAX)
+ WindowFixture(uint32_t window_size_increment = 5,
+ uint32_t min_window_size = 20,
+ uint32_t max_window_size = INT_MAX)
: _milli_time(0),
_throttler()
{
@@ -57,7 +164,7 @@ struct Fixture {
}
};
-TEST_F("window size changes dynamically based on throughput", Fixture()) {
+TEST_F("window size changes dynamically based on throughput", WindowFixture()) {
uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 105);
@@ -74,7 +181,7 @@ TEST_F("window size changes dynamically based on throughput", Fixture()) {
ASSERT_TRUE(window_size >= 90 && window_size <= 115);
}
-TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
+TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 110);
@@ -88,12 +195,12 @@ TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
}
-TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) {
+TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) {
double window_size = f1.attempt_converge_on_stable_window_size(200);
ASSERT_TRUE(window_size >= 150 && window_size <= 210);
}
-TEST_F("maximum window size is respected", Fixture(5, 1, 50)) {
+TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 40 && window_size <= 50);
}