aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-25 16:22:24 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-25 16:22:24 +0200
commit31752f1e94aaccac0631b9cf908f747ddc7afb9b (patch)
tree19b73a7230b624b3975b7a029cd1518d70df10f5 /documentapi
parentb4c9b37f5c89097d17e2630f8a25199a5d1c8e6b (diff)
Address review
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java20
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java15
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java64
3 files changed, 54 insertions, 45 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
index 9957898e459..4a77d30ec92 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java
@@ -387,6 +387,18 @@ public class ProgressToken {
}
/**
+ * Marks the current bucket as finished and advances the bucket cursor;
+ * throws instead if the current bucket is already {@link #addBucket added}.
+ */
+ void skipCurrentBucket() {
+ if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId())))
+ throw new IllegalStateException("Current bucket was already added to the explicit bucket set");
+
+ ++finishedBucketCount;
+ ++bucketCursor;
+ }
+
+ /**
* Directly generate a bucket Id key for the <code>n</code>th bucket in
* reverse sorted order.
*
@@ -428,6 +440,14 @@ public class ProgressToken {
return bucketCursor;
}
+ static BucketId toBucketId(long bucketCursor, int distributionBits) {
+ return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits)));
+ }
+
+ BucketId getCurrentBucketId() {
+ return toBucketId(getBucketCursor(), getDistributionBitCount());
+ }
+
protected void setBucketCursor(long bucketCursor) {
this.bucketCursor = bucketCursor;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index 0ccc64bb8f3..e15512ca71b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -334,8 +334,7 @@ public class VisitorIterator {
assert(hasNext()) : "getNext() called with hasNext() == false";
// Create the progress to return for creating visitors, and advance bucket cursor.
- BucketProgress progress = new BucketProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount),
- new BucketId());
+ BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId());
progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
// Skip ahead to our next next slice, to ensure we also exhaust the bucket space when
@@ -351,19 +350,11 @@ public class VisitorIterator {
if (distributionBitCount == 1)
return;
- while (progressToken.getBucketCursor() < (1L << distributionBitCount) && (progressToken.getBucketCursor() - sliceId) % slices != 0) {
- BucketId bucketId = toBucketId(progressToken.getBucketCursor(), distributionBitCount);
- progressToken.addBucket(bucketId, ProgressToken.NULL_BUCKET, ProgressToken.BucketState.BUCKET_ACTIVE);
- progressToken.updateProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount),
- ProgressToken.FINISHED_BUCKET);
- progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+ while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) {
+ progressToken.skipCurrentBucket();
}
}
- private static BucketId toBucketId(long bucketCursor, int distributionBitCount) {
- return new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(bucketCursor, distributionBitCount)));
- }
-
public int getDistributionBitCount() {
return distributionBitCount;
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
index 89a1125880d..fb5f5bd2cfb 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java
@@ -118,20 +118,17 @@ public class VisitorIteratorTestCase {
// Iterator with a single distribution bit ignores slicing.
assertTrue(iter.hasNext());
- assertEquals(new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(0, 1))),
- iter.getNext().getSuperbucket());
-
- assertEquals(new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(1, 1))),
- iter.getNext().getSuperbucket());
-
+ assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket());
+ assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket());
assertFalse(iter.hasNext());
}
@Test
public void testValidSlicing() throws ParseException {
int distBits = 4;
+ long buckets = 1 << distBits;
BucketIdFactory idFactory = new BucketIdFactory();
- for (int slices = 1; slices <= 1 << distBits + 1; slices++) {
+ for (int slices = 1; slices <= 2 * buckets; slices++) {
long bucketsTotal = 0;
for (int sliceId = 0; sliceId < slices; sliceId++) {
ProgressToken progress = new ProgressToken();
@@ -144,22 +141,22 @@ public class VisitorIteratorTestCase {
assertEquals(context, progress.getDistributionBitCount(), distBits);
assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
- assertEquals(context, progress.getFinishedBucketCount(), Math.min(1 << distBits, sliceId));
- assertEquals(context, progress.getTotalBucketCount(), 1 << distBits);
+ assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId));
+ assertEquals(context, progress.getTotalBucketCount(), buckets);
// First, get+update half of the buckets, marking them as done
long bucketCount = 0;
- // Do buckets in th first half.
- while (iter.hasNext() && progress.getFinishedBucketCount() < 1 << distBits - 1) {
+ // Do buckets in the first half.
+ while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) {
VisitorIterator.BucketProgress ids = iter.getNext();
iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET);
++bucketCount;
++bucketsTotal;
}
- if (slices + sliceId < 1 << distBits) { // Otherwise, we're already done ...
- assertEquals(context, ((1L << distBits - 1) + slices - sliceId - 1) / slices, bucketCount);
+ if (slices + sliceId < buckets) { // Otherwise, we're already done ...
+ assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount);
// Should be no buckets in limbo at this point
assertFalse(context, progress.hasActive());
assertFalse(context, progress.hasPending());
@@ -176,22 +173,23 @@ public class VisitorIteratorTestCase {
++bucketsTotal;
}
- assertEquals(context, ((1L << distBits) + slices - sliceId - 1) / slices, bucketCount);
+ assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount);
// Should be no buckets in limbo at this point
assertFalse(context, progress.hasActive());
assertFalse(context, progress.hasPending());
assertTrue(context, iter.isDone());
assertFalse(context, iter.hasNext());
- assertEquals(context, progress.getFinishedBucketCount(), 1 << distBits);
+ assertEquals(context, progress.getFinishedBucketCount(), buckets);
assertTrue(context, progress.isFinished());
}
- assertEquals("slices: " + slices, 1 << distBits, bucketsTotal);
+ assertEquals("slices: " + slices, buckets, bucketsTotal);
}
}
@Test
public void testProgressSerializationRange() throws ParseException {
int distBits = 4;
+ int buckets = 1 << distBits;
BucketIdFactory idFactory = new BucketIdFactory();
ProgressToken progress = new ProgressToken();
@@ -204,11 +202,11 @@ public class VisitorIteratorTestCase {
assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource);
assertEquals(progress.getFinishedBucketCount(), 0);
- assertEquals(progress.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progress.getTotalBucketCount(), buckets);
// First, get+update half of the buckets, marking them as done
long bucketCount = 0;
- long bucketStop = 1 << (distBits - 1);
+ long bucketStop = buckets / 2;
while (iter.hasNext() && bucketCount != bucketStop) {
VisitorIterator.BucketProgress ids = iter.getNext();
@@ -232,7 +230,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(desired.toString(), progress.toString());
@@ -245,7 +243,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -267,21 +265,21 @@ public class VisitorIteratorTestCase {
// Now fetch a subset of the remaining buckets without finishing them,
// keeping some in the active set and some in pending
- int pendingTotal = 1 << (distBits - 3);
- int activeTotal = 1 << (distBits - 3);
- Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>();
+ int pendingTotal = buckets / 8;
+ int activeTotal = buckets / 8;
+ Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>();
// Pre-fetch, since otherwise we'd reuse pending buckets
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- buckets.add(iter.getNext());
+ trackedBuckets.add(iter.getNext());
}
for (int i = 0; i < pendingTotal + activeTotal; ++i) {
- VisitorIterator.BucketProgress idTemp = buckets.get(i);
+ VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i);
if (i < activeTotal) {
// Make them 50% done
iter.update(idTemp.getSuperbucket(),
- new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits)));
+ new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets)));
}
// else: leave hanging as active
}
@@ -299,7 +297,7 @@ public class VisitorIteratorTestCase {
desired.append('\n');
desired.append(bucketCount);
desired.append('\n');
- desired.append(1 << distBits);
+ desired.append(buckets);
desired.append('\n');
assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal);
@@ -319,7 +317,7 @@ public class VisitorIteratorTestCase {
ProgressToken progDs = new ProgressToken(progress.toString());
assertEquals(progDs.getDistributionBitCount(), distBits);
- assertEquals(progDs.getTotalBucketCount(), 1 << distBits);
+ assertEquals(progDs.getTotalBucketCount(), buckets);
assertEquals(progDs.getFinishedBucketCount(), bucketCount);
VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection(
@@ -338,7 +336,7 @@ public class VisitorIteratorTestCase {
// Finish all the active buckets
for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) {
- iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
+ iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET);
++bucketCount;
}
@@ -359,16 +357,16 @@ public class VisitorIteratorTestCase {
assertFalse(iter.hasNext());
assertTrue(progress.isFinished());
// Cumulative number of finished buckets must match 2^distbits
- assertEquals(bucketCount, 1 << distBits);
+ assertEquals(bucketCount, buckets);
StringBuilder finished = new StringBuilder();
finished.append("VDS bucket progress file (100.0% completed)\n");
finished.append(distBits);
finished.append('\n');
- finished.append(1 << distBits); // Cursor
+ finished.append(buckets); // Cursor
finished.append('\n');
- finished.append(1 << distBits); // Finished
+ finished.append(buckets); // Finished
finished.append('\n');
- finished.append(1 << distBits); // Total
+ finished.append(buckets); // Total
finished.append('\n');
assertEquals(progress.toString(), finished.toString());