diff options
Diffstat (limited to 'documentapi/src/main')
3 files changed, 79 insertions, 12 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index e11bdf7f18c..0ccc64bb8f3 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -81,12 +81,24 @@ public class VisitorIterator { protected static class DistributionRangeBucketSource implements BucketSource { private boolean flushActive = false; private int distributionBitCount; + private final int slices; + private final int sliceId; // Wouldn't need this if this were a non-static class, but do it for // the sake of keeping things identical in Java and C++ private ProgressToken progressToken; public DistributionRangeBucketSource(int distributionBitCount, - ProgressToken progress) { + ProgressToken progress, + int slices, int sliceId) { + if (slices < 1) { + throw new IllegalArgumentException("slices must be positive, but was " + slices); + } + if (sliceId < 0 || sliceId >= slices) { + throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId); + } + + this.slices = slices; + this.sliceId = sliceId; progressToken = progress; // New progress token (could also be empty, in which this is a @@ -148,6 +160,7 @@ public class VisitorIterator { } // Should be all fixed up and good to go progressToken.setInconsistentState(false); + skipToSlice(); } protected boolean isLosslessResetPossible() { @@ -203,6 +216,7 @@ public class VisitorIterator { assert(p.getActiveBucketCount() == 0); p.clearAllBuckets(); p.setBucketCursor(0); + skipToSlice(); return; } @@ -292,7 +306,14 @@ public class VisitorIterator { } public boolean hasNext() { - return progressToken.getBucketCursor() < (1L << distributionBitCount); + // There is a next bucket iff. there is a bucket no earlier than the cursor which + // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're + // not yet properly initialised, with a real distribution bit count, we ignore this. + long nextBucket = progressToken.getBucketCursor(); + if (distributionBitCount != 1) { + nextBucket += Math.floorMod(sliceId - nextBucket, slices); + } + return nextBucket < (1L << distributionBitCount); } public boolean shouldYield() { @@ -311,13 +332,36 @@ public class VisitorIterator { public BucketProgress getNext() { assert(hasNext()) : "getNext() called with hasNext() == false"; - long currentPosition = progressToken.getBucketCursor(); - long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount); - ++currentPosition; - progressToken.setBucketCursor(currentPosition); - return new BucketProgress( - new BucketId(ProgressToken.keyToBucketId(key)), - new BucketId()); + + // Create the progress to return for creating visitors, and advance bucket cursor. + BucketProgress progress = new BucketProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), + new BucketId()); + progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + + // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when + // hasNext() turns false, but there are still super buckets left after the current. + skipToSlice(); + + return progress; + } + + // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped + // buckets as complete, but only if we've been initialised with a proper distribution bit count. + private void skipToSlice() { + if (distributionBitCount == 1) + return; + + while (progressToken.getBucketCursor() < (1L << distributionBitCount) && (progressToken.getBucketCursor() - sliceId) % slices != 0) { + BucketId bucketId = toBucketId(progressToken.getBucketCursor(), distributionBitCount); + progressToken.addBucket(bucketId, ProgressToken.NULL_BUCKET, ProgressToken.BucketState.BUCKET_ACTIVE); + progressToken.updateProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount), + ProgressToken.FINISHED_BUCKET); + progressToken.setBucketCursor(progressToken.getBucketCursor() + 1); + } + } + + private static BucketId toBucketId(long bucketCursor, int distributionBitCount) { + return new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(bucketCursor, distributionBitCount))); } public int getDistributionBitCount() { @@ -732,6 +776,13 @@ public class VisitorIterator { return bucketSource.visitsAllBuckets(); } + public static VisitorIterator createFromDocumentSelection( + String documentSelection, + BucketIdFactory idFactory, + int distributionBitCount, + ProgressToken progress) throws ParseException { + return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0); + } /** * Create a new <code>VisitorIterator</code> instance based on the given document * selection string. @@ -753,7 +804,9 @@ public class VisitorIterator { String documentSelection, BucketIdFactory idFactory, int distributionBitCount, - ProgressToken progress) throws ParseException { + ProgressToken progress, + int slices, + int sliceId) throws ParseException { BucketSelector bucketSel = new BucketSelector(idFactory); Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection); BucketSource src; @@ -763,7 +816,7 @@ public class VisitorIterator { // bit-based range source if (rawBuckets == null) { // Range source - src = new DistributionRangeBucketSource(distributionBitCount, progress); + src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId); } else { // Explicit source src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java index 8b0c8538855..44675d8d2ac 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java @@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters { private int traceLevel = 0; private ThrottlePolicy throttlePolicy = null; private boolean skipBucketsOnFatalErrors = false; + private int slices = 1; + private int sliceId = 0; // Advanced parameter, only for internal use. Set<BucketId> bucketsToVisit = null; @@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters { params.getDynamicMaxBucketsIncreaseFactor()); setTraceLevel(params.getTraceLevel()); skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors()); + slice(params.getSlices(), getSliceId()); } // Get functions @@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters { public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; } + public void slice(int slices, int sliceId) { + this.slices = slices; + this.sliceId = sliceId; + } + + public int getSlices() { return slices; } + + public int getSliceId() { return sliceId; } + /** * Set whether or not max buckets per visitor value should be dynamically * increased when using orderdoc and visitors do not return at least half diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 5d07d433f18..f7242695490 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession { params.getDocumentSelection(), bucketIdFactory, 1, - progressToken); + progressToken, + params.getSlices(), + params.getSliceId()); } else { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "parameters specify explicit bucket set " + |