// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.document.select.BucketSelector; import com.yahoo.document.select.parser.ParseException; import java.util.logging.Level; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.logging.Logger; /** *
Enables transparent iteration of super/sub-buckets
* *Thread safety: safe for threads to hold their own iterators (no shared state), * as long as they also hold the ProgressToken object associated with it. No two * VisitorIterator instances may share the same progress token instance at the * same time. * Concurrent access to a single VisitorIterator instance is not safe and must * be handled atomically by the caller.
* * @author vekterli */ public class VisitorIterator { private final ProgressToken progressToken; private final BucketSource bucketSource; private int distributionBitCount; private static final Logger log = Logger.getLogger(VisitorIterator.class.getName()); public static class BucketProgress { private BucketId superbucket; private BucketId progress; public BucketProgress(BucketId superbucket, BucketId progress) { this.superbucket = superbucket; this.progress = progress; } public BucketId getProgress() { return progress; } public BucketId getSuperbucket() { return superbucket; } } /** * Provides an abstract interface toVisitorIterator
for
* how pending buckets are acquired, decoupling this from the iteration
* itself.
*
* Important: it is the responsibility of the {@link BucketSource} implementation
* to ensure that progress information is honored for (partially) finished buckets.
* From the point of view of the iterator itself, it should not have to deal with
* filtering away already finished buckets, as this is a detail best left to
* bucket sources.
*/
protected static interface BucketSource {
public boolean hasNext();
public boolean shouldYield();
public boolean visitsAllBuckets();
public BucketProgress getNext();
public long getTotalBucketCount();
public int getDistributionBitCount();
public void setDistributionBitCount(int distributionBitCount,
ProgressToken progress);
public void update(BucketId superbucket, BucketId progress,
ProgressToken token);
}
/**
* Provides a bucket source that encompasses the entire range available
* through a given value of distribution bits
*/
protected static class DistributionRangeBucketSource implements BucketSource {
private boolean flushActive = false;
private int distributionBitCount;
private long totalBucketsSplit;
private long totalBucketsMerged;
private final int slices;
private final int sliceId;
// Wouldn't need this if this were a non-static class, but do it for
// the sake of keeping things identical in Java and C++
private ProgressToken progressToken;
public DistributionRangeBucketSource(int distributionBitCount,
ProgressToken progress,
int slices, int sliceId) {
if (slices < 1) {
throw new IllegalArgumentException("slices must be positive, but was " + slices);
}
if (sliceId < 0 || sliceId >= slices) {
throw new IllegalArgumentException("sliceId must be in [0, " + slices + "), but was " + sliceId);
}
this.slices = slices;
this.sliceId = sliceId;
this.totalBucketsSplit = 0;
this.totalBucketsMerged = 0;
this.progressToken = progress;
// New progress token (could also be empty, in which this is a
// no-op anyway)
if (progressToken.getTotalBucketCount() == 0) {
assert(progressToken.isEmpty()) : "inconsistent progress state";
progressToken.setTotalBucketCount(1L << distributionBitCount);
progressToken.setDistributionBitCount(distributionBitCount);
progressToken.setBucketCursor(0);
progressToken.setFinishedBucketCount(0);
this.distributionBitCount = distributionBitCount;
}
else {
this.distributionBitCount = progressToken.getDistributionBitCount();
// Quick consistency check to ensure the user isn't trying to eg.
// pass a progress token for an explicit document selection
if (progressToken.getTotalBucketCount() != (1L << progressToken.getDistributionBitCount())) {
throw new IllegalArgumentException("Total bucket count in existing progress is not "
+ "consistent with that of the current document selection");
}
}
if (!progress.isFinished()) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Importing unfinished progress token with " +
"bits: " + progressToken.getDistributionBitCount() +
", active: " + progressToken.getActiveBucketCount() +
", pending: " + progressToken.getPendingBucketCount() +
", cursor: " + progressToken.getBucketCursor() +
", finished: " + progressToken.getFinishedBucketCount() +
", total: " + progressToken.getTotalBucketCount());
}
if (!progress.isEmpty()) {
// Lower all active to pending
if (progressToken.getActiveBucketCount() > 0) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Progress token had active buckets upon range " +
"construction. Setting these as pending");
}
progressToken.setAllBucketsToState(ProgressToken.BucketState.BUCKET_PENDING);
}
// Fixup for any buckets that were active when progress was written
// but are now pending and with wrong dist bits (used-bits). Buckets
// split here may very well be split/merged again if we set a new dist
// bit count, but that is the desired process
correctInconsistentPending(progressToken.getDistributionBitCount());
// Fixup for bucket cursor in case of bucket space downscaling
correctTruncatedBucketCursor();
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Partial bucket space progress; continuing "+
"from position " + progressToken.getBucketCursor());
}
}
progressToken.setFinishedBucketCount(progressToken.getBucketCursor() -
progressToken.getPendingBucketCount());
} else {
assert(progressToken.getBucketCursor() == progressToken.getTotalBucketCount());
}
// Should be all fixed up and good to go
progressToken.setInconsistentState(false);
skipToSlice();
}
protected boolean isLosslessResetPossible() {
// #pending must be equal to cursor, i.e. all buckets ever fetched
// must be located in the set of pending
if (progressToken.getPendingBucketCount() != progressToken.getBucketCursor()) {
return false;
}
// Check if all pending buckets have a progress of 0
for (Map.EntryProgressToken
instance only has
* buckets pending that have a used-bits count of that of the
* targetDistCits
. This is done by splitting or merging
* all inconsistent buckets until the desired state is reached.
*
* Time complexity is approx O(4bn) where b is the maximum
* delta of bits to change anywhere in the set of pending and n
* is the number of pending. This includes the time spent making shallow
* map copies.
*
* @param targetDistBits The desired distribution bit count of the buckets
*/
private void correctInconsistentPending(int targetDistBits) {
boolean maybeInconsistent = true;
long bucketsSplit = 0, bucketsMerged = 0;
long pendingBefore = progressToken.getPendingBucketCount();
ProgressToken p = progressToken;
// Optimization: before doing any splitting/merging at all, we check
// to see if we can't simply just reset the entire internal state
// with the new distribution bit count. This ensures that if we go
// from eg. 1 bit to 20 bits, we won't have to perform a grueling
// half a million splits to cover the same bucket space as that 1
// single-bit bucket once did
if (isLosslessResetPossible()) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "At start of bucket space and all " +
"buckets have no progress; doing a lossless reset " +
"instead of splitting/merging");
}
assert(p.getActiveBucketCount() == 0);
p.clearAllBuckets();
p.setBucketCursor(0);
return;
}
while (maybeInconsistent) {
BucketId lastMergedBucket = null;
maybeInconsistent = false;
// Make a shallow working copy of the bucket map. BucketKeyWrapper
// keys are considered immutable, and should thus not be at risk
// for being changed during the inner loop
// Do separate passes for splitting and merging just to make
// absolutely sure that the two ops won't step on each others'
// toes. This isn't wildly efficient, but the data sets in question
// are presumed to be low in size and this is presumed to be a very
// infrequent operation
TreeMaphasNext() == true
*/
public BucketProgress getNext() {
assert(progressToken.getDistributionBitCount() == bucketSource.getDistributionBitCount())
: "inconsistent distribution bit counts for progress and source";
assert(hasNext());
// We prioritize returning buckets in the pending map over those
// that may be in the bucket source, since we want to avoid growing
// the map too much
if (progressToken.hasPending()) {
// Find first pending bucket in token
TreeMapCheck whether or not it is valid to call {@link #getNext()} with the current * iterator state.
* *There exists a case wherein hasNext
may return false before {@link #update} is
* called, but true afterwards. This happens when the set of pending buckets is
* empty, the bucket source is empty but the set of active buckets is
* not. A future progress update on any of the buckets in the active set may
* or may not make that bucket available to the pending set again.
* This must be handled explicitly by the caller by checking {@link #isDone()}
* and ensuring that {@link #update} is called before retrying hasNext
.
This method will also return false if the number of distribution bits have * changed and there are active buckets needing to be flushed before the * iterator will allow new buckets to be handed out.
* * @return Whether or not it is valid to call {@link #getNext()} with the current * iterator state. */ public boolean hasNext() { return (progressToken.hasPending() || bucketSource.hasNext()) && !bucketSource.shouldYield(); } /** * Check if the iterator is actually done * * @see #hasNext() * * @returntrue
iff the bucket source is empty and
* there are no pending or active buckets in the progress token.
*/
public boolean isDone() {
return !(hasNext() || progressToken.hasActive());
}
/**
* Tell the iterator that we've finished processing up to and
* including progress
. progress
may be a sub-bucket or
* the invalid 0-bucket (in case the caller fails to process the bucket and
* must return it to the set of pending) or the special case BucketId(Integer.MAX_VALUE)
,
* the latter indicating to the iterator that traversal is complete for
* superbucket
's tree. The null bucket should only be used if no
* non-null updates have yet been given for the superbucket.
It is a requirement that each superbucket returned by {@link #getNext()} must * eventually result in 1-n update operations, where the last update operation * has the special progress==super case.
* *If the document selection used to create the iterator is unknown and there
* were active buckets at the time of a distribution bit state change, such
* a bucket passed to update()
will be in an inconsistent state
* with regards to the number of bits it uses. For unfinished buckets, this
* is handled by splitting or merging it until it's consistent, depending on
* whether or not it had a lower or higher distribution bit count than that of
* the current system state. For finished buckets of a lower dist bit count,
* the amount of finished buckets in the ProgressToken is adjusted upwards
* to compensate for the fact that a bucket using fewer distribution bits
* actually covers more of the bucket space than the ones that are currently
* in use. For finished buckets of a higher dist bit count, the number of
* finished buckets is not increased at that point in time, since
* such a bucket doesn't actually cover an entire bucket with the current state.
All this is done automatically and transparently to the caller once all * active buckets have been updated.
* * @param superbucket A valid bucket ID that has been retrieved earlier through * {@link #getNext()} * @param progress A bucket logically contained withinsuper
. Subsequent
* updates for the same superbucket must have progress
be in an increasing
* order, where order is defined as the in-order traversal of the bucket split
* tree. May also be the null bucket if the superbucket has not seen any "proper"
* progress updates yet or the special case Integer.MAX_VALUE. Note that inconsistent
* splitting might actually see progress
as containing super
* rather than vice versa, so this is explicitly allowed to pass by the code.
*/
public void update(BucketId superbucket, BucketId progress) {
// Delegate to bucket source, as it knows how to deal with buckets
// that are in an inconsistent state wrt distribution bit count
bucketSource.update(superbucket, progress, progressToken);
}
/**
* @return The total number of iterable buckets that remain to be processed
*
* Note: currently includes all non-finished (i.e. active and pending
* buckets) as well
*/
public long getRemainingBucketCount() {
return progressToken.getTotalBucketCount() - progressToken.getFinishedBucketCount();
}
/**
* @return Internal bucket source instance. Do NOT modify!
*/
protected BucketSource getBucketSource() {
return bucketSource;
}
public ProgressToken getProgressToken() {
return progressToken;
}
public int getDistributionBitCount() {
return distributionBitCount;
}
/**
* Set the distribution bit count for the iterator and the buckets it * currently maintains and will return in the future.
* *For document selections that result in a explicit set of buckets, this * is essentially a no-op, so in such a case, disregard the rest of this text.
* *Changing the number of distribution bits for an unknown document * selection will effectively scale the bucket space that will be visited; * each bit increase or decrease doubling or halving its size, respectively. * When increasing, any pending buckets will be split to ensure the total * bucket space covered remains the same. Correspondingly, when decreasing, * any pending buckets will be merged appropriately.
* *If there are buckets active at the time of the change, the actual * bucket splitting/merging operations are kept on hold until all active * buckets have been updated, at which point they will be automatically * performed. The iterator will force such an update by not giving out * any new or pending buckets until that happens.
* *Note: when decreasing the number of distribution bits, * there is a chance of losing superbucket progress in a bucket that * is merged with another bucket, leading to potential duplicate * results.
* * @param distBits New system state distribution bit count */ public void setDistributionBitCount(int distBits) { if (distributionBitCount != distBits) { bucketSource.setDistributionBitCount(distBits, progressToken); distributionBitCount = distBits; if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "Set visitor iterator distribution bit count to " + distBits); } } } public boolean visitsAllBuckets() { return bucketSource.visitsAllBuckets(); } public static VisitorIterator createFromDocumentSelection( String documentSelection, BucketIdFactory idFactory, int distributionBitCount, ProgressToken progress) throws ParseException { return createFromDocumentSelection(documentSelection, idFactory, distributionBitCount, progress, 1, 0); } /** * Create a newVisitorIterator
instance based on the given document
* selection string.
*
* @param documentSelection Document selection string used to create the
* VisitorIterator
instance. Depending on the characteristics of the
* selection, the iterator may iterate over only a small subset of the buckets or
* every bucket in the system. Both cases will be handled efficiently.
* @param idFactory {@link BucketId} factory specifying the number of distribution bits
* to use et al.
* @param progress A unique {@link ProgressToken} instance which is used for maintaining the state
* of the iterator. Can not be shared with other iterator instances at the same time.
* If progress
contains work done in an earlier iteration run, the iterator will pick
* up from where it left off
* @return A new VisitorIterator
instance
* @throws ParseException if documentSelection
fails to properly parse
*/
public static VisitorIterator createFromDocumentSelection(
String documentSelection,
BucketIdFactory idFactory,
int distributionBitCount,
ProgressToken progress,
int slices,
int sliceId) throws ParseException {
BucketSelector bucketSel = new BucketSelector(idFactory);
SetVisitorIterator
instance based on the given
* set of buckets. This is supported for internal use only, and is required
* by Synchronization. Use {@link #createFromDocumentSelection} instead for
* all normal purposes.
*
* @param bucketsToVisit The set of buckets that will be visited
* @param distributionBitCount Number of distribution bits to use
* @param progress A unique ProgressToken instance which is used for maintaining the state
* of the iterator. Can not be shared with other iterator instances at the same time.
* If progress
contains work done in an earlier iteration run, the iterator will pick
* up from where it left off
* @return A new VisitorIterator
instance
*/
public static VisitorIterator createFromExplicitBucketSet(
Set