diff options
Diffstat (limited to 'documentapi/src/main/java')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java | 20 | ||||
-rwxr-xr-x | documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java | 15 |
2 files changed, 23 insertions, 12 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java index 9957898e459..4a77d30ec92 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/ProgressToken.java @@ -387,6 +387,18 @@ public class ProgressToken { } /** + * Marks the current bucket as finished and advances the bucket cursor; + * throws instead if the current bucket is already {@link #addBucket added}. + */ + void skipCurrentBucket() { + if (buckets.containsKey(bucketToKeyWrapper(getCurrentBucketId()))) + throw new IllegalStateException("Current bucket was already added to the explicit bucket set"); + + ++finishedBucketCount; + ++bucketCursor; + } + + /** * Directly generate a bucket Id key for the <code>n</code>th bucket in * reverse sorted order. * @@ -428,6 +440,14 @@ public class ProgressToken { return bucketCursor; } + static BucketId toBucketId(long bucketCursor, int distributionBits) { + return new BucketId(keyToBucketId(makeNthBucketKey(bucketCursor, distributionBits))); + } + + BucketId getCurrentBucketId() { + return toBucketId(getBucketCursor(), getDistributionBitCount()); + } + protected void setBucketCursor(long bucketCursor) { this.bucketCursor = bucketCursor; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index 0ccc64bb8f3..e15512ca71b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -334,8 +334,7 @@ public class VisitorIterator { assert(hasNext()) : "getNext() called with hasNext() == false"; // Create the progress to return for creating visitors, and advance bucket cursor. - BucketProgress progress = new BucketProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), - new BucketId()); + BucketProgress progress = new BucketProgress(progressToken.getCurrentBucketId(), new BucketId()); progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when @@ -351,19 +350,11 @@ public class VisitorIterator { if (distributionBitCount == 1) return; - while (progressToken.getBucketCursor() < (1L << distributionBitCount) && (progressToken.getBucketCursor() - sliceId) % slices != 0) { - BucketId bucketId = toBucketId(progressToken.getBucketCursor(), distributionBitCount); - progressToken.addBucket(bucketId, ProgressToken.NULL_BUCKET, ProgressToken.BucketState.BUCKET_ACTIVE); - progressToken.updateProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), - ProgressToken.FINISHED_BUCKET); - progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + while (progressToken.getBucketCursor() < getTotalBucketCount() && (progressToken.getBucketCursor() % slices) != sliceId) { + progressToken.skipCurrentBucket(); } } - private static BucketId toBucketId(long bucketCursor, int distributionBitCount) { - return new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(bucketCursor, distributionBitCount))); - } - public int getDistributionBitCount() { return distributionBitCount; } |