summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java28
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java2
-rw-r--r--documentapi/abi-spec.json6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java20
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java66
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java12
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java4
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java145
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java7
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java1
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java2
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java2
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java2
-rw-r--r--http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java2
-rw-r--r--http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java2
-rw-r--r--parent/pom.xml2
-rw-r--r--persistence/src/vespa/persistence/conformancetest/conformancetest.cpp47
-rw-r--r--persistence/src/vespa/persistence/conformancetest/conformancetest.h4
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp125
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h4
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h1
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h5
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h4
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp5
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h4
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp6
-rw-r--r--storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp8
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h5
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp292
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h1
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java9
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java4
46 files changed, 637 insertions, 330 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
index 854780dd336..1ddb50b0a6b 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainer.java
@@ -122,12 +122,7 @@ public class ResourceMeterMaintainer extends ControllerMaintainer {
private void reportResourceSnapshots(Collection<ResourceSnapshot> resourceSnapshots) {
meteringClient.consume(resourceSnapshots);
- metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap()));
- // total metered resource usage, for alerting on drastic changes
- metric.set(METERING_TOTAL_REPORTED,
- resourceSnapshots.stream()
- .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(),
- metric.createContext(Collections.emptyMap()));
+ updateMetrics(resourceSnapshots);
try (var lock = curator.lockMeteringRefreshTime()) {
if (needsRefresh(curator.readMeteringRefreshTime())) {
@@ -194,4 +189,25 @@ public class ResourceMeterMaintainer extends ControllerMaintainer {
double cost = new NodeResources(allocation.getCpuCores(), allocation.getMemoryGb(), allocation.getDiskGb(), 0).cost();
return Math.round(cost * 100.0 / costDivisor) / 100.0;
}
+
+ private void updateMetrics(Collection<ResourceSnapshot> resourceSnapshots) {
+ metric.set(METERING_LAST_REPORTED, clock.millis() / 1000, metric.createContext(Collections.emptyMap()));
+ // total metered resource usage, for alerting on drastic changes
+ metric.set(METERING_TOTAL_REPORTED,
+ resourceSnapshots.stream()
+ .mapToDouble(r -> r.getCpuCores() + r.getMemoryGb() + r.getDiskGb()).sum(),
+ metric.createContext(Collections.emptyMap()));
+
+ resourceSnapshots.forEach(snapshot -> {
+ var context = metric.createContext(Map.of(
+ "tenant", snapshot.getApplicationId().tenant().value(),
+ "applicationId", snapshot.getApplicationId().toFullString(),
+ "zoneId", snapshot.getZoneId()
+ ));
+ metric.set("metering.vcpu", snapshot.getCpuCores(), context);
+ metric.set("metering.memoryGB", snapshot.getMemoryGb(), context);
+ metric.set("metering.diskGB", snapshot.getDiskGb(), context);
+ });
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
index a255a6c37d8..5b923c2ee59 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ResourceMeterMaintainerTest.java
@@ -98,6 +98,8 @@ public class ResourceMeterMaintainerTest {
assertEquals(tester.clock().millis()/1000, metrics.getMetric("metering_last_reported"));
assertEquals(2224.0d, (Double) metrics.getMetric("metering_total_reported"), Double.MIN_VALUE);
+ assertEquals(24d, (Double) metrics.getMetric(context -> "tenant1".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE);
+ assertEquals(40d, (Double) metrics.getMetric(context -> "tenant2".equals(context.get("tenant")), "metering.vcpu").get(), Double.MIN_VALUE);
// Metering is not refreshed
assertFalse(snapshotConsumer.isRefreshed());
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 9cc4f60ed7e..78a58f24a65 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -799,7 +799,7 @@
"public"
],
"methods": [
- "public void <init>(int, com.yahoo.documentapi.ProgressToken)",
+ "public void <init>(int, com.yahoo.documentapi.ProgressToken, int, int)",
"protected boolean isLosslessResetPossible()",
"public boolean hasNext()",
"public boolean shouldYield()",
@@ -851,6 +851,7 @@
"public void setDistributionBitCount(int)",
"public boolean visitsAllBuckets()",
"public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken)",
+ "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken, int, int)",
"public static com.yahoo.documentapi.VisitorIterator createFromExplicitBucketSet(java.util.Set, int, com.yahoo.documentapi.ProgressToken)"
],
"fields": []
@@ -931,6 +932,9 @@
"public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()",
"public boolean skipBucketsOnFatalErrors()",
"public void skipBucketsOnFatalErrors(boolean)",
+ "public void slice(int, int)",
+ "public int getSlices()",
+ "public int getSliceId()",
"public void setDynamicallyIncreaseMaxBucketsPerVisitor(boolean)",
"public void setDynamicMaxBucketsIncreaseFactor(float)",
"public java.lang.String toString()"
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
index 9957898e459..4a77d30ec92 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
@@ -387,6 +387,18 @@ public class ProgressToken {
}
/**
+ * Marks the current bucket as finished and advances the bucket cursor;
+ * throws instead if the current bucket is already {@link #addBucket added}.
+ */
+ void skipCurrentBucket() {
+ if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId())))
+ throw new IllegalStateException("Current bucket was already added to the explicit bucket set");
+
+ ++finishedBucketCount;
+ ++bucketCursor;
+ }
+
+ /**
* Directly generate a bucket Id key for the <code>n</code>th bucket in
* reverse sorted order.
*
@@ -428,6 +440,14 @@ public class ProgressToken {
return bucketCursor;
}
+ static BucketId toBucketId(long bucketCursor, int distributionBits) {
+ return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits)));
+ }
+
+ BucketId getCurrentBucketId() {
+ return toBucketId(getBucketCursor(), getDistributionBitCount());
+ }
+
protected void setBucketCursor(long bucketCursor) {
this.bucketCursor = bucketCursor;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index e11bdf7f18c..e15512ca71b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -81,12 +81,24 @@ public class VisitorIterator {
protected static class DistributionRangeBucketSource implements BucketSource {
private boolean flushActive = false;
private int distributionBitCount;
+ private final int slices;
+ private final int sliceId;
// Wouldn't need this if this were a non-static class, but do it for
// the sake of keeping things identical in Java and C++
private ProgressToken progressToken;
public DistributionRangeBucketSource(int distributionBitCount,
- ProgressToken progress) {
+ ProgressToken progress,
+ int slices, int sliceId) {
+ if (slices < 1) {
+ throw new IllegalArgumentException("slices must be positive, but was " + slices);
+ }
+ if (sliceId < 0 || sliceId >= slices) {
+ throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId);
+ }
+
+ this.slices = slices;
+ this.sliceId = sliceId;
progressToken = progress;
// New progress token (could also be empty, in which this is a
@@ -148,6 +160,7 @@ public class VisitorIterator {
}
// Should be all fixed up and good to go
progressToken.setInconsistentState(false);
+ skipToSlice();
}
protected boolean isLosslessResetPossible() {
@@ -203,6 +216,7 @@ public class VisitorIterator {
assert(p.getActiveBucketCount() == 0);
p.clearAllBuckets();
p.setBucketCursor(0);
+ skipToSlice();
return;
}
@@ -292,7 +306,14 @@ public class VisitorIterator {
}
public boolean hasNext() {
- return progressToken.getBucketCursor() < (1L << distributionBitCount);
+ // There is a next bucket iff. there is a bucket no earlier than the cursor which
+ // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're
+ // not yet properly initialised, with a real distribution bit count, we ignore this.
+ long nextBucket = progressToken.getBucketCursor();
+ if (distributionBitCount != 1) {
+ nextBucket += Math.floorMod(sliceId - nextBucket, slices);
+ }
+ return nextBucket < (1L << distributionBitCount);
}
public boolean shouldYield() {
@@ -311,13 +332,27 @@ public class VisitorIterator {
public BucketProgress getNext() {
assert(hasNext()) : "getNext() called with hasNext() == false";
- long currentPosition = progressToken.getBucketCursor();
- long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount);
- ++currentPosition;
- progressToken.setBucketCursor(currentPosition);
- return new BucketProgress(
- new BucketId(ProgressToken.keyToBucketId(key)),
- new BucketId());
+
+ // Create the progress to return for creating visitors, and advance bucket cursor.
+ BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId());
+ progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+
+ // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when
+ // hasNext() turns false, but there are still super buckets left after the current.
+ skipToSlice();
+
+ return progress;
+ }
+
+ // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped
+ // buckets as complete, but only if we've been initialised with a proper distribution bit count.
+ private void skipToSlice() {
+ if (distributionBitCount == 1)
+ return;
+
+ while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) {
+ progressToken.skipCurrentBucket();
+ }
}
public int getDistributionBitCount() {
@@ -732,6 +767,13 @@ public class VisitorIterator {
return bucketSource.visitsAllBuckets();
}
+ public static VisitorIterator createFromDocumentSelection(
+ String documentSelection,
+ BucketIdFactory idFactory,
+ int distributionBitCount,
+ ProgressToken progress) throws ParseException {
+ return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0);
+ }
/**
* Create a new <code>VisitorIterator</code> instance based on the given document
* selection string.
@@ -753,7 +795,9 @@ public class VisitorIterator {
String documentSelection,
BucketIdFactory idFactory,
int distributionBitCount,
- ProgressToken progress) throws ParseException {
+ ProgressToken progress,
+ int slices,
+ int sliceId) throws ParseException {
BucketSelector bucketSel = new BucketSelector(idFactory);
Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection);
BucketSource src;
@@ -763,7 +807,7 @@ public class VisitorIterator {
// bit-based range source
if (rawBuckets == null) {
// Range source
- src = new DistributionRangeBucketSource(distributionBitCount, progress);
+ src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId);
} else {
// Explicit source
src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
index 8b0c8538855..44675d8d2ac 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
@@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters {
private int traceLevel = 0;
private ThrottlePolicy throttlePolicy = null;
private boolean skipBucketsOnFatalErrors = false;
+ private int slices = 1;
+ private int sliceId = 0;
// Advanced parameter, only for internal use.
Set<BucketId> bucketsToVisit = null;
@@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters {
params.getDynamicMaxBucketsIncreaseFactor());
setTraceLevel(params.getTraceLevel());
skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors());
+ slice(params.getSlices(), getSliceId());
}
// Get functions
@@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters {
public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; }
+ public void slice(int slices, int sliceId) {
+ this.slices = slices;
+ this.sliceId = sliceId;
+ }
+
+ public int getSlices() { return slices; }
+
+ public int getSliceId() { return sliceId; }
+
/**
* Set whether or not max buckets per visitor value should be dynamically
* increased when using orderdoc and visitors do not return at least half
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 5d07d433f18..f7242695490 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession {
params.getDocumentSelection(),
bucketIdFactory,
1,
- progressToken);
+ progressToken,
+ params.getSlices(),
+ params.getSliceId());
} else {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "parameters specify explicit bucket set " +
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
index 01cdad244a8..fb5f5bd2cfb 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
@@ -77,8 +77,119 @@ public class VisitorIteratorTestCase {
}
@Test
+ public void testInvalidSlicing() throws ParseException {
+ int distBits = 4;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 0, 0);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("slices must be positive, but was 0", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, 1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was 1", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, -1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was -1", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIgnoredSlicing() throws ParseException {
+ int distBits = 1;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 3, 2);
+
+ // Iterator with a single distribution bit ignores slicing.
+ assertTrue(iter.hasNext());
+ assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket());
+ assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testValidSlicing() throws ParseException {
+ int distBits = 4;
+ long buckets = 1 << distBits;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ for (int slices = 1; slices <= 2 * buckets; slices++) {
+ long bucketsTotal = 0;
+ for (int sliceId = 0; sliceId < slices; sliceId++) {
+ ProgressToken progress = new ProgressToken();
+
+ // docsel will be unknown --> entire bucket range will be covered
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, slices, sliceId);
+
+ String context = "slices: " + slices + ", sliceId: " + sliceId;
+ assertEquals(context, progress.getDistributionBitCount(), distBits);
+ assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
+
+ assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId));
+ assertEquals(context, progress.getTotalBucketCount(), buckets);
+
+ // First, get+update half of the buckets, marking them as done
+ long bucketCount = 0;
+
+ // Do buckets in the first half.
+ while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ if (slices + sliceId < buckets) { // Otherwise, we're already done ...
+ assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertFalse(context, iter.isDone());
+ assertTrue(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), bucketCount * slices + sliceId);
+ assertFalse(context, progress.isFinished());
+ }
+
+ while (iter.hasNext()) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertTrue(context, iter.isDone());
+ assertFalse(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), buckets);
+ assertTrue(context, progress.isFinished());
+ }
+ assertEquals("slices: " + slices, buckets, bucketsTotal);
+ }
+ }
+
+ @Test
public void testProgressSerializationRange() throws ParseException {
int distBits = 4;
+ int buckets = 1 << distBits;
BucketIdFactory idFactory = new BucketIdFactory();
ProgressToken progress = new ProgressToken();
@@ -91,11 +202,11 @@ public class VisitorIteratorTestCase {
assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
assertEquals(progress.getFinishedBucketCount(), 0);
- assertEquals(progress.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progress.getTotalBucketCount(), buckets);
// First, get+update half of the buckets, marking them as done
long bucketCount = 0;
- long bucketStop = 1 << (distBits - 1);
+ long bucketStop = buckets / 2;
while (iter.hasNext() && bucketCount != bucketStop) {
VisitorIterator.BucketProgress ids = iter.getNext();
@@ -119,7 +230,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(desired.toString(), progress.toString());
@@ -132,7 +243,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -154,21 +265,21 @@ public class VisitorIteratorTestCase {
// Now fetch a subset of the remaining buckets without finishing them,
// keeping some in the active set and some in pending
- int pendingTotal = 1 << (distBits - 3);
- int activeTotal = 1 << (distBits - 3);
- Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>();
+ int pendingTotal = buckets / 8;
+ int activeTotal = buckets / 8;
+ Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>();
// Pre-fetch, since otherwise we'd reuse pending buckets
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- buckets.add(iter.getNext());
+ trackedBuckets.add(iter.getNext());
}
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- VisitorIterator.BucketProgress idTemp = buckets.get(i);
+ VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i);
if (i < activeTotal) {
// Make them 50% done
iter.update(idTemp.getSuperbucket(),
- new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits)));
+ new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets)));
}
// else: leave hanging as active
}
@@ -186,7 +297,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal);
@@ -206,7 +317,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -225,7 +336,7 @@ public class VisitorIteratorTestCase {
// Finish all the active buckets
for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) {
- iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
++bucketCount;
}
@@ -246,16 +357,16 @@ public class VisitorIteratorTestCase {
assertFalse(iter.hasNext());
assertTrue(progress.isFinished());
// Cumulative number of finished buckets must match 2^distbits
- assertEquals(bucketCount, 1 << distBits);
+ assertEquals(bucketCount, buckets);
StringBuilder finished = new StringBuilder();
finished.append("VDS bucket progress file (100.0% completed)\n");
finished.append(distBits);
finished.append('\n');
- finished.append(1 << distBits); // Cursor
+ finished.append(buckets); // Cursor
finished.append('\n');
- finished.append(1 << distBits); // Finished
+ finished.append(buckets); // Finished
finished.append('\n');
- finished.append(1 << distBits); // Total
+ finished.append(buckets); // Total
finished.append('\n');
assertEquals(progress.toString(), finished.toString());
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 4b7d05928c0..e7441acc203 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -376,6 +376,13 @@ public class Flags {
"Takes effect at redeploy",
ZONE_ID, APPLICATION_ID);
+ public static final UnboundStringFlag JDK_VERSION = defineStringFlag(
+ "jdk-version", "11",
+ List.of("hmusum"), "2021-10-25", "2021-11-25",
+ "JDK version to use inside containers",
+ "Takes effect on restart of Docker container",
+ APPLICATION_ID);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
index 2c0614805a9..88e2393904e 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/PermanentFlags.java
@@ -218,7 +218,6 @@ public class PermanentFlags {
"Takes effect immediately",
ZONE_ID, APPLICATION_ID);
-
private PermanentFlags() {}
private static UnboundBooleanFlag defineFeatureFlag(
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
index 92cc35fc354..962e6b32947 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlanner.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.HttpRoute;
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
index 50af29f92aa..91810b50778 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaAsyncHttpClientBuilder.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import com.yahoo.security.tls.MixedMode;
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
index e01d278ff38..52f7ad9b56b 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/VespaHttpClientBuilder.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
index 514eae56fe8..adbc445de1a 100644
--- a/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
+++ b/http-utils/src/test/java/ai/vespa/util/http/hc4/retry/DelayedResponseLevelRetryHandlerTest.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc4.retry;
import org.apache.http.HttpResponse;
diff --git a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
index 58dc25fdf1a..78c413fba56 100644
--- a/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
+++ b/http-utils/src/test/java/ai/vespa/util/http/hc5/HttpToHttpsRoutePlannerTest.java
@@ -1,4 +1,4 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.util.http.hc5;
import org.apache.hc.client5.http.HttpRoute;
diff --git a/parent/pom.xml b/parent/pom.xml
index 01c3e335693..18fad225aad 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -849,7 +849,7 @@
<properties>
<antlr.version>3.5.2</antlr.version>
<antlr4.version>4.5</antlr4.version>
- <apache.httpclient.version>4.5.12</apache.httpclient.version>
+ <apache.httpclient.version>4.5.13</apache.httpclient.version>
<apache.httpcore.version>4.4.13</apache.httpcore.version>
<apache.httpclient5.version>5.1</apache.httpclient5.version>
<asm.version>9.1</asm.version>
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
index c5f0e60a43a..d2e6ec2716e 100644
--- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
+++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
@@ -1444,14 +1444,17 @@ TEST_F(ConformanceTest, testIterateAlreadyCompleted)
spi->destroyIterator(iter.getIteratorId(), context);
}
-TEST_F(ConformanceTest, testIterateEmptyBucket)
+void
+ConformanceTest::test_iterate_empty_or_missing_bucket(bool bucket_exists)
{
document::TestDocMan testDocMan;
_factory->clear();
PersistenceProviderUP spi(getSpi(*_factory, testDocMan));
Context context(Priority(0), Trace::TraceLevel(0));
Bucket b(makeSpiBucket(BucketId(8, 0x1)));
- spi->createBucket(b, context);
+ if (bucket_exists) {
+ spi->createBucket(b, context);
+ }
Selection sel(createSelection(""));
CreateIteratorResult iter(createIterator(*spi, b, sel));
@@ -1464,6 +1467,16 @@ TEST_F(ConformanceTest, testIterateEmptyBucket)
spi->destroyIterator(iter.getIteratorId(), context);
}
+TEST_F(ConformanceTest, test_iterate_empty_bucket)
+{
+ test_iterate_empty_or_missing_bucket(true);
+}
+
+TEST_F(ConformanceTest, test_iterate_missing_bucket)
+{
+ test_iterate_empty_or_missing_bucket(false);
+}
+
TEST_F(ConformanceTest, testDeleteBucket)
{
document::TestDocMan testDocMan;
@@ -2269,6 +2282,36 @@ TEST_F(ConformanceTest, resource_usage)
EXPECT_EQ(0.4, resource_usage_listener.get_usage().get_memory_usage());
}
+void
+ConformanceTest::test_empty_bucket_info(bool bucket_exists)
+{
+ document::TestDocMan testDocMan;
+ _factory->clear();
+ PersistenceProviderUP spi(getSpi(*_factory, testDocMan));
+ Context context(Priority(0), Trace::TraceLevel(0));
+ Bucket bucket(makeSpiBucket(BucketId(8, 0x01)));
+ if (bucket_exists) {
+ spi->createBucket(bucket, context);
+ }
+ auto info_result = spi->getBucketInfo(bucket);
+ EXPECT_TRUE(!info_result.hasError());
+ EXPECT_EQ(0u, info_result.getBucketInfo().getChecksum().getValue());
+ EXPECT_EQ(0u, info_result.getBucketInfo().getEntryCount());
+ EXPECT_EQ(0u, info_result.getBucketInfo().getDocumentCount());
+ EXPECT_TRUE(info_result.getBucketInfo().isReady());
+ EXPECT_FALSE(info_result.getBucketInfo().isActive());
+}
+
+TEST_F(ConformanceTest, test_empty_bucket_gives_empty_bucket_info)
+{
+ test_empty_bucket_info(true);
+}
+
+TEST_F(ConformanceTest, test_missing_bucket_gives_empty_bucket_info)
+{
+ test_empty_bucket_info(false);
+}
+
TEST_F(ConformanceTest, detectAndTestOptionalBehavior)
{
// Report if implementation supports setting bucket size info.
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.h b/persistence/src/vespa/persistence/conformancetest/conformancetest.h
index a55461bca80..2ee2526c2dd 100644
--- a/persistence/src/vespa/persistence/conformancetest/conformancetest.h
+++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.h
@@ -151,6 +151,10 @@ protected:
const Bucket& target,
document::TestDocMan& testDocMan);
+ void test_iterate_empty_or_missing_bucket(bool bucket_exists);
+
+ void test_empty_bucket_info(bool bucket_exists);
+
ConformanceTest();
ConformanceTest(const std::string &docType);
};
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 6e4f38fe564..71c12ab0b2f 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -34,12 +34,12 @@ BucketContent::BucketContent()
_inUse(false),
_outdatedInfo(true),
_active(false)
-{ }
+{}
+
BucketContent::~BucketContent() = default;
uint32_t
-BucketContent::computeEntryChecksum(const BucketEntry& e) const
-{
+BucketContent::computeEntryChecksum(const BucketEntry &e) const {
vespalib::crc_32_type checksummer;
uint64_t ts(e.entry->getTimestamp());
@@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const
}
BucketChecksum
-BucketContent::updateRollingChecksum(uint32_t entryChecksum)
-{
+BucketContent::updateRollingChecksum(uint32_t entryChecksum) {
uint32_t checksum = _info.getChecksum();
checksum ^= entryChecksum;
if (checksum == 0) {
@@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum)
return BucketChecksum(checksum);
}
-const BucketInfo&
-BucketContent::getBucketInfo() const
-{
+const BucketInfo &
+BucketContent::getBucketInfo() const {
if (!_outdatedInfo) {
return _info;
}
@@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const
uint32_t totalSize = 0;
uint32_t checksum = 0;
- for (const BucketEntry & bucketEntry : _entries) {
- const DocEntry& entry(*bucketEntry.entry);
- const GlobalId& gid(bucketEntry.gid);
+ for (const BucketEntry &bucketEntry: _entries) {
+ const DocEntry &entry(*bucketEntry.entry);
+ const GlobalId &gid(bucketEntry.gid);
GidMapType::const_iterator gidIt(_gidMap.find(gid));
assert(gidIt != _gidMap.end());
@@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const
namespace {
struct TimestampLess {
- bool operator()(const BucketEntry &bucketEntry, Timestamp t)
- { return bucketEntry.entry->getTimestamp() < t; }
- bool operator()(Timestamp t, const BucketEntry &bucketEntry)
- { return t < bucketEntry.entry->getTimestamp(); }
+ bool operator()(const BucketEntry &bucketEntry, Timestamp t) {
+ return bucketEntry.entry->getTimestamp() < t;
+ }
+
+ bool operator()(Timestamp t, const BucketEntry &bucketEntry) {
+ return t < bucketEntry.entry->getTimestamp();
+ }
};
} // namespace
bool
-BucketContent::hasTimestamp(Timestamp t) const
-{
+BucketContent::hasTimestamp(Timestamp t) const {
if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) {
return false;
}
@@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const
*/
void
-BucketContent::insert(DocEntry::SP e)
-{
+BucketContent::insert(DocEntry::SP e) {
LOG(spam, "insert(%s)", e->toString().c_str());
- const DocumentId* docId(e->getDocumentId());
+ const DocumentId *docId(e->getDocumentId());
assert(docId != 0);
GlobalId gid(docId->getGlobalId());
GidMapType::iterator gidIt(_gidMap.find(gid));
@@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e)
_entries.back().entry->getTimestamp() < e->getTimestamp()) {
_entries.push_back(BucketEntry(e, gid));
} else {
- std::vector<BucketEntry>::iterator it =
- lower_bound(_entries.begin(),
- _entries.end(),
- e->getTimestamp(),
- TimestampLess());
+ auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess());
if (it != _entries.end()) {
if (it->entry->getTimestamp() == e->getTimestamp()) {
if (*it->entry.get() == *e) {
- LOG(debug, "Ignoring duplicate put entry %s",
- e->toString().c_str());
+ LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str());
return;
} else {
- LOG(error, "Entry %s was already present."
- "Was trying to insert %s.",
- it->entry->toString().c_str(),
- e->toString().c_str());
+ LOG(error, "Entry %s was already present. Was trying to insert %s.",
+ it->entry->toString().c_str(), e->toString().c_str());
LOG_ABORT("should not reach here");
}
}
@@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e)
// newer versions of a document etc. by XORing away old checksum.
gidIt->second = e;
} else {
- LOG(spam,
- "Newly inserted entry %s was older than existing entry %s; "
- "not updating GID mapping",
- e->toString().c_str(),
- gidIt->second->toString().c_str());
+ LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping",
+ e->toString().c_str(), gidIt->second->toString().c_str());
}
_outdatedInfo = true;
} else {
@@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e)
_info.getActive());
}
- LOG(spam,
- "After cheap bucketinfo update, state is %s (inserted %s)",
- _info.toString().c_str(),
- e->toString().c_str());
+ LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)",
+ _info.toString().c_str(), e->toString().c_str());
}
}
@@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e)
}
DocEntry::SP
-BucketContent::getEntry(const DocumentId& did) const
-{
- GidMapType::const_iterator it(_gidMap.find(did.getGlobalId()));
+BucketContent::getEntry(const DocumentId &did) const {
+ auto it(_gidMap.find(did.getGlobalId()));
if (it != _gidMap.end()) {
return it->second;
}
@@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const
}
DocEntry::SP
-BucketContent::getEntry(Timestamp t) const
-{
- std::vector<BucketEntry>::const_iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::getEntry(Timestamp t) const {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter == _entries.end() || iter->entry->getTimestamp() != t) {
return DocEntry::SP();
@@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const
}
void
-BucketContent::eraseEntry(Timestamp t)
-{
- std::vector<BucketEntry>::iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::eraseEntry(Timestamp t) {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter != _entries.end() && iter->entry->getTimestamp() == t) {
assert(iter->entry->getDocumentId() != 0);
- GidMapType::iterator gidIt(
- _gidMap.find(iter->entry->getDocumentId()->getGlobalId()));
+ GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId());
assert(gidIt != _gidMap.end());
_entries.erase(iter);
if (gidIt->second->getTimestamp() == t) {
@@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t)
}
}
-DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
: _initialized(false),
_repo(repo),
_content(),
@@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen
DummyPersistence::~DummyPersistence() = default;
document::select::Node::UP
-DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf)
-{
+DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) {
document::select::Node::UP ret;
try {
document::select::Parser parser(*_repo, document::BucketIdFactory());
ret = parser.parse(documentSelection);
- } catch (document::select::ParsingFailedException& e) {
+ } catch (document::select::ParsingFailedException &e) {
return document::select::Node::UP();
}
if (ret->isLeafNode() && !allowLeaf) {
@@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a
}
Result
-DummyPersistence::initialize()
-{
+DummyPersistence::initialize() {
assert(!_initialized);
_initialized = true;
return Result();
}
#define DUMMYPERSISTENCE_VERIFY_INITIALIZED \
- if (!_initialized) throw vespalib::IllegalStateException( \
- "initialize() must always be called first in order to " \
- "trigger lazy initialization.", VESPA_STRLOC)
-
+ if (!_initialized) { \
+ LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \
+ abort(); \
+ }
BucketIdListResult
DummyPersistence::listBuckets(BucketSpace bucketSpace) const
@@ -414,7 +393,8 @@ DummyPersistence::getBucketInfo(const Bucket& b) const
if (!bc.get()) {
LOG(debug, "getBucketInfo(%s) : (bucket not found)",
b.toString().c_str());
- return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found");
+ BucketInfo info(BucketChecksum(0), 0, 0, 0, 0);
+ return BucketInfoResult(info);
}
BucketInfo info((*bc)->getBucketInfo());
@@ -559,9 +539,6 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection
}
}
BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared));
- if (!bc.get()) {
- return CreateIteratorResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found");
- }
Iterator* it;
IteratorId id;
@@ -577,6 +554,10 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection
}
// Memory pointed to by 'it' should now be valid from here on out
+ if (!bc.get()) {
+ // Bucket not found.
+ return CreateIteratorResult(id);
+ }
it->_fieldSet = std::move(fs);
const BucketContent::GidMapType& gidMap((*bc)->_gidMap);
@@ -648,7 +629,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con
BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket, LockMode::Shared));
if (!bc.get()) {
ctx.trace(9, "finished iterate(); bucket not found");
- return IterateResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found");
+ return IterateResult(std::vector<DocEntry::UP>(), true);
}
LOG(debug, "Iterator %" PRIu64 " acquired bucket lock", uint64_t(id));
@@ -714,8 +695,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&)
return Result();
}
-Result
-DummyPersistence::createBucket(const Bucket& b, Context&)
+void
+DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "createBucket(%s)", b.toString().c_str());
@@ -727,11 +708,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
assert(!_content[b]->_inUse);
LOG(debug, "%s already existed", b.toString().c_str());
}
- return Result();
+ onComplete->onComplete(std::make_unique<Result>());
}
void
-DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "deleteBucket(%s)", b.toString().c_str());
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 99d6ba717b7..a25bf6b8a8e 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -168,8 +168,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket&, Context&) override;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index e287bdc5252..3b59f20ca96 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider
{
public:
Result initialize() override { return Result(); };
- Result createBucket(const Bucket&, Context&) override { return Result(); }
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); }
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
index 02c626ea23e..7b04498205d 100644
--- a/persistence/src/vespa/persistence/spi/catchresult.h
+++ b/persistence/src/vespa/persistence/spi/catchresult.h
@@ -19,4 +19,9 @@ private:
const ResultHandler *_resulthandler;
};
+class NoopOperationComplete : public OperationComplete {
+ void onComplete(std::unique_ptr<spi::Result>) noexcept override { }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 3ea476c33fc..31db08a6f4f 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat
}
Result
+PersistenceProvider::createBucket(const Bucket& bucket, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ createBucketAsync(bucket, context, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 83eb042d855..269175f7d26 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -58,6 +58,7 @@ struct PersistenceProvider
virtual ~PersistenceProvider();
// TODO Move to utility class for use in tests only
+ Result createBucket(const Bucket&, Context&);
Result deleteBucket(const Bucket&, Context&);
Result put(const Bucket&, Timestamp, DocumentSP, Context&);
Result setActiveState(const Bucket&, BucketInfo::ActiveState);
@@ -336,14 +337,14 @@ struct PersistenceProvider
* Tells the provider that the given bucket has been created in the
* service layer. There is no requirement to do anything here.
*/
- virtual Result createBucket(const Bucket&, Context&) = 0;
+ virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* Deletes the given bucket and all entries contained in that bucket.
* After this operation has succeeded, a restart of the provider should
* not yield the bucket in getBucketList().
*/
- virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0;
+ virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* This function is called continuously by the service layer. It allows the
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 114292d055d..2e1fc74037c 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
}
-Result
-PersistenceEngine::createBucket(const Bucket &b, Context &)
+void
+PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "createBucket(%s)", b.toString().c_str());
HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace());
- TransportLatch latch(snap.size());
- for (; snap.handlers().valid(); snap.handlers().next()) {
+
+ auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleCreateBucket(feedtoken::make(latch), b);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleCreateBucket(feedtoken::make(transportContext), b);
+ } else {
+ handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b);
+ }
}
- latch.await();
- return latch.getResult();
}
void
-PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "deleteBucket(%s)", b.toString().c_str());
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 94331ac2cd6..fe564d01459 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -114,8 +114,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket &bucketId, Context &) override ;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index d51485df38d..701e8a80d3a 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -43,6 +43,7 @@ public:
throw std::runtime_error(_fail);
}
}
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { }
void set_fail(vespalib::string fail) { _fail = std::move(fail); }
};
@@ -85,8 +86,8 @@ public:
~ApplyBucketDiffStateTestBase();
- std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
+ std::shared_ptr<ApplyBucketDiffState> make_state() {
+ return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index b3bd1c6a253..02b43a32df3 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -24,7 +24,7 @@
#define CHECK_ERROR_ASYNC(className, failType, onError) \
{ \
- Guard guard(_lock); \
+ Guard guard(_lock); \
if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
return; \
@@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const
return _spi.listBuckets(bucketSpace);
}
-spi::Result
-PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
LOG_SPI("createBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET);
- return _spi.createBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete);
+ return _spi.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketInfoResult
@@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
void
PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
- spi::OperationComplete::UP operationComplete)
+ spi::OperationComplete::UP operationComplete) noexcept
{
LOG_SPI("deleteBucket(" << bucket << ")");
CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index c6628814dba..cfc7002a643 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -96,7 +96,7 @@ public:
void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
spi::OperationComplete::UP up) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
@@ -111,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 07d2b24d536..a3f0182ba30 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -62,13 +62,13 @@ public:
return PersistenceProviderWrapper::getBucketInfo(bucket);
}
- spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_createBucketInvocations;
- return PersistenceProviderWrapper::createBucket(bucket, ctx);
+ PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete));
}
void
- deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_deleteBucketInvocations;
PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
diff --git a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
index ef71f0ae5f0..588b390cd5f 100644
--- a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
+++ b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
@@ -84,10 +84,10 @@ TEST_F(SanityCheckedDeleteTest, differing_document_sizes_not_considered_out_of_s
c.top.sendDown(delete_cmd);
c.top.waitForMessages(1, MSG_WAIT_TIME);
- // Bucket should now well and truly be gone. Will trigger a getBucketInfo error response.
- spi::BucketInfoResult info_post_delete(
- _node->getPersistenceProvider().getBucketInfo(spiBucket));
- ASSERT_TRUE(info_post_delete.hasError()) << info_post_delete.getErrorMessage();
+ auto reply = c.top.getAndRemoveMessage(api::MessageType::DELETEBUCKET_REPLY);
+ auto delete_reply = std::dynamic_pointer_cast<api::DeleteBucketReply>(reply);
+ ASSERT_TRUE(delete_reply);
+ ASSERT_TRUE(delete_reply->getResult().success());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 017b8ce2b92..ed50730d79f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -1440,6 +1438,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
+ if (GetParam()) {
+ handler.drain_async_writes();
+ }
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index ad153c41aef..556760b347e 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -12,6 +12,14 @@ using vespalib::RetainGuard;
namespace storage {
+class ApplyBucketDiffState::Deleter {
+public:
+ void operator()(ApplyBucketDiffState *raw_state) const noexcept {
+ std::unique_ptr<ApplyBucketDiffState> state(raw_state);
+ raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state));
+ }
+};
+
ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
@@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke
_delayed_reply = std::move(delayed_reply);
}
+std::shared_ptr<ApplyBucketDiffState>
+ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+{
+ std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard)));
+ return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter());
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 39f60156e66..7157c69191b 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -24,6 +24,7 @@ class MergeBucketInfoSyncer;
* for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
*/
class ApplyBucketDiffState {
+ class Deleter;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
spi::Bucket _bucket;
vespalib::string _fail_message;
@@ -35,8 +36,9 @@ class ApplyBucketDiffState {
MessageSender* _sender;
vespalib::RetainGuard _retain_guard;
-public:
ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
+public:
+ static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -46,6 +48,7 @@ public:
std::future<vespalib::string> get_future();
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 47b5e4f5f27..bc6e67578c0 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -5,6 +5,7 @@
#include "testandsethelper.h"
#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ tracker->sendReply();
+ });
+
+ if (cmd.getActive()) {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ } else {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.deleteBuckets);
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4f5c242570c..db5a77bfb59 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -30,6 +30,7 @@ public:
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
index e05991ad9e3..b3386c591e6 100644
--- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
+++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
@@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; }
namespace storage {
+class ApplyBucketDiffState;
+
/*
* Interface class for syncing bucket info during merge.
*/
@@ -13,6 +15,7 @@ class MergeBucketInfoSyncer {
public:
virtual ~MergeBucketInfoSyncer() = default;
virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0;
+ virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0;
};
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 77e7762ec9a..c9ba43458b1 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -6,13 +6,13 @@
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
-#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
@@ -24,6 +24,7 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
uint32_t commonMergeChainOptimalizationMinimumSize,
bool async_apply_bucket_diff)
@@ -34,7 +35,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff)
+ _async_apply_bucket_diff(async_apply_bucket_diff),
+ _executor(executor)
{
}
@@ -51,20 +53,6 @@ constexpr int getDeleteFlag() {
return 2;
}
-/**
- * Throws std::runtime_error if result has an error.
- */
-void
-checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op)
-{
- if (result.hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << op << " in " << bucket << ": " << result.toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-
class IteratorGuard {
spi::PersistenceProvider& _spi;
spi::IteratorId _iteratorId;
@@ -663,25 +651,28 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
- void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
- {
- for (const auto& entry : status.diff) {
- uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
- if ((entry_has_mask == 0u) ||
- (constrictHasMask && (entry_has_mask != hasMask))) {
- continue;
- }
- cmd.getDiff().emplace_back(entry);
- if (constrictHasMask) {
- cmd.getDiff().back()._entry._hasMask = newHasMask;
- } else {
- cmd.getDiff().back()._entry._hasMask = entry_has_mask;
- }
+
+void
+findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
+{
+ for (const auto& entry : status.diff) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
+ continue;
+ }
+ cmd.getDiff().emplace_back(entry);
+ if (constrictHasMask) {
+ cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
+}
+
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
MessageSender& sender, spi::Context& context,
@@ -898,7 +889,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
tracker->fail(api::ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel());
@@ -938,141 +930,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
namespace {
- uint8_t findOwnIndex(
- const std::vector<api::MergeBucketCommand::Node>& nodeList,
- uint16_t us)
- {
- for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
- if (nodeList[i].index == us) return i;
- }
- throw vespalib::IllegalStateException(
- "Got GetBucketDiff cmd on node not in nodelist in command",
- VESPA_STRLOC);
+uint8_t findOwnIndex(
+ const std::vector<api::MergeBucketCommand::Node>& nodeList,
+ uint16_t us)
+{
+ for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
+ if (nodeList[i].index == us) return i;
}
+ throw vespalib::IllegalStateException(
+ "Got GetBucketDiff cmd on node not in nodelist in command",
+ VESPA_STRLOC);
+}
- struct DiffEntryTimestampOrder
- : public std::binary_function<api::GetBucketDiffCommand::Entry,
- api::GetBucketDiffCommand::Entry, bool>
- {
- bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const {
- return (x._timestamp < y._timestamp);
- }
- };
-
- /**
- * Merges list A and list B together and puts the result in result.
- * Result is swapped in as last step to keep function exception safe. Thus
- * result can be listA or listB if wanted.
- *
- * listA and listB are assumed to be in the order found in the slotfile, or
- * in the order given by a previous call to this function. (In both cases
- * this will be sorted by timestamp)
- *
- * @return false if any suspect entries was found.
- */
- bool mergeLists(
- const std::vector<api::GetBucketDiffCommand::Entry>& listA,
- const std::vector<api::GetBucketDiffCommand::Entry>& listB,
- std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
- {
- bool suspect = false;
- std::vector<api::GetBucketDiffCommand::Entry> result;
- uint32_t i = 0, j = 0;
- while (i < listA.size() && j < listB.size()) {
- const api::GetBucketDiffCommand::Entry& a(listA[i]);
- const api::GetBucketDiffCommand::Entry& b(listB[j]);
- if (a._timestamp < b._timestamp) {
- result.push_back(a);
- ++i;
- } else if (a._timestamp > b._timestamp) {
- result.push_back(b);
- ++j;
- } else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
- if (!(a == b)) {
- if (a._gid == b._gid && a._flags == b._flags) {
- if ((a._flags & getDeleteFlag()) != 0 &&
- (b._flags & getDeleteFlag()) != 0)
- {
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
- LOG(debug, "Found entries with equal timestamps of "
- "the same gid who both are remove "
- "entries: %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- } else {
- LOG(error, "Found entries with equal timestamps of "
- "the same gid. This is likely same "
- "document where size of document varies:"
- " %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- }
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
- } else if ((a._flags & getDeleteFlag())
- != (b._flags & getDeleteFlag()))
+/**
+ * Merges list A and list B together and puts the result in result.
+ * Result is swapped in as last step to keep function exception safe. Thus
+ * result can be listA or listB if wanted.
+ *
+ * listA and listB are assumed to be in the order found in the slotfile, or
+ * in the order given by a previous call to this function. (In both cases
+ * this will be sorted by timestamp)
+ *
+ * @return false if any suspect entries was found.
+ */
+bool mergeLists(
+ const std::vector<api::GetBucketDiffCommand::Entry>& listA,
+ const std::vector<api::GetBucketDiffCommand::Entry>& listB,
+ std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
+{
+ bool suspect = false;
+ std::vector<api::GetBucketDiffCommand::Entry> result;
+ uint32_t i = 0, j = 0;
+ while (i < listA.size() && j < listB.size()) {
+ const api::GetBucketDiffCommand::Entry& a(listA[i]);
+ const api::GetBucketDiffCommand::Entry& b(listB[j]);
+ if (a._timestamp < b._timestamp) {
+ result.push_back(a);
+ ++i;
+ } else if (a._timestamp > b._timestamp) {
+ result.push_back(b);
+ ++j;
+ } else {
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
+ if (!(a == b)) {
+ if (a._gid == b._gid && a._flags == b._flags) {
+ if ((a._flags & getDeleteFlag()) != 0 &&
+ (b._flags & getDeleteFlag()) != 0)
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
- const api::GetBucketDiffCommand::Entry& deletedEntry(
- (a._flags & getDeleteFlag()) != 0 ? a : b);
- result.push_back(deletedEntry);
- LOG(debug,
- "Found put and remove on same timestamp. Keeping"
- "remove as it is likely caused by remove with "
- "copies unavailable at the time: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
+ LOG(debug, "Found entries with equal timestamps of "
+ "the same gid who both are remove "
+ "entries: %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
} else {
- LOG(error, "Found entries with equal timestamps that "
- "weren't the same entry: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
+ LOG(error, "Found entries with equal timestamps of "
+ "the same gid. This is likely same "
+ "document where size of document varies:"
+ " %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
}
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
+ suspect = true;
+ } else if ((a._flags & getDeleteFlag())
+ != (b._flags & getDeleteFlag()))
+ {
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
+ const api::GetBucketDiffCommand::Entry& deletedEntry(
+ (a._flags & getDeleteFlag()) != 0 ? a : b);
+ result.push_back(deletedEntry);
+ LOG(debug,
+ "Found put and remove on same timestamp. Keeping"
+ "remove as it is likely caused by remove with "
+ "copies unavailable at the time: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
} else {
+ LOG(error, "Found entries with equal timestamps that "
+ "weren't the same entry: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
result.push_back(a);
result.back()._hasMask |= b._hasMask;
+ suspect = true;
}
- ++i;
- ++j;
+ } else {
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
}
+ ++i;
+ ++j;
}
- if (i < listA.size()) {
- assert(j >= listB.size());
- for (uint32_t n = listA.size(); i<n; ++i) {
- result.push_back(listA[i]);
- }
- } else if (j < listB.size()) {
- assert(i >= listA.size());
- for (uint32_t n = listB.size(); j<n; ++j) {
- result.push_back(listB[j]);
- }
+ }
+ if (i < listA.size()) {
+ assert(j >= listB.size());
+ for (uint32_t n = listA.size(); i<n; ++i) {
+ result.push_back(listA[i]);
+ }
+ } else if (j < listB.size()) {
+ assert(i >= listA.size());
+ for (uint32_t n = listB.size(); j<n; ++j) {
+ result.push_back(listB[j]);
}
- result.swap(finalResult);
- return !suspect;
}
+ result.swap(finalResult);
+ return !suspect;
+}
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
-{
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const {
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ return handleGetBucketDiffStage2(cmd, std::move(tracker));
+}
+MessageTracker::UP
+MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
+{
+ spi::Bucket bucket(cmd.getBucket());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket.");
return tracker;
@@ -1249,7 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1357,7 +1344,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1452,4 +1439,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept
_async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
}
+void
+MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
+{
+ auto bucket_id = state->get_bucket().getBucketId();
+ _executor.execute(bucket_id.getId(), [state = std::move(state)]() { });
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 17cfb847d2c..4daec4c0689 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -22,6 +22,8 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
+namespace vespalib { class ISequencedTaskExecutor; }
+
namespace storage {
namespace spi {
@@ -45,6 +47,7 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
@@ -67,6 +70,7 @@ public:
spi::Context& context,
std::shared_ptr<ApplyBucketDiffState> async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
@@ -85,7 +89,9 @@ private:
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
std::atomic<bool> _async_apply_bucket_diff;
+ vespalib::ISequencedTaskExecutor& _executor;
+ MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index aa1a9c136fd..d03c9a6d111 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock,
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize,
cfg.asyncApplyBucketDiff),
@@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::REVERT_ID:
return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ce424f0ce83..9ccd901744b 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context&
return checkResult(_impl.destroyIterator(iteratorId, context));
}
-spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
- return checkResult(_impl.createBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
void
-ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
+ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
onComplete->addResultHandler(this);
- _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
+ _impl.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index c9d2411e372..14d20cf8a52 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -49,7 +49,6 @@ public:
spi::Context &context) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -63,7 +62,8 @@ public:
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4fe207e2e5..9a7a451b906 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t
}
MessageTracker::UP
-SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.visit);
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 2cfbc7016c0..009fd6dff52 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -22,7 +22,6 @@ public:
SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index e5667c7b392..63fc7854a52 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -162,6 +162,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
private static final String STREAM = "stream";
+ private static final String SLICES = "slices";
+ private static final String SLICE_ID = "sliceId";
private final Clock clock;
private final Duration handlerTimeout;
@@ -985,12 +987,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (cluster.isEmpty() && path.documentType().isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+ Optional<Integer> slices = getProperty(request, SLICES, integerParser);
+ Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser);
+
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
parameters.visitInconsistentBuckets(true);
parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
+ if (slices.isPresent() && sliceId.isPresent())
+ parameters.slice(slices.get(), sliceId.get());
+ else if (slices.isPresent() != sliceId.isPresent())
+ throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set");
return parameters;
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index b23533a720e..1629777f837 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -258,6 +258,8 @@ public class DocumentV1ApiTest {
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
assertEquals(6000, parameters.getSessionTimeoutMs());
+ assertEquals(4, parameters.getSlices());
+ assertEquals(1, parameters.getSliceId());
// Put some documents in the response
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
@@ -269,7 +271,7 @@ public class DocumentV1ApiTest {
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK");
});
response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
- "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true");
+ "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true&slices=4&sliceId=1");
assertSameJson("{" +
" \"pathId\": \"/document/v1\"," +
" \"documents\": [" +