summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cloud-tenant-base-dependencies-enforcer/pom.xml2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java9
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java1
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java7
-rw-r--r--config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java19
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java4
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java3
-rw-r--r--config-model/src/main/javacc/SDParser.jj32
-rw-r--r--config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java52
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java55
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java3
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java3
-rw-r--r--container-dependency-versions/pom.xml4
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java28
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java2
-rw-r--r--documentapi/abi-spec.json6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java20
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java66
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java12
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java4
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java145
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java14
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java1
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java2
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java2
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java2
-rw-r--r--http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java2
-rw-r--r--http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java2
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java3
-rw-r--r--parent/pom.xml6
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp113
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h4
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h1
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h5
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h4
-rw-r--r--searchlib/src/tests/attribute/changevector/changevector_test.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/attributevector.h21
-rw-r--r--searchlib/src/vespa/searchlib/attribute/attributevector.hpp2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/changevector.h16
-rw-r--r--searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp4
-rw-r--r--searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp10
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp8
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp16
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h6
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h4
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp6
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp124
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp51
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h15
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp22
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h5
-rw-r--r--storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp358
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h17
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h1
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp23
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java1
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java9
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java4
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h35
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp32
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h21
88 files changed, 1127 insertions, 523 deletions
diff --git a/cloud-tenant-base-dependencies-enforcer/pom.xml b/cloud-tenant-base-dependencies-enforcer/pom.xml
index e2cc7085353..9f4a14387f4 100644
--- a/cloud-tenant-base-dependencies-enforcer/pom.xml
+++ b/cloud-tenant-base-dependencies-enforcer/pom.xml
@@ -22,7 +22,7 @@
<aopalliance.version>1.0</aopalliance.version>
<athenz.version>1.10.14</athenz.version>
<bouncycastle.version>1.68</bouncycastle.version>
- <felix.version>7.0.1</felix.version>
+ <felix.version>6.0.3</felix.version>
<felix.log.version>1.0.1</felix.log.version>
<findbugs.version>1.3.9</findbugs.version>
<guava.version>20.0</guava.version>
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index ebde8f3a98c..948e416eb53 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -1004,8 +1004,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// TODO expose and use monotonic clock instead of system clock
final long maxDeadlineTimePointMs = timer.getCurrentTimeInMillis() + options.getMaxDeferredTaskVersionWaitTime().toMillis();
for (RemoteClusterControllerTask task : tasksPendingStateRecompute) {
- context.log(logger, Level.FINEST, () -> String.format("Adding task of type '%s' to be completed at version %d",
- task.getClass().getName(), completeAtVersion));
+ context.log(logger, Level.INFO, task + " will be completed at version " + completeAtVersion);
taskCompletionQueue.add(new VersionDependentTaskCompletion(completeAtVersion, task, maxDeadlineTimePointMs));
}
tasksPendingStateRecompute.clear();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
index 39ffed1051a..949ad6f56a2 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
@@ -123,4 +123,6 @@ public abstract class RemoteClusterControllerTask {
}
}
+ @Override
+ public String toString() { return RemoteClusterControllerTask.class.getSimpleName(); }
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
index f406ec46ccc..431c207af5c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
@@ -98,6 +98,15 @@ public class SetNodeStateRequest extends Request<SetResponse> {
return super.isFailed() || (resultSet && !result.getWasModified());
}
+ @Override
+ public String toString() {
+ return "SetNodeStateRequest{" +
+ "node=" + id + "," +
+ "newState=" + newStates.get("user") + "," +
+ (probe ? "probe=" + probe + "," : "") +
+ "}";
+ }
+
static SetResponse setWantedState(
ContentCluster cluster,
SetUnitStateRequest.Condition condition,
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 2c51aa89c66..8c2957502a8 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -103,6 +103,7 @@ public interface ModelContext {
@ModelFeatureFlag(owners = {"geirst", "vekterli"}) default int distributorMergeBusyWait() { return 10; }
@ModelFeatureFlag(owners = {"vekterli", "geirst"}) default boolean distributorEnhancedMaintenanceScheduling() { return false; }
@ModelFeatureFlag(owners = {"arnej"}) default boolean forwardIssuesAsErrors() { return true; }
+ @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default boolean asyncApplyBucketDiff() { return false; }
}
/** Warning: As elsewhere in this package, do not make backwards incompatible changes that will break old config models! */
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index 7a09a36c1d7..b7506587102 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -70,6 +70,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
private int maxUnCommittedMemory = 123456;
private double diskBloatFactor = 0.2;
private boolean distributorEnhancedMaintenanceScheduling = false;
+ private boolean asyncApplyBucketDiff = false;
@Override public ModelContext.FeatureFlags featureFlags() { return this; }
@Override public boolean multitenant() { return multitenant; }
@@ -120,6 +121,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
@Override public int docstoreCompressionLevel() { return docstoreCompressionLevel; }
@Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; }
@Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; }
+ @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; }
public TestProperties maxUnCommittedMemory(int maxUnCommittedMemory) {
this.maxUnCommittedMemory = maxUnCommittedMemory;
@@ -307,6 +309,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
return this;
}
+ public TestProperties setAsyncApplyBucketDiff(boolean value) {
+ asyncApplyBucketDiff = value;
+ return this;
+ }
+
public static class Spec implements ConfigServerSpec {
private final String hostName;
diff --git a/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java b/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java
index 7f3b018d569..85b8d7fbe79 100644
--- a/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java
+++ b/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java
@@ -674,25 +674,26 @@ public class RankProfile implements Cloneable {
inputFeatures.put(ref, declaredType);
}
- public static class ExecuteOperation {
- public enum Phase { onmatch, onrerank, onsummary}
+ public static class MutateOperation {
+ public enum Phase { onmatch, on_first_phase, on_second_phase, onsummary}
final Phase phase;
final String attribute;
final String operation;
- ExecuteOperation(Phase phase, String attribute, String operation) {
+ MutateOperation(Phase phase, String attribute, String operation) {
this.phase = phase;
this.attribute = attribute;
this.operation = operation;
}
}
- private final List<ExecuteOperation> executeOperations = new ArrayList<>();
+ private final List<MutateOperation> mutateOperations = new ArrayList<>();
- public void addExecuteOperation(ExecuteOperation.Phase phase, String attribute, String operation) {
- executeOperations.add(new ExecuteOperation(phase, attribute, operation));
- addRankProperty("vespa.execute." + phase + ".attribute", attribute);
- addRankProperty("vespa.execute." + phase + ".operation", operation);
+ public void addMutateOperation(MutateOperation.Phase phase, String attribute, String operation) {
+ mutateOperations.add(new MutateOperation(phase, attribute, operation));
+ String prefix = "vespa.mutate." + phase.toString().replace('-', '_');
+ addRankProperty(prefix + ".attribute", attribute);
+ addRankProperty(prefix + ".operation", operation);
}
- public List<ExecuteOperation> getExecuteOperations() { return executeOperations; }
+ public List<MutateOperation> getMutateOperations() { return mutateOperations; }
public RankingExpressionFunction findFunction(String name) {
RankingExpressionFunction function = functions.get(name);
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
index d8c59ebda65..a65e6fe16c0 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java
@@ -371,6 +371,10 @@ public class VespaMetricSet {
metrics.add(new Metric(prefix + ".queuesize.count"));
metrics.add(new Metric(prefix + ".maxpending.last")); // TODO: Remove in Vespa 8
metrics.add(new Metric(prefix + ".accepted.rate"));
+ metrics.add(new Metric(prefix + ".wakeups.rate"));
+ metrics.add(new Metric(prefix + ".utilization.max"));
+ metrics.add(new Metric(prefix + ".utilization.sum"));
+ metrics.add(new Metric(prefix + ".utilization.count"));
}
private static Set<Metric> getSearchNodeMetrics() {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
index 5c692e8ef6b..d7f6fb6c581 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
@@ -47,6 +47,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
private final int reponseNumThreads;
private final StorFilestorConfig.Response_sequencer_type.Enum responseSequencerType;
private final boolean useAsyncMessageHandlingOnSchedule;
+ private final boolean asyncApplyBucketDiff;
private static StorFilestorConfig.Response_sequencer_type.Enum convertResponseSequencerType(String sequencerType) {
try {
@@ -62,6 +63,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
this.reponseNumThreads = featureFlags.defaultNumResponseThreads();
this.responseSequencerType = convertResponseSequencerType(featureFlags.responseSequencerType());
useAsyncMessageHandlingOnSchedule = featureFlags.useAsyncMessageHandlingOnSchedule();
+ asyncApplyBucketDiff = featureFlags.asyncApplyBucketDiff();
}
@Override
@@ -73,6 +75,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
builder.num_response_threads(reponseNumThreads);
builder.response_sequencer_type(responseSequencerType);
builder.use_async_message_handling_on_schedule(useAsyncMessageHandlingOnSchedule);
+ builder.async_apply_bucket_diff(asyncApplyBucketDiff);
}
}
diff --git a/config-model/src/main/javacc/SDParser.jj b/config-model/src/main/javacc/SDParser.jj
index 0f60db40069..e6a5d1f2e4e 100644
--- a/config-model/src/main/javacc/SDParser.jj
+++ b/config-model/src/main/javacc/SDParser.jj
@@ -207,10 +207,10 @@ TOKEN :
| < LOOSE: "loose" >
| < STRICT: "strict" >
| < DOCUMENT: "document" >
-| < EXECUTE: "execute" >
| < OPERATION: "operation" >
| < ON_MATCH: "on-match" >
-| < ON_RERANK: "on-rerank" >
+| < ON_FIRST_PHASE: "on-first-phase" >
+| < ON_SECOND_PHASE: "on-second-phase" >
| < ON_SUMMARY: "on-summary" >
| < STRUCT: "struct" >
| < INHERITS: "inherits" >
@@ -239,6 +239,7 @@ TOKEN :
| < CONSTANT: "constant">
| < ONNXMODEL: "onnx-model">
| < MODEL: "model" >
+| < MUTATE: "mutate" >
| < RANKPROFILE: "rank-profile" >
| < RANKDEGRADATIONFREQ: "rank-degradation-frequency" >
| < RANKDEGRADATION: "rank-degradation" >
@@ -2066,7 +2067,7 @@ Object rankProfileItem(RankProfile profile) : { }
| firstPhase(profile)
| matchPhase(profile)
| function(profile)
- | execute(profile)
+ | mutate(profile)
| ignoreRankFeatures(profile)
| numThreadsPerSearch(profile)
| minHitsPerThread(profile)
@@ -2096,39 +2097,40 @@ void inheritsRankProfile(RankProfile profile) :
}
/**
- * This rule consumes an execute statement of a rank-profile.
+ * This rule consumes an mutate statement of a rank-profile.
*
* @param profile The profile to modify.
*/
-void execute(RankProfile profile) :
+void mutate(RankProfile profile) :
{
}
{
- <EXECUTE> lbrace() (execute_operation(profile) <NL>)+ <RBRACE>
+ <MUTATE> lbrace() (mutate_operation(profile) <NL>)+ <RBRACE>
{ }
}
-void execute_operation(RankProfile profile) :
+void mutate_operation(RankProfile profile) :
{
String attribute, operation;
- RankProfile.ExecuteOperation.Phase phase;
+ RankProfile.MutateOperation.Phase phase;
}
{
- ( <ON_MATCH> { phase = RankProfile.ExecuteOperation.Phase.onmatch; }
- | <ON_RERANK> { phase = RankProfile.ExecuteOperation.Phase.onrerank; }
- | <ON_SUMMARY> { phase = RankProfile.ExecuteOperation.Phase.onsummary; }
+ ( <ON_MATCH> { phase = RankProfile.MutateOperation.Phase.onmatch; }
+ | <ON_FIRST_PHASE> { phase = RankProfile.MutateOperation.Phase.on_first_phase; }
+ | <ON_SECOND_PHASE> { phase = RankProfile.MutateOperation.Phase.on_second_phase; }
+ | <ON_SUMMARY> { phase = RankProfile.MutateOperation.Phase.onsummary; }
)
- lbrace() attribute = identifier() operation = execute_expr() (<NL>)* <RBRACE>
- { profile.addExecuteOperation(phase, attribute, operation); }
+ lbrace() attribute = identifier() operation = mutate_expr() (<NL>)* <RBRACE>
+ { profile.addMutateOperation(phase, attribute, operation); }
}
-String execute_expr() :
+String mutate_expr() :
{
String op;
Number constant = null;
}
{
- (("++" | "--") { op = token.image; } | ("+=" | "-=" | "*=" | "/=" | "%=" | "=") { op = token.image; } constant = consumeNumber())
+ (("+=" | "-=" | "=") { op = token.image; } constant = consumeNumber())
{ return constant != null ? (op + constant) : op; }
}
diff --git a/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java b/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java
index e2ca8ac4f65..637f7571a68 100644
--- a/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java
+++ b/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java
@@ -78,7 +78,7 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase {
}
}
@Test
- public void testRankProfileExecute() throws ParseException {
+ public void testRankProfileMutate() throws ParseException {
RankProfileRegistry rankProfileRegistry = new RankProfileRegistry();
SearchBuilder builder = new SearchBuilder(rankProfileRegistry);
builder.importString(joinLines(
@@ -101,15 +101,18 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase {
" attribute: mutable",
" }",
" rank-profile a {",
- " execute {",
+ " mutate {",
" on-match {",
- " synthetic_attribute_a ++",
+ " synthetic_attribute_a += 7",
" }",
- " on-rerank {",
+ " on-first-phase {",
+ " synthetic_attribute_b +=1",
+ " }",
+ " on-second-phase {",
" synthetic_attribute_b = 1.01",
" }",
" on-summary {",
- " synthetic_attribute_c --",
+ " synthetic_attribute_c -= 1",
" }",
" }",
" first-phase {",
@@ -128,28 +131,33 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase {
builder.build();
Schema schema = builder.getSearch();
RankProfile a = rankProfileRegistry.get(schema, "a");
- List<RankProfile.ExecuteOperation> operations = a.getExecuteOperations();
- assertEquals(3, operations.size());
- assertEquals(RankProfile.ExecuteOperation.Phase.onmatch, operations.get(0).phase);
+ List<RankProfile.MutateOperation> operations = a.getMutateOperations();
+ assertEquals(4, operations.size());
+ assertEquals(RankProfile.MutateOperation.Phase.onmatch, operations.get(0).phase);
assertEquals("synthetic_attribute_a", operations.get(0).attribute);
- assertEquals("++", operations.get(0).operation);
- assertEquals(RankProfile.ExecuteOperation.Phase.onrerank, operations.get(1).phase);
+ assertEquals("+=7", operations.get(0).operation);
+ assertEquals(RankProfile.MutateOperation.Phase.on_first_phase, operations.get(1).phase);
assertEquals("synthetic_attribute_b", operations.get(1).attribute);
- assertEquals("=1.01", operations.get(1).operation);
- assertEquals(RankProfile.ExecuteOperation.Phase.onsummary, operations.get(2).phase);
- assertEquals("synthetic_attribute_c", operations.get(2).attribute);
- assertEquals("--", operations.get(2).operation);
+ assertEquals("+=1", operations.get(1).operation);
+ assertEquals(RankProfile.MutateOperation.Phase.on_second_phase, operations.get(2).phase);
+ assertEquals("synthetic_attribute_b", operations.get(2).attribute);
+ assertEquals("=1.01", operations.get(2).operation);
+ assertEquals(RankProfile.MutateOperation.Phase.onsummary, operations.get(3).phase);
+ assertEquals("synthetic_attribute_c", operations.get(3).attribute);
+ assertEquals("-=1", operations.get(3).operation);
AttributeFields attributeFields = new AttributeFields(schema);
RawRankProfile raw = new RawRankProfile(a, new LargeRankExpressions(new MockFileRegistry()), new QueryProfileRegistry(), new ImportedMlModels(), attributeFields, new TestProperties());
- assertEquals(7, raw.configProperties().size());
- assertEquals("(vespa.execute.onmatch.attribute, synthetic_attribute_a)", raw.configProperties().get(0).toString());
- assertEquals("(vespa.execute.onmatch.operation, ++)", raw.configProperties().get(1).toString());
- assertEquals("(vespa.execute.onrerank.attribute, synthetic_attribute_b)", raw.configProperties().get(2).toString());
- assertEquals("(vespa.execute.onrerank.operation, =1.01)", raw.configProperties().get(3).toString());
- assertEquals("(vespa.execute.onsummary.attribute, synthetic_attribute_c)", raw.configProperties().get(4).toString());
- assertEquals("(vespa.execute.onsummary.operation, --)", raw.configProperties().get(5).toString());
- assertEquals("(vespa.rank.firstphase, a)", raw.configProperties().get(6).toString());
+ assertEquals(9, raw.configProperties().size());
+ assertEquals("(vespa.mutate.onmatch.attribute, synthetic_attribute_a)", raw.configProperties().get(0).toString());
+ assertEquals("(vespa.mutate.onmatch.operation, +=7)", raw.configProperties().get(1).toString());
+ assertEquals("(vespa.mutate.on_first_phase.attribute, synthetic_attribute_b)", raw.configProperties().get(2).toString());
+ assertEquals("(vespa.mutate.on_first_phase.operation, +=1)", raw.configProperties().get(3).toString());
+ assertEquals("(vespa.mutate.on_second_phase.attribute, synthetic_attribute_b)", raw.configProperties().get(4).toString());
+ assertEquals("(vespa.mutate.on_second_phase.operation, =1.01)", raw.configProperties().get(5).toString());
+ assertEquals("(vespa.mutate.onsummary.attribute, synthetic_attribute_c)", raw.configProperties().get(6).toString());
+ assertEquals("(vespa.mutate.onsummary.operation, -=1)", raw.configProperties().get(7).toString());
+ assertEquals("(vespa.rank.firstphase, a)", raw.configProperties().get(8).toString());
}
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
index 9d8d7509966..739f8b7fff2 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
@@ -142,6 +142,16 @@ public class StorageClusterTest {
return new StorServerConfig(builder);
}
+ private StorFilestorConfig filestorConfigFromProducer(StorFilestorConfig.Producer producer) {
+ var builder = new StorFilestorConfig.Builder();
+ producer.getConfig(builder);
+ return new StorFilestorConfig(builder);
+ }
+
+ private StorFilestorConfig filestorConfigFromProperties(TestProperties properties) {
+ return filestorConfigFromProducer(parse(cluster("foo", ""), properties));
+ }
+
@Test
public void testMergeFeatureFlags() {
var config = configFromProperties(new TestProperties().setMaxMergeQueueSize(1919).setMaxConcurrentMergesPerNode(37));
@@ -159,6 +169,15 @@ public class StorageClusterTest {
}
@Test
+ public void async_apply_bucket_diff_can_be_controlled_by_feature_flag() {
+ var config = filestorConfigFromProperties(new TestProperties());
+ assertFalse(config.async_apply_bucket_diff());
+
+ config = filestorConfigFromProperties(new TestProperties().setAsyncApplyBucketDiff(true));
+ assertTrue(config.async_apply_bucket_diff());
+ }
+
+ @Test
public void testVisitors() {
StorVisitorConfig.Builder builder = new StorVisitorConfig.Builder();
parse(cluster("bees",
@@ -188,9 +207,7 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(7, config.num_threads());
assertFalse(config.enable_multibit_split_optimalization());
@@ -199,9 +216,7 @@ public class StorageClusterTest {
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(7, config.num_threads());
}
}
@@ -215,9 +230,7 @@ public class StorageClusterTest {
"</tuning>")),
new Flavor(new FlavorsConfig.Flavor.Builder().name("test-flavor").minCpuCores(9).build())
);
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(2, config.num_response_threads());
assertEquals(StorFilestorConfig.Response_sequencer_type.ADAPTIVE, config.response_sequencer_type());
assertEquals(7, config.num_threads());
@@ -238,9 +251,7 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(4, config.num_threads());
assertFalse(config.enable_multibit_split_optimalization());
@@ -248,9 +259,7 @@ public class StorageClusterTest {
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(4, config.num_threads());
}
}
@@ -262,17 +271,13 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(8, config.num_threads());
}
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(9, config.num_threads());
}
}
@@ -285,17 +290,13 @@ public class StorageClusterTest {
@Test
public void testFeatureFlagControlOfResponseSequencer() {
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")).getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")));
assertEquals(13, config.num_response_threads());
assertEquals(StorFilestorConfig.Response_sequencer_type.THROUGHPUT, config.response_sequencer_type());
}
private void verifyAsyncMessageHandlingOnSchedule(boolean expected, boolean value) {
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)).getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)));
assertEquals(expected, config.use_async_message_handling_on_schedule());
}
@Test
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index b2360ef262d..7fdc18827f4 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -197,6 +197,7 @@ public class ModelContextImpl implements ModelContext {
private final boolean distributorEnhancedMaintenanceScheduling;
private final int maxUnCommittedMemory;
private final boolean forwardIssuesAsErrors;
+ private final boolean asyncApplyBucketDiff;
public FeatureFlags(FlagSource source, ApplicationId appId) {
this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
@@ -232,6 +233,7 @@ public class ModelContextImpl implements ModelContext {
this.distributorEnhancedMaintenanceScheduling = flagValue(source, appId, Flags.DISTRIBUTOR_ENHANCED_MAINTENANCE_SCHEDULING);
this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY);;
this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS);
+ this.asyncApplyBucketDiff = flagValue(source, appId, Flags.ASYNC_APPLY_BUCKET_DIFF);
}
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@@ -269,6 +271,7 @@ public class ModelContextImpl implements ModelContext {
@Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; }
@Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; }
@Override public boolean forwardIssuesAsErrors() { return forwardIssuesAsErrors; }
+ @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; }
private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 083cb535bfa..0aeea5ce2d5 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -433,7 +433,8 @@ public class SessionRepository {
private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) {
for (ApplicationId applicationId : applicationRepo.activeApplications()) {
- if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
+ Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId);
+ if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) {
log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId());
log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
diff --git a/container-dependency-versions/pom.xml b/container-dependency-versions/pom.xml
index e2b6f3b785b..385ec058424 100644
--- a/container-dependency-versions/pom.xml
+++ b/container-dependency-versions/pom.xml
@@ -403,8 +403,8 @@
<properties>
<aopalliance.version>1.0</aopalliance.version>
<bouncycastle.version>1.68</bouncycastle.version>
- <felix.version>7.0.1</felix.version>
- <felix.log.version>1.0.1</felix.log.version>
+ <felix.version>6.0.3</felix.version>
+ <felix.log.version>1.0.1</felix.log.version> <!-- TODO Vespa 8: upgrade! -->
<findbugs.version>1.3.9</findbugs.version>
<guava.version>20.0</guava.version>
<guice.version>3.0</guice.version>
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
index 854780dd336..1ddb50b0a6b 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
@@ -122,12 +122,7 @@ public class ResourceMeterMaintainer extends ControllerMaintainer {
private void reportResourceSnapshots(Collection<ResourceSnapshot> resourceSnapshots) {
meteringClient.consume(resourceSnapshots);
- metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap()));
- // total metered resource usage, for alerting on drastic changes
- metric.set(METERING_TOTAL_REPORTED,
- resourceSnapshots.stream()
- .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(),
- metric.createContext(Collections.emptyMap()));
+ updateMetrics(resourceSnapshots);
try (var lock = curator.lockMeteringRefreshTime()) {
if (needsRefresh(curator.readMeteringRefreshTime())) {
@@ -194,4 +189,25 @@ public class ResourceMeterMaintainer extends ControllerMaintainer {
double cost = new NodeResources(allocation.getCpuCores(), allocation.getMemoryGb(), allocation.getDiskGb(), 0).cost();
return Math.round(cost * 100.0 / costDivisor) / 100.0;
}
+
+ private void updateMetrics(Collection<ResourceSnapshot> resourceSnapshots) {
+ metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap()));
+ // total metered resource usage, for alerting on drastic changes
+ metric.set(METERING_TOTAL_REPORTED,
+ resourceSnapshots.stream()
+ .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(),
+ metric.createContext(Collections.emptyMap()));
+
+ resourceSnapshots.forEach(snapshot -> {
+ var context = metric.createContext(Map.of(
+ "tenant", snapshot.getApplicationId().tenant().value(),
+ "applicationId", snapshot.getApplicationId().toFullString(),
+ "zoneId", snapshot.getZoneId()
+ ));
+ metric.set("metering.vcpu", snapshot.getCpuCores(), context);
+ metric.set("metering.memoryGB", snapshot.getMemoryGb(), context);
+ metric.set("metering.diskGB", snapshot.getDiskGb(), context);
+ });
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
index a255a6c37d8..5b923c2ee59 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
@@ -98,6 +98,8 @@ public class ResourceMeterMaintainerTest {
assertEquals(tester.clock().millis()/1000, metrics.getMetric("metering_last_reported"));
assertEquals(2224.0d, (Double) metrics.getMetric("metering_total_reported"), Double.MIN_VALUE);
+ assertEquals(24d, (Double) metrics.getMetric(context -> "tenant1".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE);
+ assertEquals(40d, (Double) metrics.getMetric(context -> "tenant2".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE);
// Metering is not refreshed
assertFalse(snapshotConsumer.isRefreshed());
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 9cc4f60ed7e..78a58f24a65 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -799,7 +799,7 @@
"public"
],
"methods": [
- "public void <init>(int, com.yahoo.documentapi.ProgressToken)",
+ "public void <init>(int, com.yahoo.documentapi.ProgressToken, int, int)",
"protected boolean isLosslessResetPossible()",
"public boolean hasNext()",
"public boolean shouldYield()",
@@ -851,6 +851,7 @@
"public void setDistributionBitCount(int)",
"public boolean visitsAllBuckets()",
"public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken)",
+ "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken, int, int)",
"public static com.yahoo.documentapi.VisitorIterator createFromExplicitBucketSet(java.util.Set, int, com.yahoo.documentapi.ProgressToken)"
],
"fields": []
@@ -931,6 +932,9 @@
"public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()",
"public boolean skipBucketsOnFatalErrors()",
"public void skipBucketsOnFatalErrors(boolean)",
+ "public void slice(int, int)",
+ "public int getSlices()",
+ "public int getSliceId()",
"public void setDynamicallyIncreaseMaxBucketsPerVisitor(boolean)",
"public void setDynamicMaxBucketsIncreaseFactor(float)",
"public java.lang.String toString()"
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
index 9957898e459..4a77d30ec92 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
@@ -387,6 +387,18 @@ public class ProgressToken {
}
/**
+ * Marks the current bucket as finished and advances the bucket cursor;
+ * throws instead if the current bucket is already {@link #addBucket added}.
+ */
+ void skipCurrentBucket() {
+ if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId())))
+ throw new IllegalStateException("Current bucket was already added to the explicit bucket set");
+
+ ++finishedBucketCount;
+ ++bucketCursor;
+ }
+
+ /**
* Directly generate a bucket Id key for the <code>n</code>th bucket in
* reverse sorted order.
*
@@ -428,6 +440,14 @@ public class ProgressToken {
return bucketCursor;
}
+ static BucketId toBucketId(long bucketCursor, int distributionBits) {
+ return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits)));
+ }
+
+ BucketId getCurrentBucketId() {
+ return toBucketId(getBucketCursor(), getDistributionBitCount());
+ }
+
protected void setBucketCursor(long bucketCursor) {
this.bucketCursor = bucketCursor;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index e11bdf7f18c..e15512ca71b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -81,12 +81,24 @@ public class VisitorIterator {
protected static class DistributionRangeBucketSource implements BucketSource {
private boolean flushActive = false;
private int distributionBitCount;
+ private final int slices;
+ private final int sliceId;
// Wouldn't need this if this were a non-static class, but do it for
// the sake of keeping things identical in Java and C++
private ProgressToken progressToken;
public DistributionRangeBucketSource(int distributionBitCount,
- ProgressToken progress) {
+ ProgressToken progress,
+ int slices, int sliceId) {
+ if (slices < 1) {
+ throw new IllegalArgumentException("slices must be positive, but was " + slices);
+ }
+ if (sliceId < 0 || sliceId >= slices) {
+ throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId);
+ }
+
+ this.slices = slices;
+ this.sliceId = sliceId;
progressToken = progress;
// New progress token (could also be empty, in which this is a
@@ -148,6 +160,7 @@ public class VisitorIterator {
}
// Should be all fixed up and good to go
progressToken.setInconsistentState(false);
+ skipToSlice();
}
protected boolean isLosslessResetPossible() {
@@ -203,6 +216,7 @@ public class VisitorIterator {
assert(p.getActiveBucketCount() == 0);
p.clearAllBuckets();
p.setBucketCursor(0);
+ skipToSlice();
return;
}
@@ -292,7 +306,14 @@ public class VisitorIterator {
}
public boolean hasNext() {
- return progressToken.getBucketCursor() < (1L << distributionBitCount);
+ // There is a next bucket iff. there is a bucket no earlier than the cursor which
+ // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're
+ // not yet properly initialised, with a real distribution bit count, we ignore this.
+ long nextBucket = progressToken.getBucketCursor();
+ if (distributionBitCount != 1) {
+ nextBucket += Math.floorMod(sliceId - nextBucket, slices);
+ }
+ return nextBucket < (1L << distributionBitCount);
}
public boolean shouldYield() {
@@ -311,13 +332,27 @@ public class VisitorIterator {
public BucketProgress getNext() {
assert(hasNext()) : "getNext() called with hasNext() == false";
- long currentPosition = progressToken.getBucketCursor();
- long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount);
- ++currentPosition;
- progressToken.setBucketCursor(currentPosition);
- return new BucketProgress(
- new BucketId(ProgressToken.keyToBucketId(key)),
- new BucketId());
+
+ // Create the progress to return for creating visitors, and advance bucket cursor.
+ BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId());
+ progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+
+ // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when
+ // hasNext() turns false, but there are still super buckets left after the current.
+ skipToSlice();
+
+ return progress;
+ }
+
+ // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped
+ // buckets as complete, but only if we've been initialised with a proper distribution bit count.
+ private void skipToSlice() {
+ if (distributionBitCount == 1)
+ return;
+
+ while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) {
+ progressToken.skipCurrentBucket();
+ }
}
public int getDistributionBitCount() {
@@ -732,6 +767,13 @@ public class VisitorIterator {
return bucketSource.visitsAllBuckets();
}
+ public static VisitorIterator createFromDocumentSelection(
+ String documentSelection,
+ BucketIdFactory idFactory,
+ int distributionBitCount,
+ ProgressToken progress) throws ParseException {
+ return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0);
+ }
/**
* Create a new <code>VisitorIterator</code> instance based on the given document
* selection string.
@@ -753,7 +795,9 @@ public class VisitorIterator {
String documentSelection,
BucketIdFactory idFactory,
int distributionBitCount,
- ProgressToken progress) throws ParseException {
+ ProgressToken progress,
+ int slices,
+ int sliceId) throws ParseException {
BucketSelector bucketSel = new BucketSelector(idFactory);
Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection);
BucketSource src;
@@ -763,7 +807,7 @@ public class VisitorIterator {
// bit-based range source
if (rawBuckets == null) {
// Range source
- src = new DistributionRangeBucketSource(distributionBitCount, progress);
+ src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId);
} else {
// Explicit source
src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
index 8b0c8538855..44675d8d2ac 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
@@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters {
private int traceLevel = 0;
private ThrottlePolicy throttlePolicy = null;
private boolean skipBucketsOnFatalErrors = false;
+ private int slices = 1;
+ private int sliceId = 0;
// Advanced parameter, only for internal use.
Set<BucketId> bucketsToVisit = null;
@@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters {
params.getDynamicMaxBucketsIncreaseFactor());
setTraceLevel(params.getTraceLevel());
skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors());
+ slice(params.getSlices(), getSliceId());
}
// Get functions
@@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters {
public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; }
+ public void slice(int slices, int sliceId) {
+ this.slices = slices;
+ this.sliceId = sliceId;
+ }
+
+ public int getSlices() { return slices; }
+
+ public int getSliceId() { return sliceId; }
+
/**
* Set whether or not max buckets per visitor value should be dynamically
* increased when using orderdoc and visitors do not return at least half
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 5d07d433f18..f7242695490 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession {
params.getDocumentSelection(),
bucketIdFactory,
1,
- progressToken);
+ progressToken,
+ params.getSlices(),
+ params.getSliceId());
} else {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "parameters specify explicit bucket set " +
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
index 01cdad244a8..fb5f5bd2cfb 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
@@ -77,8 +77,119 @@ public class VisitorIteratorTestCase {
}
@Test
+ public void testInvalidSlicing() throws ParseException {
+ int distBits = 4;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 0, 0);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("slices must be positive, but was 0", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, 1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was 1", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, -1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was -1", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIgnoredSlicing() throws ParseException {
+ int distBits = 1;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 3, 2);
+
+ // Iterator with a single distribution bit ignores slicing.
+ assertTrue(iter.hasNext());
+ assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket());
+ assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testValidSlicing() throws ParseException {
+ int distBits = 4;
+ long buckets = 1 << distBits;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ for (int slices = 1; slices <= 2 * buckets; slices++) {
+ long bucketsTotal = 0;
+ for (int sliceId = 0; sliceId < slices; sliceId++) {
+ ProgressToken progress = new ProgressToken();
+
+ // docsel will be unknown --> entire bucket range will be covered
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, slices, sliceId);
+
+ String context = "slices: " + slices + ", sliceId: " + sliceId;
+ assertEquals(context, progress.getDistributionBitCount(), distBits);
+ assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
+
+ assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId));
+ assertEquals(context, progress.getTotalBucketCount(), buckets);
+
+ // First, get+update half of the buckets, marking them as done
+ long bucketCount = 0;
+
+ // Do buckets in the first half.
+ while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ if (slices + sliceId < buckets) { // Otherwise, we're already done ...
+ assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertFalse(context, iter.isDone());
+ assertTrue(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), bucketCount * slices + sliceId);
+ assertFalse(context, progress.isFinished());
+ }
+
+ while (iter.hasNext()) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertTrue(context, iter.isDone());
+ assertFalse(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), buckets);
+ assertTrue(context, progress.isFinished());
+ }
+ assertEquals("slices: " + slices, buckets, bucketsTotal);
+ }
+ }
+
+ @Test
public void testProgressSerializationRange() throws ParseException {
int distBits = 4;
+ int buckets = 1 << distBits;
BucketIdFactory idFactory = new BucketIdFactory();
ProgressToken progress = new ProgressToken();
@@ -91,11 +202,11 @@ public class VisitorIteratorTestCase {
assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
assertEquals(progress.getFinishedBucketCount(), 0);
- assertEquals(progress.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progress.getTotalBucketCount(), buckets);
// First, get+update half of the buckets, marking them as done
long bucketCount = 0;
- long bucketStop = 1 << (distBits - 1);
+ long bucketStop = buckets / 2;
while (iter.hasNext() && bucketCount != bucketStop) {
VisitorIterator.BucketProgress ids = iter.getNext();
@@ -119,7 +230,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(desired.toString(), progress.toString());
@@ -132,7 +243,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -154,21 +265,21 @@ public class VisitorIteratorTestCase {
// Now fetch a subset of the remaining buckets without finishing them,
// keeping some in the active set and some in pending
- int pendingTotal = 1 << (distBits - 3);
- int activeTotal = 1 << (distBits - 3);
- Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>();
+ int pendingTotal = buckets / 8;
+ int activeTotal = buckets / 8;
+ Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>();
// Pre-fetch, since otherwise we'd reuse pending buckets
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- buckets.add(iter.getNext());
+ trackedBuckets.add(iter.getNext());
}
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- VisitorIterator.BucketProgress idTemp = buckets.get(i);
+ VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i);
if (i < activeTotal) {
// Make them 50% done
iter.update(idTemp.getSuperbucket(),
- new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits)));
+ new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets)));
}
// else: leave hanging as active
}
@@ -186,7 +297,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal);
@@ -206,7 +317,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -225,7 +336,7 @@ public class VisitorIteratorTestCase {
// Finish all the active buckets
for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) {
- iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
++bucketCount;
}
@@ -246,16 +357,16 @@ public class VisitorIteratorTestCase {
assertFalse(iter.hasNext());
assertTrue(progress.isFinished());
// Cumulative number of finished buckets must match 2^distbits
- assertEquals(bucketCount, 1 << distBits);
+ assertEquals(bucketCount, buckets);
StringBuilder finished = new StringBuilder();
finished.append("VDS bucket progress file (100.0% completed)\n");
finished.append(distBits);
finished.append('\n');
- finished.append(1 << distBits); // Cursor
+ finished.append(buckets); // Cursor
finished.append('\n');
- finished.append(1 << distBits); // Finished
+ finished.append(buckets); // Finished
finished.append('\n');
- finished.append(1 << distBits); // Total
+ finished.append(buckets); // Total
finished.append('\n');
assertEquals(progress.toString(), finished.toString());
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index e355bb4d6c4..e7441acc203 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -369,6 +369,20 @@ public class Flags {
"Takes effect at redeploy",
ZONE_ID, APPLICATION_ID);
+ public static final UnboundBooleanFlag ASYNC_APPLY_BUCKET_DIFF = defineFeatureFlag(
+ "async-apply-bucket-diff", false,
+ List.of("geirst", "vekterli"), "2021-10-22", "2022-01-31",
+ "Whether portions of apply bucket diff handling will be performed asynchronously",
+ "Takes effect at redeploy",
+ ZONE_ID, APPLICATION_ID);
+
+ public static final UnboundStringFlag JDK_VERSION = defineStringFlag(
+ "jdk-version", "11",
+ List.of("hmusum"), "2021-10-25", "2021-11-25",
+ "JDK version to use inside containers",
+ "Takes effect on restart of Docker container",
+ APPLICATION_ID);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
index 2c0614805a9..88e2393904e 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
@@ -218,7 +218,6 @@ public class PermanentFlags {
"Takes effect immediately",
ZONE_ID, APPLICATION_ID);
-
private PermanentFlags() {}
private static UnboundBooleanFlag defineFeatureFlag(
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
index 92cc35fc354..962e6b32947 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.HttpRoute;
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
index 50af29f92aa..91810b50778 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import com.yahoo.security.tls.MixedMode;
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
index e01d278ff38..52f7ad9b56b 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
index 514eae56fe8..adbc445de1a 100644
--- a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
+++ b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc4.retry;
import org.apache.http.HttpResponse;
diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
index 58dc25fdf1a..78c413fba56 100644
--- a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
+++ b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.HttpRoute;
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
index b7993de5d82..50ac90b6181 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
@@ -168,8 +168,5 @@ class OsgiLogHandler extends Handler {
return new Hashtable<>();
}
- @Override
- public <A> A adapt(Class<A> aClass) { return null; }
-
}
}
diff --git a/parent/pom.xml b/parent/pom.xml
index a3d2111062e..18fad225aad 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -849,10 +849,10 @@
<properties>
<antlr.version>3.5.2</antlr.version>
<antlr4.version>4.5</antlr4.version>
- <apache.httpclient.version>4.5.12</apache.httpclient.version>
+ <apache.httpclient.version>4.5.13</apache.httpclient.version>
<apache.httpcore.version>4.4.13</apache.httpcore.version>
<apache.httpclient5.version>5.1</apache.httpclient5.version>
- <asm.version>9.2</asm.version>
+ <asm.version>9.1</asm.version>
<!-- Athenz dependencies. Make sure these dependencies match those in Vespa's internal repositories -->
<athenz.version>1.10.14</athenz.version>
<jjwt.version>0.11.2</jjwt.version>
@@ -870,7 +870,7 @@
<junit.version>5.7.0</junit.version>
<maven-assembly-plugin.version>3.1.1</maven-assembly-plugin.version>
<!-- TODO: in order to upgrade above 4.1.0, we probably need to convert fat-model-deps to a jar artifact. -->
- <maven-bundle-plugin.version>5.1.2</maven-bundle-plugin.version>
+ <maven-bundle-plugin.version>4.1.0</maven-bundle-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-deploy-plugin.version>2.8.1</maven-deploy-plugin.version>
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 6e4f38fe564..74fef13f141 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -34,12 +34,12 @@ BucketContent::BucketContent()
_inUse(false),
_outdatedInfo(true),
_active(false)
-{ }
+{}
+
BucketContent::~BucketContent() = default;
uint32_t
-BucketContent::computeEntryChecksum(const BucketEntry& e) const
-{
+BucketContent::computeEntryChecksum(const BucketEntry &e) const {
vespalib::crc_32_type checksummer;
uint64_t ts(e.entry->getTimestamp());
@@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const
}
BucketChecksum
-BucketContent::updateRollingChecksum(uint32_t entryChecksum)
-{
+BucketContent::updateRollingChecksum(uint32_t entryChecksum) {
uint32_t checksum = _info.getChecksum();
checksum ^= entryChecksum;
if (checksum == 0) {
@@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum)
return BucketChecksum(checksum);
}
-const BucketInfo&
-BucketContent::getBucketInfo() const
-{
+const BucketInfo &
+BucketContent::getBucketInfo() const {
if (!_outdatedInfo) {
return _info;
}
@@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const
uint32_t totalSize = 0;
uint32_t checksum = 0;
- for (const BucketEntry & bucketEntry : _entries) {
- const DocEntry& entry(*bucketEntry.entry);
- const GlobalId& gid(bucketEntry.gid);
+ for (const BucketEntry &bucketEntry: _entries) {
+ const DocEntry &entry(*bucketEntry.entry);
+ const GlobalId &gid(bucketEntry.gid);
GidMapType::const_iterator gidIt(_gidMap.find(gid));
assert(gidIt != _gidMap.end());
@@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const
namespace {
struct TimestampLess {
- bool operator()(const BucketEntry &bucketEntry, Timestamp t)
- { return bucketEntry.entry->getTimestamp() < t; }
- bool operator()(Timestamp t, const BucketEntry &bucketEntry)
- { return t < bucketEntry.entry->getTimestamp(); }
+ bool operator()(const BucketEntry &bucketEntry, Timestamp t) {
+ return bucketEntry.entry->getTimestamp() < t;
+ }
+
+ bool operator()(Timestamp t, const BucketEntry &bucketEntry) {
+ return t < bucketEntry.entry->getTimestamp();
+ }
};
} // namespace
bool
-BucketContent::hasTimestamp(Timestamp t) const
-{
+BucketContent::hasTimestamp(Timestamp t) const {
if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) {
return false;
}
@@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const
*/
void
-BucketContent::insert(DocEntry::SP e)
-{
+BucketContent::insert(DocEntry::SP e) {
LOG(spam, "insert(%s)", e->toString().c_str());
- const DocumentId* docId(e->getDocumentId());
+ const DocumentId *docId(e->getDocumentId());
assert(docId != 0);
GlobalId gid(docId->getGlobalId());
GidMapType::iterator gidIt(_gidMap.find(gid));
@@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e)
_entries.back().entry->getTimestamp() < e->getTimestamp()) {
_entries.push_back(BucketEntry(e, gid));
} else {
- std::vector<BucketEntry>::iterator it =
- lower_bound(_entries.begin(),
- _entries.end(),
- e->getTimestamp(),
- TimestampLess());
+ auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess());
if (it != _entries.end()) {
if (it->entry->getTimestamp() == e->getTimestamp()) {
if (*it->entry.get() == *e) {
- LOG(debug, "Ignoring duplicate put entry %s",
- e->toString().c_str());
+ LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str());
return;
} else {
- LOG(error, "Entry %s was already present."
- "Was trying to insert %s.",
- it->entry->toString().c_str(),
- e->toString().c_str());
+ LOG(error, "Entry %s was already present. Was trying to insert %s.",
+ it->entry->toString().c_str(), e->toString().c_str());
LOG_ABORT("should not reach here");
}
}
@@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e)
// newer versions of a document etc. by XORing away old checksum.
gidIt->second = e;
} else {
- LOG(spam,
- "Newly inserted entry %s was older than existing entry %s; "
- "not updating GID mapping",
- e->toString().c_str(),
- gidIt->second->toString().c_str());
+ LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping",
+ e->toString().c_str(), gidIt->second->toString().c_str());
}
_outdatedInfo = true;
} else {
@@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e)
_info.getActive());
}
- LOG(spam,
- "After cheap bucketinfo update, state is %s (inserted %s)",
- _info.toString().c_str(),
- e->toString().c_str());
+ LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)",
+ _info.toString().c_str(), e->toString().c_str());
}
}
@@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e)
}
DocEntry::SP
-BucketContent::getEntry(const DocumentId& did) const
-{
- GidMapType::const_iterator it(_gidMap.find(did.getGlobalId()));
+BucketContent::getEntry(const DocumentId &did) const {
+ auto it(_gidMap.find(did.getGlobalId()));
if (it != _gidMap.end()) {
return it->second;
}
@@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const
}
DocEntry::SP
-BucketContent::getEntry(Timestamp t) const
-{
- std::vector<BucketEntry>::const_iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::getEntry(Timestamp t) const {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter == _entries.end() || iter->entry->getTimestamp() != t) {
return DocEntry::SP();
@@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const
}
void
-BucketContent::eraseEntry(Timestamp t)
-{
- std::vector<BucketEntry>::iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::eraseEntry(Timestamp t) {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter != _entries.end() && iter->entry->getTimestamp() == t) {
assert(iter->entry->getDocumentId() != 0);
- GidMapType::iterator gidIt(
- _gidMap.find(iter->entry->getDocumentId()->getGlobalId()));
+ GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId());
assert(gidIt != _gidMap.end());
_entries.erase(iter);
if (gidIt->second->getTimestamp() == t) {
@@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t)
}
}
-DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
: _initialized(false),
_repo(repo),
_content(),
@@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen
DummyPersistence::~DummyPersistence() = default;
document::select::Node::UP
-DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf)
-{
+DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) {
document::select::Node::UP ret;
try {
document::select::Parser parser(*_repo, document::BucketIdFactory());
ret = parser.parse(documentSelection);
- } catch (document::select::ParsingFailedException& e) {
+ } catch (document::select::ParsingFailedException &e) {
return document::select::Node::UP();
}
if (ret->isLeafNode() && !allowLeaf) {
@@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a
}
Result
-DummyPersistence::initialize()
-{
+DummyPersistence::initialize() {
assert(!_initialized);
_initialized = true;
return Result();
}
#define DUMMYPERSISTENCE_VERIFY_INITIALIZED \
- if (!_initialized) throw vespalib::IllegalStateException( \
- "initialize() must always be called first in order to " \
- "trigger lazy initialization.", VESPA_STRLOC)
-
+ if (!_initialized) { \
+ LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \
+ abort(); \
+ }
BucketIdListResult
DummyPersistence::listBuckets(BucketSpace bucketSpace) const
@@ -714,8 +693,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&)
return Result();
}
-Result
-DummyPersistence::createBucket(const Bucket& b, Context&)
+void
+DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "createBucket(%s)", b.toString().c_str());
@@ -727,11 +706,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
assert(!_content[b]->_inUse);
LOG(debug, "%s already existed", b.toString().c_str());
}
- return Result();
+ onComplete->onComplete(std::make_unique<Result>());
}
void
-DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "deleteBucket(%s)", b.toString().c_str());
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 99d6ba717b7..a25bf6b8a8e 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -168,8 +168,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket&, Context&) override;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index e287bdc5252..3b59f20ca96 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider
{
public:
Result initialize() override { return Result(); };
- Result createBucket(const Bucket&, Context&) override { return Result(); }
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); }
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
index 02c626ea23e..7b04498205d 100644
--- a/persistence/src/vespa/persistence/spi/catchresult.h
+++ b/persistence/src/vespa/persistence/spi/catchresult.h
@@ -19,4 +19,9 @@ private:
const ResultHandler *_resulthandler;
};
+class NoopOperationComplete : public OperationComplete {
+ void onComplete(std::unique_ptr<spi::Result>) noexcept override { }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 3ea476c33fc..31db08a6f4f 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat
}
Result
+PersistenceProvider::createBucket(const Bucket& bucket, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ createBucketAsync(bucket, context, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 83eb042d855..269175f7d26 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -58,6 +58,7 @@ struct PersistenceProvider
virtual ~PersistenceProvider();
// TODO Move to utility class for use in tests only
+ Result createBucket(const Bucket&, Context&);
Result deleteBucket(const Bucket&, Context&);
Result put(const Bucket&, Timestamp, DocumentSP, Context&);
Result setActiveState(const Bucket&, BucketInfo::ActiveState);
@@ -336,14 +337,14 @@ struct PersistenceProvider
* Tells the provider that the given bucket has been created in the
* service layer. There is no requirement to do anything here.
*/
- virtual Result createBucket(const Bucket&, Context&) = 0;
+ virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* Deletes the given bucket and all entries contained in that bucket.
* After this operation has succeeded, a restart of the provider should
* not yield the bucket in getBucketList().
*/
- virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0;
+ virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* This function is called continuously by the service layer. It allows the
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
index 74e0971178c..406432ef697 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
@@ -10,6 +10,8 @@ ExecutorMetrics::update(const vespalib::ExecutorStats &stats)
maxPending.set(stats.queueSize.max());
accepted.inc(stats.acceptedTasks);
rejected.inc(stats.rejectedTasks);
+ wakeupCount.inc(stats.wakeupCount);
+ util.set(stats.getUtil());
const auto & qSize = stats.queueSize;
queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max());
}
@@ -19,6 +21,8 @@ ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *pa
maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this),
accepted("accepted", {}, "Number of accepted tasks", this),
rejected("rejected", {}, "Number of rejected tasks", this),
+ wakeupCount("wakeups", {}, "Number of times a worker thread has been woken up", this),
+ util("utilization", {}, "Ratio of time the worker threads has been active", this),
queueSize("queuesize", {}, "Size of task queue", this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
index 273c4ed8979..31d959a399f 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
@@ -11,9 +11,11 @@ namespace proton {
struct ExecutorMetrics : metrics::MetricSet
{
- metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
- metrics::LongCountMetric accepted;
- metrics::LongCountMetric rejected;
+ metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
+ metrics::LongCountMetric accepted;
+ metrics::LongCountMetric rejected;
+ metrics::LongCountMetric wakeupCount;
+ metrics::DoubleValueMetric util;
metrics::LongAverageMetric queueSize;
void update(const vespalib::ExecutorStats &stats);
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 114292d055d..2e1fc74037c 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
}
-Result
-PersistenceEngine::createBucket(const Bucket &b, Context &)
+void
+PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "createBucket(%s)", b.toString().c_str());
HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace());
- TransportLatch latch(snap.size());
- for (; snap.handlers().valid(); snap.handlers().next()) {
+
+ auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleCreateBucket(feedtoken::make(latch), b);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleCreateBucket(feedtoken::make(transportContext), b);
+ } else {
+ handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b);
+ }
}
- latch.await();
- return latch.getResult();
}
void
-PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "deleteBucket(%s)", b.toString().c_str());
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 94331ac2cd6..fe564d01459 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -114,8 +114,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket &bucketId, Context &) override ;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
diff --git a/searchlib/src/tests/attribute/changevector/changevector_test.cpp b/searchlib/src/tests/attribute/changevector/changevector_test.cpp
index 3e2c851c541..c37d233217b 100644
--- a/searchlib/src/tests/attribute/changevector/changevector_test.cpp
+++ b/searchlib/src/tests/attribute/changevector/changevector_test.cpp
@@ -123,7 +123,7 @@ TEST("require that buffer can grow some, but not unbound") {
TEST("Control Change size") {
EXPECT_EQUAL(32u, sizeof(ChangeTemplate<NumericChangeData<long>>));
- EXPECT_EQUAL(88u, sizeof(ChangeTemplate<StringChangeData>));
+ EXPECT_EQUAL(80u, sizeof(ChangeTemplate<StringChangeData>));
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.h b/searchlib/src/vespa/searchlib/attribute/attributevector.h
index bb88b168474..f5a79c3637e 100644
--- a/searchlib/src/vespa/searchlib/attribute/attributevector.h
+++ b/searchlib/src/vespa/searchlib/attribute/attributevector.h
@@ -357,23 +357,22 @@ protected:
static double round(double v, double & r) { return r = v; }
static largeint_t round(double v, largeint_t &r) { return r = static_cast<largeint_t>(::floor(v+0.5)); }
- template <typename BaseType, typename ChangeData>
+ template <typename BaseType, typename LargeType>
static BaseType
- applyArithmetic(const BaseType &value, const ChangeTemplate<ChangeData> & arithmetic)
+ applyArithmetic(const BaseType &value, double operand, ChangeBase::Type type)
{
- typedef typename ChangeData::DataType LargeType;
if (attribute::isUndefined(value)) {
return value;
- } else if (arithmetic._type == ChangeBase::ADD) {
- return value + static_cast<LargeType>(arithmetic._arithOperand);
- } else if (arithmetic._type == ChangeBase::SUB) {
- return value - static_cast<LargeType>(arithmetic._arithOperand);
- } else if (arithmetic._type == ChangeBase::MUL) {
+ } else if (type == ChangeBase::ADD) {
+ return value + static_cast<LargeType>(operand);
+ } else if (type == ChangeBase::SUB) {
+ return value - static_cast<LargeType>(operand);
+ } else if (type == ChangeBase::MUL) {
LargeType r;
- return round((static_cast<double>(value) * arithmetic._arithOperand), r);
- } else if (arithmetic._type == ChangeBase::DIV) {
+ return round((static_cast<double>(value) * operand), r);
+ } else if (type == ChangeBase::DIV) {
LargeType r;
- return round(static_cast<double>(value) / arithmetic._arithOperand, r);
+ return round(static_cast<double>(value) / operand, r);
}
return value;
}
diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.hpp b/searchlib/src/vespa/searchlib/attribute/attributevector.hpp
index 25658401f21..623e10ef052 100644
--- a/searchlib/src/vespa/searchlib/attribute/attributevector.hpp
+++ b/searchlib/src/vespa/searchlib/attribute/attributevector.hpp
@@ -116,7 +116,7 @@ AttributeVector::applyArithmetic(ChangeVectorT< ChangeTemplate<T> > & changes, D
_status.incNonIdempotentUpdates(diff);
_status.incUpdates(diff);
if (diff > 0) {
- changes.back()._arithOperand = aop;
+ changes.back()._data.setArithOperand(aop);
}
return true;
}
diff --git a/searchlib/src/vespa/searchlib/attribute/changevector.h b/searchlib/src/vespa/searchlib/attribute/changevector.h
index 12ac77febb9..f1fb58eb9d0 100644
--- a/searchlib/src/vespa/searchlib/attribute/changevector.h
+++ b/searchlib/src/vespa/searchlib/attribute/changevector.h
@@ -31,16 +31,14 @@ struct ChangeBase {
_type(NOOP),
_doc(0),
_weight(1),
- _enumScratchPad(UNSET_ENUM),
- _arithOperand(0)
+ _enumScratchPad(UNSET_ENUM)
{ }
ChangeBase(Type type, uint32_t d, int32_t w = 1) :
_type(type),
_doc(d),
_weight(w),
- _enumScratchPad(UNSET_ENUM),
- _arithOperand(0)
+ _enumScratchPad(UNSET_ENUM)
{ }
int cmp(const ChangeBase &b) const { int diff(_doc - b._doc); return diff; }
@@ -53,19 +51,21 @@ struct ChangeBase {
uint32_t _doc;
int32_t _weight;
mutable uint32_t _enumScratchPad;
- double _arithOperand;
};
template <typename T>
class NumericChangeData {
private:
- T _v;
+ double _arithOperand;
+ T _v;
public:
typedef T DataType;
- NumericChangeData(T v) : _v(v) { }
- NumericChangeData() : _v(T()) { }
+ NumericChangeData(T v) : _arithOperand(0), _v(v) { }
+ NumericChangeData() : _arithOperand(0), _v(T()) { }
+ double getArithOperand() const { return _arithOperand; }
+ void setArithOperand(double operand) { _arithOperand = operand; }
T get() const { return _v; }
T raw() const { return _v; }
operator T() const { return _v; }
diff --git a/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp b/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp
index 9a71248a53d..3fe7d147c1d 100644
--- a/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp
@@ -66,7 +66,7 @@ SingleBoolAttribute::onCommit() {
setBit(change._doc, change._data != 0);
} else if ((change._type >= ChangeBase::ADD) && (change._type <= ChangeBase::DIV)) {
std::atomic_thread_fence(std::memory_order_release);
- int8_t val = applyArithmetic(getFast(change._doc), change);
+ int8_t val = applyArithmetic<int8_t, largeint_t>(getFast(change._doc), change._data.getArithOperand(), change._type);
setBit(change._doc, val != 0);
} else if (change._type == ChangeBase::CLEARDOC) {
std::atomic_thread_fence(std::memory_order_release);
diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp
index 4f2be68683d..cbff4d0e361 100644
--- a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp
+++ b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp
@@ -43,7 +43,7 @@ SingleValueNumericAttribute<B>::onCommit()
_data[change._doc] = change._data;
} else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) {
std::atomic_thread_fence(std::memory_order_release);
- _data[change._doc] = this->applyArithmetic(_data[change._doc], change);
+ _data[change._doc] = applyArithmetic<T, typename B::Change::DataType>(_data[change._doc], change._data.getArithOperand(), change._type);
} else if (change._type == ChangeBase::CLEARDOC) {
std::atomic_thread_fence(std::memory_order_release);
_data[change._doc] = this->_defaultValue._data;
diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp
index 55a82210a96..585b4514ba4 100644
--- a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp
+++ b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp
@@ -34,7 +34,7 @@ SingleValueNumericEnumAttribute<B>::considerArithmeticAttributeChange(const Chan
oldValue = get(c._doc);
}
- T newValue = this->applyArithmetic(oldValue, c);
+ T newValue = applyArithmetic<T, typename Change::DataType>(oldValue, c._data.getArithOperand(), c._type);
EnumIndex idx;
if (!this->_enumStore.find_index(newValue, idx)) {
@@ -52,7 +52,7 @@ SingleValueNumericEnumAttribute<B>::applyArithmeticValueChange(const Change& c,
{
EnumIndex oldIdx = this->_enumIndices[c._doc];
EnumIndex newIdx;
- T newValue = this->applyArithmetic(get(c._doc), c);
+ T newValue = applyArithmetic<T, typename Change::DataType>(get(c._doc), c._data.getArithOperand(), c._type);
this->_enumStore.find_index(newValue, newIdx);
this->updateEnumRefCounts(c, newIdx, oldIdx, updater);
diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp
index 12138b0cfbc..5a610cc3da8 100644
--- a/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp
+++ b/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp
@@ -102,7 +102,7 @@ SingleValueNumericPostingAttribute<B>::applyValueChanges(EnumStoreBatchUpdater&
} else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) {
if (oldIdx.valid()) {
T oldValue = enumStore.get_value(oldIdx);
- T newValue = this->applyArithmetic(oldValue, change);
+ T newValue = applyArithmetic<T, typename Change::DataType>(oldValue, change._data.getArithOperand(), change._type);
EnumIndex newIdx;
(void) dictionary.find_index(enumStore.make_comparator(newValue), newIdx);
currEnumIndices[change._doc] = newIdx;
diff --git a/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp b/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp
index 9346bc43370..24dec664547 100644
--- a/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp
@@ -60,7 +60,7 @@ SingleValueSmallNumericAttribute::onCommit()
set(change._doc, change._data);
} else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) {
std::atomic_thread_fence(std::memory_order_release);
- set(change._doc, applyArithmetic(getFast(change._doc), change));
+ set(change._doc, applyArithmetic<T, typename Change::DataType>(getFast(change._doc), change._data.getArithOperand(), change._type));
} else if (change._type == ChangeBase::CLEARDOC) {
std::atomic_thread_fence(std::memory_order_release);
set(change._doc, 0u);
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 869ff0456e1..099a958a00b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand()
AdaptiveSequencedExecutor::Worker::Worker()
: cond(),
+ idleTracker(),
state(State::RUNNING),
strand(nullptr)
{
@@ -151,9 +152,12 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m
} else {
worker.state = Worker::State::BLOCKED;
_worker_stack.push(&worker);
+ worker.idleTracker.set_idle(steady_clock::now());
while (worker.state == Worker::State::BLOCKED) {
worker.cond.wait(lock);
}
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
+ _stats.wakeupCount++;
}
return (worker.state == Worker::State::RUNNING);
}
@@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t
_worker_stack(num_threads),
_self(),
_stats(),
+ _idleTracker(steady_clock::now()),
_cfg(num_threads, max_waiting, max_pending)
{
_stats.queueSize.add(_self.pending_tasks);
@@ -329,6 +334,11 @@ AdaptiveSequencedExecutor::getStats()
{
auto guard = std::lock_guard(_mutex);
ExecutorStats stats = _stats;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _worker_stack.size(); i++) {
+ _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now));
+ }
+ stats.setUtil(_cfg.num_threads, _idleTracker.reset(now, _cfg.num_threads));
_stats = ExecutorStats();
_stats.queueSize.add(_self.pending_tasks);
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index fdcdf35fbbb..776248384a5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -3,6 +3,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/eventbarrier.hpp>
@@ -70,7 +71,7 @@ private:
struct Strand {
enum class State { IDLE, WAITING, ACTIVE };
State state;
- vespalib::ArrayQueue<TaggedTask> queue;
+ ArrayQueue<TaggedTask> queue;
Strand();
~Strand();
};
@@ -81,6 +82,7 @@ private:
struct Worker {
enum class State { RUNNING, BLOCKED, DONE };
std::condition_variable cond;
+ ThreadIdleTracker idleTracker;
State state;
Strand *strand;
Worker();
@@ -107,7 +109,7 @@ private:
static constexpr size_t STACK_SIZE = (256 * 1024);
AdaptiveSequencedExecutor &parent;
std::unique_ptr<FastOS_ThreadPool> pool;
- vespalib::Gate allow_worker_exit;
+ Gate allow_worker_exit;
ThreadTools(AdaptiveSequencedExecutor &parent_in);
~ThreadTools();
void Run(FastOS_ThreadInterface *, void *) override;
@@ -123,11 +125,12 @@ private:
std::unique_ptr<ThreadTools> _thread_tools;
mutable std::mutex _mutex;
std::vector<Strand> _strands;
- vespalib::ArrayQueue<Strand*> _wait_queue;
- vespalib::ArrayQueue<Worker*> _worker_stack;
+ ArrayQueue<Strand*> _wait_queue;
+ ArrayQueue<Worker*> _worker_stack;
EventBarrier<BarrierCompletion> _barrier;
Self _self;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Config _cfg;
void maybe_block_self(std::unique_lock<std::mutex> &lock);
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index 3bd5ca3d49a..c1e56572614 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -23,7 +23,7 @@ public:
}
size_t getNumThreads() const override { return 0; }
ExecutorStats getStats() override {
- return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 0);
}
void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
index 0d132193af1..703256c5521 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -19,7 +19,7 @@ ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
ForegroundTaskExecutor::~ForegroundTaskExecutor() = default;
void
-ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+ForegroundTaskExecutor::executeTask(ExecutorId id, Executor::Task::UP task)
{
assert(id.getId() < getNumExecutors());
task->run();
@@ -35,8 +35,8 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
}
-vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
+ExecutorStats ForegroundTaskExecutor::getStats() {
+ return ExecutorStats(ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 0);
}
ISequencedTaskExecutor::ExecutorId
@@ -44,4 +44,4 @@ ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const {
return ExecutorId(componentId%getNumExecutors());
}
-} // namespace search
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
index 7359de0ba66..9d351aca653 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
@@ -22,10 +22,10 @@ public:
~ForegroundTaskExecutor() override;
ExecutorId getExecutorId(uint64_t componentId) const override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void executeTask(ExecutorId id, Executor::Task::UP task) override;
void sync() override;
void setTaskLimit(uint32_t taskLimit) override;
- vespalib::ExecutorStats getStats() override;
+ ExecutorStats getStats() override;
private:
std::atomic<uint64_t> _accepted;
};
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 727894397a7..9e95bdaa3ab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
for (auto &executor :* _executors) {
- accumulatedStats += executor->getStats();
+ accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 803ec4f3f7c..af95918ccab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_consumerCondition(),
_producerCondition(),
_thread(*this),
+ _idleTracker(steady_clock::now()),
+ _threadIdleTracker(),
+ _wakeupCount(0),
_lastAccepted(0),
_queueSize(),
_wakeupConsumerAt(0),
@@ -115,7 +118,11 @@ SingleExecutor::run() {
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, _reactionTime);
+ steady_time now = steady_clock::now();
+ _threadIdleTracker.set_idle(now);
+ _consumerCondition.wait_until(lock, now + _reactionTime);
+ _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now()));
+ _wakeupCount++;
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) {
drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
- taskLimit = _taskLimit;
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
@@ -162,7 +168,11 @@ ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0);
+ steady_time now = steady_clock::now();
+ _idleTracker.was_idle(_threadIdleTracker.reset(now));
+ ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
+ stats.setUtil(1, _idleTracker.reset(now, 1));
+ _wakeupCount = 0;
_lastAccepted = accepted;
_queueSize = ExecutorStats::QueueSizeT() ;
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 8e9c1ae3fa1..7d868322558 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -18,7 +19,7 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit);
SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
@@ -54,6 +55,9 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ ExecutorIdleTracker _idleTracker;
+ ThreadIdleTracker _threadIdleTracker;
+ uint64_t _wakeupCount;
uint64_t _lastAccepted;
ExecutorStats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index fe77477fa77..701e8a80d3a 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -11,6 +11,8 @@
using document::DocumentId;
using document::test::makeDocumentBucket;
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
namespace storage {
@@ -41,6 +43,7 @@ public:
throw std::runtime_error(_fail);
}
}
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { }
void set_fail(vespalib::string fail) { _fail = std::move(fail); }
};
@@ -72,6 +75,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test
public:
uint32_t sync_count;
DummyMergeBucketInfoSyncer syncer;
+ MonitoredRefCount monitored_ref_count;
ApplyBucketDiffStateTestBase()
: ::testing::Test(),
@@ -82,8 +86,8 @@ public:
~ApplyBucketDiffStateTestBase();
- std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket));
+ std::shared_ptr<ApplyBucketDiffState> make_state() {
+ return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index b3bd1c6a253..02b43a32df3 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -24,7 +24,7 @@
#define CHECK_ERROR_ASYNC(className, failType, onError) \
{ \
- Guard guard(_lock); \
+ Guard guard(_lock); \
if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
return; \
@@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const
return _spi.listBuckets(bucketSpace);
}
-spi::Result
-PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
LOG_SPI("createBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET);
- return _spi.createBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete);
+ return _spi.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketInfoResult
@@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
void
PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
- spi::OperationComplete::UP operationComplete)
+ spi::OperationComplete::UP operationComplete) noexcept
{
LOG_SPI("deleteBucket(" << bucket << ")");
CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index c6628814dba..cfc7002a643 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -96,7 +96,7 @@ public:
void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
spi::OperationComplete::UP up) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
@@ -111,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 07d2b24d536..a3f0182ba30 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -62,13 +62,13 @@ public:
return PersistenceProviderWrapper::getBucketInfo(bucket);
}
- spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_createBucketInvocations;
- return PersistenceProviderWrapper::createBucket(bucket, ctx);
+ PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete));
}
void
- deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_deleteBucketInvocations;
PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 60030004594..ed50730d79f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -20,7 +20,12 @@ using namespace ::testing;
namespace storage {
-struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
+/*
+ * Class for testing merge handler taking async_apply_bucket_diff as
+ * parameter for the test.
+ */
+struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
+ public testing::WithParamInterface<bool> {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
int _counter;
MessageSenderStub _stub;
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
+ void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler);
};
+ void convert_delayed_error_to_exception(MergeHandler& handler);
+
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
+ }
+
+ std::shared_ptr<api::StorageMessage> get_queued_reply() {
+ std::shared_ptr<api::StorageMessage> msg;
+ if (_replySender.queue.getNext(msg, 0s)) {
+ return msg;
+ } else {
+ return {};
+ }
+
}
};
@@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_F(MergeHandlerTest, merge_bucket_command) {
+TEST_P(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_F(MergeHandlerTest, master_message_flow) {
+TEST_P(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_F(MergeHandlerTest, max_timestamp) {
+TEST_P(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_F(MergeHandlerTest, spi_flush_guard) {
+TEST_P(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
EXPECT_EQ(0x0, diff[0]._entry._hasMask);
}
+void
+MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler)
+{
+ handler.drain_async_writes();
+ if (getEnv()._fileStorHandler.isMerging(_bucket)) {
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ api::ReturnCode return_code;
+ s->check_delayed_error(return_code);
+ if (return_code.failed()) {
+ getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code);
+ fetchSingleMessage<api::ApplyBucketDiffReply>();
+ fetchSingleMessage<api::ApplyBucketDiffCommand>();
+ throw std::runtime_error(return_code.getMessage());
+ }
+ }
+}
+
std::string
MergeHandlerTest::doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -831,7 +872,6 @@ TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -855,14 +895,13 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -888,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -953,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1006,6 +1045,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
}
void
+MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler)
+{
+ handler.drain_async_writes();
+ if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) {
+ auto chained_reply = _stub.replies.back();
+ _stub.replies.pop_back();
+ test.messageKeeper().sendReply(chained_reply);
+ throw std::runtime_error(chained_reply->getResult().getMessage());
+ }
+}
+
+void
MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
@@ -1016,6 +1067,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
+ if (test.GetParam()) {
+ convert_delayed_error_to_exception(test, handler);
+ }
}
std::string
@@ -1039,7 +1093,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1066,7 +1120,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_F(MergeHandlerTest, remove_from_diff) {
+TEST_P(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1132,7 +1186,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1156,8 +1210,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
+ if (GetParam()) {
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ } else {
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ }
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -1246,7 +1307,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1377,9 +1438,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
+ if (GetParam()) {
+ handler.drain_async_writes();
+ }
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
}
+VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index eb7a5ef5bc6..556760b347e 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -2,21 +2,35 @@
#include "apply_bucket_diff_state.h"
#include "mergehandler.h"
+#include "persistenceutil.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/vespalib/stllike/asciistream.h>
using storage::spi::Result;
+using vespalib::RetainGuard;
namespace storage {
-ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket)
+class ApplyBucketDiffState::Deleter {
+public:
+ void operator()(ApplyBucketDiffState *raw_state) const noexcept {
+ std::unique_ptr<ApplyBucketDiffState> state(raw_state);
+ raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state));
+ }
+};
+
+ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
_fail_message(),
_failed_flag(),
_stale_bucket_info(false),
- _promise()
+ _promise(),
+ _tracker(),
+ _delayed_reply(),
+ _sender(nullptr),
+ _retain_guard(std::move(retain_guard))
{
}
@@ -32,6 +46,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState()
if (_promise.has_value()) {
_promise.value().set_value(_fail_message);
}
+ if (_delayed_reply) {
+ if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) {
+ _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message));
+ }
+ if (_sender) {
+ _sender->sendReply(std::move(_delayed_reply));
+ } else {
+ // _tracker->_reply and _delayed_reply points to the same reply.
+ _tracker->sendReply();
+ }
+ }
}
void
@@ -69,4 +94,26 @@ ApplyBucketDiffState::get_future()
return _promise.value().get_future();
}
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _delayed_reply = std::move(delayed_reply);
+}
+
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _sender = &sender;
+ _delayed_reply = std::move(delayed_reply);
+}
+
+std::shared_ptr<ApplyBucketDiffState>
+ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+{
+ std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard)));
+ return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter());
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index af4174b06d6..7157c69191b 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -3,16 +3,20 @@
#pragma once
#include <vespa/persistence/spi/bucket.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
#include <vector>
namespace document { class DocumentId; }
+namespace storage::api { class StorageReply; }
namespace storage::spi { class Result; }
namespace storage {
class ApplyBucketDiffEntryResult;
+class MessageSender;
+class MessageTracker;
class MergeBucketInfoSyncer;
/*
@@ -20,15 +24,21 @@ class MergeBucketInfoSyncer;
* for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
*/
class ApplyBucketDiffState {
+ class Deleter;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
spi::Bucket _bucket;
vespalib::string _fail_message;
std::atomic_flag _failed_flag;
bool _stale_bucket_info;
std::optional<std::promise<vespalib::string>> _promise;
+ std::unique_ptr<MessageTracker> _tracker;
+ std::shared_ptr<api::StorageReply> _delayed_reply;
+ MessageSender* _sender;
+ vespalib::RetainGuard _retain_guard;
+ ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
public:
- ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket);
+ static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -36,6 +46,9 @@ public:
void mark_stale_bucket_info();
void sync_bucket_info();
std::future<vespalib::string> get_future();
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 47b5e4f5f27..bc6e67578c0 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -5,6 +5,7 @@
#include "testandsethelper.h"
#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ tracker->sendReply();
+ });
+
+ if (cmd.getActive()) {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ } else {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.deleteBuckets);
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4f5c242570c..db5a77bfb59 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -30,6 +30,7 @@ public:
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 2ffb827accf..c32efb6aa66 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -209,6 +209,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
+ } else {
+ std::lock_guard guard(_lock);
+ for (auto& handler : _persistenceHandlers) {
+ handler->configure(*config);
+ }
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index febac5b87e5..a75eda5b1a4 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock,
uint32_t traceLevel)
: reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0),
pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock),
+ delayed_error(),
context(priority, traceLevel)
{}
@@ -122,4 +123,25 @@ MergeStatus::print(std::ostream& out, bool verbose,
}
}
+void
+MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in)
+{
+ delayed_error = std::move(delayed_error_in);
+}
+
+void
+MergeStatus::check_delayed_error(api::ReturnCode &return_code)
+{
+ if (!return_code.failed() && delayed_error.has_value()) {
+ // Wait for pending writes to local node to complete and check error
+ auto& future_error = delayed_error.value();
+ future_error.wait();
+ vespalib::string fail_message = future_error.get();
+ delayed_error.reset();
+ if (!fail_message.empty()) {
+ return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message));
+ }
+ }
+}
+
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index b28ca4e373a..05ffd1336a2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -9,7 +9,9 @@
#include <vector>
#include <deque>
+#include <future>
#include <memory>
+#include <optional>
namespace storage {
@@ -25,6 +27,7 @@ public:
std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff;
vespalib::duration timeout;
framework::MilliSecTimer startTime;
+ std::optional<std::future<vespalib::string>> delayed_error;
spi::Context context;
MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel);
@@ -40,6 +43,8 @@ public:
bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes);
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool isFirstNode() const { return static_cast<bool>(reply); }
+ void set_delayed_error(std::future<vespalib::string>&& delayed_error_in);
+ void check_delayed_error(api::ReturnCode &return_code);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
index e05991ad9e3..b3386c591e6 100644
--- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
+++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
@@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; }
namespace storage {
+class ApplyBucketDiffState;
+
/*
* Interface class for syncing bucket info during merge.
*/
@@ -13,6 +15,7 @@ class MergeBucketInfoSyncer {
public:
virtual ~MergeBucketInfoSyncer() = default;
virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0;
+ virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0;
};
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 17a16487ac4..c9ba43458b1 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -6,21 +6,25 @@
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
-#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
+
namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
uint32_t commonMergeChainOptimalizationMinimumSize,
bool async_apply_bucket_diff)
@@ -28,12 +32,20 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff)
+ _async_apply_bucket_diff(async_apply_bucket_diff),
+ _executor(executor)
+{
+}
+
+MergeHandler::~MergeHandler()
{
+ drain_async_writes();
}
+
namespace {
constexpr int getDeleteFlag() {
@@ -41,20 +53,6 @@ constexpr int getDeleteFlag() {
return 2;
}
-/**
- * Throws std::runtime_error if result has an error.
- */
-void
-checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op)
-{
- if (result.hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << op << " in " << bucket << ": " << result.toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-
class IteratorGuard {
spi::PersistenceProvider& _spi;
spi::IteratorId _iteratorId;
@@ -653,28 +651,32 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
- void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
- {
- for (const auto& entry : status.diff) {
- uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
- if ((entry_has_mask == 0u) ||
- (constrictHasMask && (entry_has_mask != hasMask))) {
- continue;
- }
- cmd.getDiff().emplace_back(entry);
- if (constrictHasMask) {
- cmd.getDiff().back()._entry._hasMask = newHasMask;
- } else {
- cmd.getDiff().back()._entry._hasMask = entry_has_mask;
- }
+
+void
+findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
+{
+ for (const auto& entry : status.diff) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
+ continue;
+ }
+ cmd.getDiff().emplace_back(entry);
+ if (constrictHasMask) {
+ cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
+}
+
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
- MessageSender& sender, spi::Context& context) const
+ MessageSender& sender, spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const
{
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
@@ -806,6 +808,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
}
cmd->setPriority(status.context.getPriority());
cmd->setTimeout(status.timeout);
+ if (async_results) {
+ // Check currently pending writes to local node before sending new command.
+ check_apply_diff_sync(std::move(async_results));
+ }
if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) {
framework::MilliSecTimer startTime(_clock);
fetchLocalData(bucket, cmd->getDiff(), 0, context);
@@ -883,7 +889,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
tracker->fail(api::ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel());
@@ -923,141 +930,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
namespace {
- uint8_t findOwnIndex(
- const std::vector<api::MergeBucketCommand::Node>& nodeList,
- uint16_t us)
- {
- for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
- if (nodeList[i].index == us) return i;
- }
- throw vespalib::IllegalStateException(
- "Got GetBucketDiff cmd on node not in nodelist in command",
- VESPA_STRLOC);
+uint8_t findOwnIndex(
+ const std::vector<api::MergeBucketCommand::Node>& nodeList,
+ uint16_t us)
+{
+ for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
+ if (nodeList[i].index == us) return i;
}
+ throw vespalib::IllegalStateException(
+ "Got GetBucketDiff cmd on node not in nodelist in command",
+ VESPA_STRLOC);
+}
- struct DiffEntryTimestampOrder
- : public std::binary_function<api::GetBucketDiffCommand::Entry,
- api::GetBucketDiffCommand::Entry, bool>
- {
- bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const {
- return (x._timestamp < y._timestamp);
- }
- };
-
- /**
- * Merges list A and list B together and puts the result in result.
- * Result is swapped in as last step to keep function exception safe. Thus
- * result can be listA or listB if wanted.
- *
- * listA and listB are assumed to be in the order found in the slotfile, or
- * in the order given by a previous call to this function. (In both cases
- * this will be sorted by timestamp)
- *
- * @return false if any suspect entries was found.
- */
- bool mergeLists(
- const std::vector<api::GetBucketDiffCommand::Entry>& listA,
- const std::vector<api::GetBucketDiffCommand::Entry>& listB,
- std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
- {
- bool suspect = false;
- std::vector<api::GetBucketDiffCommand::Entry> result;
- uint32_t i = 0, j = 0;
- while (i < listA.size() && j < listB.size()) {
- const api::GetBucketDiffCommand::Entry& a(listA[i]);
- const api::GetBucketDiffCommand::Entry& b(listB[j]);
- if (a._timestamp < b._timestamp) {
- result.push_back(a);
- ++i;
- } else if (a._timestamp > b._timestamp) {
- result.push_back(b);
- ++j;
- } else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
- if (!(a == b)) {
- if (a._gid == b._gid && a._flags == b._flags) {
- if ((a._flags & getDeleteFlag()) != 0 &&
- (b._flags & getDeleteFlag()) != 0)
- {
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
- LOG(debug, "Found entries with equal timestamps of "
- "the same gid who both are remove "
- "entries: %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- } else {
- LOG(error, "Found entries with equal timestamps of "
- "the same gid. This is likely same "
- "document where size of document varies:"
- " %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- }
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
- } else if ((a._flags & getDeleteFlag())
- != (b._flags & getDeleteFlag()))
+/**
+ * Merges list A and list B together and puts the result in result.
+ * Result is swapped in as last step to keep function exception safe. Thus
+ * result can be listA or listB if wanted.
+ *
+ * listA and listB are assumed to be in the order found in the slotfile, or
+ * in the order given by a previous call to this function. (In both cases
+ * this will be sorted by timestamp)
+ *
+ * @return false if any suspect entries was found.
+ */
+bool mergeLists(
+ const std::vector<api::GetBucketDiffCommand::Entry>& listA,
+ const std::vector<api::GetBucketDiffCommand::Entry>& listB,
+ std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
+{
+ bool suspect = false;
+ std::vector<api::GetBucketDiffCommand::Entry> result;
+ uint32_t i = 0, j = 0;
+ while (i < listA.size() && j < listB.size()) {
+ const api::GetBucketDiffCommand::Entry& a(listA[i]);
+ const api::GetBucketDiffCommand::Entry& b(listB[j]);
+ if (a._timestamp < b._timestamp) {
+ result.push_back(a);
+ ++i;
+ } else if (a._timestamp > b._timestamp) {
+ result.push_back(b);
+ ++j;
+ } else {
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
+ if (!(a == b)) {
+ if (a._gid == b._gid && a._flags == b._flags) {
+ if ((a._flags & getDeleteFlag()) != 0 &&
+ (b._flags & getDeleteFlag()) != 0)
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
- const api::GetBucketDiffCommand::Entry& deletedEntry(
- (a._flags & getDeleteFlag()) != 0 ? a : b);
- result.push_back(deletedEntry);
- LOG(debug,
- "Found put and remove on same timestamp. Keeping"
- "remove as it is likely caused by remove with "
- "copies unavailable at the time: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
+ LOG(debug, "Found entries with equal timestamps of "
+ "the same gid who both are remove "
+ "entries: %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
} else {
- LOG(error, "Found entries with equal timestamps that "
- "weren't the same entry: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
+ LOG(error, "Found entries with equal timestamps of "
+ "the same gid. This is likely same "
+ "document where size of document varies:"
+ " %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
}
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
+ suspect = true;
+ } else if ((a._flags & getDeleteFlag())
+ != (b._flags & getDeleteFlag()))
+ {
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
+ const api::GetBucketDiffCommand::Entry& deletedEntry(
+ (a._flags & getDeleteFlag()) != 0 ? a : b);
+ result.push_back(deletedEntry);
+ LOG(debug,
+ "Found put and remove on same timestamp. Keeping"
+ "remove as it is likely caused by remove with "
+ "copies unavailable at the time: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
} else {
+ LOG(error, "Found entries with equal timestamps that "
+ "weren't the same entry: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
result.push_back(a);
result.back()._hasMask |= b._hasMask;
+ suspect = true;
}
- ++i;
- ++j;
+ } else {
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
}
+ ++i;
+ ++j;
}
- if (i < listA.size()) {
- assert(j >= listB.size());
- for (uint32_t n = listA.size(); i<n; ++i) {
- result.push_back(listA[i]);
- }
- } else if (j < listB.size()) {
- assert(i >= listA.size());
- for (uint32_t n = listB.size(); j<n; ++j) {
- result.push_back(listB[j]);
- }
+ }
+ if (i < listA.size()) {
+ assert(j >= listB.size());
+ for (uint32_t n = listA.size(); i<n; ++i) {
+ result.push_back(listA[i]);
+ }
+ } else if (j < listB.size()) {
+ assert(i >= listA.size());
+ for (uint32_t n = listB.size(); j<n; ++j) {
+ result.push_back(listB[j]);
}
- result.swap(finalResult);
- return !suspect;
}
+ result.swap(finalResult);
+ return !suspect;
+}
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
-{
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const {
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ return handleGetBucketDiffStage2(cmd, std::move(tracker));
+}
+MessageTracker::UP
+MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
+{
+ spi::Bucket bucket(cmd.getBucket());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket.");
return tracker;
@@ -1171,7 +1173,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
@@ -1211,7 +1214,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1233,10 +1236,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
+ if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1260,10 +1266,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
}
- tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd));
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd);
+ tracker->setReply(reply);
static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff());
LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.",
bucket.toString().c_str(), cmd.getNodes()[index - 1].index);
+ if (async_results) {
+ async_results->set_delayed_reply(std::move(tracker), std::move(reply));
+ }
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
@@ -1280,6 +1290,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
cmd2->setPriority(cmd.getPriority());
cmd2->setTimeout(cmd.getTimeout());
s->pendingId = cmd2->getMsgId();
+ if (async_results) {
+ // Reply handler should check for delayed error.
+ s->set_delayed_error(async_results->get_future());
+ }
_env._fileStorHandler.sendCommand(cmd2);
// Everything went fine. Don't delete state but wait for reply
stateGuard.deactivate();
@@ -1290,12 +1304,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const
{
- (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
@@ -1316,6 +1329,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
api::StorageReply::SP replyToSend;
// Process apply bucket diff locally
api::ReturnCode returnCode = reply.getResult();
+ // Check for delayed error from handleApplyBucketDiff
+ s->check_delayed_error(returnCode);
try {
if (reply.getResult().failed()) {
LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str());
@@ -1329,9 +1344,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
+ if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1370,7 +1388,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
// Should reply now, since we failed.
replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
@@ -1392,6 +1410,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
throw;
}
+ if (async_results && replyToSend) {
+ replyToSend->setResult(returnCode);
+ async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend));
+ }
if (clearState) {
_env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
@@ -1402,4 +1424,26 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
+void
+MergeHandler::drain_async_writes()
+{
+ if (_monitored_ref_count) {
+ // Wait for related ApplyBucketDiffState objects to be destroyed
+ _monitored_ref_count->waitForZeroRefCount();
+ }
+}
+
+void
+MergeHandler::configure(bool async_apply_bucket_diff) noexcept
+{
+ _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
+}
+
+void
+MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
+{
+ auto bucket_id = state->get_bucket().getBucketId();
+ _executor.execute(bucket_id.getId(), [state = std::move(state)]() { });
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f6e8ddcf306..4daec4c0689 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -20,6 +20,9 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
+
+namespace vespalib { class ISequencedTaskExecutor; }
namespace storage {
@@ -44,10 +47,13 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
+ ~MergeHandler();
+
bool buildBucketInfoList(
const spi::Bucket& bucket,
Timestamp maxTimestamp,
@@ -64,27 +70,34 @@ public:
spi::Context& context,
std::shared_ptr<ApplyBucketDiffState> async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
+ void drain_async_writes();
+ void configure(bool async_apply_bucket_diff) noexcept;
private:
const framework::Clock &_clock;
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
- const bool _async_apply_bucket_diff;
+ std::atomic<bool> _async_apply_bucket_diff;
+ vespalib::ISequencedTaskExecutor& _executor;
+ MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
MessageSender& sender,
- spi::Context& context) const;
+ spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 1ef883fc810..d03c9a6d111 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock,
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize,
cfg.asyncApplyBucketDiff),
@@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::REVERT_ID:
return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
@@ -150,4 +150,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
+void
+PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept
+{
+ _mergeHandler.configure(config.asyncApplyBucketDiff);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a92c2dc78ca..c60fb05e56e 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -35,6 +35,7 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
+ void configure(vespa::config::content::StorFilestorConfig& config) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ce424f0ce83..9ccd901744b 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context&
return checkResult(_impl.destroyIterator(iteratorId, context));
}
-spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
- return checkResult(_impl.createBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
void
-ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
+ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
onComplete->addResultHandler(this);
- _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
+ _impl.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index c9d2411e372..14d20cf8a52 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -49,7 +49,6 @@ public:
spi::Context &context) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -63,7 +62,8 @@ public:
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4fe207e2e5..9a7a451b906 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t
}
MessageTracker::UP
-SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.visit);
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 2cfbc7016c0..009fd6dff52 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -22,7 +22,6 @@ public:
SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index 867cfccfaf9..360db5ea3d7 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -156,7 +156,7 @@ MergeBucketReply::print(std::ostream& out, bool verbose,
{
out << "MergeBucketReply(" << getBucketId() << ", to time "
<< _maxTimestamp << ", cluster state version: "
- << _clusterStateVersion << ", nodes: ";
+ << _clusterStateVersion << ", nodes: [";
for (uint32_t i=0; i<_nodes.size(); ++i) {
if (i != 0) out << ", ";
out << _nodes[i];
@@ -220,15 +220,16 @@ GetBucketDiffCommand::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
out << "GetBucketDiffCommand(" << getBucketId() << ", to time "
- << _maxTimestamp << ", nodes: ";
+ << _maxTimestamp << ", nodes: [";
for (uint32_t i=0; i<_nodes.size(); ++i) {
if (i != 0) out << ", ";
out << _nodes[i];
}
+
if (_diff.empty()) {
- out << ", no entries";
+ out << "], no entries";
} else if (verbose) {
- out << ",";
+ out << "],";
for (uint32_t i=0; i<_diff.size(); ++i) {
out << "\n" << indent << " ";
_diff[i].print(out, verbose, indent + " ");
@@ -258,15 +259,15 @@ GetBucketDiffReply::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
out << "GetBucketDiffReply(" << getBucketId() << ", to time "
- << _maxTimestamp << ", nodes: ";
+ << _maxTimestamp << ", nodes: [";
for (uint32_t i=0; i<_nodes.size(); ++i) {
if (i != 0) out << ", ";
out << _nodes[i];
}
if (_diff.empty()) {
- out << ", no entries";
+ out << "], no entries";
} else if (verbose) {
- out << ",";
+ out << "],";
for (uint32_t i=0; i<_diff.size(); ++i) {
out << "\n" << indent << " ";
_diff[i].print(out, verbose, indent + " ");
@@ -363,12 +364,12 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose,
totalSize += it->_bodyBlob.size();
if (it->filled()) ++filled;
}
- out << "ApplyBucketDiffCommand(" << getBucketId() << ", nodes: ";
+ out << "ApplyBucketDiffCommand(" << getBucketId() << ", nodes: [";
for (uint32_t i=0; i<_nodes.size(); ++i) {
if (i != 0) out << ", ";
out << _nodes[i];
}
- out << _diff.size() << " entries of " << totalSize << " bytes, "
+ out << "], " << _diff.size() << " entries of " << totalSize << " bytes, "
<< (100.0 * filled / _diff.size()) << " \% filled)";
if (_diff.empty()) {
out << ", no entries";
@@ -408,12 +409,12 @@ ApplyBucketDiffReply::print(std::ostream& out, bool verbose,
totalSize += entry._bodyBlob.size();
if (entry.filled()) ++filled;
}
- out << "ApplyBucketDiffReply(" << getBucketId() << ", nodes: ";
+ out << "ApplyBucketDiffReply(" << getBucketId() << ", nodes: [";
for (uint32_t i=0; i<_nodes.size(); ++i) {
if (i != 0) out << ", ";
out << _nodes[i];
}
- out << _diff.size() << " entries of " << totalSize << " bytes, "
+ out << "], " << _diff.size() << " entries of " << totalSize << " bytes, "
<< (100.0 * filled / _diff.size()) << " \% filled)";
if (_diff.empty()) {
out << ", no entries";
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 9c423c6fc34..eb818ba1d48 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -8,7 +8,6 @@ import com.fasterxml.jackson.core.JsonToken;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index e5667c7b392..63fc7854a52 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -162,6 +162,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
private static final String STREAM = "stream";
+ private static final String SLICES = "slices";
+ private static final String SLICE_ID = "sliceId";
private final Clock clock;
private final Duration handlerTimeout;
@@ -985,12 +987,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (cluster.isEmpty() && path.documentType().isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+ Optional<Integer> slices = getProperty(request, SLICES, integerParser);
+ Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser);
+
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
parameters.visitInconsistentBuckets(true);
parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
+ if (slices.isPresent() && sliceId.isPresent())
+ parameters.slice(slices.get(), sliceId.get());
+ else if (slices.isPresent() != sliceId.isPresent())
+ throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set");
return parameters;
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index b23533a720e..1629777f837 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -258,6 +258,8 @@ public class DocumentV1ApiTest {
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
assertEquals(6000, parameters.getSessionTimeoutMs());
+ assertEquals(4, parameters.getSlices());
+ assertEquals(1, parameters.getSliceId());
// Put some documents in the response
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
@@ -269,7 +271,7 @@ public class DocumentV1ApiTest {
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK");
});
response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
- "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true");
+ "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true&slices=4&sliceId=1");
assertSameJson("{" +
" \"pathId\": \"/document/v1\"," +
" \"documents\": [" +
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index b55f54f9339..e61dc071b62 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -4,7 +4,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/backtrace.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <atomic>
+#include <thread>
using namespace vespalib;
@@ -74,9 +74,9 @@ struct MyState {
ExecutorStats stats = executor.getStats();
EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
- EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
- stats.acceptedTasks);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks);
EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks));
EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
if (expect_deleted == 0) {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
@@ -85,6 +85,7 @@ struct MyState {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
EXPECT_EQUAL(0u, stats.acceptedTasks);
EXPECT_EQUAL(0u, stats.rejectedTasks);
+ EXPECT_EQUAL(0u, stats.wakeupCount);
return *this;
}
};
@@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3);
+ EXPECT_TRUE(std::atomic<duration>::is_always_lock_free);
+ ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3,7);
+ stats.setUtil(3, 0.8);
EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9);
+ EXPECT_EQUAL(7u, stats.wakeupCount);
+ EXPECT_EQUAL(3u, stats.getThreadCount());
+ EXPECT_EQUAL(0.2, stats.getUtil());
+ stats.aggregate(ExecutorStats(ExecutorStats::QueueSizeT(7),8,9,11).setUtil(7,0.5));
EXPECT_EQUAL(2u, stats.queueSize.count());
EXPECT_EQUAL(8u, stats.queueSize.total());
EXPECT_EQUAL(8u, stats.queueSize.max());
@@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") {
EXPECT_EQUAL(8u, stats.queueSize.max());
EXPECT_EQUAL(4.0, stats.queueSize.average());
+ EXPECT_EQUAL(10u, stats.getThreadCount());
EXPECT_EQUAL(10u, stats.acceptedTasks);
EXPECT_EQUAL(12u, stats.rejectedTasks);
+ EXPECT_EQUAL(18u, stats.wakeupCount);
+ EXPECT_EQUAL(0.41, stats.getUtil());
+}
+TEST("Test that utilization is computed") {
+ ThreadStackExecutor executor(1, 128_Ki);
+ std::this_thread::sleep_for(1s);
+ auto stats = executor.getStats();
+ EXPECT_GREATER(0.50, stats.getUtil());
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index f1f58685570..577ae933ec2 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -51,25 +51,46 @@ private:
/**
* Struct representing stats for an executor.
+ * Note that aggregation requires sample interval to be the same(similar) for all samples.
**/
-struct ExecutorStats {
+class ExecutorStats {
+private:
+ size_t _threadCount;
+ double _absUtil;
+public:
using QueueSizeT = AggregatedAverage<size_t>;
QueueSizeT queueSize;
- size_t acceptedTasks;
- size_t rejectedTasks;
- ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {}
- ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected)
- : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected)
+ size_t acceptedTasks;
+ size_t rejectedTasks;
+ size_t wakeupCount; // Number of times a worker was woken up,
+
+ ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0, 0) {}
+ ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in)
+ : _threadCount(1),
+ _absUtil(1.0),
+ queueSize(queueSize_in),
+ acceptedTasks(accepted),
+ rejectedTasks(rejected),
+ wakeupCount(wakeupCount_in)
{}
- ExecutorStats & operator += (const ExecutorStats & rhs) {
+ void aggregate(const ExecutorStats & rhs) {
+ _threadCount += rhs._threadCount;
queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(),
queueSize.total() + rhs.queueSize.total(),
queueSize.min() + rhs.queueSize.min(),
queueSize.max() + rhs.queueSize.max());
acceptedTasks += rhs.acceptedTasks;
rejectedTasks += rhs.rejectedTasks;
+ wakeupCount += rhs.wakeupCount;
+ _absUtil += rhs._absUtil;
+ }
+ ExecutorStats & setUtil(uint32_t threadCount, double idle) {
+ _threadCount = threadCount;
+ _absUtil = (1.0 - idle) * threadCount;
return *this;
}
+ double getUtil() const { return _absUtil / _threadCount; }
+ size_t getThreadCount() const { return _threadCount; }
};
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index f80a5b4ce32..133350f3d56 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) {
}
+ThreadStackExecutorBase::Worker::Worker()
+ : lock(),
+ cond(),
+ idleTracker(),
+ pre_guard(0xaaaaaaaa),
+ idle(true),
+ post_guard(0x55555555),
+ task()
+{}
+
+void
+ThreadStackExecutorBase::Worker::verify(bool expect_idle) const {
+ (void) expect_idle;
+ assert(pre_guard == 0xaaaaaaaa);
+ assert(post_guard == 0x55555555);
+ assert(idle == expect_idle);
+ assert(!task.task == expect_idle);
+}
+
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
@@ -103,6 +122,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
return false;
}
_workers.push(&worker);
+ worker.idleTracker.set_idle(steady_clock::now());
}
{
unique_lock guard(worker.lock);
@@ -141,6 +161,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_lock(),
_cond(),
_stats(),
+ _idleTracker(steady_clock::now()),
_executorCompletion(),
_tasks(),
_workers(),
@@ -164,7 +185,8 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
-size_t ThreadStackExecutorBase::getNumThreads() const {
+size_t
+ThreadStackExecutorBase::getNumThreads() const {
return _pool->GetNumStartedThreads();
}
@@ -208,6 +230,12 @@ ThreadStackExecutorBase::getStats()
{
std::unique_lock guard(_lock);
ExecutorStats stats = _stats;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _workers.size(); i++) {
+ _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now));
+ }
+ size_t numThreads = getNumThreads();
+ stats.setUtil(numThreads, _idleTracker.reset(now, numThreads));
_stats = ExecutorStats();
_stats.queueSize.add(_taskCount);
return stats;
@@ -225,6 +253,8 @@ ThreadStackExecutorBase::execute(Task::UP task)
if (!_workers.empty()) {
Worker *worker = _workers.back();
_workers.popBack();
+ _idleTracker.was_idle(worker->idleTracker.set_active(steady_clock::now()));
+ _stats.wakeupCount++;
guard.unlock(); // <- UNLOCK
assignTask(std::move(taggedTask), *worker);
} else {
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 66a34bfde95..c3552cfe579 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -7,6 +7,7 @@
#include "arrayqueue.hpp"
#include "gate.h"
#include "runnable.h"
+#include "executor_idle_tracking.h"
#include <vector>
#include <functional>
@@ -47,18 +48,13 @@ private:
struct Worker {
std::mutex lock;
std::condition_variable cond;
- uint32_t pre_guard;
- bool idle;
- uint32_t post_guard;
- TaggedTask task;
- Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
- void verify(bool expect_idle) const {
- (void) expect_idle;
- assert(pre_guard == 0xaaaaaaaa);
- assert(post_guard == 0x55555555);
- assert(idle == expect_idle);
- assert(!task.task == expect_idle);
- }
+ ThreadIdleTracker idleTracker;
+ uint32_t pre_guard;
+ bool idle;
+ uint32_t post_guard;
+ TaggedTask task;
+ Worker();
+ void verify(bool expect_idle) const;
};
struct BarrierCompletion {
@@ -81,6 +77,7 @@ private:
mutable std::mutex _lock;
std::condition_variable _cond;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;