diff options
88 files changed, 1127 insertions, 523 deletions
diff --git a/cloud-tenant-base-dependencies-enforcer/pom.xml b/cloud-tenant-base-dependencies-enforcer/pom.xml index e2cc7085353..9f4a14387f4 100644 --- a/cloud-tenant-base-dependencies-enforcer/pom.xml +++ b/cloud-tenant-base-dependencies-enforcer/pom.xml @@ -22,7 +22,7 @@ <aopalliance.version>1.0</aopalliance.version> <athenz.version>1.10.14</athenz.version> <bouncycastle.version>1.68</bouncycastle.version> - <felix.version>7.0.1</felix.version> + <felix.version>6.0.3</felix.version> <felix.log.version>1.0.1</felix.log.version> <findbugs.version>1.3.9</findbugs.version> <guava.version>20.0</guava.version> diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index ebde8f3a98c..948e416eb53 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -1004,8 +1004,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // TODO expose and use monotonic clock instead of system clock final long maxDeadlineTimePointMs = timer.getCurrentTimeInMillis() + options.getMaxDeferredTaskVersionWaitTime().toMillis(); for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { - context.log(logger, Level.FINEST, () -> String.format("Adding task of type '%s' to be completed at version %d", - task.getClass().getName(), completeAtVersion)); + context.log(logger, Level.INFO, task + " will be completed at version " + completeAtVersion); taskCompletionQueue.add(new VersionDependentTaskCompletion(completeAtVersion, task, maxDeadlineTimePointMs)); } tasksPendingStateRecompute.clear(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java index 39ffed1051a..949ad6f56a2 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java @@ -123,4 +123,6 @@ public abstract class RemoteClusterControllerTask { } } + @Override + public String toString() { return RemoteClusterControllerTask.class.getSimpleName(); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java index f406ec46ccc..431c207af5c 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java @@ -98,6 +98,15 @@ public class SetNodeStateRequest extends Request<SetResponse> { return super.isFailed() || (resultSet && !result.getWasModified()); } + @Override + public String toString() { + return "SetNodeStateRequest{" + + "node=" + id + "," + + "newState=" + newStates.get("user") + "," + + (probe ? "probe=" + probe + "," : "") + + "}"; + } + static SetResponse setWantedState( ContentCluster cluster, SetUnitStateRequest.Condition condition, diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 2c51aa89c66..8c2957502a8 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -103,6 +103,7 @@ public interface ModelContext { @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default int distributorMergeBusyWait() { return 10; } @ModelFeatureFlag(owners = {"vekterli", "geirst"}) default boolean distributorEnhancedMaintenanceScheduling() { return false; } @ModelFeatureFlag(owners = {"arnej"}) default boolean forwardIssuesAsErrors() { return true; } + @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default boolean asyncApplyBucketDiff() { return false; } } /** Warning: As elsewhere in this package, do not make backwards incompatible changes that will break old config models! */ diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 7a09a36c1d7..b7506587102 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -70,6 +70,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea private int maxUnCommittedMemory = 123456; private double diskBloatFactor = 0.2; private boolean distributorEnhancedMaintenanceScheduling = false; + private boolean asyncApplyBucketDiff = false; @Override public ModelContext.FeatureFlags featureFlags() { return this; } @Override public boolean multitenant() { return multitenant; } @@ -120,6 +121,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea @Override public int docstoreCompressionLevel() { return docstoreCompressionLevel; } @Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; } @Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; } + @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; } public TestProperties maxUnCommittedMemory(int maxUnCommittedMemory) { this.maxUnCommittedMemory = maxUnCommittedMemory; @@ -307,6 +309,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea return this; } + public TestProperties setAsyncApplyBucketDiff(boolean value) { + asyncApplyBucketDiff = value; + return this; + } + public static class Spec implements ConfigServerSpec { private final String hostName; diff --git a/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java b/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java index 7f3b018d569..85b8d7fbe79 100644 --- a/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java +++ b/config-model/src/main/java/com/yahoo/searchdefinition/RankProfile.java @@ -674,25 +674,26 @@ public class RankProfile implements Cloneable { inputFeatures.put(ref, declaredType); } - public static class ExecuteOperation { - public enum Phase { onmatch, onrerank, onsummary} + public static class MutateOperation { + public enum Phase { onmatch, on_first_phase, on_second_phase, onsummary} final Phase phase; final String attribute; final String operation; - ExecuteOperation(Phase phase, String attribute, String operation) { + MutateOperation(Phase phase, String attribute, String operation) { this.phase = phase; this.attribute = attribute; this.operation = operation; } } - private final List<ExecuteOperation> executeOperations = new ArrayList<>(); + private final List<MutateOperation> mutateOperations = new ArrayList<>(); - public void addExecuteOperation(ExecuteOperation.Phase phase, String attribute, String operation) { - executeOperations.add(new ExecuteOperation(phase, attribute, operation)); - addRankProperty("vespa.execute." + phase + ".attribute", attribute); - addRankProperty("vespa.execute." + phase + ".operation", operation); + public void addMutateOperation(MutateOperation.Phase phase, String attribute, String operation) { + mutateOperations.add(new MutateOperation(phase, attribute, operation)); + String prefix = "vespa.mutate." + phase.toString().replace('-', '_'); + addRankProperty(prefix + ".attribute", attribute); + addRankProperty(prefix + ".operation", operation); } - public List<ExecuteOperation> getExecuteOperations() { return executeOperations; } + public List<MutateOperation> getMutateOperations() { return mutateOperations; } public RankingExpressionFunction findFunction(String name) { RankingExpressionFunction function = functions.get(name); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java index d8c59ebda65..a65e6fe16c0 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java @@ -371,6 +371,10 @@ public class VespaMetricSet { metrics.add(new Metric(prefix + ".queuesize.count")); metrics.add(new Metric(prefix + ".maxpending.last")); // TODO: Remove in Vespa 8 metrics.add(new Metric(prefix + ".accepted.rate")); + metrics.add(new Metric(prefix + ".wakeups.rate")); + metrics.add(new Metric(prefix + ".utilization.max")); + metrics.add(new Metric(prefix + ".utilization.sum")); + metrics.add(new Metric(prefix + ".utilization.count")); } private static Set<Metric> getSearchNodeMetrics() { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java index 5c692e8ef6b..d7f6fb6c581 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java @@ -47,6 +47,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { private final int reponseNumThreads; private final StorFilestorConfig.Response_sequencer_type.Enum responseSequencerType; private final boolean useAsyncMessageHandlingOnSchedule; + private final boolean asyncApplyBucketDiff; private static StorFilestorConfig.Response_sequencer_type.Enum convertResponseSequencerType(String sequencerType) { try { @@ -62,6 +63,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { this.reponseNumThreads = featureFlags.defaultNumResponseThreads(); this.responseSequencerType = convertResponseSequencerType(featureFlags.responseSequencerType()); useAsyncMessageHandlingOnSchedule = featureFlags.useAsyncMessageHandlingOnSchedule(); + asyncApplyBucketDiff = featureFlags.asyncApplyBucketDiff(); } @Override @@ -73,6 +75,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { builder.num_response_threads(reponseNumThreads); builder.response_sequencer_type(responseSequencerType); builder.use_async_message_handling_on_schedule(useAsyncMessageHandlingOnSchedule); + builder.async_apply_bucket_diff(asyncApplyBucketDiff); } } diff --git a/config-model/src/main/javacc/SDParser.jj b/config-model/src/main/javacc/SDParser.jj index 0f60db40069..e6a5d1f2e4e 100644 --- a/config-model/src/main/javacc/SDParser.jj +++ b/config-model/src/main/javacc/SDParser.jj @@ -207,10 +207,10 @@ TOKEN : | < LOOSE: "loose" > | < STRICT: "strict" > | < DOCUMENT: "document" > -| < EXECUTE: "execute" > | < OPERATION: "operation" > | < ON_MATCH: "on-match" > -| < ON_RERANK: "on-rerank" > +| < ON_FIRST_PHASE: "on-first-phase" > +| < ON_SECOND_PHASE: "on-second-phase" > | < ON_SUMMARY: "on-summary" > | < STRUCT: "struct" > | < INHERITS: "inherits" > @@ -239,6 +239,7 @@ TOKEN : | < CONSTANT: "constant"> | < ONNXMODEL: "onnx-model"> | < MODEL: "model" > +| < MUTATE: "mutate" > | < RANKPROFILE: "rank-profile" > | < RANKDEGRADATIONFREQ: "rank-degradation-frequency" > | < RANKDEGRADATION: "rank-degradation" > @@ -2066,7 +2067,7 @@ Object rankProfileItem(RankProfile profile) : { } | firstPhase(profile) | matchPhase(profile) | function(profile) - | execute(profile) + | mutate(profile) | ignoreRankFeatures(profile) | numThreadsPerSearch(profile) | minHitsPerThread(profile) @@ -2096,39 +2097,40 @@ void inheritsRankProfile(RankProfile profile) : } /** - * This rule consumes an execute statement of a rank-profile. + * This rule consumes an mutate statement of a rank-profile. * * @param profile The profile to modify. */ -void execute(RankProfile profile) : +void mutate(RankProfile profile) : { } { - <EXECUTE> lbrace() (execute_operation(profile) <NL>)+ <RBRACE> + <MUTATE> lbrace() (mutate_operation(profile) <NL>)+ <RBRACE> { } } -void execute_operation(RankProfile profile) : +void mutate_operation(RankProfile profile) : { String attribute, operation; - RankProfile.ExecuteOperation.Phase phase; + RankProfile.MutateOperation.Phase phase; } { - ( <ON_MATCH> { phase = RankProfile.ExecuteOperation.Phase.onmatch; } - | <ON_RERANK> { phase = RankProfile.ExecuteOperation.Phase.onrerank; } - | <ON_SUMMARY> { phase = RankProfile.ExecuteOperation.Phase.onsummary; } + ( <ON_MATCH> { phase = RankProfile.MutateOperation.Phase.onmatch; } + | <ON_FIRST_PHASE> { phase = RankProfile.MutateOperation.Phase.on_first_phase; } + | <ON_SECOND_PHASE> { phase = RankProfile.MutateOperation.Phase.on_second_phase; } + | <ON_SUMMARY> { phase = RankProfile.MutateOperation.Phase.onsummary; } ) - lbrace() attribute = identifier() operation = execute_expr() (<NL>)* <RBRACE> - { profile.addExecuteOperation(phase, attribute, operation); } + lbrace() attribute = identifier() operation = mutate_expr() (<NL>)* <RBRACE> + { profile.addMutateOperation(phase, attribute, operation); } } -String execute_expr() : +String mutate_expr() : { String op; Number constant = null; } { - (("++" | "--") { op = token.image; } | ("+=" | "-=" | "*=" | "/=" | "%=" | "=") { op = token.image; } constant = consumeNumber()) + (("+=" | "-=" | "=") { op = token.image; } constant = consumeNumber()) { return constant != null ? (op + constant) : op; } } diff --git a/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java b/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java index e2ca8ac4f65..637f7571a68 100644 --- a/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java +++ b/config-model/src/test/java/com/yahoo/searchdefinition/RankPropertiesTestCase.java @@ -78,7 +78,7 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase { } } @Test - public void testRankProfileExecute() throws ParseException { + public void testRankProfileMutate() throws ParseException { RankProfileRegistry rankProfileRegistry = new RankProfileRegistry(); SearchBuilder builder = new SearchBuilder(rankProfileRegistry); builder.importString(joinLines( @@ -101,15 +101,18 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase { " attribute: mutable", " }", " rank-profile a {", - " execute {", + " mutate {", " on-match {", - " synthetic_attribute_a ++", + " synthetic_attribute_a += 7", " }", - " on-rerank {", + " on-first-phase {", + " synthetic_attribute_b +=1", + " }", + " on-second-phase {", " synthetic_attribute_b = 1.01", " }", " on-summary {", - " synthetic_attribute_c --", + " synthetic_attribute_c -= 1", " }", " }", " first-phase {", @@ -128,28 +131,33 @@ public class RankPropertiesTestCase extends AbstractSchemaTestCase { builder.build(); Schema schema = builder.getSearch(); RankProfile a = rankProfileRegistry.get(schema, "a"); - List<RankProfile.ExecuteOperation> operations = a.getExecuteOperations(); - assertEquals(3, operations.size()); - assertEquals(RankProfile.ExecuteOperation.Phase.onmatch, operations.get(0).phase); + List<RankProfile.MutateOperation> operations = a.getMutateOperations(); + assertEquals(4, operations.size()); + assertEquals(RankProfile.MutateOperation.Phase.onmatch, operations.get(0).phase); assertEquals("synthetic_attribute_a", operations.get(0).attribute); - assertEquals("++", operations.get(0).operation); - assertEquals(RankProfile.ExecuteOperation.Phase.onrerank, operations.get(1).phase); + assertEquals("+=7", operations.get(0).operation); + assertEquals(RankProfile.MutateOperation.Phase.on_first_phase, operations.get(1).phase); assertEquals("synthetic_attribute_b", operations.get(1).attribute); - assertEquals("=1.01", operations.get(1).operation); - assertEquals(RankProfile.ExecuteOperation.Phase.onsummary, operations.get(2).phase); - assertEquals("synthetic_attribute_c", operations.get(2).attribute); - assertEquals("--", operations.get(2).operation); + assertEquals("+=1", operations.get(1).operation); + assertEquals(RankProfile.MutateOperation.Phase.on_second_phase, operations.get(2).phase); + assertEquals("synthetic_attribute_b", operations.get(2).attribute); + assertEquals("=1.01", operations.get(2).operation); + assertEquals(RankProfile.MutateOperation.Phase.onsummary, operations.get(3).phase); + assertEquals("synthetic_attribute_c", operations.get(3).attribute); + assertEquals("-=1", operations.get(3).operation); AttributeFields attributeFields = new AttributeFields(schema); RawRankProfile raw = new RawRankProfile(a, new LargeRankExpressions(new MockFileRegistry()), new QueryProfileRegistry(), new ImportedMlModels(), attributeFields, new TestProperties()); - assertEquals(7, raw.configProperties().size()); - assertEquals("(vespa.execute.onmatch.attribute, synthetic_attribute_a)", raw.configProperties().get(0).toString()); - assertEquals("(vespa.execute.onmatch.operation, ++)", raw.configProperties().get(1).toString()); - assertEquals("(vespa.execute.onrerank.attribute, synthetic_attribute_b)", raw.configProperties().get(2).toString()); - assertEquals("(vespa.execute.onrerank.operation, =1.01)", raw.configProperties().get(3).toString()); - assertEquals("(vespa.execute.onsummary.attribute, synthetic_attribute_c)", raw.configProperties().get(4).toString()); - assertEquals("(vespa.execute.onsummary.operation, --)", raw.configProperties().get(5).toString()); - assertEquals("(vespa.rank.firstphase, a)", raw.configProperties().get(6).toString()); + assertEquals(9, raw.configProperties().size()); + assertEquals("(vespa.mutate.onmatch.attribute, synthetic_attribute_a)", raw.configProperties().get(0).toString()); + assertEquals("(vespa.mutate.onmatch.operation, +=7)", raw.configProperties().get(1).toString()); + assertEquals("(vespa.mutate.on_first_phase.attribute, synthetic_attribute_b)", raw.configProperties().get(2).toString()); + assertEquals("(vespa.mutate.on_first_phase.operation, +=1)", raw.configProperties().get(3).toString()); + assertEquals("(vespa.mutate.on_second_phase.attribute, synthetic_attribute_b)", raw.configProperties().get(4).toString()); + assertEquals("(vespa.mutate.on_second_phase.operation, =1.01)", raw.configProperties().get(5).toString()); + assertEquals("(vespa.mutate.onsummary.attribute, synthetic_attribute_c)", raw.configProperties().get(6).toString()); + assertEquals("(vespa.mutate.onsummary.operation, -=1)", raw.configProperties().get(7).toString()); + assertEquals("(vespa.rank.firstphase, a)", raw.configProperties().get(8).toString()); } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 9d8d7509966..739f8b7fff2 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -142,6 +142,16 @@ public class StorageClusterTest { return new StorServerConfig(builder); } + private StorFilestorConfig filestorConfigFromProducer(StorFilestorConfig.Producer producer) { + var builder = new StorFilestorConfig.Builder(); + producer.getConfig(builder); + return new StorFilestorConfig(builder); + } + + private StorFilestorConfig filestorConfigFromProperties(TestProperties properties) { + return filestorConfigFromProducer(parse(cluster("foo", ""), properties)); + } + @Test public void testMergeFeatureFlags() { var config = configFromProperties(new TestProperties().setMaxMergeQueueSize(1919).setMaxConcurrentMergesPerNode(37)); @@ -159,6 +169,15 @@ public class StorageClusterTest { } @Test + public void async_apply_bucket_diff_can_be_controlled_by_feature_flag() { + var config = filestorConfigFromProperties(new TestProperties()); + assertFalse(config.async_apply_bucket_diff()); + + config = filestorConfigFromProperties(new TestProperties().setAsyncApplyBucketDiff(true)); + assertTrue(config.async_apply_bucket_diff()); + } + + @Test public void testVisitors() { StorVisitorConfig.Builder builder = new StorVisitorConfig.Builder(); parse(cluster("bees", @@ -188,9 +207,7 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(7, config.num_threads()); assertFalse(config.enable_multibit_split_optimalization()); @@ -199,9 +216,7 @@ public class StorageClusterTest { { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(7, config.num_threads()); } } @@ -215,9 +230,7 @@ public class StorageClusterTest { "</tuning>")), new Flavor(new FlavorsConfig.Flavor.Builder().name("test-flavor").minCpuCores(9).build()) ); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(2, config.num_response_threads()); assertEquals(StorFilestorConfig.Response_sequencer_type.ADAPTIVE, config.response_sequencer_type()); assertEquals(7, config.num_threads()); @@ -238,9 +251,7 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(4, config.num_threads()); assertFalse(config.enable_multibit_split_optimalization()); @@ -248,9 +259,7 @@ public class StorageClusterTest { { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(4, config.num_threads()); } } @@ -262,17 +271,13 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(8, config.num_threads()); } { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(9, config.num_threads()); } } @@ -285,17 +290,13 @@ public class StorageClusterTest { @Test public void testFeatureFlagControlOfResponseSequencer() { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")).getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT"))); assertEquals(13, config.num_response_threads()); assertEquals(StorFilestorConfig.Response_sequencer_type.THROUGHPUT, config.response_sequencer_type()); } private void verifyAsyncMessageHandlingOnSchedule(boolean expected, boolean value) { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)).getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value))); assertEquals(expected, config.use_async_message_handling_on_schedule()); } @Test diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index b2360ef262d..7fdc18827f4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -197,6 +197,7 @@ public class ModelContextImpl implements ModelContext { private final boolean distributorEnhancedMaintenanceScheduling; private final int maxUnCommittedMemory; private final boolean forwardIssuesAsErrors; + private final boolean asyncApplyBucketDiff; public FeatureFlags(FlagSource source, ApplicationId appId) { this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); @@ -232,6 +233,7 @@ public class ModelContextImpl implements ModelContext { this.distributorEnhancedMaintenanceScheduling = flagValue(source, appId, Flags.DISTRIBUTOR_ENHANCED_MAINTENANCE_SCHEDULING); this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY);; this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS); + this.asyncApplyBucketDiff = flagValue(source, appId, Flags.ASYNC_APPLY_BUCKET_DIFF); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @@ -269,6 +271,7 @@ public class ModelContextImpl implements ModelContext { @Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; } @Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; } @Override public boolean forwardIssuesAsErrors() { return forwardIssuesAsErrors; } + @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; } private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) { return flag.bindTo(source) diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 083cb535bfa..0aeea5ce2d5 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -433,7 +433,8 @@ public class SessionRepository { private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { - if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { + Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId); + if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) { log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId()); log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); diff --git a/container-dependency-versions/pom.xml b/container-dependency-versions/pom.xml index e2b6f3b785b..385ec058424 100644 --- a/container-dependency-versions/pom.xml +++ b/container-dependency-versions/pom.xml @@ -403,8 +403,8 @@ <properties> <aopalliance.version>1.0</aopalliance.version> <bouncycastle.version>1.68</bouncycastle.version> - <felix.version>7.0.1</felix.version> - <felix.log.version>1.0.1</felix.log.version> + <felix.version>6.0.3</felix.version> + <felix.log.version>1.0.1</felix.log.version> <!-- TODO Vespa 8: upgrade! --> <findbugs.version>1.3.9</findbugs.version> <guava.version>20.0</guava.version> <guice.version>3.0</guice.version> diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java index 854780dd336..1ddb50b0a6b 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java @@ -122,12 +122,7 @@ public class ResourceMeterMaintainer extends ControllerMaintainer { private void reportResourceSnapshots(Collection<ResourceSnapshot> resourceSnapshots) { meteringClient.consume(resourceSnapshots); - metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap())); - // total metered resource usage, for alerting on drastic changes - metric.set(METERING_TOTAL_REPORTED, - resourceSnapshots.stream() - .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(), - metric.createContext(Collections.emptyMap())); + updateMetrics(resourceSnapshots); try (var lock = curator.lockMeteringRefreshTime()) { if (needsRefresh(curator.readMeteringRefreshTime())) { @@ -194,4 +189,25 @@ public class ResourceMeterMaintainer extends ControllerMaintainer { double cost = new NodeResources(allocation.getCpuCores(), allocation.getMemoryGb(), allocation.getDiskGb(), 0).cost(); return Math.round(cost * 100.0 / costDivisor) / 100.0; } + + private void updateMetrics(Collection<ResourceSnapshot> resourceSnapshots) { + metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap())); + // total metered resource usage, for alerting on drastic changes + metric.set(METERING_TOTAL_REPORTED, + resourceSnapshots.stream() + .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(), + metric.createContext(Collections.emptyMap())); + + resourceSnapshots.forEach(snapshot -> { + var context = metric.createContext(Map.of( + "tenant", snapshot.getApplicationId().tenant().value(), + "applicationId", snapshot.getApplicationId().toFullString(), + "zoneId", snapshot.getZoneId() + )); + metric.set("metering.vcpu", snapshot.getCpuCores(), context); + metric.set("metering.memoryGB", snapshot.getMemoryGb(), context); + metric.set("metering.diskGB", snapshot.getDiskGb(), context); + }); + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java index a255a6c37d8..5b923c2ee59 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java @@ -98,6 +98,8 @@ public class ResourceMeterMaintainerTest { assertEquals(tester.clock().millis()/1000, metrics.getMetric("metering_last_reported")); assertEquals(2224.0d, (Double) metrics.getMetric("metering_total_reported"), Double.MIN_VALUE); + assertEquals(24d, (Double) metrics.getMetric(context -> "tenant1".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE); + assertEquals(40d, (Double) metrics.getMetric(context -> "tenant2".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE); // Metering is not refreshed assertFalse(snapshotConsumer.isRefreshed()); diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 9cc4f60ed7e..78a58f24a65 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -799,7 +799,7 @@ "public" ], "methods": [ - "public void <init>(int, com.yahoo.documentapi.ProgressToken)", + "public void <init>(int, com.yahoo.documentapi.ProgressToken, int, int)", "protected boolean isLosslessResetPossible()", "public boolean hasNext()", "public boolean shouldYield()", @@ -851,6 +851,7 @@ "public void setDistributionBitCount(int)", "public boolean visitsAllBuckets()", "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken)", + "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken, int, int)", "public static com.yahoo.documentapi.VisitorIterator createFromExplicitBucketSet(java.util.Set, int, com.yahoo.documentapi.ProgressToken)" ], "fields": [] @@ -931,6 +932,9 @@ "public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()", "public boolean skipBucketsOnFatalErrors()", "public void skipBucketsOnFatalErrors(boolean)", + "public void slice(int, int)", + "public int getSlices()", + "public int getSliceId()", "public void setDynamicallyIncreaseMaxBucketsPerVisitor(boolean)", "public void setDynamicMaxBucketsIncreaseFactor(float)", "public java.lang.String toString()" diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java index 9957898e459..4a77d30ec92 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java @@ -387,6 +387,18 @@ public class ProgressToken { } /** + * Marks the current bucket as finished and advances the bucket cursor; + * throws instead if the current bucket is already {@link #addBucket added}. + */ + void skipCurrentBucket() { + if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId()))) + throw new IllegalStateException("Current bucket was already added to the explicit bucket set"); + + ++finishedBucketCount; + ++bucketCursor; + } + + /** * Directly generate a bucket Id key for the <code>n</code>th bucket in * reverse sorted order. * @@ -428,6 +440,14 @@ public class ProgressToken { return bucketCursor; } + static BucketId toBucketId(long bucketCursor, int distributionBits) { + return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits))); + } + + BucketId getCurrentBucketId() { + return toBucketId(getBucketCursor(), getDistributionBitCount()); + } + protected void setBucketCursor(long bucketCursor) { this.bucketCursor = bucketCursor; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index e11bdf7f18c..e15512ca71b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -81,12 +81,24 @@ public class VisitorIterator { protected static class DistributionRangeBucketSource implements BucketSource { private boolean flushActive = false; private int distributionBitCount; + private final int slices; + private final int sliceId; // Wouldn't need this if this were a non-static class, but do it for // the sake of keeping things identical in Java and C++ private ProgressToken progressToken; public DistributionRangeBucketSource(int distributionBitCount, - ProgressToken progress) { + ProgressToken progress, + int slices, int sliceId) { + if (slices < 1) { + throw new IllegalArgumentException("slices must be positive, but was " + slices); + } + if (sliceId < 0 || sliceId >= slices) { + throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId); + } + + this.slices = slices; + this.sliceId = sliceId; progressToken = progress; // New progress token (could also be empty, in which this is a @@ -148,6 +160,7 @@ public class VisitorIterator { } // Should be all fixed up and good to go progressToken.setInconsistentState(false); + skipToSlice(); } protected boolean isLosslessResetPossible() { @@ -203,6 +216,7 @@ public class VisitorIterator { assert(p.getActiveBucketCount() == 0); p.clearAllBuckets(); p.setBucketCursor(0); + skipToSlice(); return; } @@ -292,7 +306,14 @@ public class VisitorIterator { } public boolean hasNext() { - return progressToken.getBucketCursor() < (1L << distributionBitCount); + // There is a next bucket iff. there is a bucket no earlier than the cursor which + // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're + // not yet properly initialised, with a real distribution bit count, we ignore this. + long nextBucket = progressToken.getBucketCursor(); + if (distributionBitCount != 1) { + nextBucket += Math.floorMod(sliceId - nextBucket, slices); + } + return nextBucket < (1L << distributionBitCount); } public boolean shouldYield() { @@ -311,13 +332,27 @@ public class VisitorIterator { public BucketProgress getNext() { assert(hasNext()) : "getNext() called with hasNext() == false"; - long currentPosition = progressToken.getBucketCursor(); - long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount); - ++currentPosition; - progressToken.setBucketCursor(currentPosition); - return new BucketProgress( - new BucketId(ProgressToken.keyToBucketId(key)), - new BucketId()); + + // Create the progress to return for creating visitors, and advance bucket cursor. + BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId()); + progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + + // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when + // hasNext() turns false, but there are still super buckets left after the current. + skipToSlice(); + + return progress; + } + + // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped + // buckets as complete, but only if we've been initialised with a proper distribution bit count. + private void skipToSlice() { + if (distributionBitCount == 1) + return; + + while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) { + progressToken.skipCurrentBucket(); + } } public int getDistributionBitCount() { @@ -732,6 +767,13 @@ public class VisitorIterator { return bucketSource.visitsAllBuckets(); } + public static VisitorIterator createFromDocumentSelection( + String documentSelection, + BucketIdFactory idFactory, + int distributionBitCount, + ProgressToken progress) throws ParseException { + return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0); + } /** * Create a new <code>VisitorIterator</code> instance based on the given document * selection string. @@ -753,7 +795,9 @@ public class VisitorIterator { String documentSelection, BucketIdFactory idFactory, int distributionBitCount, - ProgressToken progress) throws ParseException { + ProgressToken progress, + int slices, + int sliceId) throws ParseException { BucketSelector bucketSel = new BucketSelector(idFactory); Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection); BucketSource src; @@ -763,7 +807,7 @@ public class VisitorIterator { // bit-based range source if (rawBuckets == null) { // Range source - src = new DistributionRangeBucketSource(distributionBitCount, progress); + src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId); } else { // Explicit source src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java index 8b0c8538855..44675d8d2ac 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java @@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters { private int traceLevel = 0; private ThrottlePolicy throttlePolicy = null; private boolean skipBucketsOnFatalErrors = false; + private int slices = 1; + private int sliceId = 0; // Advanced parameter, only for internal use. Set<BucketId> bucketsToVisit = null; @@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters { params.getDynamicMaxBucketsIncreaseFactor()); setTraceLevel(params.getTraceLevel()); skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors()); + slice(params.getSlices(), getSliceId()); } // Get functions @@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters { public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; } + public void slice(int slices, int sliceId) { + this.slices = slices; + this.sliceId = sliceId; + } + + public int getSlices() { return slices; } + + public int getSliceId() { return sliceId; } + /** * Set whether or not max buckets per visitor value should be dynamically * increased when using orderdoc and visitors do not return at least half diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 5d07d433f18..f7242695490 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession { params.getDocumentSelection(), bucketIdFactory, 1, - progressToken); + progressToken, + params.getSlices(), + params.getSliceId()); } else { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "parameters specify explicit bucket set " + diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java index 01cdad244a8..fb5f5bd2cfb 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java @@ -77,8 +77,119 @@ public class VisitorIteratorTestCase { } @Test + public void testInvalidSlicing() throws ParseException { + int distBits = 4; + BucketIdFactory idFactory = new BucketIdFactory(); + ProgressToken progress = new ProgressToken(); + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 0, 0); + } + catch (IllegalArgumentException e) { + assertEquals("slices must be positive, but was 0", e.getMessage()); + } + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, 1); + } + catch (IllegalArgumentException e) { + assertEquals("sliceId must be in [0, 1), but was 1", e.getMessage()); + } + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, -1); + } + catch (IllegalArgumentException e) { + assertEquals("sliceId must be in [0, 1), but was -1", e.getMessage()); + } + } + + @Test + public void testIgnoredSlicing() throws ParseException { + int distBits = 1; + BucketIdFactory idFactory = new BucketIdFactory(); + ProgressToken progress = new ProgressToken(); + + VisitorIterator iter = VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 3, 2); + + // Iterator with a single distribution bit ignores slicing. + assertTrue(iter.hasNext()); + assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket()); + assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket()); + assertFalse(iter.hasNext()); + } + + @Test + public void testValidSlicing() throws ParseException { + int distBits = 4; + long buckets = 1 << distBits; + BucketIdFactory idFactory = new BucketIdFactory(); + for (int slices = 1; slices <= 2 * buckets; slices++) { + long bucketsTotal = 0; + for (int sliceId = 0; sliceId < slices; sliceId++) { + ProgressToken progress = new ProgressToken(); + + // docsel will be unknown --> entire bucket range will be covered + VisitorIterator iter = VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, slices, sliceId); + + String context = "slices: " + slices + ", sliceId: " + sliceId; + assertEquals(context, progress.getDistributionBitCount(), distBits); + assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); + + assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId)); + assertEquals(context, progress.getTotalBucketCount(), buckets); + + // First, get+update half of the buckets, marking them as done + long bucketCount = 0; + + // Do buckets in the first half. + while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) { + VisitorIterator.BucketProgress ids = iter.getNext(); + iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET); + ++bucketCount; + ++bucketsTotal; + } + + if (slices + sliceId < buckets) { // Otherwise, we're already done ... + assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount); + // Should be no buckets in limbo at this point + assertFalse(context, progress.hasActive()); + assertFalse(context, progress.hasPending()); + assertFalse(context, iter.isDone()); + assertTrue(context, iter.hasNext()); + assertEquals(context, progress.getFinishedBucketCount(), bucketCount * slices + sliceId); + assertFalse(context, progress.isFinished()); + } + + while (iter.hasNext()) { + VisitorIterator.BucketProgress ids = iter.getNext(); + iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET); + ++bucketCount; + ++bucketsTotal; + } + + assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount); + // Should be no buckets in limbo at this point + assertFalse(context, progress.hasActive()); + assertFalse(context, progress.hasPending()); + assertTrue(context, iter.isDone()); + assertFalse(context, iter.hasNext()); + assertEquals(context, progress.getFinishedBucketCount(), buckets); + assertTrue(context, progress.isFinished()); + } + assertEquals("slices: " + slices, buckets, bucketsTotal); + } + } + + @Test public void testProgressSerializationRange() throws ParseException { int distBits = 4; + int buckets = 1 << distBits; BucketIdFactory idFactory = new BucketIdFactory(); ProgressToken progress = new ProgressToken(); @@ -91,11 +202,11 @@ public class VisitorIteratorTestCase { assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); assertEquals(progress.getFinishedBucketCount(), 0); - assertEquals(progress.getTotalBucketCount(), 1 << distBits); + assertEquals(progress.getTotalBucketCount(), buckets); // First, get+update half of the buckets, marking them as done long bucketCount = 0; - long bucketStop = 1 << (distBits - 1); + long bucketStop = buckets / 2; while (iter.hasNext() && bucketCount != bucketStop) { VisitorIterator.BucketProgress ids = iter.getNext(); @@ -119,7 +230,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(desired.toString(), progress.toString()); @@ -132,7 +243,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -154,21 +265,21 @@ public class VisitorIteratorTestCase { // Now fetch a subset of the remaining buckets without finishing them, // keeping some in the active set and some in pending - int pendingTotal = 1 << (distBits - 3); - int activeTotal = 1 << (distBits - 3); - Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>(); + int pendingTotal = buckets / 8; + int activeTotal = buckets / 8; + Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>(); // Pre-fetch, since otherwise we'd reuse pending buckets for (int i = 0; i < pendingTotal + activeTotal; ++i) { - buckets.add(iter.getNext()); + trackedBuckets.add(iter.getNext()); } for (int i = 0; i < pendingTotal + activeTotal; ++i) { - VisitorIterator.BucketProgress idTemp = buckets.get(i); + VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i); if (i < activeTotal) { // Make them 50% done iter.update(idTemp.getSuperbucket(), - new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits))); + new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets))); } // else: leave hanging as active } @@ -186,7 +297,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal); @@ -206,7 +317,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -225,7 +336,7 @@ public class VisitorIteratorTestCase { // Finish all the active buckets for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) { - iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); + iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); ++bucketCount; } @@ -246,16 +357,16 @@ public class VisitorIteratorTestCase { assertFalse(iter.hasNext()); assertTrue(progress.isFinished()); // Cumulative number of finished buckets must match 2^distbits - assertEquals(bucketCount, 1 << distBits); + assertEquals(bucketCount, buckets); StringBuilder finished = new StringBuilder(); finished.append("VDS bucket progress file (100.0% completed)\n"); finished.append(distBits); finished.append('\n'); - finished.append(1 << distBits); // Cursor + finished.append(buckets); // Cursor finished.append('\n'); - finished.append(1 << distBits); // Finished + finished.append(buckets); // Finished finished.append('\n'); - finished.append(1 << distBits); // Total + finished.append(buckets); // Total finished.append('\n'); assertEquals(progress.toString(), finished.toString()); diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index e355bb4d6c4..e7441acc203 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -369,6 +369,20 @@ public class Flags { "Takes effect at redeploy", ZONE_ID, APPLICATION_ID); + public static final UnboundBooleanFlag ASYNC_APPLY_BUCKET_DIFF = defineFeatureFlag( + "async-apply-bucket-diff", false, + List.of("geirst", "vekterli"), "2021-10-22", "2022-01-31", + "Whether portions of apply bucket diff handling will be performed asynchronously", + "Takes effect at redeploy", + ZONE_ID, APPLICATION_ID); + + public static final UnboundStringFlag JDK_VERSION = defineStringFlag( + "jdk-version", "11", + List.of("hmusum"), "2021-10-25", "2021-11-25", + "JDK version to use inside containers", + "Takes effect on restart of Docker container", + APPLICATION_ID); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners, String createdAt, String expiresAt, String description, diff --git a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java index 2c0614805a9..88e2393904e 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java @@ -218,7 +218,6 @@ public class PermanentFlags { "Takes effect immediately", ZONE_ID, APPLICATION_ID); - private PermanentFlags() {} private static UnboundBooleanFlag defineFeatureFlag( diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java index 92cc35fc354..962e6b32947 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.HttpRoute; diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java index 50af29f92aa..91810b50778 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import com.yahoo.security.tls.MixedMode; diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java index e01d278ff38..52f7ad9b56b 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java index 514eae56fe8..adbc445de1a 100644 --- a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java +++ b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc4.retry; import org.apache.http.HttpResponse; diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java index 58dc25fdf1a..78c413fba56 100644 --- a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java +++ b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.HttpRoute; diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java index b7993de5d82..50ac90b6181 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java @@ -168,8 +168,5 @@ class OsgiLogHandler extends Handler { return new Hashtable<>(); } - @Override - public <A> A adapt(Class<A> aClass) { return null; } - } } diff --git a/parent/pom.xml b/parent/pom.xml index a3d2111062e..18fad225aad 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -849,10 +849,10 @@ <properties> <antlr.version>3.5.2</antlr.version> <antlr4.version>4.5</antlr4.version> - <apache.httpclient.version>4.5.12</apache.httpclient.version> + <apache.httpclient.version>4.5.13</apache.httpclient.version> <apache.httpcore.version>4.4.13</apache.httpcore.version> <apache.httpclient5.version>5.1</apache.httpclient5.version> - <asm.version>9.2</asm.version> + <asm.version>9.1</asm.version> <!-- Athenz dependencies. Make sure these dependencies match those in Vespa's internal repositories --> <athenz.version>1.10.14</athenz.version> <jjwt.version>0.11.2</jjwt.version> @@ -870,7 +870,7 @@ <junit.version>5.7.0</junit.version> <maven-assembly-plugin.version>3.1.1</maven-assembly-plugin.version> <!-- TODO: in order to upgrade above 4.1.0, we probably need to convert fat-model-deps to a jar artifact. --> - <maven-bundle-plugin.version>5.1.2</maven-bundle-plugin.version> + <maven-bundle-plugin.version>4.1.0</maven-bundle-plugin.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version> <maven-deploy-plugin.version>2.8.1</maven-deploy-plugin.version> diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6e4f38fe564..74fef13f141 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -34,12 +34,12 @@ BucketContent::BucketContent() _inUse(false), _outdatedInfo(true), _active(false) -{ } +{} + BucketContent::~BucketContent() = default; uint32_t -BucketContent::computeEntryChecksum(const BucketEntry& e) const -{ +BucketContent::computeEntryChecksum(const BucketEntry &e) const { vespalib::crc_32_type checksummer; uint64_t ts(e.entry->getTimestamp()); @@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const } BucketChecksum -BucketContent::updateRollingChecksum(uint32_t entryChecksum) -{ +BucketContent::updateRollingChecksum(uint32_t entryChecksum) { uint32_t checksum = _info.getChecksum(); checksum ^= entryChecksum; if (checksum == 0) { @@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum) return BucketChecksum(checksum); } -const BucketInfo& -BucketContent::getBucketInfo() const -{ +const BucketInfo & +BucketContent::getBucketInfo() const { if (!_outdatedInfo) { return _info; } @@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const uint32_t totalSize = 0; uint32_t checksum = 0; - for (const BucketEntry & bucketEntry : _entries) { - const DocEntry& entry(*bucketEntry.entry); - const GlobalId& gid(bucketEntry.gid); + for (const BucketEntry &bucketEntry: _entries) { + const DocEntry &entry(*bucketEntry.entry); + const GlobalId &gid(bucketEntry.gid); GidMapType::const_iterator gidIt(_gidMap.find(gid)); assert(gidIt != _gidMap.end()); @@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const namespace { struct TimestampLess { - bool operator()(const BucketEntry &bucketEntry, Timestamp t) - { return bucketEntry.entry->getTimestamp() < t; } - bool operator()(Timestamp t, const BucketEntry &bucketEntry) - { return t < bucketEntry.entry->getTimestamp(); } + bool operator()(const BucketEntry &bucketEntry, Timestamp t) { + return bucketEntry.entry->getTimestamp() < t; + } + + bool operator()(Timestamp t, const BucketEntry &bucketEntry) { + return t < bucketEntry.entry->getTimestamp(); + } }; } // namespace bool -BucketContent::hasTimestamp(Timestamp t) const -{ +BucketContent::hasTimestamp(Timestamp t) const { if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { return false; } @@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const */ void -BucketContent::insert(DocEntry::SP e) -{ +BucketContent::insert(DocEntry::SP e) { LOG(spam, "insert(%s)", e->toString().c_str()); - const DocumentId* docId(e->getDocumentId()); + const DocumentId *docId(e->getDocumentId()); assert(docId != 0); GlobalId gid(docId->getGlobalId()); GidMapType::iterator gidIt(_gidMap.find(gid)); @@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e) _entries.back().entry->getTimestamp() < e->getTimestamp()) { _entries.push_back(BucketEntry(e, gid)); } else { - std::vector<BucketEntry>::iterator it = - lower_bound(_entries.begin(), - _entries.end(), - e->getTimestamp(), - TimestampLess()); + auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess()); if (it != _entries.end()) { if (it->entry->getTimestamp() == e->getTimestamp()) { if (*it->entry.get() == *e) { - LOG(debug, "Ignoring duplicate put entry %s", - e->toString().c_str()); + LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str()); return; } else { - LOG(error, "Entry %s was already present." - "Was trying to insert %s.", - it->entry->toString().c_str(), - e->toString().c_str()); + LOG(error, "Entry %s was already present. Was trying to insert %s.", + it->entry->toString().c_str(), e->toString().c_str()); LOG_ABORT("should not reach here"); } } @@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e) // newer versions of a document etc. by XORing away old checksum. gidIt->second = e; } else { - LOG(spam, - "Newly inserted entry %s was older than existing entry %s; " - "not updating GID mapping", - e->toString().c_str(), - gidIt->second->toString().c_str()); + LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping", + e->toString().c_str(), gidIt->second->toString().c_str()); } _outdatedInfo = true; } else { @@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e) _info.getActive()); } - LOG(spam, - "After cheap bucketinfo update, state is %s (inserted %s)", - _info.toString().c_str(), - e->toString().c_str()); + LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)", + _info.toString().c_str(), e->toString().c_str()); } } @@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e) } DocEntry::SP -BucketContent::getEntry(const DocumentId& did) const -{ - GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); +BucketContent::getEntry(const DocumentId &did) const { + auto it(_gidMap.find(did.getGlobalId())); if (it != _gidMap.end()) { return it->second; } @@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const } DocEntry::SP -BucketContent::getEntry(Timestamp t) const -{ - std::vector<BucketEntry>::const_iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::getEntry(Timestamp t) const { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter == _entries.end() || iter->entry->getTimestamp() != t) { return DocEntry::SP(); @@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const } void -BucketContent::eraseEntry(Timestamp t) -{ - std::vector<BucketEntry>::iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::eraseEntry(Timestamp t) { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter != _entries.end() && iter->entry->getTimestamp() == t) { assert(iter->entry->getDocumentId() != 0); - GidMapType::iterator gidIt( - _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); + GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId()); assert(gidIt != _gidMap.end()); _entries.erase(iter); if (gidIt->second->getTimestamp() == t) { @@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t) } } -DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo) +DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo) : _initialized(false), _repo(repo), _content(), @@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen DummyPersistence::~DummyPersistence() = default; document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) -{ +DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { document::select::Node::UP ret; try { document::select::Parser parser(*_repo, document::BucketIdFactory()); ret = parser.parse(documentSelection); - } catch (document::select::ParsingFailedException& e) { + } catch (document::select::ParsingFailedException &e) { return document::select::Node::UP(); } if (ret->isLeafNode() && !allowLeaf) { @@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a } Result -DummyPersistence::initialize() -{ +DummyPersistence::initialize() { assert(!_initialized); _initialized = true; return Result(); } #define DUMMYPERSISTENCE_VERIFY_INITIALIZED \ - if (!_initialized) throw vespalib::IllegalStateException( \ - "initialize() must always be called first in order to " \ - "trigger lazy initialization.", VESPA_STRLOC) - + if (!_initialized) { \ + LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \ + abort(); \ + } BucketIdListResult DummyPersistence::listBuckets(BucketSpace bucketSpace) const @@ -714,8 +693,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) return Result(); } -Result -DummyPersistence::createBucket(const Bucket& b, Context&) +void +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -727,11 +706,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); LOG(debug, "%s already existed", b.toString().c_str()); } - return Result(); + onComplete->onComplete(std::make_unique<Result>()); } void -DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 99d6ba717b7..a25bf6b8a8e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,8 +168,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket&, Context&) override; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e287bdc5252..3b59f20ca96 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider { public: Result initialize() override { return Result(); }; - Result createBucket(const Bucket&, Context&) override { return Result(); } Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h index 02c626ea23e..7b04498205d 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -19,4 +19,9 @@ private: const ResultHandler *_resulthandler; }; +class NoopOperationComplete : public OperationComplete { + void onComplete(std::unique_ptr<spi::Result>) noexcept override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 3ea476c33fc..31db08a6f4f 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat } Result +PersistenceProvider::createBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + createBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 83eb042d855..269175f7d26 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -58,6 +58,7 @@ struct PersistenceProvider virtual ~PersistenceProvider(); // TODO Move to utility class for use in tests only + Result createBucket(const Bucket&, Context&); Result deleteBucket(const Bucket&, Context&); Result put(const Bucket&, Timestamp, DocumentSP, Context&); Result setActiveState(const Bucket&, BucketInfo::ActiveState); @@ -336,14 +337,14 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual Result createBucket(const Bucket&, Context&) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * Deletes the given bucket and all entries contained in that bucket. * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp index 74e0971178c..406432ef697 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp @@ -10,6 +10,8 @@ ExecutorMetrics::update(const vespalib::ExecutorStats &stats) maxPending.set(stats.queueSize.max()); accepted.inc(stats.acceptedTasks); rejected.inc(stats.rejectedTasks); + wakeupCount.inc(stats.wakeupCount); + util.set(stats.getUtil()); const auto & qSize = stats.queueSize; queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max()); } @@ -19,6 +21,8 @@ ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *pa maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this), accepted("accepted", {}, "Number of accepted tasks", this), rejected("rejected", {}, "Number of rejected tasks", this), + wakeupCount("wakeups", {}, "Number of times a worker thread has been woken up", this), + util("utilization", {}, "Ratio of time the worker threads has been active", this), queueSize("queuesize", {}, "Size of task queue", this) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h index 273c4ed8979..31d959a399f 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h @@ -11,9 +11,11 @@ namespace proton { struct ExecutorMetrics : metrics::MetricSet { - metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible. - metrics::LongCountMetric accepted; - metrics::LongCountMetric rejected; + metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible. + metrics::LongCountMetric accepted; + metrics::LongCountMetric rejected; + metrics::LongCountMetric wakeupCount; + metrics::DoubleValueMetric util; metrics::LongAverageMetric queueSize; void update(const vespalib::ExecutorStats &stats); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 114292d055d..2e1fc74037c 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) } -Result -PersistenceEngine::createBucket(const Bucket &b, Context &) +void +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleCreateBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleCreateBucket(feedtoken::make(transportContext), b); + } else { + handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } void -PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 94331ac2cd6..fe564d01459 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,8 +114,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket &bucketId, Context &) override ; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/searchlib/src/tests/attribute/changevector/changevector_test.cpp b/searchlib/src/tests/attribute/changevector/changevector_test.cpp index 3e2c851c541..c37d233217b 100644 --- a/searchlib/src/tests/attribute/changevector/changevector_test.cpp +++ b/searchlib/src/tests/attribute/changevector/changevector_test.cpp @@ -123,7 +123,7 @@ TEST("require that buffer can grow some, but not unbound") { TEST("Control Change size") { EXPECT_EQUAL(32u, sizeof(ChangeTemplate<NumericChangeData<long>>)); - EXPECT_EQUAL(88u, sizeof(ChangeTemplate<StringChangeData>)); + EXPECT_EQUAL(80u, sizeof(ChangeTemplate<StringChangeData>)); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.h b/searchlib/src/vespa/searchlib/attribute/attributevector.h index bb88b168474..f5a79c3637e 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributevector.h +++ b/searchlib/src/vespa/searchlib/attribute/attributevector.h @@ -357,23 +357,22 @@ protected: static double round(double v, double & r) { return r = v; } static largeint_t round(double v, largeint_t &r) { return r = static_cast<largeint_t>(::floor(v+0.5)); } - template <typename BaseType, typename ChangeData> + template <typename BaseType, typename LargeType> static BaseType - applyArithmetic(const BaseType &value, const ChangeTemplate<ChangeData> & arithmetic) + applyArithmetic(const BaseType &value, double operand, ChangeBase::Type type) { - typedef typename ChangeData::DataType LargeType; if (attribute::isUndefined(value)) { return value; - } else if (arithmetic._type == ChangeBase::ADD) { - return value + static_cast<LargeType>(arithmetic._arithOperand); - } else if (arithmetic._type == ChangeBase::SUB) { - return value - static_cast<LargeType>(arithmetic._arithOperand); - } else if (arithmetic._type == ChangeBase::MUL) { + } else if (type == ChangeBase::ADD) { + return value + static_cast<LargeType>(operand); + } else if (type == ChangeBase::SUB) { + return value - static_cast<LargeType>(operand); + } else if (type == ChangeBase::MUL) { LargeType r; - return round((static_cast<double>(value) * arithmetic._arithOperand), r); - } else if (arithmetic._type == ChangeBase::DIV) { + return round((static_cast<double>(value) * operand), r); + } else if (type == ChangeBase::DIV) { LargeType r; - return round(static_cast<double>(value) / arithmetic._arithOperand, r); + return round(static_cast<double>(value) / operand, r); } return value; } diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.hpp b/searchlib/src/vespa/searchlib/attribute/attributevector.hpp index 25658401f21..623e10ef052 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributevector.hpp +++ b/searchlib/src/vespa/searchlib/attribute/attributevector.hpp @@ -116,7 +116,7 @@ AttributeVector::applyArithmetic(ChangeVectorT< ChangeTemplate<T> > & changes, D _status.incNonIdempotentUpdates(diff); _status.incUpdates(diff); if (diff > 0) { - changes.back()._arithOperand = aop; + changes.back()._data.setArithOperand(aop); } return true; } diff --git a/searchlib/src/vespa/searchlib/attribute/changevector.h b/searchlib/src/vespa/searchlib/attribute/changevector.h index 12ac77febb9..f1fb58eb9d0 100644 --- a/searchlib/src/vespa/searchlib/attribute/changevector.h +++ b/searchlib/src/vespa/searchlib/attribute/changevector.h @@ -31,16 +31,14 @@ struct ChangeBase { _type(NOOP), _doc(0), _weight(1), - _enumScratchPad(UNSET_ENUM), - _arithOperand(0) + _enumScratchPad(UNSET_ENUM) { } ChangeBase(Type type, uint32_t d, int32_t w = 1) : _type(type), _doc(d), _weight(w), - _enumScratchPad(UNSET_ENUM), - _arithOperand(0) + _enumScratchPad(UNSET_ENUM) { } int cmp(const ChangeBase &b) const { int diff(_doc - b._doc); return diff; } @@ -53,19 +51,21 @@ struct ChangeBase { uint32_t _doc; int32_t _weight; mutable uint32_t _enumScratchPad; - double _arithOperand; }; template <typename T> class NumericChangeData { private: - T _v; + double _arithOperand; + T _v; public: typedef T DataType; - NumericChangeData(T v) : _v(v) { } - NumericChangeData() : _v(T()) { } + NumericChangeData(T v) : _arithOperand(0), _v(v) { } + NumericChangeData() : _arithOperand(0), _v(T()) { } + double getArithOperand() const { return _arithOperand; } + void setArithOperand(double operand) { _arithOperand = operand; } T get() const { return _v; } T raw() const { return _v; } operator T() const { return _v; } diff --git a/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp b/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp index 9a71248a53d..3fe7d147c1d 100644 --- a/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/singleboolattribute.cpp @@ -66,7 +66,7 @@ SingleBoolAttribute::onCommit() { setBit(change._doc, change._data != 0); } else if ((change._type >= ChangeBase::ADD) && (change._type <= ChangeBase::DIV)) { std::atomic_thread_fence(std::memory_order_release); - int8_t val = applyArithmetic(getFast(change._doc), change); + int8_t val = applyArithmetic<int8_t, largeint_t>(getFast(change._doc), change._data.getArithOperand(), change._type); setBit(change._doc, val != 0); } else if (change._type == ChangeBase::CLEARDOC) { std::atomic_thread_fence(std::memory_order_release); diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp index 4f2be68683d..cbff4d0e361 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp @@ -43,7 +43,7 @@ SingleValueNumericAttribute<B>::onCommit() _data[change._doc] = change._data; } else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) { std::atomic_thread_fence(std::memory_order_release); - _data[change._doc] = this->applyArithmetic(_data[change._doc], change); + _data[change._doc] = applyArithmetic<T, typename B::Change::DataType>(_data[change._doc], change._data.getArithOperand(), change._type); } else if (change._type == ChangeBase::CLEARDOC) { std::atomic_thread_fence(std::memory_order_release); _data[change._doc] = this->_defaultValue._data; diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp index 55a82210a96..585b4514ba4 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp @@ -34,7 +34,7 @@ SingleValueNumericEnumAttribute<B>::considerArithmeticAttributeChange(const Chan oldValue = get(c._doc); } - T newValue = this->applyArithmetic(oldValue, c); + T newValue = applyArithmetic<T, typename Change::DataType>(oldValue, c._data.getArithOperand(), c._type); EnumIndex idx; if (!this->_enumStore.find_index(newValue, idx)) { @@ -52,7 +52,7 @@ SingleValueNumericEnumAttribute<B>::applyArithmeticValueChange(const Change& c, { EnumIndex oldIdx = this->_enumIndices[c._doc]; EnumIndex newIdx; - T newValue = this->applyArithmetic(get(c._doc), c); + T newValue = applyArithmetic<T, typename Change::DataType>(get(c._doc), c._data.getArithOperand(), c._type); this->_enumStore.find_index(newValue, newIdx); this->updateEnumRefCounts(c, newIdx, oldIdx, updater); diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp index 12138b0cfbc..5a610cc3da8 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/singlenumericpostattribute.hpp @@ -102,7 +102,7 @@ SingleValueNumericPostingAttribute<B>::applyValueChanges(EnumStoreBatchUpdater& } else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) { if (oldIdx.valid()) { T oldValue = enumStore.get_value(oldIdx); - T newValue = this->applyArithmetic(oldValue, change); + T newValue = applyArithmetic<T, typename Change::DataType>(oldValue, change._data.getArithOperand(), change._type); EnumIndex newIdx; (void) dictionary.find_index(enumStore.make_comparator(newValue), newIdx); currEnumIndices[change._doc] = newIdx; diff --git a/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp b/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp index 9346bc43370..24dec664547 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/singlesmallnumericattribute.cpp @@ -60,7 +60,7 @@ SingleValueSmallNumericAttribute::onCommit() set(change._doc, change._data); } else if (change._type >= ChangeBase::ADD && change._type <= ChangeBase::DIV) { std::atomic_thread_fence(std::memory_order_release); - set(change._doc, applyArithmetic(getFast(change._doc), change)); + set(change._doc, applyArithmetic<T, typename Change::DataType>(getFast(change._doc), change._data.getArithOperand(), change._type)); } else if (change._type == ChangeBase::CLEARDOC) { std::atomic_thread_fence(std::memory_order_release); set(change._doc, 0u); diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 869ff0456e1..099a958a00b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand() AdaptiveSequencedExecutor::Worker::Worker() : cond(), + idleTracker(), state(State::RUNNING), strand(nullptr) { @@ -151,9 +152,12 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m } else { worker.state = Worker::State::BLOCKED; _worker_stack.push(&worker); + worker.idleTracker.set_idle(steady_clock::now()); while (worker.state == Worker::State::BLOCKED) { worker.cond.wait(lock); } + _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now())); + _stats.wakeupCount++; } return (worker.state == Worker::State::RUNNING); } @@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t _worker_stack(num_threads), _self(), _stats(), + _idleTracker(steady_clock::now()), _cfg(num_threads, max_waiting, max_pending) { _stats.queueSize.add(_self.pending_tasks); @@ -329,6 +334,11 @@ AdaptiveSequencedExecutor::getStats() { auto guard = std::lock_guard(_mutex); ExecutorStats stats = _stats; + steady_time now = steady_clock::now(); + for (size_t i(0); i < _worker_stack.size(); i++) { + _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now)); + } + stats.setUtil(_cfg.num_threads, _idleTracker.reset(now, _cfg.num_threads)); _stats = ExecutorStats(); _stats.queueSize.add(_self.pending_tasks); return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index fdcdf35fbbb..776248384a5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -3,6 +3,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/eventbarrier.hpp> @@ -70,7 +71,7 @@ private: struct Strand { enum class State { IDLE, WAITING, ACTIVE }; State state; - vespalib::ArrayQueue<TaggedTask> queue; + ArrayQueue<TaggedTask> queue; Strand(); ~Strand(); }; @@ -81,6 +82,7 @@ private: struct Worker { enum class State { RUNNING, BLOCKED, DONE }; std::condition_variable cond; + ThreadIdleTracker idleTracker; State state; Strand *strand; Worker(); @@ -107,7 +109,7 @@ private: static constexpr size_t STACK_SIZE = (256 * 1024); AdaptiveSequencedExecutor &parent; std::unique_ptr<FastOS_ThreadPool> pool; - vespalib::Gate allow_worker_exit; + Gate allow_worker_exit; ThreadTools(AdaptiveSequencedExecutor &parent_in); ~ThreadTools(); void Run(FastOS_ThreadInterface *, void *) override; @@ -123,11 +125,12 @@ private: std::unique_ptr<ThreadTools> _thread_tools; mutable std::mutex _mutex; std::vector<Strand> _strands; - vespalib::ArrayQueue<Strand*> _wait_queue; - vespalib::ArrayQueue<Worker*> _worker_stack; + ArrayQueue<Strand*> _wait_queue; + ArrayQueue<Worker*> _worker_stack; EventBarrier<BarrierCompletion> _barrier; Self _self; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Config _cfg; void maybe_block_self(std::unique_lock<std::mutex> &lock); diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 3bd5ca3d49a..c1e56572614 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -23,7 +23,7 @@ public: } size_t getNumThreads() const override { return 0; } ExecutorStats getStats() override { - return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); + return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 0); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index 0d132193af1..703256c5521 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -19,7 +19,7 @@ ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) ForegroundTaskExecutor::~ForegroundTaskExecutor() = default; void -ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +ForegroundTaskExecutor::executeTask(ExecutorId id, Executor::Task::UP task) { assert(id.getId() < getNumExecutors()); task->run(); @@ -35,8 +35,8 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } -vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); +ExecutorStats ForegroundTaskExecutor::getStats() { + return ExecutorStats(ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 0); } ISequencedTaskExecutor::ExecutorId @@ -44,4 +44,4 @@ ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const { return ExecutorId(componentId%getNumExecutors()); } -} // namespace search +} diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h index 7359de0ba66..9d351aca653 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h @@ -22,10 +22,10 @@ public: ~ForegroundTaskExecutor() override; ExecutorId getExecutorId(uint64_t componentId) const override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void executeTask(ExecutorId id, Executor::Task::UP task) override; void sync() override; void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; private: std::atomic<uint64_t> _accepted; }; diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 727894397a7..9e95bdaa3ab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats() { ExecutorStats accumulatedStats; for (auto &executor :* _executors) { - accumulatedStats += executor->getStats(); + accumulatedStats.aggregate(executor->getStats()); } return accumulatedStats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 803ec4f3f7c..af95918ccab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _consumerCondition(), _producerCondition(), _thread(*this), + _idleTracker(steady_clock::now()), + _threadIdleTracker(), + _wakeupCount(0), _lastAccepted(0), _queueSize(), _wakeupConsumerAt(0), @@ -115,7 +118,11 @@ SingleExecutor::run() { _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, _reactionTime); + steady_time now = steady_clock::now(); + _threadIdleTracker.set_idle(now); + _consumerCondition.wait_until(lock, now + _reactionTime); + _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now())); + _wakeupCount++; } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) { drain(lock); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); - taskLimit = _taskLimit; } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { @@ -162,7 +168,11 @@ ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0); + steady_time now = steady_clock::now(); + _idleTracker.was_idle(_threadIdleTracker.reset(now)); + ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); + stats.setUtil(1, _idleTracker.reset(now, 1)); + _wakeupCount = 0; _lastAccepted = accepted; _queueSize = ExecutorStats::QueueSizeT() ; return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 8e9c1ae3fa1..7d868322558 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -18,7 +19,7 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - explicit SingleExecutor(init_fun_t func, uint32_t taskLimit); + SingleExecutor(init_fun_t func, uint32_t taskLimit); SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; @@ -54,6 +55,9 @@ private: std::condition_variable _consumerCondition; std::condition_variable _producerCondition; vespalib::Thread _thread; + ExecutorIdleTracker _idleTracker; + ThreadIdleTracker _threadIdleTracker; + uint64_t _wakeupCount; uint64_t _lastAccepted; ExecutorStats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index fe77477fa77..701e8a80d3a 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -11,6 +11,8 @@ using document::DocumentId; using document::test::makeDocumentBucket; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; namespace storage { @@ -41,6 +43,7 @@ public: throw std::runtime_error(_fail); } } + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { } void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; @@ -72,6 +75,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test public: uint32_t sync_count; DummyMergeBucketInfoSyncer syncer; + MonitoredRefCount monitored_ref_count; ApplyBucketDiffStateTestBase() : ::testing::Test(), @@ -82,8 +86,8 @@ public: ~ApplyBucketDiffStateTestBase(); - std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket)); + std::shared_ptr<ApplyBucketDiffState> make_state() { + return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index b3bd1c6a253..02b43a32df3 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -24,7 +24,7 @@ #define CHECK_ERROR_ASYNC(className, failType, onError) \ { \ - Guard guard(_lock); \ + Guard guard(_lock); \ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ return; \ @@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const return _spi.listBuckets(bucketSpace); } -spi::Result -PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { LOG_SPI("createBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET); - return _spi.createBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); + return _spi.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketInfoResult @@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, void PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, - spi::OperationComplete::UP operationComplete) + spi::OperationComplete::UP operationComplete) noexcept { LOG_SPI("deleteBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index c6628814dba..cfc7002a643 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; @@ -111,7 +111,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 07d2b24d536..a3f0182ba30 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,13 +62,13 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_createBucketInvocations; - return PersistenceProviderWrapper::createBucket(bucket, ctx); + PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void - deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { + deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_deleteBucketInvocations; PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 60030004594..ed50730d79f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -20,7 +20,12 @@ using namespace ::testing; namespace storage { -struct MergeHandlerTest : SingleDiskPersistenceTestUtils { +/* + * Class for testing merge handler taking async_apply_bucket_diff as + * parameter for the test. + */ +struct MergeHandlerTest : SingleDiskPersistenceTestUtils, + public testing::WithParamInterface<bool> { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { int _counter; MessageSenderStub _stub; std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; + void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler); }; + void convert_delayed_error_to_exception(MergeHandler& handler); + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); + } + + std::shared_ptr<api::StorageMessage> get_queued_reply() { + std::shared_ptr<api::StorageMessage> msg; + if (_replySender.queue.getNext(msg, 0s)) { + return msg; + } else { + return {}; + } + } }; @@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_F(MergeHandlerTest, merge_bucket_command) { +TEST_P(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_F(MergeHandlerTest, master_message_flow) { +TEST_P(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_F(MergeHandlerTest, max_timestamp) { +TEST_P(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_F(MergeHandlerTest, spi_flush_guard) { +TEST_P(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_F(MergeHandlerTest, bucket_not_found_in_db) { +TEST_P(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_F(MergeHandlerTest, merge_progress_safe_guard) { +TEST_P(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { EXPECT_EQ(0x0, diff[0]._entry._hasMask); } +void +MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler) +{ + handler.drain_async_writes(); + if (getEnv()._fileStorHandler.isMerging(_bucket)) { + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + api::ReturnCode return_code; + s->check_delayed_error(return_code); + if (return_code.failed()) { + getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code); + fetchSingleMessage<api::ApplyBucketDiffReply>(); + fetchSingleMessage<api::ApplyBucketDiffCommand>(); + throw std::runtime_error(return_code.getMessage()); + } + } +} + std::string MergeHandlerTest::doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -831,7 +872,6 @@ TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -855,14 +895,13 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -888,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -953,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1006,6 +1045,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( } void +MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler) +{ + handler.drain_async_writes(); + if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) { + auto chained_reply = _stub.replies.back(); + _stub.replies.pop_back(); + test.messageKeeper().sendReply(chained_reply); + throw std::runtime_error(chained_reply->getResult().getMessage()); + } +} + +void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, @@ -1016,6 +1067,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); + if (test.GetParam()) { + convert_delayed_error_to_exception(test, handler); + } } std::string @@ -1039,7 +1093,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1066,7 +1120,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_F(MergeHandlerTest, remove_from_diff) { +TEST_P(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1132,7 +1186,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { } } -TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1156,8 +1210,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); + if (GetParam()) { + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } else { + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -1246,7 +1307,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1377,9 +1438,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } + if (GetParam()) { + handler.drain_async_writes(); + } ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); + } // storage diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index eb7a5ef5bc6..556760b347e 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -2,21 +2,35 @@ #include "apply_bucket_diff_state.h" #include "mergehandler.h" +#include "persistenceutil.h" #include <vespa/document/base/documentid.h> #include <vespa/persistence/spi/result.h> #include <vespa/vespalib/stllike/asciistream.h> using storage::spi::Result; +using vespalib::RetainGuard; namespace storage { -ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) +class ApplyBucketDiffState::Deleter { +public: + void operator()(ApplyBucketDiffState *raw_state) const noexcept { + std::unique_ptr<ApplyBucketDiffState> state(raw_state); + raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state)); + } +}; + +ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), _fail_message(), _failed_flag(), _stale_bucket_info(false), - _promise() + _promise(), + _tracker(), + _delayed_reply(), + _sender(nullptr), + _retain_guard(std::move(retain_guard)) { } @@ -32,6 +46,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState() if (_promise.has_value()) { _promise.value().set_value(_fail_message); } + if (_delayed_reply) { + if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) { + _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message)); + } + if (_sender) { + _sender->sendReply(std::move(_delayed_reply)); + } else { + // _tracker->_reply and _delayed_reply points to the same reply. + _tracker->sendReply(); + } + } } void @@ -69,4 +94,26 @@ ApplyBucketDiffState::get_future() return _promise.value().get_future(); } +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _delayed_reply = std::move(delayed_reply); +} + +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _sender = &sender; + _delayed_reply = std::move(delayed_reply); +} + +std::shared_ptr<ApplyBucketDiffState> +ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +{ + std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard))); + return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter()); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index af4174b06d6..7157c69191b 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,16 +3,20 @@ #pragma once #include <vespa/persistence/spi/bucket.h> +#include <vespa/vespalib/util/retain_guard.h> #include <future> #include <memory> #include <vector> namespace document { class DocumentId; } +namespace storage::api { class StorageReply; } namespace storage::spi { class Result; } namespace storage { class ApplyBucketDiffEntryResult; +class MessageSender; +class MessageTracker; class MergeBucketInfoSyncer; /* @@ -20,15 +24,21 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { + class Deleter; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; vespalib::string _fail_message; std::atomic_flag _failed_flag; bool _stale_bucket_info; std::optional<std::promise<vespalib::string>> _promise; + std::unique_ptr<MessageTracker> _tracker; + std::shared_ptr<api::StorageReply> _delayed_reply; + MessageSender* _sender; + vespalib::RetainGuard _retain_guard; + ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); public: - ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); + static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -36,6 +46,9 @@ public: void mark_stale_bucket_info(); void sync_bucket_info(); std::future<vespalib::string> get_future(); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); + const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 47b5e4f5f27..bc6e67578c0 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -5,6 +5,7 @@ #include "testandsethelper.h" #include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> @@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + } + spi::Bucket bucket(cmd.getBucket()); + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + tracker->sendReply(); + }); + + if (cmd.getActive()) { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } else { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } + + return tracker; +} + +MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.deleteBuckets); diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4f5c242570c..db5a77bfb59 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -30,6 +30,7 @@ public: MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 2ffb827accf..c32efb6aa66 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -209,6 +209,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC *_filestorHandler, i % numStripes, _component)); } _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); + } else { + std::lock_guard guard(_lock); + for (auto& handler : _persistenceHandlers) { + handler->configure(*config); + } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index febac5b87e5..a75eda5b1a4 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock, uint32_t traceLevel) : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock), + delayed_error(), context(priority, traceLevel) {} @@ -122,4 +123,25 @@ MergeStatus::print(std::ostream& out, bool verbose, } } +void +MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in) +{ + delayed_error = std::move(delayed_error_in); +} + +void +MergeStatus::check_delayed_error(api::ReturnCode &return_code) +{ + if (!return_code.failed() && delayed_error.has_value()) { + // Wait for pending writes to local node to complete and check error + auto& future_error = delayed_error.value(); + future_error.wait(); + vespalib::string fail_message = future_error.get(); + delayed_error.reset(); + if (!fail_message.empty()) { + return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message)); + } + } +} + }; diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index b28ca4e373a..05ffd1336a2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -9,7 +9,9 @@ #include <vector> #include <deque> +#include <future> #include <memory> +#include <optional> namespace storage { @@ -25,6 +27,7 @@ public: std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff; vespalib::duration timeout; framework::MilliSecTimer startTime; + std::optional<std::future<vespalib::string>> delayed_error; spi::Context context; MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel); @@ -40,6 +43,8 @@ public: bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes); void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool isFirstNode() const { return static_cast<bool>(reply); } + void set_delayed_error(std::future<vespalib::string>&& delayed_error_in); + void check_delayed_error(api::ReturnCode &return_code); }; } // storage diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h index e05991ad9e3..b3386c591e6 100644 --- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h +++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h @@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; } namespace storage { +class ApplyBucketDiffState; + /* * Interface class for syncing bucket info during merge. */ @@ -13,6 +15,7 @@ class MergeBucketInfoSyncer { public: virtual ~MergeBucketInfoSyncer() = default; virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0; + virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0; }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 17a16487ac4..c9ba43458b1 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -6,21 +6,25 @@ #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> -#include <future> #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; + namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, uint32_t commonMergeChainOptimalizationMinimumSize, bool async_apply_bucket_diff) @@ -28,12 +32,20 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), + _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff) + _async_apply_bucket_diff(async_apply_bucket_diff), + _executor(executor) +{ +} + +MergeHandler::~MergeHandler() { + drain_async_writes(); } + namespace { constexpr int getDeleteFlag() { @@ -41,20 +53,6 @@ constexpr int getDeleteFlag() { return 2; } -/** - * Throws std::runtime_error if result has an error. - */ -void -checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op) -{ - if (result.hasError()) { - vespalib::asciistream ss; - ss << "Failed " << op << " in " << bucket << ": " << result.toString(); - throw std::runtime_error(ss.str()); - } -} - - class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; @@ -653,28 +651,32 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { - void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) - { - for (const auto& entry : status.diff) { - uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); - if ((entry_has_mask == 0u) || - (constrictHasMask && (entry_has_mask != hasMask))) { - continue; - } - cmd.getDiff().emplace_back(entry); - if (constrictHasMask) { - cmd.getDiff().back()._entry._hasMask = newHasMask; - } else { - cmd.getDiff().back()._entry._hasMask = entry_has_mask; - } + +void +findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) +{ + for (const auto& entry : status.diff) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { + continue; + } + cmd.getDiff().emplace_back(entry); + if (constrictHasMask) { + cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } +} + api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, - MessageSender& sender, spi::Context& context) const + MessageSender& sender, spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const { // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { @@ -806,6 +808,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); + if (async_results) { + // Check currently pending writes to local node before sending new command. + check_apply_diff_sync(std::move(async_results)); + } if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) { framework::MilliSecTimer startTime(_clock); fetchLocalData(bucket, cmd->getDiff(), 0, context); @@ -883,7 +889,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); @@ -923,141 +930,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP namespace { - uint8_t findOwnIndex( - const std::vector<api::MergeBucketCommand::Node>& nodeList, - uint16_t us) - { - for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { - if (nodeList[i].index == us) return i; - } - throw vespalib::IllegalStateException( - "Got GetBucketDiff cmd on node not in nodelist in command", - VESPA_STRLOC); +uint8_t findOwnIndex( + const std::vector<api::MergeBucketCommand::Node>& nodeList, + uint16_t us) +{ + for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { + if (nodeList[i].index == us) return i; } + throw vespalib::IllegalStateException( + "Got GetBucketDiff cmd on node not in nodelist in command", + VESPA_STRLOC); +} - struct DiffEntryTimestampOrder - : public std::binary_function<api::GetBucketDiffCommand::Entry, - api::GetBucketDiffCommand::Entry, bool> - { - bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const { - return (x._timestamp < y._timestamp); - } - }; - - /** - * Merges list A and list B together and puts the result in result. - * Result is swapped in as last step to keep function exception safe. Thus - * result can be listA or listB if wanted. - * - * listA and listB are assumed to be in the order found in the slotfile, or - * in the order given by a previous call to this function. (In both cases - * this will be sorted by timestamp) - * - * @return false if any suspect entries was found. - */ - bool mergeLists( - const std::vector<api::GetBucketDiffCommand::Entry>& listA, - const std::vector<api::GetBucketDiffCommand::Entry>& listB, - std::vector<api::GetBucketDiffCommand::Entry>& finalResult) - { - bool suspect = false; - std::vector<api::GetBucketDiffCommand::Entry> result; - uint32_t i = 0, j = 0; - while (i < listA.size() && j < listB.size()) { - const api::GetBucketDiffCommand::Entry& a(listA[i]); - const api::GetBucketDiffCommand::Entry& b(listB[j]); - if (a._timestamp < b._timestamp) { - result.push_back(a); - ++i; - } else if (a._timestamp > b._timestamp) { - result.push_back(b); - ++j; - } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. - if (!(a == b)) { - if (a._gid == b._gid && a._flags == b._flags) { - if ((a._flags & getDeleteFlag()) != 0 && - (b._flags & getDeleteFlag()) != 0) - { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. - LOG(debug, "Found entries with equal timestamps of " - "the same gid who both are remove " - "entries: %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } else { - LOG(error, "Found entries with equal timestamps of " - "the same gid. This is likely same " - "document where size of document varies:" - " %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; - } else if ((a._flags & getDeleteFlag()) - != (b._flags & getDeleteFlag())) +/** + * Merges list A and list B together and puts the result in result. + * Result is swapped in as last step to keep function exception safe. Thus + * result can be listA or listB if wanted. + * + * listA and listB are assumed to be in the order found in the slotfile, or + * in the order given by a previous call to this function. (In both cases + * this will be sorted by timestamp) + * + * @return false if any suspect entries was found. + */ +bool mergeLists( + const std::vector<api::GetBucketDiffCommand::Entry>& listA, + const std::vector<api::GetBucketDiffCommand::Entry>& listB, + std::vector<api::GetBucketDiffCommand::Entry>& finalResult) +{ + bool suspect = false; + std::vector<api::GetBucketDiffCommand::Entry> result; + uint32_t i = 0, j = 0; + while (i < listA.size() && j < listB.size()) { + const api::GetBucketDiffCommand::Entry& a(listA[i]); + const api::GetBucketDiffCommand::Entry& b(listB[j]); + if (a._timestamp < b._timestamp) { + result.push_back(a); + ++i; + } else if (a._timestamp > b._timestamp) { + result.push_back(b); + ++j; + } else { + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. + if (!(a == b)) { + if (a._gid == b._gid && a._flags == b._flags) { + if ((a._flags & getDeleteFlag()) != 0 && + (b._flags & getDeleteFlag()) != 0) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. - const api::GetBucketDiffCommand::Entry& deletedEntry( - (a._flags & getDeleteFlag()) != 0 ? a : b); - result.push_back(deletedEntry); - LOG(debug, - "Found put and remove on same timestamp. Keeping" - "remove as it is likely caused by remove with " - "copies unavailable at the time: %s, %s.", - a.toString().c_str(), b.toString().c_str()); + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. + LOG(debug, "Found entries with equal timestamps of " + "the same gid who both are remove " + "entries: %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } else { - LOG(error, "Found entries with equal timestamps that " - "weren't the same entry: %s, %s.", - a.toString().c_str(), b.toString().c_str()); - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; + LOG(error, "Found entries with equal timestamps of " + "the same gid. This is likely same " + "document where size of document varies:" + " %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } + result.push_back(a); + result.back()._hasMask |= b._hasMask; + suspect = true; + } else if ((a._flags & getDeleteFlag()) + != (b._flags & getDeleteFlag())) + { + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. + const api::GetBucketDiffCommand::Entry& deletedEntry( + (a._flags & getDeleteFlag()) != 0 ? a : b); + result.push_back(deletedEntry); + LOG(debug, + "Found put and remove on same timestamp. Keeping" + "remove as it is likely caused by remove with " + "copies unavailable at the time: %s, %s.", + a.toString().c_str(), b.toString().c_str()); } else { + LOG(error, "Found entries with equal timestamps that " + "weren't the same entry: %s, %s.", + a.toString().c_str(), b.toString().c_str()); result.push_back(a); result.back()._hasMask |= b._hasMask; + suspect = true; } - ++i; - ++j; + } else { + result.push_back(a); + result.back()._hasMask |= b._hasMask; } + ++i; + ++j; } - if (i < listA.size()) { - assert(j >= listB.size()); - for (uint32_t n = listA.size(); i<n; ++i) { - result.push_back(listA[i]); - } - } else if (j < listB.size()) { - assert(i >= listA.size()); - for (uint32_t n = listB.size(); j<n; ++j) { - result.push_back(listB[j]); - } + } + if (i < listA.size()) { + assert(j >= listB.size()); + for (uint32_t n = listA.size(); i<n; ++i) { + result.push_back(listA[i]); + } + } else if (j < listB.size()) { + assert(i >= listA.size()); + for (uint32_t n = listB.size(); j<n; ++j) { + result.push_back(listB[j]); } - result.swap(finalResult); - return !suspect; } + result.swap(finalResult); + return !suspect; +} } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const -{ +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + return handleGetBucketDiffStage2(cmd, std::move(tracker)); +} +MessageTracker::UP +MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const +{ + spi::Bucket bucket(cmd.getBucket()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1171,7 +1173,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + std::shared_ptr<ApplyBucketDiffState> async_results; + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. @@ -1211,7 +1214,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1233,10 +1236,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); + if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1260,10 +1266,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } } - tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd)); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd); + tracker->setReply(reply); static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff()); LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.", bucket.toString().c_str(), cmd.getNodes()[index - 1].index); + if (async_results) { + async_results->set_delayed_reply(std::move(tracker), std::move(reply)); + } } else { // When not the last node in merge chain, we must save reply, and // send command on. @@ -1280,6 +1290,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); s->pendingId = cmd2->getMsgId(); + if (async_results) { + // Reply handler should check for delayed error. + s->set_delayed_error(async_results->get_future()); + } _env._fileStorHandler.sendCommand(cmd2); // Everything went fine. Don't delete state but wait for reply stateGuard.deactivate(); @@ -1290,12 +1304,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const { - (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1316,6 +1329,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag api::StorageReply::SP replyToSend; // Process apply bucket diff locally api::ReturnCode returnCode = reply.getResult(); + // Check for delayed error from handleApplyBucketDiff + s->check_delayed_error(returnCode); try { if (reply.getResult().failed()) { LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str()); @@ -1329,9 +1344,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); + if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1370,7 +1388,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag // Should reply now, since we failed. replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. @@ -1392,6 +1410,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag throw; } + if (async_results && replyToSend) { + replyToSend->setResult(returnCode); + async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend)); + } if (clearState) { _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } @@ -1402,4 +1424,26 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } +void +MergeHandler::drain_async_writes() +{ + if (_monitored_ref_count) { + // Wait for related ApplyBucketDiffState objects to be destroyed + _monitored_ref_count->waitForZeroRefCount(); + } +} + +void +MergeHandler::configure(bool async_apply_bucket_diff) noexcept +{ + _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); +} + +void +MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const +{ + auto bucket_id = state->get_bucket().getBucketId(); + _executor.execute(bucket_id.getId(), [state = std::move(state)]() { }); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f6e8ddcf306..4daec4c0689 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,6 +20,9 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/vespalib/util/monitored_refcount.h> + +namespace vespalib { class ISequencedTaskExecutor; } namespace storage { @@ -44,10 +47,13 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); + ~MergeHandler(); + bool buildBucketInfoList( const spi::Bucket& bucket, Timestamp maxTimestamp, @@ -64,27 +70,34 @@ public: spi::Context& context, std::shared_ptr<ApplyBucketDiffState> async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; + void drain_async_writes(); + void configure(bool async_apply_bucket_diff) noexcept; private: const framework::Clock &_clock; const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; + std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; - const bool _async_apply_bucket_diff; + std::atomic<bool> _async_apply_bucket_diff; + vespalib::ISequencedTaskExecutor& _executor; + MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, - spi::Context& context) const; + spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 1ef883fc810..d03c9a6d111 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize, cfg.asyncApplyBucketDiff), @@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::REVERT_ID: return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: @@ -150,4 +150,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } +void +PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept +{ + _mergeHandler.configure(config.asyncApplyBucketDiff); +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a92c2dc78ca..c60fb05e56e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -35,6 +35,7 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } + void configure(vespa::config::content::StorFilestorConfig& config) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ce424f0ce83..9ccd901744b 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& return checkResult(_impl.destroyIterator(iteratorId, context)); } -spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { - return checkResult(_impl.createBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index c9d2411e372..14d20cf8a52 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -49,7 +49,6 @@ public: spi::Context &context) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -63,7 +62,8 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4fe207e2e5..9a7a451b906 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t } MessageTracker::UP -SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - -MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.visit); diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 2cfbc7016c0..009fd6dff52 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -22,7 +22,6 @@ public: SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp index 867cfccfaf9..360db5ea3d7 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.cpp +++ b/storageapi/src/vespa/storageapi/message/bucket.cpp @@ -156,7 +156,7 @@ MergeBucketReply::print(std::ostream& out, bool verbose, { out << "MergeBucketReply(" << getBucketId() << ", to time " << _maxTimestamp << ", cluster state version: " - << _clusterStateVersion << ", nodes: "; + << _clusterStateVersion << ", nodes: ["; for (uint32_t i=0; i<_nodes.size(); ++i) { if (i != 0) out << ", "; out << _nodes[i]; @@ -220,15 +220,16 @@ GetBucketDiffCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { out << "GetBucketDiffCommand(" << getBucketId() << ", to time " - << _maxTimestamp << ", nodes: "; + << _maxTimestamp << ", nodes: ["; for (uint32_t i=0; i<_nodes.size(); ++i) { if (i != 0) out << ", "; out << _nodes[i]; } + if (_diff.empty()) { - out << ", no entries"; + out << "], no entries"; } else if (verbose) { - out << ","; + out << "],"; for (uint32_t i=0; i<_diff.size(); ++i) { out << "\n" << indent << " "; _diff[i].print(out, verbose, indent + " "); @@ -258,15 +259,15 @@ GetBucketDiffReply::print(std::ostream& out, bool verbose, const std::string& indent) const { out << "GetBucketDiffReply(" << getBucketId() << ", to time " - << _maxTimestamp << ", nodes: "; + << _maxTimestamp << ", nodes: ["; for (uint32_t i=0; i<_nodes.size(); ++i) { if (i != 0) out << ", "; out << _nodes[i]; } if (_diff.empty()) { - out << ", no entries"; + out << "], no entries"; } else if (verbose) { - out << ","; + out << "],"; for (uint32_t i=0; i<_diff.size(); ++i) { out << "\n" << indent << " "; _diff[i].print(out, verbose, indent + " "); @@ -363,12 +364,12 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose, totalSize += it->_bodyBlob.size(); if (it->filled()) ++filled; } - out << "ApplyBucketDiffCommand(" << getBucketId() << ", nodes: "; + out << "ApplyBucketDiffCommand(" << getBucketId() << ", nodes: ["; for (uint32_t i=0; i<_nodes.size(); ++i) { if (i != 0) out << ", "; out << _nodes[i]; } - out << _diff.size() << " entries of " << totalSize << " bytes, " + out << "], " << _diff.size() << " entries of " << totalSize << " bytes, " << (100.0 * filled / _diff.size()) << " \% filled)"; if (_diff.empty()) { out << ", no entries"; @@ -408,12 +409,12 @@ ApplyBucketDiffReply::print(std::ostream& out, bool verbose, totalSize += entry._bodyBlob.size(); if (entry.filled()) ++filled; } - out << "ApplyBucketDiffReply(" << getBucketId() << ", nodes: "; + out << "ApplyBucketDiffReply(" << getBucketId() << ", nodes: ["; for (uint32_t i=0; i<_nodes.size(); ++i) { if (i != 0) out << ", "; out << _nodes[i]; } - out << _diff.size() << " entries of " << totalSize << " bytes, " + out << "], " << _diff.size() << " entries of " << totalSize << " bytes, " << (100.0 * filled / _diff.size()) << " \% filled)"; if (_diff.empty()) { out << ", no entries"; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 9c423c6fc34..eb818ba1d48 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.core.JsonToken; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.StringJoiner; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index e5667c7b392..63fc7854a52 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -162,6 +162,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; private static final String STREAM = "stream"; + private static final String SLICES = "slices"; + private static final String SLICE_ID = "sliceId"; private final Clock clock; private final Duration handlerTimeout; @@ -985,12 +987,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + Optional<Integer> slices = getProperty(request, SLICES, integerParser); + Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser); + VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); parameters.visitInconsistentBuckets(true); parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); + if (slices.isPresent() && sliceId.isPresent()) + parameters.slice(slices.get(), sliceId.get()); + else if (slices.isPresent() != sliceId.isPresent()) + throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set"); return parameters; } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index b23533a720e..1629777f837 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -258,6 +258,8 @@ public class DocumentV1ApiTest { assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); assertEquals(6000, parameters.getSessionTimeoutMs()); + assertEquals(4, parameters.getSlices()); + assertEquals(1, parameters.getSliceId()); // Put some documents in the response parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); @@ -269,7 +271,7 @@ public class DocumentV1ApiTest { parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK"); }); response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + - "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true"); + "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true&slices=4&sliceId=1"); assertSameJson("{" + " \"pathId\": \"/document/v1\"," + " \"documents\": [" + diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index b55f54f9339..e61dc071b62 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -4,7 +4,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/backtrace.h> #include <vespa/vespalib/util/size_literals.h> -#include <atomic> +#include <thread> using namespace vespalib; @@ -74,9 +74,9 @@ struct MyState { ExecutorStats stats = executor.getStats(); EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); - EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, - stats.acceptedTasks); + EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks); EXPECT_EQUAL(expect_rejected, stats.rejectedTasks); + EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks)); EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0)); if (expect_deleted == 0) { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); @@ -85,6 +85,7 @@ struct MyState { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); EXPECT_EQUAL(0u, stats.acceptedTasks); EXPECT_EQUAL(0u, stats.rejectedTasks); + EXPECT_EQUAL(0u, stats.wakeupCount); return *this; } }; @@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3); + EXPECT_TRUE(std::atomic<duration>::is_always_lock_free); + ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3,7); + stats.setUtil(3, 0.8); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9); + EXPECT_EQUAL(7u, stats.wakeupCount); + EXPECT_EQUAL(3u, stats.getThreadCount()); + EXPECT_EQUAL(0.2, stats.getUtil()); + stats.aggregate(ExecutorStats(ExecutorStats::QueueSizeT(7),8,9,11).setUtil(7,0.5)); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); @@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") { EXPECT_EQUAL(8u, stats.queueSize.max()); EXPECT_EQUAL(4.0, stats.queueSize.average()); + EXPECT_EQUAL(10u, stats.getThreadCount()); EXPECT_EQUAL(10u, stats.acceptedTasks); EXPECT_EQUAL(12u, stats.rejectedTasks); + EXPECT_EQUAL(18u, stats.wakeupCount); + EXPECT_EQUAL(0.41, stats.getUtil()); +} +TEST("Test that utilization is computed") { + ThreadStackExecutor executor(1, 128_Ki); + std::this_thread::sleep_for(1s); + auto stats = executor.getStats(); + EXPECT_GREATER(0.50, stats.getUtil()); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index f1f58685570..577ae933ec2 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -51,25 +51,46 @@ private: /** * Struct representing stats for an executor. + * Note that aggregation requires sample interval to be the same(similar) for all samples. **/ -struct ExecutorStats { +class ExecutorStats { +private: + size_t _threadCount; + double _absUtil; +public: using QueueSizeT = AggregatedAverage<size_t>; QueueSizeT queueSize; - size_t acceptedTasks; - size_t rejectedTasks; - ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {} - ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected) - : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected) + size_t acceptedTasks; + size_t rejectedTasks; + size_t wakeupCount; // Number of times a worker was woken up, + + ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0, 0) {} + ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in) + : _threadCount(1), + _absUtil(1.0), + queueSize(queueSize_in), + acceptedTasks(accepted), + rejectedTasks(rejected), + wakeupCount(wakeupCount_in) {} - ExecutorStats & operator += (const ExecutorStats & rhs) { + void aggregate(const ExecutorStats & rhs) { + _threadCount += rhs._threadCount; queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(), queueSize.total() + rhs.queueSize.total(), queueSize.min() + rhs.queueSize.min(), queueSize.max() + rhs.queueSize.max()); acceptedTasks += rhs.acceptedTasks; rejectedTasks += rhs.rejectedTasks; + wakeupCount += rhs.wakeupCount; + _absUtil += rhs._absUtil; + } + ExecutorStats & setUtil(uint32_t threadCount, double idle) { + _threadCount = threadCount; + _absUtil = (1.0 - idle) * threadCount; return *this; } + double getUtil() const { return _absUtil / _threadCount; } + size_t getThreadCount() const { return _threadCount; } }; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index f80a5b4ce32..133350f3d56 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) { } +ThreadStackExecutorBase::Worker::Worker() + : lock(), + cond(), + idleTracker(), + pre_guard(0xaaaaaaaa), + idle(true), + post_guard(0x55555555), + task() +{} + +void +ThreadStackExecutorBase::Worker::verify(bool expect_idle) const { + (void) expect_idle; + assert(pre_guard == 0xaaaaaaaa); + assert(post_guard == 0x55555555); + assert(idle == expect_idle); + assert(!task.task == expect_idle); +} + void ThreadStackExecutorBase::BlockedThread::wait() const { @@ -103,6 +122,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) return false; } _workers.push(&worker); + worker.idleTracker.set_idle(steady_clock::now()); } { unique_lock guard(worker.lock); @@ -141,6 +161,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _lock(), _cond(), _stats(), + _idleTracker(steady_clock::now()), _executorCompletion(), _tasks(), _workers(), @@ -164,7 +185,8 @@ ThreadStackExecutorBase::start(uint32_t threads) } } -size_t ThreadStackExecutorBase::getNumThreads() const { +size_t +ThreadStackExecutorBase::getNumThreads() const { return _pool->GetNumStartedThreads(); } @@ -208,6 +230,12 @@ ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); ExecutorStats stats = _stats; + steady_time now = steady_clock::now(); + for (size_t i(0); i < _workers.size(); i++) { + _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now)); + } + size_t numThreads = getNumThreads(); + stats.setUtil(numThreads, _idleTracker.reset(now, numThreads)); _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; @@ -225,6 +253,8 @@ ThreadStackExecutorBase::execute(Task::UP task) if (!_workers.empty()) { Worker *worker = _workers.back(); _workers.popBack(); + _idleTracker.was_idle(worker->idleTracker.set_active(steady_clock::now())); + _stats.wakeupCount++; guard.unlock(); // <- UNLOCK assignTask(std::move(taggedTask), *worker); } else { diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 66a34bfde95..c3552cfe579 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -7,6 +7,7 @@ #include "arrayqueue.hpp" #include "gate.h" #include "runnable.h" +#include "executor_idle_tracking.h" #include <vector> #include <functional> @@ -47,18 +48,13 @@ private: struct Worker { std::mutex lock; std::condition_variable cond; - uint32_t pre_guard; - bool idle; - uint32_t post_guard; - TaggedTask task; - Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {} - void verify(bool expect_idle) const { - (void) expect_idle; - assert(pre_guard == 0xaaaaaaaa); - assert(post_guard == 0x55555555); - assert(idle == expect_idle); - assert(!task.task == expect_idle); - } + ThreadIdleTracker idleTracker; + uint32_t pre_guard; + bool idle; + uint32_t post_guard; + TaggedTask task; + Worker(); + void verify(bool expect_idle) const; }; struct BarrierCompletion { @@ -81,6 +77,7 @@ private: mutable std::mutex _lock; std::condition_variable _cond; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; |