summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java20
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java66
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java12
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java4
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java145
6 files changed, 223 insertions, 30 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 9cc4f60ed7e..78a58f24a65 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -799,7 +799,7 @@
"public"
],
"methods": [
- "public void <init>(int, com.yahoo.documentapi.ProgressToken)",
+ "public void <init>(int, com.yahoo.documentapi.ProgressToken, int, int)",
"protected boolean isLosslessResetPossible()",
"public boolean hasNext()",
"public boolean shouldYield()",
@@ -851,6 +851,7 @@
"public void setDistributionBitCount(int)",
"public boolean visitsAllBuckets()",
"public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken)",
+ "public static com.yahoo.documentapi.VisitorIterator createFromDocumentSelection(java.lang.String, com.yahoo.document.BucketIdFactory, int, com.yahoo.documentapi.ProgressToken, int, int)",
"public static com.yahoo.documentapi.VisitorIterator createFromExplicitBucketSet(java.util.Set, int, com.yahoo.documentapi.ProgressToken)"
],
"fields": []
@@ -931,6 +932,9 @@
"public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()",
"public boolean skipBucketsOnFatalErrors()",
"public void skipBucketsOnFatalErrors(boolean)",
+ "public void slice(int, int)",
+ "public int getSlices()",
+ "public int getSliceId()",
"public void setDynamicallyIncreaseMaxBucketsPerVisitor(boolean)",
"public void setDynamicMaxBucketsIncreaseFactor(float)",
"public java.lang.String toString()"
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
index 9957898e459..4a77d30ec92 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
@@ -387,6 +387,18 @@ public class ProgressToken {
}
/**
+ * Marks the current bucket as finished and advances the bucket cursor;
+ * throws instead if the current bucket is already {@link #addBucket added}.
+ */
+ void skipCurrentBucket() {
+ if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId())))
+ throw new IllegalStateException("Current bucket was already added to the explicit bucket set");
+
+ ++finishedBucketCount;
+ ++bucketCursor;
+ }
+
+ /**
* Directly generate a bucket Id key for the <code>n</code>th bucket in
* reverse sorted order.
*
@@ -428,6 +440,14 @@ public class ProgressToken {
return bucketCursor;
}
+ static BucketId toBucketId(long bucketCursor, int distributionBits) {
+ return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits)));
+ }
+
+ BucketId getCurrentBucketId() {
+ return toBucketId(getBucketCursor(), getDistributionBitCount());
+ }
+
protected void setBucketCursor(long bucketCursor) {
this.bucketCursor = bucketCursor;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index e11bdf7f18c..e15512ca71b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -81,12 +81,24 @@ public class VisitorIterator {
protected static class DistributionRangeBucketSource implements BucketSource {
private boolean flushActive = false;
private int distributionBitCount;
+ private final int slices;
+ private final int sliceId;
// Wouldn't need this if this were a non-static class, but do it for
// the sake of keeping things identical in Java and C++
private ProgressToken progressToken;
public DistributionRangeBucketSource(int distributionBitCount,
- ProgressToken progress) {
+ ProgressToken progress,
+ int slices, int sliceId) {
+ if (slices < 1) {
+ throw new IllegalArgumentException("slices must be positive, but was " + slices);
+ }
+ if (sliceId < 0 || sliceId >= slices) {
+ throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId);
+ }
+
+ this.slices = slices;
+ this.sliceId = sliceId;
progressToken = progress;
// New progress token (could also be empty, in which this is a
@@ -148,6 +160,7 @@ public class VisitorIterator {
}
// Should be all fixed up and good to go
progressToken.setInconsistentState(false);
+ skipToSlice();
}
protected boolean isLosslessResetPossible() {
@@ -203,6 +216,7 @@ public class VisitorIterator {
assert(p.getActiveBucketCount() == 0);
p.clearAllBuckets();
p.setBucketCursor(0);
+ skipToSlice();
return;
}
@@ -292,7 +306,14 @@ public class VisitorIterator {
}
public boolean hasNext() {
- return progressToken.getBucketCursor() < (1L << distributionBitCount);
+ // There is a next bucket iff. there is a bucket no earlier than the cursor which
+ // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're
+ // not yet properly initialised, with a real distribution bit count, we ignore this.
+ long nextBucket = progressToken.getBucketCursor();
+ if (distributionBitCount != 1) {
+ nextBucket += Math.floorMod(sliceId - nextBucket, slices);
+ }
+ return nextBucket < (1L << distributionBitCount);
}
public boolean shouldYield() {
@@ -311,13 +332,27 @@ public class VisitorIterator {
public BucketProgress getNext() {
assert(hasNext()) : "getNext() called with hasNext() == false";
- long currentPosition = progressToken.getBucketCursor();
- long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount);
- ++currentPosition;
- progressToken.setBucketCursor(currentPosition);
- return new BucketProgress(
- new BucketId(ProgressToken.keyToBucketId(key)),
- new BucketId());
+
+ // Create the progress to return for creating visitors, and advance bucket cursor.
+ BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId());
+ progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+
+ // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when
+ // hasNext() turns false, but there are still super buckets left after the current.
+ skipToSlice();
+
+ return progress;
+ }
+
+ // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped
+ // buckets as complete, but only if we've been initialised with a proper distribution bit count.
+ private void skipToSlice() {
+ if (distributionBitCount == 1)
+ return;
+
+ while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) {
+ progressToken.skipCurrentBucket();
+ }
}
public int getDistributionBitCount() {
@@ -732,6 +767,13 @@ public class VisitorIterator {
return bucketSource.visitsAllBuckets();
}
+ public static VisitorIterator createFromDocumentSelection(
+ String documentSelection,
+ BucketIdFactory idFactory,
+ int distributionBitCount,
+ ProgressToken progress) throws ParseException {
+ return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0);
+ }
/**
* Create a new <code>VisitorIterator</code> instance based on the given document
* selection string.
@@ -753,7 +795,9 @@ public class VisitorIterator {
String documentSelection,
BucketIdFactory idFactory,
int distributionBitCount,
- ProgressToken progress) throws ParseException {
+ ProgressToken progress,
+ int slices,
+ int sliceId) throws ParseException {
BucketSelector bucketSel = new BucketSelector(idFactory);
Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection);
BucketSource src;
@@ -763,7 +807,7 @@ public class VisitorIterator {
// bit-based range source
if (rawBuckets == null) {
// Range source
- src = new DistributionRangeBucketSource(distributionBitCount, progress);
+ src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId);
} else {
// Explicit source
src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
index 8b0c8538855..44675d8d2ac 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
@@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters {
private int traceLevel = 0;
private ThrottlePolicy throttlePolicy = null;
private boolean skipBucketsOnFatalErrors = false;
+ private int slices = 1;
+ private int sliceId = 0;
// Advanced parameter, only for internal use.
Set<BucketId> bucketsToVisit = null;
@@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters {
params.getDynamicMaxBucketsIncreaseFactor());
setTraceLevel(params.getTraceLevel());
skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors());
+ slice(params.getSlices(), getSliceId());
}
// Get functions
@@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters {
public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; }
+ public void slice(int slices, int sliceId) {
+ this.slices = slices;
+ this.sliceId = sliceId;
+ }
+
+ public int getSlices() { return slices; }
+
+ public int getSliceId() { return sliceId; }
+
/**
* Set whether or not max buckets per visitor value should be dynamically
* increased when using orderdoc and visitors do not return at least half
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 5d07d433f18..f7242695490 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession {
params.getDocumentSelection(),
bucketIdFactory,
1,
- progressToken);
+ progressToken,
+ params.getSlices(),
+ params.getSliceId());
} else {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "parameters specify explicit bucket set " +
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
index 01cdad244a8..fb5f5bd2cfb 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
@@ -77,8 +77,119 @@ public class VisitorIteratorTestCase {
}
@Test
+ public void testInvalidSlicing() throws ParseException {
+ int distBits = 4;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 0, 0);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("slices must be positive, but was 0", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, 1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was 1", e.getMessage());
+ }
+
+ try {
+ VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 1, -1);
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("sliceId must be in [0, 1), but was -1", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIgnoredSlicing() throws ParseException {
+ int distBits = 1;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ ProgressToken progress = new ProgressToken();
+
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, 3, 2);
+
+ // Iterator with a single distribution bit ignores slicing.
+ assertTrue(iter.hasNext());
+ assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket());
+ assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testValidSlicing() throws ParseException {
+ int distBits = 4;
+ long buckets = 1 << distBits;
+ BucketIdFactory idFactory = new BucketIdFactory();
+ for (int slices = 1; slices <= 2 * buckets; slices++) {
+ long bucketsTotal = 0;
+ for (int sliceId = 0; sliceId < slices; sliceId++) {
+ ProgressToken progress = new ProgressToken();
+
+ // docsel will be unknown --> entire bucket range will be covered
+ VisitorIterator iter = VisitorIterator.createFromDocumentSelection(
+ "id.group != \"yahoo.com\"", idFactory, distBits, progress, slices, sliceId);
+
+ String context = "slices: " + slices + ", sliceId: " + sliceId;
+ assertEquals(context, progress.getDistributionBitCount(), distBits);
+ assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
+
+ assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId));
+ assertEquals(context, progress.getTotalBucketCount(), buckets);
+
+ // First, get+update half of the buckets, marking them as done
+ long bucketCount = 0;
+
+ // Do buckets in the first half.
+ while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ if (slices + sliceId < buckets) { // Otherwise, we're already done ...
+ assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertFalse(context, iter.isDone());
+ assertTrue(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), bucketCount * slices + sliceId);
+ assertFalse(context, progress.isFinished());
+ }
+
+ while (iter.hasNext()) {
+ VisitorIterator.BucketProgress ids = iter.getNext();
+ iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ ++bucketCount;
+ ++bucketsTotal;
+ }
+
+ assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount);
+ // Should be no buckets in limbo at this point
+ assertFalse(context, progress.hasActive());
+ assertFalse(context, progress.hasPending());
+ assertTrue(context, iter.isDone());
+ assertFalse(context, iter.hasNext());
+ assertEquals(context, progress.getFinishedBucketCount(), buckets);
+ assertTrue(context, progress.isFinished());
+ }
+ assertEquals("slices: " + slices, buckets, bucketsTotal);
+ }
+ }
+
+ @Test
public void testProgressSerializationRange() throws ParseException {
int distBits = 4;
+ int buckets = 1 << distBits;
BucketIdFactory idFactory = new BucketIdFactory();
ProgressToken progress = new ProgressToken();
@@ -91,11 +202,11 @@ public class VisitorIteratorTestCase {
assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
assertEquals(progress.getFinishedBucketCount(), 0);
- assertEquals(progress.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progress.getTotalBucketCount(), buckets);
// First, get+update half of the buckets, marking them as done
long bucketCount = 0;
- long bucketStop = 1 << (distBits - 1);
+ long bucketStop = buckets / 2;
while (iter.hasNext() && bucketCount != bucketStop) {
VisitorIterator.BucketProgress ids = iter.getNext();
@@ -119,7 +230,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(desired.toString(), progress.toString());
@@ -132,7 +243,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -154,21 +265,21 @@ public class VisitorIteratorTestCase {
// Now fetch a subset of the remaining buckets without finishing them,
// keeping some in the active set and some in pending
- int pendingTotal = 1 << (distBits - 3);
- int activeTotal = 1 << (distBits - 3);
- Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>();
+ int pendingTotal = buckets / 8;
+ int activeTotal = buckets / 8;
+ Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>();
// Pre-fetch, since otherwise we'd reuse pending buckets
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- buckets.add(iter.getNext());
+ trackedBuckets.add(iter.getNext());
}
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- VisitorIterator.BucketProgress idTemp = buckets.get(i);
+ VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i);
if (i < activeTotal) {
// Make them 50% done
iter.update(idTemp.getSuperbucket(),
- new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits)));
+ new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets)));
}
// else: leave hanging as active
}
@@ -186,7 +297,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal);
@@ -206,7 +317,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -225,7 +336,7 @@ public class VisitorIteratorTestCase {
// Finish all the active buckets
for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) {
- iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
++bucketCount;
}
@@ -246,16 +357,16 @@ public class VisitorIteratorTestCase {
assertFalse(iter.hasNext());
assertTrue(progress.isFinished());
// Cumulative number of finished buckets must match 2^distbits
- assertEquals(bucketCount, 1 << distBits);
+ assertEquals(bucketCount, buckets);
StringBuilder finished = new StringBuilder();
finished.append("VDS bucket progress file (100.0% completed)\n");
finished.append(distBits);
finished.append('\n');
- finished.append(1 << distBits); // Cursor
+ finished.append(buckets); // Cursor
finished.append('\n');
- finished.append(1 << distBits); // Finished
+ finished.append(buckets); // Finished
finished.append('\n');
- finished.append(1 << distBits); // Total
+ finished.append(buckets); // Total
finished.append('\n');
assertEquals(progress.toString(), finished.toString());