getBuckets() {
return buckets;
}
protected void setActiveBucketCount(long activeBucketCount) {
this.activeBucketCount = activeBucketCount;
}
protected void setPendingBucketCount(long pendingBucketCount) {
this.pendingBucketCount = pendingBucketCount;
}
/**
* The format of the bucket progress output is as follows:
*
* VDS bucket progress file (n% completed)\n
* distribution bit count\n
* current bucket cursor\n
* number of finished buckets\n
* total number of buckets\n
* hex-of-superbucket:hex-of-progress\n
* ... repeat above line for each pending bucket ...
*
*
* Note that unlike earlier versions of ProgressToken, the bucket IDs are
* not prefixed with '0x'.
*/
public synchronized String toString() {
StringBuilder sb = new StringBuilder();
// Append header
sb.append("VDS bucket progress file (");
sb.append(percentFinished());
sb.append("% completed)\n");
sb.append(distributionBits);
sb.append('\n');
sb.append(bucketCursor);
sb.append('\n');
long doneBucketCount = Math.max(0l, finishedBucketCount - failedBuckets.size());
sb.append(doneBucketCount);
sb.append('\n');
sb.append(totalBucketCount);
sb.append('\n');
// Append individual bucket progress
for (Map.Entry entry : buckets.entrySet()) {
sb.append(Long.toHexString(keyToBucketId(entry.getKey().getKey())));
sb.append(':');
sb.append(Long.toHexString(entry.getValue().getProgress().getRawId()));
sb.append('\n');
}
for (Map.Entry entry : failedBuckets.entrySet()) {
sb.append(Long.toHexString(entry.getKey().getRawId()));
sb.append(':');
sb.append(Long.toHexString(entry.getValue().getRawId()));
sb.append('\n');
}
return sb.toString();
}
/**
* Calculate an estimate on how far we've managed to iterate over both the
* superbuckets and the sub-buckets.
*
* Runs in O(n+m) time, where n is the number of active buckets
* and m is the number of pending buckets. Both these values should
* be fairly small in practice, however.
*
* Method is synchronized, as legacy code treats this as an atomic read.
*
* @return A value in the range [0, 100] estimating the progress.
*/
public synchronized double percentFinished() {
long superTotal = totalBucketCount;
long superFinished = finishedBucketCount;
if (superTotal == 0 || superTotal == superFinished) return 100;
double superDelta = 100.0 / superTotal;
double cumulativeSubProgress = 0;
// Calculate cumulative for all non-finished buckets. 0 means the
// bucket has yet to see any progress
// There are numerical precision issues here, but this hardly requires
// aerospace engineering result-accuracy
for (Map.Entry entry : buckets.entrySet()) {
BucketId superbucket = new BucketId(keyToBucketId(entry.getKey().getKey()));
BucketId progress = entry.getValue().getProgress();
// Prevent calculation of bucket progress on inconsistent buckets
if (progress.getId() != 0 && superbucket.contains(progress)) {
cumulativeSubProgress += superDelta * progressFraction(superbucket, progress);
}
}
return (((double)superFinished / (double)superTotal) * 100.0)
+ cumulativeSubProgress;
}
/*
* Based on the following C++ code from document/bucket/bucketid.cpp:
*
* BucketId::Type
* BucketId::bucketIdToKey(Type id)
* {
* Type retVal = reverse(id);
*
* Type usedCountLSB = id >> maxNumBits();
* retVal >>= CountBits;
* retVal <<= CountBits;
* retVal |= usedCountLSB;
*
* return retVal;
* }
*
* static uint32_t maxNumBits() { return (8 * sizeof(Type) - CountBits);}
*/
// TODO: this should probably be moved to BucketId at some point?
public static long bucketToKey(long id) {
long retVal = Long.reverse(id);
long usedCountLSB = id >>> (64 - BucketId.COUNT_BITS);
retVal >>>= BucketId.COUNT_BITS;
retVal <<= BucketId.COUNT_BITS;
retVal |= usedCountLSB;
return retVal;
}
private static BucketKeyWrapper bucketToKeyWrapper(BucketId bucket) {
return new BucketKeyWrapper(bucketToKey(bucket.getId()));
}
/*
* BucketId::Type
* BucketId::keyToBucketId(Type key)
* {
* Type retVal = reverse(key);
*
* Type usedCountMSB = key << maxNumBits();
* retVal <<= CountBits;
* retVal >>= CountBits;
* retVal |= usedCountMSB;
*
* return retVal;
* }
*/
public static long keyToBucketId(long key) {
long retVal = Long.reverse(key);
long usedCountMSB = key << (64 - BucketId.COUNT_BITS);
retVal <<= BucketId.COUNT_BITS;
retVal >>>= BucketId.COUNT_BITS;
retVal |= usedCountMSB;
return retVal;
}
/**
* @param superbucket The superbucket of which progress
is
* a sub-bucket
* @param progress The sub-bucket for which a fractional progress should
* be calculated
* @return a value in [0, 1] specifying how far the (sub-bucket) has
* reached in its superbucket. This is calculated by looking at the
* bucket's split factor.
*/
public synchronized double progressFraction(BucketId superbucket, BucketId progress) {
long revBits = bucketToKey(progress.getId());
int superUsed = superbucket.getUsedBits();
int progressUsed = progress.getUsedBits();
if (progressUsed == 0 || progressUsed < superUsed) {
return 0;
}
int splitCount = progressUsed - superUsed;
if (splitCount == 0) return 1; // Superbucket or inconsistent used-bits
// Extract reversed split-bits
revBits <<= superUsed;
revBits >>>= 64 - splitCount;
return (double)(revBits + 1) / (double)(1L << splitCount);
}
/**
* Checks whether or not a given bucket is certain to be finished. Only
* looks at the super-bucket part of the given bucket ID, so it's possible
* that the bucket has in fact finished on a sub-bucket progress level.
* This does not affect the correctness of the result, however.
*
* During a distribution bit change, the token's buckets may be inconsistent.
* In this scenario, false is always returned since we can't tell for
* sure if the bucket is still active until the buckets have been made
* consistent.
*
* @param bucket Bucket to check whether or not is finished.
* @return true
if bucket
's super-bucket is
* finished, false
otherwise.
*/
protected synchronized boolean isBucketFinished(BucketId bucket) {
if (inconsistentState) {
return false;
}
// Token only knows of super-buckets, not sub buckets
BucketId superbucket = new BucketId(distributionBits, bucket.getId());
// Bucket is done if the current cursor location implies a visitor for
// the associated superbucket has already been sent off at some point
// and there is no pending visitor for the superbucket. The cursor is
// used to directly generate bucket keys, so we can compare against it
// directly.
// Example: given db=3 and cursor=2, the buckets 000 and 100 will have
// been returned by the iterator. By reversing the id and "right-
// aligning" it, we get the cursor location that would be required to
// generate it.
// We also return false if we're inconsistent, since the active/pending
// check is done on exact key values, requiring a uniform distribution
// bit value.
long reverseId = Long.reverse(superbucket.getId())
>>> (64 - distributionBits); // No count bits
if (reverseId >= bucketCursor) {
return false;
}
// Bucket has been generated, and it must have been finished if it's
// not listed as active/pending since we always remove finished buckets
BucketEntry entry = buckets.get(bucketToKeyWrapper(superbucket));
if (entry == null) {
return true;
}
// If key of bucket progress > key of bucket id, we've finished it
long bucketKey = bucketToKey(bucket.getId());
long progressKey = bucketToKey(entry.getProgress().getId());
// TODO: verify correctness for all bucket orderings!
return progressKey > bucketKey;
}
/**
*
* @param bucket BucketId to be split into two buckets. Bucket's used-bits
* do not need to match the ProgressToken's current distribution bit count,
* as it is assumed the client knows what it's doing and will bring the
* token into a consistent state eventually.
*/
protected void splitPendingBucket(BucketId bucket) {
BucketKeyWrapper bucketKey = bucketToKeyWrapper(bucket);
BucketEntry entry = buckets.get(bucketKey);
if (entry == null) {
throw new IllegalArgumentException(
"Attempting to split unknown bucket: " + bucket);
}
if (entry.getState() != BucketState.BUCKET_PENDING) {
throw new IllegalArgumentException(
"Attempting to split non-pending bucket: " + bucket);
}
int splitDistBits = bucket.getUsedBits() + 1;
// Original bucket is replaced by two split children
BucketId splitLeft = new BucketId(splitDistBits, bucket.getId());
// Right split sibling becomes logically at location original_bucket*2 in the
// bucket space due to the key ordering and setting the MSB of the split
BucketId splitRight = new BucketId(splitDistBits, bucket.getId()
| (1L << bucket.getUsedBits()));
addBucket(splitLeft, entry.getProgress(), BucketState.BUCKET_PENDING);
addBucket(splitRight, entry.getProgress(), BucketState.BUCKET_PENDING);
// Remove old bucket
buckets.remove(bucketKey);
--pendingBucketCount;
}
protected void mergePendingBucket(BucketId bucket) {
BucketKeyWrapper bucketKey = bucketToKeyWrapper(bucket);
BucketEntry entry = buckets.get(bucketKey);
if (entry == null) {
throw new IllegalArgumentException(
"Attempting to join unknown bucket: " + bucket);
}
if (entry.getState() != BucketState.BUCKET_PENDING) {
throw new IllegalArgumentException(
"Attempting to join non-pending bucket: " + bucket);
}
int usedBits = bucket.getUsedBits();
// If MSB is 0, we should look for the bucket's right sibling. If not,
// we know that there's no left sibling, as it should otherwise have been
// merged already by the caller, due to it being ordered before the
// right sibling in the pending mapping
if ((bucket.getId() & (1L << (usedBits - 1))) == 0) {
BucketId rightCheck = new BucketId(usedBits, bucket.getId() | (1L << (usedBits - 1)));
BucketEntry rightSibling = buckets.get(bucketToKeyWrapper(rightCheck));
// Must not merge if sibling isn't pending
if (rightSibling != null) {
assert(rightSibling.getState() == BucketState.BUCKET_PENDING);
if (log.isLoggable(Level.FINEST)) {
log.log(Level.FINEST, "Merging " + bucket + " with rhs " + rightCheck);
}
// If right sibling has progress, it will unfortunately have to
// be discarded
if (rightSibling.getProgress().getUsedBits() != 0
&& log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Bucket progress for " + rightCheck +
" will be lost due to merging; potential for duplicates in result-set");
}
buckets.remove(bucketToKeyWrapper(rightCheck));
--pendingBucketCount;
}
} else {
BucketId leftSanityCheck = new BucketId(usedBits, bucket.getId() & ~(1L << (usedBits - 1)));
BucketEntry leftSibling = buckets.get(bucketToKeyWrapper(leftSanityCheck));
assert(leftSibling == null) : "bucket merge sanity checking failed";
}
BucketId newMerged = new BucketId(usedBits - 1, bucket.getId());
addBucket(newMerged, entry.getProgress(), BucketState.BUCKET_PENDING);
// Remove original bucket, leaving only the merged bucket
buckets.remove(bucketKey);
--pendingBucketCount;
assert(pendingBucketCount > 0);
}
protected void setAllBucketsToState(BucketState state) {
for (Map.Entry entry
: buckets.entrySet()) {
entry.getValue().setState(state);
}
}
protected void clearAllBuckets() {
buckets.clear();
pendingBucketCount = 0;
activeBucketCount = 0;
}
}