diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-25 16:22:24 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-25 16:22:24 +0200 |
commit | 31752f1e94aaccac0631b9cf908f747ddc7afb9b (patch) | |
tree | 19b73a7230b624b3975b7a029cd1518d70df10f5 /documentapi | |
parent | b4c9b37f5c89097d17e2630f8a25199a5d1c8e6b (diff) |
Address review
Diffstat (limited to 'documentapi')
3 files changed, 54 insertions, 45 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java index 9957898e459..4a77d30ec92 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java @@ -387,6 +387,18 @@ public class ProgressToken { } /** + * Marks the current bucket as finished and advances the bucket cursor; + * throws instead if the current bucket is already {@link #addBucket added}. + */ + void skipCurrentBucket() { + if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId()))) + throw new IllegalStateException("Current bucket was already added to the explicit bucket set"); + + ++finishedBucketCount; + ++bucketCursor; + } + + /** * Directly generate a bucket Id key for the <code>n</code>th bucket in * reverse sorted order. * @@ -428,6 +440,14 @@ public class ProgressToken { return bucketCursor; } + static BucketId toBucketId(long bucketCursor, int distributionBits) { + return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits))); + } + + BucketId getCurrentBucketId() { + return toBucketId(getBucketCursor(), getDistributionBitCount()); + } + protected void setBucketCursor(long bucketCursor) { this.bucketCursor = bucketCursor; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index 0ccc64bb8f3..e15512ca71b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -334,8 +334,7 @@ public class VisitorIterator { assert(hasNext()) : "getNext() called with hasNext() == false"; // Create the progress to return for creating visitors, and advance bucket cursor. - BucketProgress progress = new BucketProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), - new BucketId()); + BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId()); progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when @@ -351,19 +350,11 @@ public class VisitorIterator { if (distributionBitCount == 1) return; - while (progressToken.getBucketCursor() < (1L << distributionBitCount) && (progressToken.getBucketCursor() - sliceId) % slices != 0) { - BucketId bucketId = toBucketId(progressToken.getBucketCursor(), distributionBitCount); - progressToken.addBucket(bucketId, ProgressToken.NULL_BUCKET, ProgressToken.BucketState.BUCKET_ACTIVE); - progressToken.updateProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), - ProgressToken.FINISHED_BUCKET); - progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) { + progressToken.skipCurrentBucket(); } } - private static BucketId toBucketId(long bucketCursor, int distributionBitCount) { - return new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(bucketCursor, distributionBitCount))); - } - public int getDistributionBitCount() { return distributionBitCount; } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java index 89a1125880d..fb5f5bd2cfb 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorIteratorTestCase.java @@ -118,20 +118,17 @@ public class VisitorIteratorTestCase { // Iterator with a single distribution bit ignores slicing. assertTrue(iter.hasNext()); - assertEquals(new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(0, 1))), - iter.getNext().getSuperbucket()); - - assertEquals(new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(1, 1))), - iter.getNext().getSuperbucket()); - + assertEquals(ProgressToken.toBucketId(0, 1), iter.getNext().getSuperbucket()); + assertEquals(ProgressToken.toBucketId(1, 1), iter.getNext().getSuperbucket()); assertFalse(iter.hasNext()); } @Test public void testValidSlicing() throws ParseException { int distBits = 4; + long buckets = 1 << distBits; BucketIdFactory idFactory = new BucketIdFactory(); - for (int slices = 1; slices <= 1 << distBits + 1; slices++) { + for (int slices = 1; slices <= 2 * buckets; slices++) { long bucketsTotal = 0; for (int sliceId = 0; sliceId < slices; sliceId++) { ProgressToken progress = new ProgressToken(); @@ -144,22 +141,22 @@ public class VisitorIteratorTestCase { assertEquals(context, progress.getDistributionBitCount(), distBits); assertTrue(context, iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); - assertEquals(context, progress.getFinishedBucketCount(), Math.min(1 << distBits, sliceId)); - assertEquals(context, progress.getTotalBucketCount(), 1 << distBits); + assertEquals(context, progress.getFinishedBucketCount(), Math.min(buckets, sliceId)); + assertEquals(context, progress.getTotalBucketCount(), buckets); // First, get+update half of the buckets, marking them as done long bucketCount = 0; - // Do buckets in th first half. - while (iter.hasNext() && progress.getFinishedBucketCount() < 1 << distBits - 1) { + // Do buckets in the first half. + while (iter.hasNext() && progress.getFinishedBucketCount() < buckets / 2) { VisitorIterator.BucketProgress ids = iter.getNext(); iter.update(ids.getSuperbucket(), ProgressToken.FINISHED_BUCKET); ++bucketCount; ++bucketsTotal; } - if (slices + sliceId < 1 << distBits) { // Otherwise, we're already done ... - assertEquals(context, ((1L << distBits - 1) + slices - sliceId - 1) / slices, bucketCount); + if (slices + sliceId < buckets) { // Otherwise, we're already done ... + assertEquals(context, ((buckets / 2) + slices - sliceId - 1) / slices, bucketCount); // Should be no buckets in limbo at this point assertFalse(context, progress.hasActive()); assertFalse(context, progress.hasPending()); @@ -176,22 +173,23 @@ public class VisitorIteratorTestCase { ++bucketsTotal; } - assertEquals(context, ((1L << distBits) + slices - sliceId - 1) / slices, bucketCount); + assertEquals(context, (buckets + slices - sliceId - 1) / slices, bucketCount); // Should be no buckets in limbo at this point assertFalse(context, progress.hasActive()); assertFalse(context, progress.hasPending()); assertTrue(context, iter.isDone()); assertFalse(context, iter.hasNext()); - assertEquals(context, progress.getFinishedBucketCount(), 1 << distBits); + assertEquals(context, progress.getFinishedBucketCount(), buckets); assertTrue(context, progress.isFinished()); } - assertEquals("slices: " + slices, 1 << distBits, bucketsTotal); + assertEquals("slices: " + slices, buckets, bucketsTotal); } } @Test public void testProgressSerializationRange() throws ParseException { int distBits = 4; + int buckets = 1 << distBits; BucketIdFactory idFactory = new BucketIdFactory(); ProgressToken progress = new ProgressToken(); @@ -204,11 +202,11 @@ public class VisitorIteratorTestCase { assertTrue(iter.getBucketSource() instanceof VisitorIterator.DistributionRangeBucketSource); assertEquals(progress.getFinishedBucketCount(), 0); - assertEquals(progress.getTotalBucketCount(), 1 << distBits); + assertEquals(progress.getTotalBucketCount(), buckets); // First, get+update half of the buckets, marking them as done long bucketCount = 0; - long bucketStop = 1 << (distBits - 1); + long bucketStop = buckets / 2; while (iter.hasNext() && bucketCount != bucketStop) { VisitorIterator.BucketProgress ids = iter.getNext(); @@ -232,7 +230,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(desired.toString(), progress.toString()); @@ -245,7 +243,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -267,21 +265,21 @@ public class VisitorIteratorTestCase { // Now fetch a subset of the remaining buckets without finishing them, // keeping some in the active set and some in pending - int pendingTotal = 1 << (distBits - 3); - int activeTotal = 1 << (distBits - 3); - Vector<VisitorIterator.BucketProgress> buckets = new Vector<VisitorIterator.BucketProgress>(); + int pendingTotal = buckets / 8; + int activeTotal = buckets / 8; + Vector<VisitorIterator.BucketProgress> trackedBuckets = new Vector<VisitorIterator.BucketProgress>(); // Pre-fetch, since otherwise we'd reuse pending buckets for (int i = 0; i < pendingTotal + activeTotal; ++i) { - buckets.add(iter.getNext()); + trackedBuckets.add(iter.getNext()); } for (int i = 0; i < pendingTotal + activeTotal; ++i) { - VisitorIterator.BucketProgress idTemp = buckets.get(i); + VisitorIterator.BucketProgress idTemp = trackedBuckets.get(i); if (i < activeTotal) { // Make them 50% done iter.update(idTemp.getSuperbucket(), - new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 << distBits))); + new BucketId(distBits + 2, idTemp.getSuperbucket().getId() | (2 * buckets))); } // else: leave hanging as active } @@ -299,7 +297,7 @@ public class VisitorIteratorTestCase { desired.append('\n'); desired.append(bucketCount); desired.append('\n'); - desired.append(1 << distBits); + desired.append(buckets); desired.append('\n'); assertEquals(progress.getBuckets().entrySet().size(), pendingTotal + activeTotal); @@ -319,7 +317,7 @@ public class VisitorIteratorTestCase { ProgressToken progDs = new ProgressToken(progress.toString()); assertEquals(progDs.getDistributionBitCount(), distBits); - assertEquals(progDs.getTotalBucketCount(), 1 << distBits); + assertEquals(progDs.getTotalBucketCount(), buckets); assertEquals(progDs.getFinishedBucketCount(), bucketCount); VisitorIterator iterDs = VisitorIterator.createFromDocumentSelection( @@ -338,7 +336,7 @@ public class VisitorIteratorTestCase { // Finish all the active buckets for (int i = activeTotal; i < activeTotal + pendingTotal; ++i) { - iter.update(buckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); + iter.update(trackedBuckets.get(i).getSuperbucket(), ProgressToken.FINISHED_BUCKET); ++bucketCount; } @@ -359,16 +357,16 @@ public class VisitorIteratorTestCase { assertFalse(iter.hasNext()); assertTrue(progress.isFinished()); // Cumulative number of finished buckets must match 2^distbits - assertEquals(bucketCount, 1 << distBits); + assertEquals(bucketCount, buckets); StringBuilder finished = new StringBuilder(); finished.append("VDS bucket progress file (100.0% completed)\n"); finished.append(distBits); finished.append('\n'); - finished.append(1 << distBits); // Cursor + finished.append(buckets); // Cursor finished.append('\n'); - finished.append(1 << distBits); // Finished + finished.append(buckets); // Finished finished.append('\n'); - finished.append(1 << distBits); // Total + finished.append(buckets); // Total finished.append('\n'); assertEquals(progress.toString(), finished.toString()); |