aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main')
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java75
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java12
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java4
3 files changed, 79 insertions, 12 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index e11bdf7f18c..0ccc64bb8f3 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -81,12 +81,24 @@ public class VisitorIterator {
protected static class DistributionRangeBucketSource implements BucketSource {
private boolean flushActive = false;
private int distributionBitCount;
+ private final int slices;
+ private final int sliceId;
// Wouldn't need this if this were a non-static class, but do it for
// the sake of keeping things identical in Java and C++
private ProgressToken progressToken;
public DistributionRangeBucketSource(int distributionBitCount,
- ProgressToken progress) {
+ ProgressToken progress,
+ int slices, int sliceId) {
+ if (slices < 1) {
+ throw new IllegalArgumentException("slices must be positive, but was " + slices);
+ }
+ if (sliceId < 0 || sliceId >= slices) {
+ throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId);
+ }
+
+ this.slices = slices;
+ this.sliceId = sliceId;
progressToken = progress;
// New progress token (could also be empty, in which this is a
@@ -148,6 +160,7 @@ public class VisitorIterator {
}
// Should be all fixed up and good to go
progressToken.setInconsistentState(false);
+ skipToSlice();
}
protected boolean isLosslessResetPossible() {
@@ -203,6 +216,7 @@ public class VisitorIterator {
assert(p.getActiveBucketCount() == 0);
p.clearAllBuckets();
p.setBucketCursor(0);
+ skipToSlice();
return;
}
@@ -292,7 +306,14 @@ public class VisitorIterator {
}
public boolean hasNext() {
- return progressToken.getBucketCursor() < (1L << distributionBitCount);
+ // There is a next bucket iff. there is a bucket no earlier than the cursor which
+ // is contained in the bucket space, and is also 0 modulo our sliceId; or if we're
+ // not yet properly initialised, with a real distribution bit count, we ignore this.
+ long nextBucket = progressToken.getBucketCursor();
+ if (distributionBitCount != 1) {
+ nextBucket += Math.floorMod(sliceId - nextBucket, slices);
+ }
+ return nextBucket < (1L << distributionBitCount);
}
public boolean shouldYield() {
@@ -311,13 +332,36 @@ public class VisitorIterator {
public BucketProgress getNext() {
assert(hasNext()) : "getNext() called with hasNext() == false";
- long currentPosition = progressToken.getBucketCursor();
- long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount);
- ++currentPosition;
- progressToken.setBucketCursor(currentPosition);
- return new BucketProgress(
- new BucketId(ProgressToken.keyToBucketId(key)),
- new BucketId());
+
+ // Create the progress to return for creating visitors, and advance bucket cursor.
+ BucketProgress progress = new BucketProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount),
+ new BucketId());
+ progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+
+ // Skip ahead to our next next slice, to ensure we also exhaust the bucket space when
+ // hasNext() turns false, but there are still super buckets left after the current.
+ skipToSlice();
+
+ return progress;
+ }
+
+ // Advances the wrapped progress token's bucket cursor to our next slice, marking any skipped
+ // buckets as complete, but only if we've been initialised with a proper distribution bit count.
+ private void skipToSlice() {
+ if (distributionBitCount == 1)
+ return;
+
+ while (progressToken.getBucketCursor() < (1L << distributionBitCount) && (progressToken.getBucketCursor() - sliceId) % slices != 0) {
+ BucketId bucketId = toBucketId(progressToken.getBucketCursor(), distributionBitCount);
+ progressToken.addBucket(bucketId, ProgressToken.NULL_BUCKET, ProgressToken.BucketState.BUCKET_ACTIVE);
+ progressToken.updateProgress(toBucketId(progressToken.getBucketCursor(), distributionBitCount),
+ ProgressToken.FINISHED_BUCKET);
+ progressToken.setBucketCursor(progressToken.getBucketCursor() + 1);
+ }
+ }
+
+ private static BucketId toBucketId(long bucketCursor, int distributionBitCount) {
+ return new BucketId(ProgressToken.keyToBucketId(ProgressToken.makeNthBucketKey(bucketCursor, distributionBitCount)));
}
public int getDistributionBitCount() {
@@ -732,6 +776,13 @@ public class VisitorIterator {
return bucketSource.visitsAllBuckets();
}
+ public static VisitorIterator createFromDocumentSelection(
+ String documentSelection,
+ BucketIdFactory idFactory,
+ int distributionBitCount,
+ ProgressToken progress) throws ParseException {
+ return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0);
+ }
/**
* Create a new <code>VisitorIterator</code> instance based on the given document
* selection string.
@@ -753,7 +804,9 @@ public class VisitorIterator {
String documentSelection,
BucketIdFactory idFactory,
int distributionBitCount,
- ProgressToken progress) throws ParseException {
+ ProgressToken progress,
+ int slices,
+ int sliceId) throws ParseException {
BucketSelector bucketSel = new BucketSelector(idFactory);
Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection);
BucketSource src;
@@ -763,7 +816,7 @@ public class VisitorIterator {
// bit-based range source
if (rawBuckets == null) {
// Range source
- src = new DistributionRangeBucketSource(distributionBitCount, progress);
+ src = new DistributionRangeBucketSource(distributionBitCount, progress, slices, sliceId);
} else {
// Explicit source
src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
index 8b0c8538855..44675d8d2ac 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
@@ -50,6 +50,8 @@ public class VisitorParameters extends Parameters {
private int traceLevel = 0;
private ThrottlePolicy throttlePolicy = null;
private boolean skipBucketsOnFatalErrors = false;
+ private int slices = 1;
+ private int sliceId = 0;
// Advanced parameter, only for internal use.
Set<BucketId> bucketsToVisit = null;
@@ -101,6 +103,7 @@ public class VisitorParameters extends Parameters {
params.getDynamicMaxBucketsIncreaseFactor());
setTraceLevel(params.getTraceLevel());
skipBucketsOnFatalErrors(params.skipBucketsOnFatalErrors());
+ slice(params.getSlices(), getSliceId());
}
// Get functions
@@ -331,6 +334,15 @@ public class VisitorParameters extends Parameters {
public void skipBucketsOnFatalErrors(boolean skipBucketsOnFatalErrors) { this.skipBucketsOnFatalErrors = skipBucketsOnFatalErrors; }
+ public void slice(int slices, int sliceId) {
+ this.slices = slices;
+ this.sliceId = sliceId;
+ }
+
+ public int getSlices() { return slices; }
+
+ public int getSliceId() { return sliceId; }
+
/**
* Set whether or not max buckets per visitor value should be dynamically
* increased when using orderdoc and visitors do not return at least half
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 5d07d433f18..f7242695490 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -572,7 +572,9 @@ public class MessageBusVisitorSession implements VisitorSession {
params.getDocumentSelection(),
bucketIdFactory,
1,
- progressToken);
+ progressToken,
+ params.getSlices(),
+ params.getSliceId());
} else {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "parameters specify explicit bucket set " +