diff options
46 files changed, 637 insertions, 330 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java index 854780dd336..1ddb50b0a6b 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java @@ -122,12 +122,7 @@ public class ResourceMeterMaintainer extends ControllerMaintainer { private void reportResourceSnapshots(Collection<ResourceSnapshot> resourceSnapshots) { meteringClient.consume(resourceSnapshots); - metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap())); - // total metered resource usage, for alerting on drastic changes - metric.set(METERING_TOTAL_REPORTED, - resourceSnapshots.stream() - .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(), - metric.createContext(Collections.emptyMap())); + updateMetrics(resourceSnapshots); try (var lock = curator.lockMeteringRefreshTime()) { if (needsRefresh(curator.readMeteringRefreshTime())) { @@ -194,4 +189,25 @@ public class ResourceMeterMaintainer extends ControllerMaintainer { double cost = new NodeResources(allocation.getCpuCores(), allocation.getMemoryGb(), allocation.getDiskGb(), 0).cost(); return Math.round(cost * 100.0 / costDivisor) / 100.0; } + + private void updateMetrics(Collection<ResourceSnapshot> resourceSnapshots) { + metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap())); + // total metered resource usage, for alerting on drastic changes + metric.set(METERING_TOTAL_REPORTED, + resourceSnapshots.stream() + .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(), + metric.createContext(Collections.emptyMap())); + + resourceSnapshots.forEach(snapshot -> { + var context = metric.createContext(Map.of( + "tenant", snapshot.getApplicationId().tenant().value(), + "applicationId", snapshot.getApplicationId().toFullString(), + "zoneId", snapshot.getZoneId() + )); + metric.set("metering.vcpu", snapshot.getCpuCores(), context); + metric.set("metering.memoryGB", snapshot.getMemoryGb(), context); + metric.set("metering.diskGB", snapshot.getDiskGb(), context); + }); + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java index a255a6c37d8..5b923c2ee59 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java @@ -98,6 +98,8 @@ public class ResourceMeterMaintainerTest { assertEquals(tester.clock().millis()/1000, metrics.getMetric("metering_last_reported")); assertEquals(2224.0d, (Double) metrics.getMetric("metering_total_reported"), Double.MIN_VALUE); + assertEquals(24d, (Double) metrics.getMetric(context -> "tenant1".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE); + assertEquals(40d, (Double) metrics.getMetric(context -> "tenant2".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE); // Metering is not refreshed assertFalse(snapshotConsumer.isRefreshed()); diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 9cc4f60ed7e..78a58f24a65 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -799,7 +799,7 @@ "public" ], "methods": [ - "public void <init>(int, com.yahoo.documentapi.ProgressToken)", + "public void <init>(int, com.yahoo.documentapi.ProgressToken, int, int)", "protected boolean isLosslessResetPossible()", "public boolean hasNext()", "public boolean shouldYield()", @@ -851,6 +851,7 @@ "public void setDistributionBitCount(int)", "public boolean visitsAllBuckets()", "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken)", + "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken, int, int)", "public static com.yahoo.documentapi.VisitorIterator createFromExplicitBucketSet(java.util.Set, int, com.yahoo.documentapi.ProgressToken)" ], "fields": [] @@ -931,6 +932,9 @@ "public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()", "public boolean skipBucketsOnFatalErrors()", "public void skipBucketsOnFatalErrors(boolean)", + "public void slice(int, int)", + "public int getSlices()", + "public int getSliceId()", "public void setDynamicallyIncreaseMaxBucketsPerVisitor(boolean)", "public void setDynamicMaxBucketsIncreaseFactor(float)", "public java.lang.String toString()" diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java index 9957898e459..4a77d30ec92 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java @@ -387,6 +387,18 @@ public class ProgressToken { } /** + * Marks the current bucket as finished and advances the bucket cursor; + * throws instead if the current bucket is already {@link #addBucket added}. + */ + void skipCurrentBucket() { + if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId()))) + throw new IllegalStateException("Current bucket was already added to the explicit bucket set"); + + ++finishedBucketCount; + ++bucketCursor; + } + + /** * Directly generate a bucket Id key for the <code>n</code>th bucket in * reverse sorted order. * @@ -428,6 +440,14 @@ public class ProgressToken { return bucketCursor; } + static BucketId toBucketId(long bucketCursor, int distributionBits) { + return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits))); + } + + BucketId getCurrentBucketId() { + return toBucketId(getBucketCursor(), getDistributionBitCount()); + } + protected void setBucketCursor(long bucketCursor) { this.bucketCursor = bucketCursor; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index e11bdf7f18c..e15512ca71b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -81,12 +81,24 @@ public class VisitorIterator { protected static class DistributionRangeBucketSource implements BucketSource { private boolean flushActive = false; private int distributionBitCount; + private final int slices; + private final int sliceId; // Wouldn't need this if this were a non-static class, but do it for // the sake of keeping things identical in Java and C++ private ProgressToken progressToken; public DistributionRangeBucketSource(int distributionBitCount, - ProgressToken progress) { + ProgressToken progress, + int slices, int sliceId) { + if (slices < 1) { + throw new IllegalArgumentException("slices must be positive, but was " + slices); + } + if (sliceId < 0 || sliceId >= slices) { + throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId); + } + + this.slices = slices; + this.sliceId = sliceId; progressToken = progress; // New progress token (could also be empty, in which this is a @@ -148,6 +160,7 @@ public class VisitorIterator { } // Should be all fixed up and good to go progressToken.setInconsistentState(false); + skipToSlice(); } protected boolean isLosslessResetPossible() { @@ -203,6 +216,7 @@ public class VisitorIterator { assert(p.getActiveBucketCount() == 0); p.clearAllBuckets(); p.setBucketCursor(0); + skipToSlice(); return; } @@ -292,7 +306,14 @@ public class VisitorIterator { } public boolean hasNext() { - return progressToken.getBucketCursor() < (1L << distributionBitCount); + // There is a next bucket iff. there is a bucket no earlier than the cursor which + // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're + // not yet properly initialised, with a real distribution bit count, we ignore this. + long nextBucket = progressToken.getBucketCursor(); + if (distributionBitCount != 1) { + nextBucket += Math.floorMod(sliceId - nextBucket, slices); + } + return nextBucket < (1L << distributionBitCount); } public boolean shouldYield() { @@ -311,13 +332,27 @@ public class VisitorIterator { public BucketProgress getNext() { assert(hasNext()) : "getNext() called with hasNext() == false"; - long currentPosition = progressToken.getBucketCursor(); - long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount); - ++currentPosition; - progressToken.setBucketCursor(currentPosition); - return new BucketProgress( - new BucketId(ProgressToken.keyToBucketId(key)), - new BucketId()); + + // Create the progress to return for creating visitors, and advance bucket cursor. + BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId()); + progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + + // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when + // hasNext() turns false, but there are still super buckets left after the current. + skipToSlice(); + + return progress; + } + + // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped + // buckets as complete, but only if we've been initialised with a proper distribution bit count. + private void skipToSlice() { + if (distributionBitCount == 1) + return; + + while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) { + progressToken.skipCurrentBucket(); + } } public int getDistributionBitCount() { @@ -732,6 +767,13 @@ public class VisitorIterator { return bucketSource.visitsAllBuckets(); } + public static VisitorIterator createFromDocumentSelection( + String documentSelection, + BucketIdFactory idFactory, + int distributionBitCount, + ProgressToken progress) throws ParseException { + return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0); + } /** * Create a new <code>VisitorIterator</code> instance based on the given document * selection string. @@ -753,7 +795,9 @@ public class VisitorIterator { String documentSelection, BucketIdFactory idFactory, int distributionBitCount, - ProgressToken progress) throws ParseException { + ProgressToken progress, + int slices, + int sliceId) throws ParseException { BucketSelector bucketSel = new BucketSelector(idFactory); Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection); BucketSource src; @@ -763,7 +807,7 @@ public class VisitorIterator { // bit-based range source if (rawBuckets == null) { // Range source - src = new DistributionRangeBucketSource(distributionBitCount, progress); + src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId); } else { // Explicit source src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java index 8b0c8538855..44675d8d2ac 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java @@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters { private int traceLevel = 0; private ThrottlePolicy throttlePolicy = null; private boolean skipBucketsOnFatalErrors = false; + private int slices = 1; + private int sliceId = 0; // Advanced parameter, only for internal use. Set<BucketId> bucketsToVisit = null; @@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters { params.getDynamicMaxBucketsIncreaseFactor()); setTraceLevel(params.getTraceLevel()); skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors()); + slice(params.getSlices(), getSliceId()); } // Get functions @@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters { public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; } + public void slice(int slices, int sliceId) { + this.slices = slices; + this.sliceId = sliceId; + } + + public int getSlices() { return slices; } + + public int getSliceId() { return sliceId; } + /** * Set whether or not max buckets per visitor value should be dynamically * increased when using orderdoc and visitors do not return at least half diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 5d07d433f18..f7242695490 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession { params.getDocumentSelection(), bucketIdFactory, 1, - progressToken); + progressToken, + params.getSlices(), + params.getSliceId()); } else { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "parameters specify explicit bucket set " + diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java index 01cdad244a8..fb5f5bd2cfb 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java @@ -77,8 +77,119 @@ public class VisitorIteratorTestCase { } @Test + public void testInvalidSlicing() throws ParseException { + int distBits = 4; + BucketIdFactory idFactory = new BucketIdFactory(); + ProgressToken progress = new ProgressToken(); + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 0, 0); + } + catch (IllegalArgumentException e) { + assertEquals("slices must be positive, but was 0", e.getMessage()); + } + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, 1); + } + catch (IllegalArgumentException e) { + assertEquals("sliceId must be in [0, 1), but was 1", e.getMessage()); + } + + try { + VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, -1); + } + catch (IllegalArgumentException e) { + assertEquals("sliceId must be in [0, 1), but was -1", e.getMessage()); + } + } + + @Test + public void testIgnoredSlicing() throws ParseException { + int distBits = 1; + BucketIdFactory idFactory = new BucketIdFactory(); + ProgressToken progress = new ProgressToken(); + + VisitorIterator iter = VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, 3, 2); + + // Iterator with a single distribution bit ignores slicing. + assertTrue(iter.hasNext()); + assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket()); + assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket()); + assertFalse(iter.hasNext()); + } + + @Test + public void testValidSlicing() throws ParseException { + int distBits = 4; + long buckets = 1 << distBits; + BucketIdFactory idFactory = new BucketIdFactory(); + for (int slices = 1; slices <= 2 * buckets; slices++) { + long bucketsTotal = 0; + for (int sliceId = 0; sliceId < slices; sliceId++) { + ProgressToken progress = new ProgressToken(); + + // docsel will be unknown --> entire bucket range will be covered + VisitorIterator iter = VisitorIterator.createFromDocumentSelection( + "id.group != \"yahoo.com\"", idFactory, distBits, progress, slices, sliceId); + + String context = "slices: " + slices + ", sliceId: " + sliceId; + assertEquals(context, progress.getDistributionBitCount(), distBits); + assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); + + assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId)); + assertEquals(context, progress.getTotalBucketCount(), buckets); + + // First, get+update half of the buckets, marking them as done + long bucketCount = 0; + + // Do buckets in the first half. + while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) { + VisitorIterator.BucketProgress ids = iter.getNext(); + iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET); + ++bucketCount; + ++bucketsTotal; + } + + if (slices + sliceId < buckets) { // Otherwise, we're already done ... + assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount); + // Should be no buckets in limbo at this point + assertFalse(context, progress.hasActive()); + assertFalse(context, progress.hasPending()); + assertFalse(context, iter.isDone()); + assertTrue(context, iter.hasNext()); + assertEquals(context, progress.getFinishedBucketCount(), bucketCount * slices + sliceId); + assertFalse(context, progress.isFinished()); + } + + while (iter.hasNext()) { + VisitorIterator.BucketProgress ids = iter.getNext(); + iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET); + ++bucketCount; + ++bucketsTotal; + } + + assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount); + // Should be no buckets in limbo at this point + assertFalse(context, progress.hasActive()); + assertFalse(context, progress.hasPending()); + assertTrue(context, iter.isDone()); + assertFalse(context, iter.hasNext()); + assertEquals(context, progress.getFinishedBucketCount(), buckets); + assertTrue(context, progress.isFinished()); + } + assertEquals("slices: " + slices, buckets, bucketsTotal); + } + } + + @Test public void testProgressSerializationRange() throws ParseException { int distBits = 4; + int buckets = 1 << distBits; BucketIdFactory idFactory = new BucketIdFactory(); ProgressToken progress = new ProgressToken(); @@ -91,11 +202,11 @@ public class VisitorIteratorTestCase { assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); assertEquals(progress.getFinishedBucketCount(), 0); - assertEquals(progress.getTotalBucketCount(), 1 << distBits); + assertEquals(progress.getTotalBucketCount(), buckets); // First, get+update half of the buckets, marking them as done long bucketCount = 0; - long bucketStop = 1 << (distBits - 1); + long bucketStop = buckets / 2; while (iter.hasNext() && bucketCount != bucketStop) { VisitorIterator.BucketProgress ids = iter.getNext(); @@ -119,7 +230,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(desired.toString(), progress.toString()); @@ -132,7 +243,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -154,21 +265,21 @@ public class VisitorIteratorTestCase { // Now fetch a subset of the remaining buckets without finishing them, // keeping some in the active set and some in pending - int pendingTotal = 1 << (distBits - 3); - int activeTotal = 1 << (distBits - 3); - Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>(); + int pendingTotal = buckets / 8; + int activeTotal = buckets / 8; + Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>(); // Pre-fetch, since otherwise we'd reuse pending buckets for (int i = 0; i < pendingTotal + activeTotal; ++i) { - buckets.add(iter.getNext()); + trackedBuckets.add(iter.getNext()); } for (int i = 0; i < pendingTotal + activeTotal; ++i) { - VisitorIterator.BucketProgress idTemp = buckets.get(i); + VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i); if (i < activeTotal) { // Make them 50% done iter.update(idTemp.getSuperbucket(), - new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits))); + new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets))); } // else: leave hanging as active } @@ -186,7 +297,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal); @@ -206,7 +317,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -225,7 +336,7 @@ public class VisitorIteratorTestCase { // Finish all the active buckets for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) { - iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); + iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); ++bucketCount; } @@ -246,16 +357,16 @@ public class VisitorIteratorTestCase { assertFalse(iter.hasNext()); assertTrue(progress.isFinished()); // Cumulative number of finished buckets must match 2^distbits - assertEquals(bucketCount, 1 << distBits); + assertEquals(bucketCount, buckets); StringBuilder finished = new StringBuilder(); finished.append("VDS bucket progress file (100.0% completed)\n"); finished.append(distBits); finished.append('\n'); - finished.append(1 << distBits); // Cursor + finished.append(buckets); // Cursor finished.append('\n'); - finished.append(1 << distBits); // Finished + finished.append(buckets); // Finished finished.append('\n'); - finished.append(1 << distBits); // Total + finished.append(buckets); // Total finished.append('\n'); assertEquals(progress.toString(), finished.toString()); diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 4b7d05928c0..e7441acc203 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -376,6 +376,13 @@ public class Flags { "Takes effect at redeploy", ZONE_ID, APPLICATION_ID); + public static final UnboundStringFlag JDK_VERSION = defineStringFlag( + "jdk-version", "11", + List.of("hmusum"), "2021-10-25", "2021-11-25", + "JDK version to use inside containers", + "Takes effect on restart of Docker container", + APPLICATION_ID); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners, String createdAt, String expiresAt, String description, diff --git a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java index 2c0614805a9..88e2393904e 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java @@ -218,7 +218,6 @@ public class PermanentFlags { "Takes effect immediately", ZONE_ID, APPLICATION_ID); - private PermanentFlags() {} private static UnboundBooleanFlag defineFeatureFlag( diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java index 92cc35fc354..962e6b32947 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.HttpRoute; diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java index 50af29f92aa..91810b50778 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import com.yahoo.security.tls.MixedMode; diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java index e01d278ff38..52f7ad9b56b 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java +++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java index 514eae56fe8..adbc445de1a 100644 --- a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java +++ b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc4.retry; import org.apache.http.HttpResponse; diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java index 58dc25fdf1a..78c413fba56 100644 --- a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java +++ b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java @@ -1,4 +1,4 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.util.http.hc5; import org.apache.hc.client5.http.HttpRoute; diff --git a/parent/pom.xml b/parent/pom.xml index 01c3e335693..18fad225aad 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -849,7 +849,7 @@ <properties> <antlr.version>3.5.2</antlr.version> <antlr4.version>4.5</antlr4.version> - <apache.httpclient.version>4.5.12</apache.httpclient.version> + <apache.httpclient.version>4.5.13</apache.httpclient.version> <apache.httpcore.version>4.4.13</apache.httpcore.version> <apache.httpclient5.version>5.1</apache.httpclient5.version> <asm.version>9.1</asm.version> diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp index c5f0e60a43a..d2e6ec2716e 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp @@ -1444,14 +1444,17 @@ TEST_F(ConformanceTest, testIterateAlreadyCompleted) spi->destroyIterator(iter.getIteratorId(), context); } -TEST_F(ConformanceTest, testIterateEmptyBucket) +void +ConformanceTest::test_iterate_empty_or_missing_bucket(bool bucket_exists) { document::TestDocMan testDocMan; _factory->clear(); PersistenceProviderUP spi(getSpi(*_factory, testDocMan)); Context context(Priority(0), Trace::TraceLevel(0)); Bucket b(makeSpiBucket(BucketId(8, 0x1))); - spi->createBucket(b, context); + if (bucket_exists) { + spi->createBucket(b, context); + } Selection sel(createSelection("")); CreateIteratorResult iter(createIterator(*spi, b, sel)); @@ -1464,6 +1467,16 @@ TEST_F(ConformanceTest, testIterateEmptyBucket) spi->destroyIterator(iter.getIteratorId(), context); } +TEST_F(ConformanceTest, test_iterate_empty_bucket) +{ + test_iterate_empty_or_missing_bucket(true); +} + +TEST_F(ConformanceTest, test_iterate_missing_bucket) +{ + test_iterate_empty_or_missing_bucket(false); +} + TEST_F(ConformanceTest, testDeleteBucket) { document::TestDocMan testDocMan; @@ -2269,6 +2282,36 @@ TEST_F(ConformanceTest, resource_usage) EXPECT_EQ(0.4, resource_usage_listener.get_usage().get_memory_usage()); } +void +ConformanceTest::test_empty_bucket_info(bool bucket_exists) +{ + document::TestDocMan testDocMan; + _factory->clear(); + PersistenceProviderUP spi(getSpi(*_factory, testDocMan)); + Context context(Priority(0), Trace::TraceLevel(0)); + Bucket bucket(makeSpiBucket(BucketId(8, 0x01))); + if (bucket_exists) { + spi->createBucket(bucket, context); + } + auto info_result = spi->getBucketInfo(bucket); + EXPECT_TRUE(!info_result.hasError()); + EXPECT_EQ(0u, info_result.getBucketInfo().getChecksum().getValue()); + EXPECT_EQ(0u, info_result.getBucketInfo().getEntryCount()); + EXPECT_EQ(0u, info_result.getBucketInfo().getDocumentCount()); + EXPECT_TRUE(info_result.getBucketInfo().isReady()); + EXPECT_FALSE(info_result.getBucketInfo().isActive()); +} + +TEST_F(ConformanceTest, test_empty_bucket_gives_empty_bucket_info) +{ + test_empty_bucket_info(true); +} + +TEST_F(ConformanceTest, test_missing_bucket_gives_empty_bucket_info) +{ + test_empty_bucket_info(false); +} + TEST_F(ConformanceTest, detectAndTestOptionalBehavior) { // Report if implementation supports setting bucket size info. diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.h b/persistence/src/vespa/persistence/conformancetest/conformancetest.h index a55461bca80..2ee2526c2dd 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.h +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.h @@ -151,6 +151,10 @@ protected: const Bucket& target, document::TestDocMan& testDocMan); + void test_iterate_empty_or_missing_bucket(bool bucket_exists); + + void test_empty_bucket_info(bool bucket_exists); + ConformanceTest(); ConformanceTest(const std::string &docType); }; diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6e4f38fe564..71c12ab0b2f 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -34,12 +34,12 @@ BucketContent::BucketContent() _inUse(false), _outdatedInfo(true), _active(false) -{ } +{} + BucketContent::~BucketContent() = default; uint32_t -BucketContent::computeEntryChecksum(const BucketEntry& e) const -{ +BucketContent::computeEntryChecksum(const BucketEntry &e) const { vespalib::crc_32_type checksummer; uint64_t ts(e.entry->getTimestamp()); @@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const } BucketChecksum -BucketContent::updateRollingChecksum(uint32_t entryChecksum) -{ +BucketContent::updateRollingChecksum(uint32_t entryChecksum) { uint32_t checksum = _info.getChecksum(); checksum ^= entryChecksum; if (checksum == 0) { @@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum) return BucketChecksum(checksum); } -const BucketInfo& -BucketContent::getBucketInfo() const -{ +const BucketInfo & +BucketContent::getBucketInfo() const { if (!_outdatedInfo) { return _info; } @@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const uint32_t totalSize = 0; uint32_t checksum = 0; - for (const BucketEntry & bucketEntry : _entries) { - const DocEntry& entry(*bucketEntry.entry); - const GlobalId& gid(bucketEntry.gid); + for (const BucketEntry &bucketEntry: _entries) { + const DocEntry &entry(*bucketEntry.entry); + const GlobalId &gid(bucketEntry.gid); GidMapType::const_iterator gidIt(_gidMap.find(gid)); assert(gidIt != _gidMap.end()); @@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const namespace { struct TimestampLess { - bool operator()(const BucketEntry &bucketEntry, Timestamp t) - { return bucketEntry.entry->getTimestamp() < t; } - bool operator()(Timestamp t, const BucketEntry &bucketEntry) - { return t < bucketEntry.entry->getTimestamp(); } + bool operator()(const BucketEntry &bucketEntry, Timestamp t) { + return bucketEntry.entry->getTimestamp() < t; + } + + bool operator()(Timestamp t, const BucketEntry &bucketEntry) { + return t < bucketEntry.entry->getTimestamp(); + } }; } // namespace bool -BucketContent::hasTimestamp(Timestamp t) const -{ +BucketContent::hasTimestamp(Timestamp t) const { if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { return false; } @@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const */ void -BucketContent::insert(DocEntry::SP e) -{ +BucketContent::insert(DocEntry::SP e) { LOG(spam, "insert(%s)", e->toString().c_str()); - const DocumentId* docId(e->getDocumentId()); + const DocumentId *docId(e->getDocumentId()); assert(docId != 0); GlobalId gid(docId->getGlobalId()); GidMapType::iterator gidIt(_gidMap.find(gid)); @@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e) _entries.back().entry->getTimestamp() < e->getTimestamp()) { _entries.push_back(BucketEntry(e, gid)); } else { - std::vector<BucketEntry>::iterator it = - lower_bound(_entries.begin(), - _entries.end(), - e->getTimestamp(), - TimestampLess()); + auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess()); if (it != _entries.end()) { if (it->entry->getTimestamp() == e->getTimestamp()) { if (*it->entry.get() == *e) { - LOG(debug, "Ignoring duplicate put entry %s", - e->toString().c_str()); + LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str()); return; } else { - LOG(error, "Entry %s was already present." - "Was trying to insert %s.", - it->entry->toString().c_str(), - e->toString().c_str()); + LOG(error, "Entry %s was already present. Was trying to insert %s.", + it->entry->toString().c_str(), e->toString().c_str()); LOG_ABORT("should not reach here"); } } @@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e) // newer versions of a document etc. by XORing away old checksum. gidIt->second = e; } else { - LOG(spam, - "Newly inserted entry %s was older than existing entry %s; " - "not updating GID mapping", - e->toString().c_str(), - gidIt->second->toString().c_str()); + LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping", + e->toString().c_str(), gidIt->second->toString().c_str()); } _outdatedInfo = true; } else { @@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e) _info.getActive()); } - LOG(spam, - "After cheap bucketinfo update, state is %s (inserted %s)", - _info.toString().c_str(), - e->toString().c_str()); + LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)", + _info.toString().c_str(), e->toString().c_str()); } } @@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e) } DocEntry::SP -BucketContent::getEntry(const DocumentId& did) const -{ - GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); +BucketContent::getEntry(const DocumentId &did) const { + auto it(_gidMap.find(did.getGlobalId())); if (it != _gidMap.end()) { return it->second; } @@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const } DocEntry::SP -BucketContent::getEntry(Timestamp t) const -{ - std::vector<BucketEntry>::const_iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::getEntry(Timestamp t) const { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter == _entries.end() || iter->entry->getTimestamp() != t) { return DocEntry::SP(); @@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const } void -BucketContent::eraseEntry(Timestamp t) -{ - std::vector<BucketEntry>::iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::eraseEntry(Timestamp t) { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter != _entries.end() && iter->entry->getTimestamp() == t) { assert(iter->entry->getDocumentId() != 0); - GidMapType::iterator gidIt( - _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); + GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId()); assert(gidIt != _gidMap.end()); _entries.erase(iter); if (gidIt->second->getTimestamp() == t) { @@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t) } } -DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo) +DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo) : _initialized(false), _repo(repo), _content(), @@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen DummyPersistence::~DummyPersistence() = default; document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) -{ +DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { document::select::Node::UP ret; try { document::select::Parser parser(*_repo, document::BucketIdFactory()); ret = parser.parse(documentSelection); - } catch (document::select::ParsingFailedException& e) { + } catch (document::select::ParsingFailedException &e) { return document::select::Node::UP(); } if (ret->isLeafNode() && !allowLeaf) { @@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a } Result -DummyPersistence::initialize() -{ +DummyPersistence::initialize() { assert(!_initialized); _initialized = true; return Result(); } #define DUMMYPERSISTENCE_VERIFY_INITIALIZED \ - if (!_initialized) throw vespalib::IllegalStateException( \ - "initialize() must always be called first in order to " \ - "trigger lazy initialization.", VESPA_STRLOC) - + if (!_initialized) { \ + LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \ + abort(); \ + } BucketIdListResult DummyPersistence::listBuckets(BucketSpace bucketSpace) const @@ -414,7 +393,8 @@ DummyPersistence::getBucketInfo(const Bucket& b) const if (!bc.get()) { LOG(debug, "getBucketInfo(%s) : (bucket not found)", b.toString().c_str()); - return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); + BucketInfo info(BucketChecksum(0), 0, 0, 0, 0); + return BucketInfoResult(info); } BucketInfo info((*bc)->getBucketInfo()); @@ -559,9 +539,6 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection } } BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared)); - if (!bc.get()) { - return CreateIteratorResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); - } Iterator* it; IteratorId id; @@ -577,6 +554,10 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection } // Memory pointed to by 'it' should now be valid from here on out + if (!bc.get()) { + // Bucket not found. + return CreateIteratorResult(id); + } it->_fieldSet = std::move(fs); const BucketContent::GidMapType& gidMap((*bc)->_gidMap); @@ -648,7 +629,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket, LockMode::Shared)); if (!bc.get()) { ctx.trace(9, "finished iterate(); bucket not found"); - return IterateResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); + return IterateResult(std::vector<DocEntry::UP>(), true); } LOG(debug, "Iterator %" PRIu64 " acquired bucket lock", uint64_t(id)); @@ -714,8 +695,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) return Result(); } -Result -DummyPersistence::createBucket(const Bucket& b, Context&) +void +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -727,11 +708,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); LOG(debug, "%s already existed", b.toString().c_str()); } - return Result(); + onComplete->onComplete(std::make_unique<Result>()); } void -DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 99d6ba717b7..a25bf6b8a8e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,8 +168,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket&, Context&) override; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e287bdc5252..3b59f20ca96 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider { public: Result initialize() override { return Result(); }; - Result createBucket(const Bucket&, Context&) override { return Result(); } Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h index 02c626ea23e..7b04498205d 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -19,4 +19,9 @@ private: const ResultHandler *_resulthandler; }; +class NoopOperationComplete : public OperationComplete { + void onComplete(std::unique_ptr<spi::Result>) noexcept override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 3ea476c33fc..31db08a6f4f 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat } Result +PersistenceProvider::createBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + createBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 83eb042d855..269175f7d26 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -58,6 +58,7 @@ struct PersistenceProvider virtual ~PersistenceProvider(); // TODO Move to utility class for use in tests only + Result createBucket(const Bucket&, Context&); Result deleteBucket(const Bucket&, Context&); Result put(const Bucket&, Timestamp, DocumentSP, Context&); Result setActiveState(const Bucket&, BucketInfo::ActiveState); @@ -336,14 +337,14 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual Result createBucket(const Bucket&, Context&) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * Deletes the given bucket and all entries contained in that bucket. * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 114292d055d..2e1fc74037c 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) } -Result -PersistenceEngine::createBucket(const Bucket &b, Context &) +void +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleCreateBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleCreateBucket(feedtoken::make(transportContext), b); + } else { + handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } void -PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 94331ac2cd6..fe564d01459 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,8 +114,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket &bucketId, Context &) override ; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index d51485df38d..701e8a80d3a 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -43,6 +43,7 @@ public: throw std::runtime_error(_fail); } } + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { } void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; @@ -85,8 +86,8 @@ public: ~ApplyBucketDiffStateTestBase(); - std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); + std::shared_ptr<ApplyBucketDiffState> make_state() { + return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index b3bd1c6a253..02b43a32df3 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -24,7 +24,7 @@ #define CHECK_ERROR_ASYNC(className, failType, onError) \ { \ - Guard guard(_lock); \ + Guard guard(_lock); \ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ return; \ @@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const return _spi.listBuckets(bucketSpace); } -spi::Result -PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { LOG_SPI("createBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET); - return _spi.createBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); + return _spi.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketInfoResult @@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, void PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, - spi::OperationComplete::UP operationComplete) + spi::OperationComplete::UP operationComplete) noexcept { LOG_SPI("deleteBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index c6628814dba..cfc7002a643 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; @@ -111,7 +111,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 07d2b24d536..a3f0182ba30 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,13 +62,13 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_createBucketInvocations; - return PersistenceProviderWrapper::createBucket(bucket, ctx); + PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void - deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { + deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_deleteBucketInvocations; PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } diff --git a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp index ef71f0ae5f0..588b390cd5f 100644 --- a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp +++ b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp @@ -84,10 +84,10 @@ TEST_F(SanityCheckedDeleteTest, differing_document_sizes_not_considered_out_of_s c.top.sendDown(delete_cmd); c.top.waitForMessages(1, MSG_WAIT_TIME); - // Bucket should now well and truly be gone. Will trigger a getBucketInfo error response. - spi::BucketInfoResult info_post_delete( - _node->getPersistenceProvider().getBucketInfo(spiBucket)); - ASSERT_TRUE(info_post_delete.hasError()) << info_post_delete.getErrorMessage(); + auto reply = c.top.getAndRemoveMessage(api::MessageType::DELETEBUCKET_REPLY); + auto delete_reply = std::dynamic_pointer_cast<api::DeleteBucketReply>(reply); + ASSERT_TRUE(delete_reply); + ASSERT_TRUE(delete_reply->getResult().success()); } } // namespace storage diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 017b8ce2b92..ed50730d79f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -1440,6 +1438,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } + if (GetParam()) { + handler.drain_async_writes(); + } ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index ad153c41aef..556760b347e 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -12,6 +12,14 @@ using vespalib::RetainGuard; namespace storage { +class ApplyBucketDiffState::Deleter { +public: + void operator()(ApplyBucketDiffState *raw_state) const noexcept { + std::unique_ptr<ApplyBucketDiffState> state(raw_state); + raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state)); + } +}; + ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), @@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke _delayed_reply = std::move(delayed_reply); } +std::shared_ptr<ApplyBucketDiffState> +ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +{ + std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard))); + return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter()); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 39f60156e66..7157c69191b 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -24,6 +24,7 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { + class Deleter; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; vespalib::string _fail_message; @@ -35,8 +36,9 @@ class ApplyBucketDiffState { MessageSender* _sender; vespalib::RetainGuard _retain_guard; -public: ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); +public: + static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -46,6 +48,7 @@ public: std::future<vespalib::string> get_future(); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); + const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 47b5e4f5f27..bc6e67578c0 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -5,6 +5,7 @@ #include "testandsethelper.h" #include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> @@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + } + spi::Bucket bucket(cmd.getBucket()); + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + tracker->sendReply(); + }); + + if (cmd.getActive()) { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } else { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } + + return tracker; +} + +MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.deleteBuckets); diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4f5c242570c..db5a77bfb59 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -30,6 +30,7 @@ public: MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h index e05991ad9e3..b3386c591e6 100644 --- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h +++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h @@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; } namespace storage { +class ApplyBucketDiffState; + /* * Interface class for syncing bucket info during merge. */ @@ -13,6 +15,7 @@ class MergeBucketInfoSyncer { public: virtual ~MergeBucketInfoSyncer() = default; virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0; + virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0; }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 77e7762ec9a..c9ba43458b1 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -6,13 +6,13 @@ #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> -#include <future> #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); @@ -24,6 +24,7 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, uint32_t commonMergeChainOptimalizationMinimumSize, bool async_apply_bucket_diff) @@ -34,7 +35,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff) + _async_apply_bucket_diff(async_apply_bucket_diff), + _executor(executor) { } @@ -51,20 +53,6 @@ constexpr int getDeleteFlag() { return 2; } -/** - * Throws std::runtime_error if result has an error. - */ -void -checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op) -{ - if (result.hasError()) { - vespalib::asciistream ss; - ss << "Failed " << op << " in " << bucket << ": " << result.toString(); - throw std::runtime_error(ss.str()); - } -} - - class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; @@ -663,25 +651,28 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { - void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) - { - for (const auto& entry : status.diff) { - uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); - if ((entry_has_mask == 0u) || - (constrictHasMask && (entry_has_mask != hasMask))) { - continue; - } - cmd.getDiff().emplace_back(entry); - if (constrictHasMask) { - cmd.getDiff().back()._entry._hasMask = newHasMask; - } else { - cmd.getDiff().back()._entry._hasMask = entry_has_mask; - } + +void +findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) +{ + for (const auto& entry : status.diff) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { + continue; + } + cmd.getDiff().emplace_back(entry); + if (constrictHasMask) { + cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } +} + api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, spi::Context& context, @@ -898,7 +889,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); @@ -938,141 +930,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP namespace { - uint8_t findOwnIndex( - const std::vector<api::MergeBucketCommand::Node>& nodeList, - uint16_t us) - { - for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { - if (nodeList[i].index == us) return i; - } - throw vespalib::IllegalStateException( - "Got GetBucketDiff cmd on node not in nodelist in command", - VESPA_STRLOC); +uint8_t findOwnIndex( + const std::vector<api::MergeBucketCommand::Node>& nodeList, + uint16_t us) +{ + for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { + if (nodeList[i].index == us) return i; } + throw vespalib::IllegalStateException( + "Got GetBucketDiff cmd on node not in nodelist in command", + VESPA_STRLOC); +} - struct DiffEntryTimestampOrder - : public std::binary_function<api::GetBucketDiffCommand::Entry, - api::GetBucketDiffCommand::Entry, bool> - { - bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const { - return (x._timestamp < y._timestamp); - } - }; - - /** - * Merges list A and list B together and puts the result in result. - * Result is swapped in as last step to keep function exception safe. Thus - * result can be listA or listB if wanted. - * - * listA and listB are assumed to be in the order found in the slotfile, or - * in the order given by a previous call to this function. (In both cases - * this will be sorted by timestamp) - * - * @return false if any suspect entries was found. - */ - bool mergeLists( - const std::vector<api::GetBucketDiffCommand::Entry>& listA, - const std::vector<api::GetBucketDiffCommand::Entry>& listB, - std::vector<api::GetBucketDiffCommand::Entry>& finalResult) - { - bool suspect = false; - std::vector<api::GetBucketDiffCommand::Entry> result; - uint32_t i = 0, j = 0; - while (i < listA.size() && j < listB.size()) { - const api::GetBucketDiffCommand::Entry& a(listA[i]); - const api::GetBucketDiffCommand::Entry& b(listB[j]); - if (a._timestamp < b._timestamp) { - result.push_back(a); - ++i; - } else if (a._timestamp > b._timestamp) { - result.push_back(b); - ++j; - } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. - if (!(a == b)) { - if (a._gid == b._gid && a._flags == b._flags) { - if ((a._flags & getDeleteFlag()) != 0 && - (b._flags & getDeleteFlag()) != 0) - { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. - LOG(debug, "Found entries with equal timestamps of " - "the same gid who both are remove " - "entries: %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } else { - LOG(error, "Found entries with equal timestamps of " - "the same gid. This is likely same " - "document where size of document varies:" - " %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; - } else if ((a._flags & getDeleteFlag()) - != (b._flags & getDeleteFlag())) +/** + * Merges list A and list B together and puts the result in result. + * Result is swapped in as last step to keep function exception safe. Thus + * result can be listA or listB if wanted. + * + * listA and listB are assumed to be in the order found in the slotfile, or + * in the order given by a previous call to this function. (In both cases + * this will be sorted by timestamp) + * + * @return false if any suspect entries was found. + */ +bool mergeLists( + const std::vector<api::GetBucketDiffCommand::Entry>& listA, + const std::vector<api::GetBucketDiffCommand::Entry>& listB, + std::vector<api::GetBucketDiffCommand::Entry>& finalResult) +{ + bool suspect = false; + std::vector<api::GetBucketDiffCommand::Entry> result; + uint32_t i = 0, j = 0; + while (i < listA.size() && j < listB.size()) { + const api::GetBucketDiffCommand::Entry& a(listA[i]); + const api::GetBucketDiffCommand::Entry& b(listB[j]); + if (a._timestamp < b._timestamp) { + result.push_back(a); + ++i; + } else if (a._timestamp > b._timestamp) { + result.push_back(b); + ++j; + } else { + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. + if (!(a == b)) { + if (a._gid == b._gid && a._flags == b._flags) { + if ((a._flags & getDeleteFlag()) != 0 && + (b._flags & getDeleteFlag()) != 0) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. - const api::GetBucketDiffCommand::Entry& deletedEntry( - (a._flags & getDeleteFlag()) != 0 ? a : b); - result.push_back(deletedEntry); - LOG(debug, - "Found put and remove on same timestamp. Keeping" - "remove as it is likely caused by remove with " - "copies unavailable at the time: %s, %s.", - a.toString().c_str(), b.toString().c_str()); + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. + LOG(debug, "Found entries with equal timestamps of " + "the same gid who both are remove " + "entries: %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } else { - LOG(error, "Found entries with equal timestamps that " - "weren't the same entry: %s, %s.", - a.toString().c_str(), b.toString().c_str()); - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; + LOG(error, "Found entries with equal timestamps of " + "the same gid. This is likely same " + "document where size of document varies:" + " %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } + result.push_back(a); + result.back()._hasMask |= b._hasMask; + suspect = true; + } else if ((a._flags & getDeleteFlag()) + != (b._flags & getDeleteFlag())) + { + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. + const api::GetBucketDiffCommand::Entry& deletedEntry( + (a._flags & getDeleteFlag()) != 0 ? a : b); + result.push_back(deletedEntry); + LOG(debug, + "Found put and remove on same timestamp. Keeping" + "remove as it is likely caused by remove with " + "copies unavailable at the time: %s, %s.", + a.toString().c_str(), b.toString().c_str()); } else { + LOG(error, "Found entries with equal timestamps that " + "weren't the same entry: %s, %s.", + a.toString().c_str(), b.toString().c_str()); result.push_back(a); result.back()._hasMask |= b._hasMask; + suspect = true; } - ++i; - ++j; + } else { + result.push_back(a); + result.back()._hasMask |= b._hasMask; } + ++i; + ++j; } - if (i < listA.size()) { - assert(j >= listB.size()); - for (uint32_t n = listA.size(); i<n; ++i) { - result.push_back(listA[i]); - } - } else if (j < listB.size()) { - assert(i >= listA.size()); - for (uint32_t n = listB.size(); j<n; ++j) { - result.push_back(listB[j]); - } + } + if (i < listA.size()) { + assert(j >= listB.size()); + for (uint32_t n = listA.size(); i<n; ++i) { + result.push_back(listA[i]); + } + } else if (j < listB.size()) { + assert(i >= listA.size()); + for (uint32_t n = listB.size(); j<n; ++j) { + result.push_back(listB[j]); } - result.swap(finalResult); - return !suspect; } + result.swap(finalResult); + return !suspect; +} } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const -{ +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + return handleGetBucketDiffStage2(cmd, std::move(tracker)); +} +MessageTracker::UP +MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const +{ + spi::Bucket bucket(cmd.getBucket()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1249,7 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1357,7 +1344,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1452,4 +1439,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); } +void +MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const +{ + auto bucket_id = state->get_bucket().getBucketId(); + _executor.execute(bucket_id.getId(), [state = std::move(state)]() { }); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17cfb847d2c..4daec4c0689 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -22,6 +22,8 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> +namespace vespalib { class ISequencedTaskExecutor; } + namespace storage { namespace spi { @@ -45,6 +47,7 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); @@ -67,6 +70,7 @@ public: spi::Context& context, std::shared_ptr<ApplyBucketDiffState> async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; @@ -85,7 +89,9 @@ private: const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic<bool> _async_apply_bucket_diff; + vespalib::ISequencedTaskExecutor& _executor; + MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index aa1a9c136fd..d03c9a6d111 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize, cfg.asyncApplyBucketDiff), @@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::REVERT_ID: return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ce424f0ce83..9ccd901744b 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& return checkResult(_impl.destroyIterator(iteratorId, context)); } -spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { - return checkResult(_impl.createBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index c9d2411e372..14d20cf8a52 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -49,7 +49,6 @@ public: spi::Context &context) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -63,7 +62,8 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4fe207e2e5..9a7a451b906 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t } MessageTracker::UP -SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - -MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.visit); diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 2cfbc7016c0..009fd6dff52 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -22,7 +22,6 @@ public: SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index e5667c7b392..63fc7854a52 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -162,6 +162,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; private static final String STREAM = "stream"; + private static final String SLICES = "slices"; + private static final String SLICE_ID = "sliceId"; private final Clock clock; private final Duration handlerTimeout; @@ -985,12 +987,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + Optional<Integer> slices = getProperty(request, SLICES, integerParser); + Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser); + VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); parameters.visitInconsistentBuckets(true); parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); + if (slices.isPresent() && sliceId.isPresent()) + parameters.slice(slices.get(), sliceId.get()); + else if (slices.isPresent() != sliceId.isPresent()) + throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set"); return parameters; } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index b23533a720e..1629777f837 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -258,6 +258,8 @@ public class DocumentV1ApiTest { assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); assertEquals(6000, parameters.getSessionTimeoutMs()); + assertEquals(4, parameters.getSlices()); + assertEquals(1, parameters.getSliceId()); // Put some documents in the response parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); @@ -269,7 +271,7 @@ public class DocumentV1ApiTest { parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK"); }); response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + - "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true"); + "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true&slices=4&sliceId=1"); assertSameJson("{" + " \"pathId\": \"/document/v1\"," + " \"documents\": [" + |