diff options
Diffstat (limited to 'documentapi/src/main')
38 files changed, 3920 insertions, 3920 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java index 183e4ea63d3..f759068cce2 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java @@ -10,7 +10,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; * <p>A session for asynchronous access to a document repository. * This class provides document repository writes and random access with high * throughput.</p> - * + * * <p>All operations which are <i>accepted</i> by an async session will cause one or more * {@link Response responses} to be returned within the timeout limit. When an operation fails, * the response will contain the argument which was submitted to the operation.</p> diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccessParams.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccessParams.java index 701fafbab06..b5caa2f3812 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccessParams.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccessParams.java @@ -1,38 +1,38 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi;
-
-import com.yahoo.document.config.DocumentmanagerConfig;
-
-import java.util.Optional;
-
-/**
- * Superclass of the classes which contains the parameters for creating or opening a document access.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class DocumentAccessParams {
-
- /** The id to resolve to document manager config. Not needed if the config is passed here */
- private String documentManagerConfigId = "client";
-
- /** The document manager config, or empty if not provided (in which case a subscription must be created) */
- private Optional<DocumentmanagerConfig> documentmanagerConfig = Optional.empty();
-
- /** Returns the config id that the document manager should subscribe to. */
- public String getDocumentManagerConfigId() { return documentManagerConfigId; }
-
- /** Returns the document manager config to use, or empty if it it necessary to subscribe to get it */
- public Optional<DocumentmanagerConfig> documentmanagerConfig() { return documentmanagerConfig; }
-
- /** Sets the config id that the document manager should subscribe to. */
- public DocumentAccessParams setDocumentManagerConfigId(String configId) {
- documentManagerConfigId = configId;
- return this;
- }
-
- public DocumentAccessParams setDocumentmanagerConfig(DocumentmanagerConfig documentmanagerConfig) {
- this.documentmanagerConfig = Optional.of(documentmanagerConfig);
- return this;
- }
-
-}
\ No newline at end of file +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi; + +import com.yahoo.document.config.DocumentmanagerConfig; + +import java.util.Optional; + +/** + * Superclass of the classes which contains the parameters for creating or opening a document access. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class DocumentAccessParams { + + /** The id to resolve to document manager config. Not needed if the config is passed here */ + private String documentManagerConfigId = "client"; + + /** The document manager config, or empty if not provided (in which case a subscription must be created) */ + private Optional<DocumentmanagerConfig> documentmanagerConfig = Optional.empty(); + + /** Returns the config id that the document manager should subscribe to. */ + public String getDocumentManagerConfigId() { return documentManagerConfigId; } + + /** Returns the document manager config to use, or empty if it it necessary to subscribe to get it */ + public Optional<DocumentmanagerConfig> documentmanagerConfig() { return documentmanagerConfig; } + + /** Sets the config id that the document manager should subscribe to. */ + public DocumentAccessParams setDocumentManagerConfigId(String configId) { + documentManagerConfigId = configId; + return this; + } + + public DocumentAccessParams setDocumentmanagerConfig(DocumentmanagerConfig documentmanagerConfig) { + this.documentmanagerConfig = Optional.of(documentmanagerConfig); + return this; + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/SyncParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/SyncParameters.java index 24b68613208..2db5542de23 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/SyncParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/SyncParameters.java @@ -1,11 +1,11 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi;
-
-/**
- * Parameters for creating a synchronous session
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SyncParameters extends Parameters {
- // empty
-}
+package com.yahoo.documentapi; + +/** + * Parameters for creating a synchronous session + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SyncParameters extends Parameters { + // empty +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java index f864898fb5b..252ac35b8cd 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java @@ -1,101 +1,101 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.DocumentType;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.TestAndSetCondition;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-
-/**
- * <p>A session for synchronous access to a document repository. This class
- * provides simple document access where throughput is not a concern.</p>
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface SyncSession extends Session {
-
- /**
- * <p>Puts a document. When this method returns, the document is safely
- * received. This enables setting condition compared to using Document.</p>
- *
- * @param documentPut The DocumentPut operation
- */
- void put(DocumentPut documentPut);
-
- /**
- * <p>Puts a document. When this method returns, the document is safely
- * received.</p>
- *
- * @param documentPut The DocumentPut operation
- * @param priority The priority with which to perform this operation.
- */
- void put(DocumentPut documentPut, DocumentProtocol.Priority priority);
-
- /**
- * <p>Gets a document.</p>
- *
- * @param id The id of the document to get.
- * @return The known document having this id, or null if there is no
- * document having this id.
- * @throws UnsupportedOperationException Thrown if this access does not
- * support retrieving.
- */
- Document get(DocumentId id);
-
- /**
- * <p>Gets a document.</p>
- *
- * @param id The id of the document to get.
- * @param fieldSet A comma-separated list of fields to retrieve
- * @param priority The priority with which to perform this operation.
- * @return The known document having this id, or null if there is no
- * document having this id.
- * @throws UnsupportedOperationException Thrown if this access does not
- * support retrieving.
- */
- Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority priority);
-
- /**
- * <p>Removes a document if it is present and condition is fulfilled.</p>
- * @param documentRemove document to delete
- * @return true If the document with this id was removed, false otherwise.
- */
- boolean remove(DocumentRemove documentRemove);
-
- /**
- * <p>Removes a document if it is present.</p>
- *
- * @param documentRemove Document remove operation
- * @param priority The priority with which to perform this operation.
- * @return true If the document with this id was removed, false otherwise.
- * @throws UnsupportedOperationException Thrown if this access does not
- * support removal.
- */
- boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority);
-
- /**
- * <p>Updates a document.</p>
- *
- * @param update The updates to perform.
- * @return True, if the document was found and updated.
- * @throws UnsupportedOperationException Thrown if this access does not
- * support update.
- */
- boolean update(DocumentUpdate update);
-
- /**
- * <p>Updates a document.</p>
- *
- * @param update The updates to perform.
- * @param priority The priority with which to perform this operation.
- * @return True, if the document was found and updated.
- * @throws UnsupportedOperationException Thrown if this access does not
- * support update.
- */
- boolean update(DocumentUpdate update, DocumentProtocol.Priority priority);
-
-}
+package com.yahoo.documentapi; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; + +/** + * <p>A session for synchronous access to a document repository. This class + * provides simple document access where throughput is not a concern.</p> + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface SyncSession extends Session { + + /** + * <p>Puts a document. When this method returns, the document is safely + * received. This enables setting condition compared to using Document.</p> + * + * @param documentPut The DocumentPut operation + */ + void put(DocumentPut documentPut); + + /** + * <p>Puts a document. When this method returns, the document is safely + * received.</p> + * + * @param documentPut The DocumentPut operation + * @param priority The priority with which to perform this operation. + */ + void put(DocumentPut documentPut, DocumentProtocol.Priority priority); + + /** + * <p>Gets a document.</p> + * + * @param id The id of the document to get. + * @return The known document having this id, or null if there is no + * document having this id. + * @throws UnsupportedOperationException Thrown if this access does not + * support retrieving. + */ + Document get(DocumentId id); + + /** + * <p>Gets a document.</p> + * + * @param id The id of the document to get. + * @param fieldSet A comma-separated list of fields to retrieve + * @param priority The priority with which to perform this operation. + * @return The known document having this id, or null if there is no + * document having this id. + * @throws UnsupportedOperationException Thrown if this access does not + * support retrieving. + */ + Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority priority); + + /** + * <p>Removes a document if it is present and condition is fulfilled.</p> + * @param documentRemove document to delete + * @return true If the document with this id was removed, false otherwise. + */ + boolean remove(DocumentRemove documentRemove); + + /** + * <p>Removes a document if it is present.</p> + * + * @param documentRemove Document remove operation + * @param priority The priority with which to perform this operation. + * @return true If the document with this id was removed, false otherwise. + * @throws UnsupportedOperationException Thrown if this access does not + * support removal. + */ + boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority); + + /** + * <p>Updates a document.</p> + * + * @param update The updates to perform. + * @return True, if the document was found and updated. + * @throws UnsupportedOperationException Thrown if this access does not + * support update. + */ + boolean update(DocumentUpdate update); + + /** + * <p>Updates a document.</p> + * + * @param update The updates to perform. + * @param priority The priority with which to perform this operation. + * @return True, if the document was found and updated. + * @throws UnsupportedOperationException Thrown if this access does not + * support update. + */ + boolean update(DocumentUpdate update, DocumentProtocol.Priority priority); + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index cde434df141..30bed329918 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -1,797 +1,797 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi;
-
-import com.yahoo.document.BucketId;
-import com.yahoo.document.BucketIdFactory;
-import com.yahoo.document.select.BucketSelector;
-import com.yahoo.document.select.parser.ParseException;
-import com.yahoo.log.LogLevel;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.logging.Logger;
-
-/**
- * <p>Enables transparent iteration of super/sub-buckets</p>
- *
- * <p>Thread safety: safe for threads to hold their own iterators (no shared state),
- * as long as they also hold the ProgressToken object associated with it. No two
- * VisitorIterator instances may share the same progress token instance at the
- * same time.
- * Concurrent access to a single VisitorIterator instance is not safe and must
- * be handled atomically by the caller.</p>
- *
- * @author <a href="mailto:vekterli@yahoo-inc.com">Tor Brede Vekterli</a>
- */
-public class VisitorIterator {
- private ProgressToken progressToken;
- private BucketSource bucketSource;
- private int distributionBitCount;
-
- private static final Logger log = Logger.getLogger(VisitorIterator.class.getName());
-
- public static class BucketProgress {
- private BucketId superbucket;
- private BucketId progress;
-
- public BucketProgress(BucketId superbucket, BucketId progress) {
- this.superbucket = superbucket;
- this.progress = progress;
- }
-
- public BucketId getProgress() {
- return progress;
- }
-
- public BucketId getSuperbucket() {
- return superbucket;
- }
- }
-
- /**
- * Provides an abstract interface to <code>VisitorIterator</code> for
- * how pending buckets are acquired, decoupling this from the iteration
- * itself.
- *
- * <em>Important</em>: it is the responsibility of the {@link BucketSource} implementation
- * to ensure that progress information is honored for (partially) finished buckets.
- * From the point of view of the iterator itself, it should not have to deal with
- * filtering away already finished buckets, as this is a detail best left to
- * bucket sources.
- */
- protected static interface BucketSource {
- public boolean hasNext();
- public boolean shouldYield();
- public boolean visitsAllBuckets();
- public BucketProgress getNext();
- public long getTotalBucketCount();
- public int getDistributionBitCount();
- public void setDistributionBitCount(int distributionBitCount,
- ProgressToken progress);
- public void update(BucketId superbucket, BucketId progress,
- ProgressToken token);
- }
-
- /**
- * Provides a bucket source that encompasses the entire range available
- * through a given value of distribution bits
- */
- protected static class DistributionRangeBucketSource implements BucketSource {
- private boolean flushActive = false;
- private int distributionBitCount;
- // Wouldn't need this if this were a non-static class, but do it for
- // the sake of keeping things identical in Java and C++
- private ProgressToken progressToken;
-
- public DistributionRangeBucketSource(int distributionBitCount,
- ProgressToken progress) {
- progressToken = progress;
-
- // New progress token (could also be empty, in which this is a
- // no-op anyway)
- if (progressToken.getTotalBucketCount() == 0) {
- assert(progressToken.isEmpty()) : "inconsistent progress state";
- progressToken.setTotalBucketCount(1L << distributionBitCount);
- progressToken.setDistributionBitCount(distributionBitCount);
- progressToken.setBucketCursor(0);
- progressToken.setFinishedBucketCount(0);
- this.distributionBitCount = distributionBitCount;
- }
- else {
- this.distributionBitCount = progressToken.getDistributionBitCount();
- // Quick consistency check to ensure the user isn't trying to eg.
- // pass a progress token for an explicit document selection
- if (progressToken.getTotalBucketCount() != (1L << progressToken.getDistributionBitCount())) {
- throw new IllegalArgumentException("Total bucket count in existing progress is not "
- + "consistent with that of the current document selection");
- }
- }
-
- if (!progress.isFinished()) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Importing unfinished progress token with " +
- "bits: " + progressToken.getDistributionBitCount() +
- ", active: " + progressToken.getActiveBucketCount() +
- ", pending: " + progressToken.getPendingBucketCount() +
- ", cursor: " + progressToken.getBucketCursor() +
- ", finished: " + progressToken.getFinishedBucketCount() +
- ", total: " + progressToken.getTotalBucketCount());
- }
- if (!progress.isEmpty()) {
- // Lower all active to pending
- if (progressToken.getActiveBucketCount() > 0) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Progress token had active buckets upon range " +
- "construction. Setting these as pending");
- }
- progressToken.setAllBucketsToState(ProgressToken.BucketState.BUCKET_PENDING);
- }
- // Fixup for any buckets that were active when progress was written
- // but are now pending and with wrong dist bits (used-bits). Buckets
- // split here may very well be split/merged again if we set a new dist
- // bit count, but that is the desired process
- correctInconsistentPending(progressToken.getDistributionBitCount());
- // Fixup for bucket cursor in case of bucket space downscaling
- correctTruncatedBucketCursor();
-
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Partial bucket space progress; continuing "+
- "from position " + progressToken.getBucketCursor());
- }
- }
- progressToken.setFinishedBucketCount(progressToken.getBucketCursor() -
- progressToken.getPendingBucketCount());
- } else {
- assert(progressToken.getBucketCursor() == progressToken.getTotalBucketCount());
- }
- // Should be all fixed up and good to go
- progressToken.setInconsistentState(false);
- }
-
- protected boolean isLosslessResetPossible() {
- // #pending must be equal to cursor, i.e. all buckets ever fetched
- // must be located in the set of pending
- if (progressToken.getPendingBucketCount() != progressToken.getBucketCursor()) {
- return false;
- }
- // Check if all pending buckets have a progress of 0
- for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry
- : progressToken.getBuckets().entrySet()) {
- if (entry.getValue().getState() != ProgressToken.BucketState.BUCKET_PENDING) {
- return false;
- }
- if (entry.getValue().getProgress().getId() != 0) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Ensure that a given <code>ProgressToken</code> instance only has
- * buckets pending that have a used-bits count of that of the
- * <code>targetDistCits</code>. This is done by splitting or merging
- * all inconsistent buckets until the desired state is reached.
- *
- * Time complexity is approx <i>O(4bn)</i> where <i>b</i> is the maximum
- * delta of bits to change anywhere in the set of pending and <i>n</i>
- * is the number of pending. This includes the time spent making shallow
- * map copies.
- *
- * @param targetDistBits The desired distribution bit count of the buckets
- */
- private void correctInconsistentPending(int targetDistBits) {
- boolean maybeInconsistent = true;
- long bucketsSplit = 0, bucketsMerged = 0;
- long pendingBefore = progressToken.getPendingBucketCount();
- ProgressToken p = progressToken;
-
- // Optimization: before doing any splitting/merging at all, we check
- // to see if we can't simply just reset the entire internal state
- // with the new distribution bit count. This ensures that if we go
- // from eg. 1 bit to 20 bits, we won't have to perform a grueling
- // half a million splits to cover the same bucket space as that 1
- // single-bit bucket once did
- if (isLosslessResetPossible()) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "At start of bucket space and all " +
- "buckets have no progress; doing a lossless reset " +
- "instead of splitting/merging");
- }
- assert(p.getActiveBucketCount() == 0);
- p.clearAllBuckets();
- p.setBucketCursor(0);
- return;
- }
-
- while (maybeInconsistent) {
- BucketId lastMergedBucket = null;
- maybeInconsistent = false;
- // Make a shallow working copy of the bucket map. BucketKeyWrapper
- // keys are considered immutable, and should thus not be at risk
- // for being changed during the inner loop
- // Do separate passes for splitting and merging just to make
- // absolutely sure that the two ops won't step on each others'
- // toes. This isn't wildly efficient, but the data sets in question
- // are presumed to be low in size and this is presumed to be a very
- // infrequent operation
- TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> buckets
- = new TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry>(p.getBuckets());
- for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry
- : buckets.entrySet()) {
- assert(entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING);
- BucketId pending = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey()));
- if (pending.getUsedBits() < targetDistBits) {
- if (pending.getUsedBits() + 1 < targetDistBits) {
- maybeInconsistent = true; // Do another pass
- }
- p.splitPendingBucket(pending);
- ++bucketsSplit;
- }
- }
-
- // Make new map copy with potentially split buckets
- buckets = new TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry>(p.getBuckets());
- for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry
- : buckets.entrySet()) {
- assert(entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING);
- BucketId pending = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey()));
- if (pending.getUsedBits() > targetDistBits) {
- // If this is the right sibling of an already merged left sibling,
- // it's already been merged away, so we should skip it
- if (lastMergedBucket != null) {
- BucketId rightCheck = new BucketId(lastMergedBucket.getUsedBits(),
- lastMergedBucket.getId() | (1L << (lastMergedBucket.getUsedBits() - 1)));
- if (pending.equals(rightCheck)) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Skipped " + pending +
- ", as it was right sibling of " + lastMergedBucket);
- }
- continue;
- }
- }
- if (pending.getUsedBits() - 1 > targetDistBits) {
- maybeInconsistent = true; // Do another pass
- }
- p.mergePendingBucket(pending);
- ++bucketsMerged;
-
- lastMergedBucket = pending;
- }
- }
- }
- if ((bucketsSplit > 0 || bucketsMerged > 0) && log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Existing progress' pending buckets had inconsistent " +
- "distribution bits; performed " + bucketsSplit + " split ops and " +
- bucketsMerged + " merge ops. Pending: " + pendingBefore + " -> " +
- p.getPendingBucketCount());
- }
- }
-
- private void correctTruncatedBucketCursor() {
- // We've truncated the bucket cursor, but in doing so we might
- // have moved back beyond where there are pending buckets. Consider
- // having a cursor value of 3 at 31 bits and then moving to 11 bits.
- // With 1 pending we'll normally reach a cursor of 0, even though it
- // should be 1
- for (ProgressToken.BucketKeyWrapper bucketKey
- : progressToken.getBuckets().keySet()) {
- BucketId bid = bucketKey.toBucketId();
- long idx = bucketKey.getKey() >>> (64 - bid.getUsedBits());
- if (bid.getUsedBits() == distributionBitCount
- && idx >= progressToken.getBucketCursor()) {
- progressToken.setBucketCursor(idx + 1);
- }
- }
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "New range bucket cursor is " +
- progressToken.getBucketCursor());
- }
- }
-
- public boolean hasNext() {
- return progressToken.getBucketCursor() < (1L << distributionBitCount);
- }
-
- public boolean shouldYield() {
- // If we need to flush all active buckets, stall the iteration until
- // this has been done
- return flushActive;
- }
-
- public boolean visitsAllBuckets() {
- return true;
- }
-
- public long getTotalBucketCount() {
- return 1L << distributionBitCount;
- }
-
- public BucketProgress getNext() {
- assert(hasNext()) : "getNext() called with hasNext() == false";
- long currentPosition = progressToken.getBucketCursor();
- long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount);
- ++currentPosition;
- progressToken.setBucketCursor(currentPosition);
- return new BucketProgress(
- new BucketId(ProgressToken.keyToBucketId(key)),
- new BucketId());
- }
-
- public int getDistributionBitCount() {
- return distributionBitCount;
- }
-
- public void setDistributionBitCount(int distributionBitCount,
- ProgressToken progress)
- {
- this.distributionBitCount = distributionBitCount;
-
- // There might be a case where we're waiting for active buckets
- // already when a new distribution bit change comes in. If so,
- // don't do anything at all yet with the set of pending
- if (progressToken.getActiveBucketCount() > 0) {
- flushActive = true;
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Holding off new/pending buckets and consistency " +
- "correction until all " + progress.getActiveBucketCount() +
- " active buckets have been updated");
- }
- progressToken.setInconsistentState(true);
- } else {
- // Only perform the actual distribution bit bucket ops if we've
- // got no pending buckets
- int delta = distributionBitCount - progressToken.getDistributionBitCount();
-
- // Must do this before setting the bucket cursor to allow
- // reset-checking to be performed
- correctInconsistentPending(distributionBitCount);
- if (delta > 0) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Increasing distribution bits for full bucket " +
- "space range source from " + progressToken.getDistributionBitCount() + " to " +
- distributionBitCount);
- }
- progressToken.setFinishedBucketCount(progressToken.getFinishedBucketCount() << delta);
- // By n-doubling the position, the bucket key ordering ensures
- // we go from eg. 3:0x02 to 4:0x02 to 5:02 etc.
- progressToken.setBucketCursor(progressToken.getBucketCursor() << delta);
- } else if (delta < 0) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Decreasing distribution bits for full bucket " +
- "space range source from " + progressToken.getDistributionBitCount() +
- " to " + distributionBitCount + " bits");
- }
- // Scale down bucket space and cursor
- progressToken.setBucketCursor(progressToken.getBucketCursor() >>> -delta);
- progressToken.setFinishedBucketCount(progressToken.getFinishedBucketCount() >>> -delta);
- }
-
- progressToken.setTotalBucketCount(1L << distributionBitCount);
- progressToken.setDistributionBitCount(distributionBitCount);
-
- correctTruncatedBucketCursor();
- progressToken.setInconsistentState(false);
- }
- }
-
- public void update(BucketId superbucket, BucketId progress,
- ProgressToken token) {
- progressToken.updateProgress(superbucket, progress);
-
- if (superbucket.getUsedBits() != distributionBitCount) {
- if (!progress.equals(ProgressToken.FINISHED_BUCKET)) {
- // We should now always flush active buckets before doing a
- // consistency fix. This simplifies things greatly
- assert(flushActive);
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Received non-finished bucket " +
- superbucket + " with wrong distribution bit count (" +
- superbucket.getUsedBits() + "). Waiting to correct " +
- "until all active are done");
- }
- } else {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Received finished bucket " +
- superbucket + " with wrong distribution bit count (" +
- superbucket.getUsedBits() + "). Waiting to correct " +
- "until all active are done");
- }
- }
- }
-
- if (progressToken.getActiveBucketCount() == 0) {
- if (flushActive) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "All active buckets flushed, " +
- "correcting progress token and continuing normal operation");
- }
- // Trigger the actual bucket state change this time
- setDistributionBitCount(distributionBitCount, progressToken);
- assert(progressToken.getDistributionBitCount() == distributionBitCount);
- }
- flushActive = false;
- // Update #finished since we might have had inconsistent active
- // buckets that have prevent us from getting a correct value. At
- // this point, however, all pending buckets should presumably be
- // at the same, correct dist bit count, so we can safely compute
- // a new count
- // TODO: ensure this is consistent
- if (progressToken.getPendingBucketCount() <= progressToken.getBucketCursor()) {
- progressToken.setFinishedBucketCount(progressToken.getBucketCursor() -
- progressToken.getPendingBucketCount());
- }
- }
- }
- }
-
- /**
- * Provides an explicit set of bucket IDs to iterate over. Will immediately
- * set these as pending in the {@link ProgressToken}, as it is presumed this set is
- * rather small. Changing the distribution bit count for this source is
- * effectively a no-op, as explicit bucket IDs should not be implicitly
- * changed.
- */
- protected static class ExplicitBucketSource implements BucketSource {
- private int distributionBitCount;
- private long totalBucketCount = 0;
-
- public ExplicitBucketSource(Set<BucketId> superbuckets,
- int distributionBitCount,
- ProgressToken progress) {
- this.distributionBitCount = progress.getDistributionBitCount();
- this.totalBucketCount = superbuckets.size();
-
- // New progress token?
- if (progress.getTotalBucketCount() == 0) {
- progress.setTotalBucketCount(this.totalBucketCount);
- progress.setDistributionBitCount(distributionBitCount);
- this.distributionBitCount = distributionBitCount;
- }
- else {
- // Quick consistency check to ensure the user isn't trying to eg.
- // pass a progress token for another document selection
- if (progress.getTotalBucketCount() != totalBucketCount
- || (progress.getFinishedBucketCount() + progress.getPendingBucketCount()
- + progress.getActiveBucketCount() != totalBucketCount)) {
- throw new IllegalArgumentException("Total bucket count in existing progress is not " +
- "consistent with that of the current document selection");
- }
- if (progress.getBucketCursor() != 0) {
- // Trying to use a range source progress file
- throw new IllegalArgumentException("Cannot use given progress file with the "+
- "current document selection");
- }
- this.distributionBitCount = progress.getDistributionBitCount();
- }
-
- if (progress.isFinished() || !progress.isEmpty()) return;
-
- for (BucketId id : superbuckets) {
- // Add all superbuckets with zero sub-bucket progress and pending
- progress.addBucket(id, new BucketId(), ProgressToken.BucketState.BUCKET_PENDING);
- }
- }
-
- public boolean hasNext() {
- return false;
- }
-
- public boolean shouldYield() {
- return false;
- }
-
- public boolean visitsAllBuckets() {
- return false;
- }
-
- public long getTotalBucketCount() {
- return totalBucketCount;
- }
-
- // All explicit buckets should have been placed in the progress
- // token during construction, so this method should never be called
- public BucketProgress getNext() {
- throw new IllegalStateException("getNext() called on ExplicitBucketSource");
- }
-
- public int getDistributionBitCount() {
- return distributionBitCount;
- }
-
- public void setDistributionBitCount(int distributionBitCount,
- ProgressToken progress)
- {
- // Setting distribution bits for explicit bucket source is essentially
- // a no-op, since its buckets already are fixed at 32 used bits.
- progress.setDistributionBitCount(distributionBitCount);
- this.distributionBitCount = distributionBitCount;
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Set distribution bit count to "
- + distributionBitCount + " for explicit bucket source (no-op)");
- }
- }
-
- public void update(BucketId superbucket, BucketId progress,
- ProgressToken token) {
- // Simply delegate to ProgressToken, as it maintains all progress state
- token.updateProgress(superbucket, progress);
- }
- }
-
- /**
- * @param bucketSource An instance of {@link BucketSource}, providing the working set for
- * the iterator
- * @param progressToken A {@link ProgressToken} instance, allowing the progress of
- * finished or partially finished buckets to be tracked
- *
- * @see BucketSource
- * @see ProgressToken
- */
- private VisitorIterator(ProgressToken progressToken,
- BucketSource bucketSource) {
- assert(progressToken.getDistributionBitCount() == bucketSource.getDistributionBitCount())
- : "inconsistent distribution bit counts";
- this.distributionBitCount = progressToken.getDistributionBitCount();
- this.progressToken = progressToken;
- this.bucketSource = bucketSource;
- }
-
-
- /**
- * @return The pair [superbucket, progress] that specifies the next iterable
- * bucket. When a superbucket is initially returned, the pair is equal to
- * that of [superbucket, 0], as there has been no progress into its sub-buckets
- * yet (if they exist).
- *
- * Precondition: <code>hasNext() == true</code>
- */
- public BucketProgress getNext() {
- assert(progressToken.getDistributionBitCount() == bucketSource.getDistributionBitCount())
- : "inconsistent distribution bit counts for progress and source";
- assert(hasNext());
- // We prioritize returning buckets in the pending map over those
- // that may be in the bucket source, since we want to avoid growing
- // the map too much
- if (progressToken.hasPending()) {
- // Find first pending bucket in token
- TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> buckets = progressToken.getBuckets();
- ProgressToken.BucketEntry pending = null;
- BucketId superbucket = null;
- for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry : buckets.entrySet()) {
- if (entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING) {
- pending = entry.getValue();
- superbucket = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey()));
- break;
- }
- }
- assert(pending != null) : "getNext() called with inconsistent state";
-
- // Set bucket to active, since it's not awaiting an update
- pending.setState(ProgressToken.BucketState.BUCKET_ACTIVE);
-
- progressToken.setActiveBucketCount(progressToken.getActiveBucketCount() + 1);
- progressToken.setPendingBucketCount(progressToken.getPendingBucketCount() - 1);
-
- return new BucketProgress(superbucket, pending.getProgress());
- } else {
- BucketProgress ret = bucketSource.getNext();
- progressToken.addBucket(ret.getSuperbucket(), ret.getProgress(),
- ProgressToken.BucketState.BUCKET_ACTIVE);
- return ret;
- }
- }
-
- /**
- * <p>Check whether or not it is valid to call {@link #getNext()} with the current
- * iterator state.</p>
- *
- * <p>There exists a case wherein <code>hasNext</code> may return false before {@link #update} is
- * called, but true afterwards. This happens when the set of pending buckets is
- * empty, the bucket source is empty <em>but</em> the set of active buckets is
- * not. A future progress update on any of the buckets in the active set may
- * or may not make that bucket available to the pending set again.
- * This must be handled explicitly by the caller by checking {@link #isDone()}
- * and ensuring that {@link #update} is called before retrying <code>hasNext</code>.</p>
- *
- * <p>This method will also return false if the number of distribution bits have
- * changed and there are active buckets needing to be flushed before the
- * iterator will allow new buckets to be handed out.</p>
- *
- * @return Whether or not it is valid to call {@link #getNext()} with the current
- * iterator state.
- */
- public boolean hasNext() {
- return (progressToken.hasPending() || bucketSource.hasNext()) && !bucketSource.shouldYield();
- }
-
- /**
- * Check if the iterator is actually done
- *
- * @see #hasNext()
- *
- * @return <code>true</code> <em>iff</em> the bucket source is empty and
- * there are no pending or active buckets in the progress token.
- */
- public boolean isDone() {
- return !(hasNext() || progressToken.hasActive());
- }
-
- /**
- * <p>Tell the iterator that we've finished processing up to <i>and
- * including</i> <code>progress</code>. <code>progress</code> may be a sub-bucket <i>or</i>
- * the invalid 0-bucket (in case the caller fails to process the bucket and
- * must return it to the set of pending) <em>or</em> the special case <code>BucketId(Integer.MAX_VALUE)</code>,
- * the latter indicating to the iterator that traversal is complete for
- * <code>superbucket</code>'s tree. The null bucket should only be used if no
- * non-null updates have yet been given for the superbucket.</p>
- *
- * <p>It is a requirement that each superbucket returned by {@link #getNext()} must
- * eventually result in 1-n update operations, where the last update operation
- * has the special progress==super case.</p>
- *
- * <p>If the document selection used to create the iterator is unknown and there
- * were active buckets at the time of a distribution bit state change, such
- * a bucket passed to <code>update()</code> will be in an inconsistent state
- * with regards to the number of bits it uses. For unfinished buckets, this
- * is handled by splitting or merging it until it's consistent, depending on
- * whether or not it had a lower or higher distribution bit count than that of
- * the current system state. For finished buckets of a lower dist bit count,
- * the amount of finished buckets in the ProgressToken is adjusted upwards
- * to compensate for the fact that a bucket using fewer distribution bits
- * actually covers more of the bucket space than the ones that are currently
- * in use. For finished buckets of a higher dist bit count, the number of
- * finished buckets is <em>not</em> increased at that point in time, since
- * such a bucket doesn't actually cover an entire bucket with the current state.</p>
- *
- * <p>All this is done automatically and transparently to the caller once all
- * active buckets have been updated.</p>
- *
- * @param superbucket A valid bucket ID that has been retrieved earlier through
- * {@link #getNext()}
- * @param progress A bucket logically contained within <code>super</code>. Subsequent
- * updates for the same superbucket must have <code>progress</code> be in an increasing
- * order, where order is defined as the in-order traversal of the bucket split
- * tree. May also be the null bucket if the superbucket has not seen any "proper"
- * progress updates yet or the special case Integer.MAX_VALUE. Note that inconsistent
- * splitting might actually see <code>progress</code> as containing <code>super</code>
- * rather than vice versa, so this is explicitly allowed to pass by the code.
- */
- public void update(BucketId superbucket, BucketId progress) {
- // Delegate to bucket source, as it knows how to deal with buckets
- // that are in an inconsistent state wrt distribution bit count
- bucketSource.update(superbucket, progress, progressToken);
- }
-
- /**
- * @return The total number of iterable buckets that remain to be processed
- *
- * Note: currently includes all non-finished (i.e. active and pending
- * buckets) as well
- */
- public long getRemainingBucketCount() {
- return progressToken.getTotalBucketCount() - progressToken.getFinishedBucketCount();
- }
-
- /**
- * @return Internal bucket source instance. Do <i>NOT</i> modify!
- */
- protected BucketSource getBucketSource() {
- return bucketSource;
- }
-
- public ProgressToken getProgressToken() {
- return progressToken;
- }
-
- public int getDistributionBitCount() {
- return distributionBitCount;
- }
-
- /**
- * <p>Set the distribution bit count for the iterator and the buckets it
- * currently maintains and will return in the future.</p>
- *
- * <p>For document selections that result in a explicit set of buckets, this
- * is essentially a no-op, so in such a case, disregard the rest of this text.</p>
- *
- * <p>Changing the number of distribution bits for an unknown document
- * selection will effectively scale the bucket space that will be visited;
- * each bit increase or decrease doubling or halving its size, respectively.
- * When increasing, any pending buckets will be split to ensure the total
- * bucket space covered remains the same. Correspondingly, when decreasing,
- * any pending buckets will be merged appropriately.</p>
- *
- * <p>If there are buckets active at the time of the change, the actual
- * bucket splitting/merging operations are kept on hold until all active
- * buckets have been updated, at which point they will be automatically
- * performed. The iterator will force such an update by not giving out
- * any new or pending buckets until that happens.</p>
- *
- * <p><em>Note:</em> when decreasing the number of distribution bits,
- * there is a chance of losing superbucket progress in a bucket that
- * is merged with another bucket, leading to potential duplicate
- * results.</p>
- *
- * @param distBits New system state distribution bit count
- */
- public void setDistributionBitCount(int distBits) {
- if (distributionBitCount != distBits) {
- bucketSource.setDistributionBitCount(distBits, progressToken);
- distributionBitCount = distBits;
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Set visitor iterator distribution bit count to "
- + distBits);
- }
- }
- }
-
- public boolean visitsAllBuckets() {
- return bucketSource.visitsAllBuckets();
- }
-
- /**
- * Create a new <code>VisitorIterator</code> instance based on the given document
- * selection string.
- *
- * @param documentSelection Document selection string used to create the
- * <code>VisitorIterator</code> instance. Depending on the characteristics of the
- * selection, the iterator may iterate over only a small subset of the buckets or
- * every bucket in the system. Both cases will be handled efficiently.
- * @param idFactory {@link BucketId} factory specifying the number of distribution bits
- * to use et al.
- * @param progress A unique {@link ProgressToken} instance which is used for maintaining the state
- * of the iterator. Can <em>not</em> be shared with other iterator instances at the same time.
- * If <code>progress</code> contains work done in an earlier iteration run, the iterator will pick
- * up from where it left off
- * @return A new <code>VisitorIterator</code> instance
- * @throws ParseException if <code>documentSelection</code> fails to properly parse
- */
- public static VisitorIterator createFromDocumentSelection(
- String documentSelection,
- BucketIdFactory idFactory,
- int distributionBitCount,
- ProgressToken progress) throws ParseException {
- BucketSelector bucketSel = new BucketSelector(idFactory);
- Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection);
- BucketSource src;
-
- // Depending on whether the expression yielded an unknown number of
- // buckets, we create either an explicit bucket source or a distribution
- // bit-based range source
- if (rawBuckets == null) {
- // Range source
- src = new DistributionRangeBucketSource(distributionBitCount, progress);
- } else {
- // Explicit source
- src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress);
- }
-
- return new VisitorIterator(progress, src);
- }
-
- /**
- * Create a new <code>VisitorIterator</code> instance based on the given
- * set of buckets. This is supported for internal use only, and is required
- * by Synchronization. Use {@link #createFromDocumentSelection} instead for
- * all normal purposes.
- *
- * @param bucketsToVisit The set of buckets that will be visited
- * @param distributionBitCount Number of distribution bits to use
- * @param progress A unique ProgressToken instance which is used for maintaining the state
- * of the iterator. Can <em>not</em> be shared with other iterator instances at the same time.
- * If <code>progress</code> contains work done in an earlier iteration run, the iterator will pick
- * up from where it left off
- * @return A new <code>VisitorIterator</code> instance
- */
- public static VisitorIterator createFromExplicitBucketSet(
- Set<BucketId> bucketsToVisit,
- int distributionBitCount,
- ProgressToken progress) {
- // For obvious reasons, always create an explicit source here
- BucketSource src = new ExplicitBucketSource(bucketsToVisit,
- distributionBitCount, progress);
- return new VisitorIterator(progress, src);
- }
-}
+package com.yahoo.documentapi; + +import com.yahoo.document.BucketId; +import com.yahoo.document.BucketIdFactory; +import com.yahoo.document.select.BucketSelector; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.log.LogLevel; + +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.logging.Logger; + +/** + * <p>Enables transparent iteration of super/sub-buckets</p> + * + * <p>Thread safety: safe for threads to hold their own iterators (no shared state), + * as long as they also hold the ProgressToken object associated with it. No two + * VisitorIterator instances may share the same progress token instance at the + * same time. + * Concurrent access to a single VisitorIterator instance is not safe and must + * be handled atomically by the caller.</p> + * + * @author <a href="mailto:vekterli@yahoo-inc.com">Tor Brede Vekterli</a> + */ +public class VisitorIterator { + private ProgressToken progressToken; + private BucketSource bucketSource; + private int distributionBitCount; + + private static final Logger log = Logger.getLogger(VisitorIterator.class.getName()); + + public static class BucketProgress { + private BucketId superbucket; + private BucketId progress; + + public BucketProgress(BucketId superbucket, BucketId progress) { + this.superbucket = superbucket; + this.progress = progress; + } + + public BucketId getProgress() { + return progress; + } + + public BucketId getSuperbucket() { + return superbucket; + } + } + + /** + * Provides an abstract interface to <code>VisitorIterator</code> for + * how pending buckets are acquired, decoupling this from the iteration + * itself. + * + * <em>Important</em>: it is the responsibility of the {@link BucketSource} implementation + * to ensure that progress information is honored for (partially) finished buckets. + * From the point of view of the iterator itself, it should not have to deal with + * filtering away already finished buckets, as this is a detail best left to + * bucket sources. + */ + protected static interface BucketSource { + public boolean hasNext(); + public boolean shouldYield(); + public boolean visitsAllBuckets(); + public BucketProgress getNext(); + public long getTotalBucketCount(); + public int getDistributionBitCount(); + public void setDistributionBitCount(int distributionBitCount, + ProgressToken progress); + public void update(BucketId superbucket, BucketId progress, + ProgressToken token); + } + + /** + * Provides a bucket source that encompasses the entire range available + * through a given value of distribution bits + */ + protected static class DistributionRangeBucketSource implements BucketSource { + private boolean flushActive = false; + private int distributionBitCount; + // Wouldn't need this if this were a non-static class, but do it for + // the sake of keeping things identical in Java and C++ + private ProgressToken progressToken; + + public DistributionRangeBucketSource(int distributionBitCount, + ProgressToken progress) { + progressToken = progress; + + // New progress token (could also be empty, in which this is a + // no-op anyway) + if (progressToken.getTotalBucketCount() == 0) { + assert(progressToken.isEmpty()) : "inconsistent progress state"; + progressToken.setTotalBucketCount(1L << distributionBitCount); + progressToken.setDistributionBitCount(distributionBitCount); + progressToken.setBucketCursor(0); + progressToken.setFinishedBucketCount(0); + this.distributionBitCount = distributionBitCount; + } + else { + this.distributionBitCount = progressToken.getDistributionBitCount(); + // Quick consistency check to ensure the user isn't trying to eg. + // pass a progress token for an explicit document selection + if (progressToken.getTotalBucketCount() != (1L << progressToken.getDistributionBitCount())) { + throw new IllegalArgumentException("Total bucket count in existing progress is not " + + "consistent with that of the current document selection"); + } + } + + if (!progress.isFinished()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Importing unfinished progress token with " + + "bits: " + progressToken.getDistributionBitCount() + + ", active: " + progressToken.getActiveBucketCount() + + ", pending: " + progressToken.getPendingBucketCount() + + ", cursor: " + progressToken.getBucketCursor() + + ", finished: " + progressToken.getFinishedBucketCount() + + ", total: " + progressToken.getTotalBucketCount()); + } + if (!progress.isEmpty()) { + // Lower all active to pending + if (progressToken.getActiveBucketCount() > 0) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Progress token had active buckets upon range " + + "construction. Setting these as pending"); + } + progressToken.setAllBucketsToState(ProgressToken.BucketState.BUCKET_PENDING); + } + // Fixup for any buckets that were active when progress was written + // but are now pending and with wrong dist bits (used-bits). Buckets + // split here may very well be split/merged again if we set a new dist + // bit count, but that is the desired process + correctInconsistentPending(progressToken.getDistributionBitCount()); + // Fixup for bucket cursor in case of bucket space downscaling + correctTruncatedBucketCursor(); + + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Partial bucket space progress; continuing "+ + "from position " + progressToken.getBucketCursor()); + } + } + progressToken.setFinishedBucketCount(progressToken.getBucketCursor() - + progressToken.getPendingBucketCount()); + } else { + assert(progressToken.getBucketCursor() == progressToken.getTotalBucketCount()); + } + // Should be all fixed up and good to go + progressToken.setInconsistentState(false); + } + + protected boolean isLosslessResetPossible() { + // #pending must be equal to cursor, i.e. all buckets ever fetched + // must be located in the set of pending + if (progressToken.getPendingBucketCount() != progressToken.getBucketCursor()) { + return false; + } + // Check if all pending buckets have a progress of 0 + for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry + : progressToken.getBuckets().entrySet()) { + if (entry.getValue().getState() != ProgressToken.BucketState.BUCKET_PENDING) { + return false; + } + if (entry.getValue().getProgress().getId() != 0) { + return false; + } + } + return true; + } + + /** + * Ensure that a given <code>ProgressToken</code> instance only has + * buckets pending that have a used-bits count of that of the + * <code>targetDistCits</code>. This is done by splitting or merging + * all inconsistent buckets until the desired state is reached. + * + * Time complexity is approx <i>O(4bn)</i> where <i>b</i> is the maximum + * delta of bits to change anywhere in the set of pending and <i>n</i> + * is the number of pending. This includes the time spent making shallow + * map copies. + * + * @param targetDistBits The desired distribution bit count of the buckets + */ + private void correctInconsistentPending(int targetDistBits) { + boolean maybeInconsistent = true; + long bucketsSplit = 0, bucketsMerged = 0; + long pendingBefore = progressToken.getPendingBucketCount(); + ProgressToken p = progressToken; + + // Optimization: before doing any splitting/merging at all, we check + // to see if we can't simply just reset the entire internal state + // with the new distribution bit count. This ensures that if we go + // from eg. 1 bit to 20 bits, we won't have to perform a grueling + // half a million splits to cover the same bucket space as that 1 + // single-bit bucket once did + if (isLosslessResetPossible()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "At start of bucket space and all " + + "buckets have no progress; doing a lossless reset " + + "instead of splitting/merging"); + } + assert(p.getActiveBucketCount() == 0); + p.clearAllBuckets(); + p.setBucketCursor(0); + return; + } + + while (maybeInconsistent) { + BucketId lastMergedBucket = null; + maybeInconsistent = false; + // Make a shallow working copy of the bucket map. BucketKeyWrapper + // keys are considered immutable, and should thus not be at risk + // for being changed during the inner loop + // Do separate passes for splitting and merging just to make + // absolutely sure that the two ops won't step on each others' + // toes. This isn't wildly efficient, but the data sets in question + // are presumed to be low in size and this is presumed to be a very + // infrequent operation + TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> buckets + = new TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry>(p.getBuckets()); + for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry + : buckets.entrySet()) { + assert(entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING); + BucketId pending = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey())); + if (pending.getUsedBits() < targetDistBits) { + if (pending.getUsedBits() + 1 < targetDistBits) { + maybeInconsistent = true; // Do another pass + } + p.splitPendingBucket(pending); + ++bucketsSplit; + } + } + + // Make new map copy with potentially split buckets + buckets = new TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry>(p.getBuckets()); + for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry + : buckets.entrySet()) { + assert(entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING); + BucketId pending = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey())); + if (pending.getUsedBits() > targetDistBits) { + // If this is the right sibling of an already merged left sibling, + // it's already been merged away, so we should skip it + if (lastMergedBucket != null) { + BucketId rightCheck = new BucketId(lastMergedBucket.getUsedBits(), + lastMergedBucket.getId() | (1L << (lastMergedBucket.getUsedBits() - 1))); + if (pending.equals(rightCheck)) { + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Skipped " + pending + + ", as it was right sibling of " + lastMergedBucket); + } + continue; + } + } + if (pending.getUsedBits() - 1 > targetDistBits) { + maybeInconsistent = true; // Do another pass + } + p.mergePendingBucket(pending); + ++bucketsMerged; + + lastMergedBucket = pending; + } + } + } + if ((bucketsSplit > 0 || bucketsMerged > 0) && log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Existing progress' pending buckets had inconsistent " + + "distribution bits; performed " + bucketsSplit + " split ops and " + + bucketsMerged + " merge ops. Pending: " + pendingBefore + " -> " + + p.getPendingBucketCount()); + } + } + + private void correctTruncatedBucketCursor() { + // We've truncated the bucket cursor, but in doing so we might + // have moved back beyond where there are pending buckets. Consider + // having a cursor value of 3 at 31 bits and then moving to 11 bits. + // With 1 pending we'll normally reach a cursor of 0, even though it + // should be 1 + for (ProgressToken.BucketKeyWrapper bucketKey + : progressToken.getBuckets().keySet()) { + BucketId bid = bucketKey.toBucketId(); + long idx = bucketKey.getKey() >>> (64 - bid.getUsedBits()); + if (bid.getUsedBits() == distributionBitCount + && idx >= progressToken.getBucketCursor()) { + progressToken.setBucketCursor(idx + 1); + } + } + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "New range bucket cursor is " + + progressToken.getBucketCursor()); + } + } + + public boolean hasNext() { + return progressToken.getBucketCursor() < (1L << distributionBitCount); + } + + public boolean shouldYield() { + // If we need to flush all active buckets, stall the iteration until + // this has been done + return flushActive; + } + + public boolean visitsAllBuckets() { + return true; + } + + public long getTotalBucketCount() { + return 1L << distributionBitCount; + } + + public BucketProgress getNext() { + assert(hasNext()) : "getNext() called with hasNext() == false"; + long currentPosition = progressToken.getBucketCursor(); + long key = ProgressToken.makeNthBucketKey(currentPosition, distributionBitCount); + ++currentPosition; + progressToken.setBucketCursor(currentPosition); + return new BucketProgress( + new BucketId(ProgressToken.keyToBucketId(key)), + new BucketId()); + } + + public int getDistributionBitCount() { + return distributionBitCount; + } + + public void setDistributionBitCount(int distributionBitCount, + ProgressToken progress) + { + this.distributionBitCount = distributionBitCount; + + // There might be a case where we're waiting for active buckets + // already when a new distribution bit change comes in. If so, + // don't do anything at all yet with the set of pending + if (progressToken.getActiveBucketCount() > 0) { + flushActive = true; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Holding off new/pending buckets and consistency " + + "correction until all " + progress.getActiveBucketCount() + + " active buckets have been updated"); + } + progressToken.setInconsistentState(true); + } else { + // Only perform the actual distribution bit bucket ops if we've + // got no pending buckets + int delta = distributionBitCount - progressToken.getDistributionBitCount(); + + // Must do this before setting the bucket cursor to allow + // reset-checking to be performed + correctInconsistentPending(distributionBitCount); + if (delta > 0) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Increasing distribution bits for full bucket " + + "space range source from " + progressToken.getDistributionBitCount() + " to " + + distributionBitCount); + } + progressToken.setFinishedBucketCount(progressToken.getFinishedBucketCount() << delta); + // By n-doubling the position, the bucket key ordering ensures + // we go from eg. 3:0x02 to 4:0x02 to 5:02 etc. + progressToken.setBucketCursor(progressToken.getBucketCursor() << delta); + } else if (delta < 0) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Decreasing distribution bits for full bucket " + + "space range source from " + progressToken.getDistributionBitCount() + + " to " + distributionBitCount + " bits"); + } + // Scale down bucket space and cursor + progressToken.setBucketCursor(progressToken.getBucketCursor() >>> -delta); + progressToken.setFinishedBucketCount(progressToken.getFinishedBucketCount() >>> -delta); + } + + progressToken.setTotalBucketCount(1L << distributionBitCount); + progressToken.setDistributionBitCount(distributionBitCount); + + correctTruncatedBucketCursor(); + progressToken.setInconsistentState(false); + } + } + + public void update(BucketId superbucket, BucketId progress, + ProgressToken token) { + progressToken.updateProgress(superbucket, progress); + + if (superbucket.getUsedBits() != distributionBitCount) { + if (!progress.equals(ProgressToken.FINISHED_BUCKET)) { + // We should now always flush active buckets before doing a + // consistency fix. This simplifies things greatly + assert(flushActive); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Received non-finished bucket " + + superbucket + " with wrong distribution bit count (" + + superbucket.getUsedBits() + "). Waiting to correct " + + "until all active are done"); + } + } else { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Received finished bucket " + + superbucket + " with wrong distribution bit count (" + + superbucket.getUsedBits() + "). Waiting to correct " + + "until all active are done"); + } + } + } + + if (progressToken.getActiveBucketCount() == 0) { + if (flushActive) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "All active buckets flushed, " + + "correcting progress token and continuing normal operation"); + } + // Trigger the actual bucket state change this time + setDistributionBitCount(distributionBitCount, progressToken); + assert(progressToken.getDistributionBitCount() == distributionBitCount); + } + flushActive = false; + // Update #finished since we might have had inconsistent active + // buckets that have prevent us from getting a correct value. At + // this point, however, all pending buckets should presumably be + // at the same, correct dist bit count, so we can safely compute + // a new count + // TODO: ensure this is consistent + if (progressToken.getPendingBucketCount() <= progressToken.getBucketCursor()) { + progressToken.setFinishedBucketCount(progressToken.getBucketCursor() - + progressToken.getPendingBucketCount()); + } + } + } + } + + /** + * Provides an explicit set of bucket IDs to iterate over. Will immediately + * set these as pending in the {@link ProgressToken}, as it is presumed this set is + * rather small. Changing the distribution bit count for this source is + * effectively a no-op, as explicit bucket IDs should not be implicitly + * changed. + */ + protected static class ExplicitBucketSource implements BucketSource { + private int distributionBitCount; + private long totalBucketCount = 0; + + public ExplicitBucketSource(Set<BucketId> superbuckets, + int distributionBitCount, + ProgressToken progress) { + this.distributionBitCount = progress.getDistributionBitCount(); + this.totalBucketCount = superbuckets.size(); + + // New progress token? + if (progress.getTotalBucketCount() == 0) { + progress.setTotalBucketCount(this.totalBucketCount); + progress.setDistributionBitCount(distributionBitCount); + this.distributionBitCount = distributionBitCount; + } + else { + // Quick consistency check to ensure the user isn't trying to eg. + // pass a progress token for another document selection + if (progress.getTotalBucketCount() != totalBucketCount + || (progress.getFinishedBucketCount() + progress.getPendingBucketCount() + + progress.getActiveBucketCount() != totalBucketCount)) { + throw new IllegalArgumentException("Total bucket count in existing progress is not " + + "consistent with that of the current document selection"); + } + if (progress.getBucketCursor() != 0) { + // Trying to use a range source progress file + throw new IllegalArgumentException("Cannot use given progress file with the "+ + "current document selection"); + } + this.distributionBitCount = progress.getDistributionBitCount(); + } + + if (progress.isFinished() || !progress.isEmpty()) return; + + for (BucketId id : superbuckets) { + // Add all superbuckets with zero sub-bucket progress and pending + progress.addBucket(id, new BucketId(), ProgressToken.BucketState.BUCKET_PENDING); + } + } + + public boolean hasNext() { + return false; + } + + public boolean shouldYield() { + return false; + } + + public boolean visitsAllBuckets() { + return false; + } + + public long getTotalBucketCount() { + return totalBucketCount; + } + + // All explicit buckets should have been placed in the progress + // token during construction, so this method should never be called + public BucketProgress getNext() { + throw new IllegalStateException("getNext() called on ExplicitBucketSource"); + } + + public int getDistributionBitCount() { + return distributionBitCount; + } + + public void setDistributionBitCount(int distributionBitCount, + ProgressToken progress) + { + // Setting distribution bits for explicit bucket source is essentially + // a no-op, since its buckets already are fixed at 32 used bits. + progress.setDistributionBitCount(distributionBitCount); + this.distributionBitCount = distributionBitCount; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Set distribution bit count to " + + distributionBitCount + " for explicit bucket source (no-op)"); + } + } + + public void update(BucketId superbucket, BucketId progress, + ProgressToken token) { + // Simply delegate to ProgressToken, as it maintains all progress state + token.updateProgress(superbucket, progress); + } + } + + /** + * @param bucketSource An instance of {@link BucketSource}, providing the working set for + * the iterator + * @param progressToken A {@link ProgressToken} instance, allowing the progress of + * finished or partially finished buckets to be tracked + * + * @see BucketSource + * @see ProgressToken + */ + private VisitorIterator(ProgressToken progressToken, + BucketSource bucketSource) { + assert(progressToken.getDistributionBitCount() == bucketSource.getDistributionBitCount()) + : "inconsistent distribution bit counts"; + this.distributionBitCount = progressToken.getDistributionBitCount(); + this.progressToken = progressToken; + this.bucketSource = bucketSource; + } + + + /** + * @return The pair [superbucket, progress] that specifies the next iterable + * bucket. When a superbucket is initially returned, the pair is equal to + * that of [superbucket, 0], as there has been no progress into its sub-buckets + * yet (if they exist). + * + * Precondition: <code>hasNext() == true</code> + */ + public BucketProgress getNext() { + assert(progressToken.getDistributionBitCount() == bucketSource.getDistributionBitCount()) + : "inconsistent distribution bit counts for progress and source"; + assert(hasNext()); + // We prioritize returning buckets in the pending map over those + // that may be in the bucket source, since we want to avoid growing + // the map too much + if (progressToken.hasPending()) { + // Find first pending bucket in token + TreeMap<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> buckets = progressToken.getBuckets(); + ProgressToken.BucketEntry pending = null; + BucketId superbucket = null; + for (Map.Entry<ProgressToken.BucketKeyWrapper, ProgressToken.BucketEntry> entry : buckets.entrySet()) { + if (entry.getValue().getState() == ProgressToken.BucketState.BUCKET_PENDING) { + pending = entry.getValue(); + superbucket = new BucketId(ProgressToken.keyToBucketId(entry.getKey().getKey())); + break; + } + } + assert(pending != null) : "getNext() called with inconsistent state"; + + // Set bucket to active, since it's not awaiting an update + pending.setState(ProgressToken.BucketState.BUCKET_ACTIVE); + + progressToken.setActiveBucketCount(progressToken.getActiveBucketCount() + 1); + progressToken.setPendingBucketCount(progressToken.getPendingBucketCount() - 1); + + return new BucketProgress(superbucket, pending.getProgress()); + } else { + BucketProgress ret = bucketSource.getNext(); + progressToken.addBucket(ret.getSuperbucket(), ret.getProgress(), + ProgressToken.BucketState.BUCKET_ACTIVE); + return ret; + } + } + + /** + * <p>Check whether or not it is valid to call {@link #getNext()} with the current + * iterator state.</p> + * + * <p>There exists a case wherein <code>hasNext</code> may return false before {@link #update} is + * called, but true afterwards. This happens when the set of pending buckets is + * empty, the bucket source is empty <em>but</em> the set of active buckets is + * not. A future progress update on any of the buckets in the active set may + * or may not make that bucket available to the pending set again. + * This must be handled explicitly by the caller by checking {@link #isDone()} + * and ensuring that {@link #update} is called before retrying <code>hasNext</code>.</p> + * + * <p>This method will also return false if the number of distribution bits have + * changed and there are active buckets needing to be flushed before the + * iterator will allow new buckets to be handed out.</p> + * + * @return Whether or not it is valid to call {@link #getNext()} with the current + * iterator state. + */ + public boolean hasNext() { + return (progressToken.hasPending() || bucketSource.hasNext()) && !bucketSource.shouldYield(); + } + + /** + * Check if the iterator is actually done + * + * @see #hasNext() + * + * @return <code>true</code> <em>iff</em> the bucket source is empty and + * there are no pending or active buckets in the progress token. + */ + public boolean isDone() { + return !(hasNext() || progressToken.hasActive()); + } + + /** + * <p>Tell the iterator that we've finished processing up to <i>and + * including</i> <code>progress</code>. <code>progress</code> may be a sub-bucket <i>or</i> + * the invalid 0-bucket (in case the caller fails to process the bucket and + * must return it to the set of pending) <em>or</em> the special case <code>BucketId(Integer.MAX_VALUE)</code>, + * the latter indicating to the iterator that traversal is complete for + * <code>superbucket</code>'s tree. The null bucket should only be used if no + * non-null updates have yet been given for the superbucket.</p> + * + * <p>It is a requirement that each superbucket returned by {@link #getNext()} must + * eventually result in 1-n update operations, where the last update operation + * has the special progress==super case.</p> + * + * <p>If the document selection used to create the iterator is unknown and there + * were active buckets at the time of a distribution bit state change, such + * a bucket passed to <code>update()</code> will be in an inconsistent state + * with regards to the number of bits it uses. For unfinished buckets, this + * is handled by splitting or merging it until it's consistent, depending on + * whether or not it had a lower or higher distribution bit count than that of + * the current system state. For finished buckets of a lower dist bit count, + * the amount of finished buckets in the ProgressToken is adjusted upwards + * to compensate for the fact that a bucket using fewer distribution bits + * actually covers more of the bucket space than the ones that are currently + * in use. For finished buckets of a higher dist bit count, the number of + * finished buckets is <em>not</em> increased at that point in time, since + * such a bucket doesn't actually cover an entire bucket with the current state.</p> + * + * <p>All this is done automatically and transparently to the caller once all + * active buckets have been updated.</p> + * + * @param superbucket A valid bucket ID that has been retrieved earlier through + * {@link #getNext()} + * @param progress A bucket logically contained within <code>super</code>. Subsequent + * updates for the same superbucket must have <code>progress</code> be in an increasing + * order, where order is defined as the in-order traversal of the bucket split + * tree. May also be the null bucket if the superbucket has not seen any "proper" + * progress updates yet or the special case Integer.MAX_VALUE. Note that inconsistent + * splitting might actually see <code>progress</code> as containing <code>super</code> + * rather than vice versa, so this is explicitly allowed to pass by the code. + */ + public void update(BucketId superbucket, BucketId progress) { + // Delegate to bucket source, as it knows how to deal with buckets + // that are in an inconsistent state wrt distribution bit count + bucketSource.update(superbucket, progress, progressToken); + } + + /** + * @return The total number of iterable buckets that remain to be processed + * + * Note: currently includes all non-finished (i.e. active and pending + * buckets) as well + */ + public long getRemainingBucketCount() { + return progressToken.getTotalBucketCount() - progressToken.getFinishedBucketCount(); + } + + /** + * @return Internal bucket source instance. Do <i>NOT</i> modify! + */ + protected BucketSource getBucketSource() { + return bucketSource; + } + + public ProgressToken getProgressToken() { + return progressToken; + } + + public int getDistributionBitCount() { + return distributionBitCount; + } + + /** + * <p>Set the distribution bit count for the iterator and the buckets it + * currently maintains and will return in the future.</p> + * + * <p>For document selections that result in a explicit set of buckets, this + * is essentially a no-op, so in such a case, disregard the rest of this text.</p> + * + * <p>Changing the number of distribution bits for an unknown document + * selection will effectively scale the bucket space that will be visited; + * each bit increase or decrease doubling or halving its size, respectively. + * When increasing, any pending buckets will be split to ensure the total + * bucket space covered remains the same. Correspondingly, when decreasing, + * any pending buckets will be merged appropriately.</p> + * + * <p>If there are buckets active at the time of the change, the actual + * bucket splitting/merging operations are kept on hold until all active + * buckets have been updated, at which point they will be automatically + * performed. The iterator will force such an update by not giving out + * any new or pending buckets until that happens.</p> + * + * <p><em>Note:</em> when decreasing the number of distribution bits, + * there is a chance of losing superbucket progress in a bucket that + * is merged with another bucket, leading to potential duplicate + * results.</p> + * + * @param distBits New system state distribution bit count + */ + public void setDistributionBitCount(int distBits) { + if (distributionBitCount != distBits) { + bucketSource.setDistributionBitCount(distBits, progressToken); + distributionBitCount = distBits; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Set visitor iterator distribution bit count to " + + distBits); + } + } + } + + public boolean visitsAllBuckets() { + return bucketSource.visitsAllBuckets(); + } + + /** + * Create a new <code>VisitorIterator</code> instance based on the given document + * selection string. + * + * @param documentSelection Document selection string used to create the + * <code>VisitorIterator</code> instance. Depending on the characteristics of the + * selection, the iterator may iterate over only a small subset of the buckets or + * every bucket in the system. Both cases will be handled efficiently. + * @param idFactory {@link BucketId} factory specifying the number of distribution bits + * to use et al. + * @param progress A unique {@link ProgressToken} instance which is used for maintaining the state + * of the iterator. Can <em>not</em> be shared with other iterator instances at the same time. + * If <code>progress</code> contains work done in an earlier iteration run, the iterator will pick + * up from where it left off + * @return A new <code>VisitorIterator</code> instance + * @throws ParseException if <code>documentSelection</code> fails to properly parse + */ + public static VisitorIterator createFromDocumentSelection( + String documentSelection, + BucketIdFactory idFactory, + int distributionBitCount, + ProgressToken progress) throws ParseException { + BucketSelector bucketSel = new BucketSelector(idFactory); + Set<BucketId> rawBuckets = bucketSel.getBucketList(documentSelection); + BucketSource src; + + // Depending on whether the expression yielded an unknown number of + // buckets, we create either an explicit bucket source or a distribution + // bit-based range source + if (rawBuckets == null) { + // Range source + src = new DistributionRangeBucketSource(distributionBitCount, progress); + } else { + // Explicit source + src = new ExplicitBucketSource(rawBuckets, distributionBitCount, progress); + } + + return new VisitorIterator(progress, src); + } + + /** + * Create a new <code>VisitorIterator</code> instance based on the given + * set of buckets. This is supported for internal use only, and is required + * by Synchronization. Use {@link #createFromDocumentSelection} instead for + * all normal purposes. + * + * @param bucketsToVisit The set of buckets that will be visited + * @param distributionBitCount Number of distribution bits to use + * @param progress A unique ProgressToken instance which is used for maintaining the state + * of the iterator. Can <em>not</em> be shared with other iterator instances at the same time. + * If <code>progress</code> contains work done in an earlier iteration run, the iterator will pick + * up from where it left off + * @return A new <code>VisitorIterator</code> instance + */ + public static VisitorIterator createFromExplicitBucketSet( + Set<BucketId> bucketsToVisit, + int distributionBitCount, + ProgressToken progress) { + // For obvious reasons, always create an explicit source here + BucketSource src = new ExplicitBucketSource(bucketsToVisit, + distributionBitCount, progress); + return new VisitorIterator(progress, src); + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalSyncSession.java index 966caa46969..aa56f69b966 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalSyncSession.java @@ -1,103 +1,103 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.local;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.DocumentType;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.TestAndSetCondition;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.SyncSession;
-import com.yahoo.documentapi.messagebus.protocol.*;
-
-/**
- * @author bratseth
- */
-public class LocalSyncSession implements SyncSession {
-
- private LocalDocumentAccess access;
-
- public LocalSyncSession(LocalDocumentAccess access) {
- this.access = access;
- }
-
- @Override
- public void put(DocumentPut documentPut) {
- if (documentPut.getCondition().isPresent()) {
- throw new UnsupportedOperationException("test-and-set is not supported.");
- }
-
- access.documents.put(documentPut.getId(), documentPut.getDocument());
- }
-
- @Override
- public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) {
- access.documents.put(documentPut.getId(), documentPut.getDocument());
- }
-
- @Override
- public Document get(DocumentId id) {
- return access.documents.get(id);
- }
-
- @Override
- public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) {
- // FIXME: More than half the get() methods are deprecated, but they all
- // call exactly the same method, including this one, throwing away most
- // of the parameters
- return access.documents.get(id);
- }
-
- @Override
- public boolean remove(DocumentRemove documentRemove) {
- if (documentRemove.getCondition().isPresent()) {
- throw new UnsupportedOperationException("test-and-set is not supported.");
- }
- access.documents.remove(documentRemove.getId());
- return true;
- }
-
- @Override
- public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority) {
- return remove(documentRemove);
- }
-
- @Override
- public boolean update(DocumentUpdate update) {
- Document document = access.documents.get(update.getId());
- if (document == null) {
- return false;
- }
- update.applyTo(document);
- return true;
- }
-
- @Override
- public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- Document document = access.documents.get(update.getId());
- if (document == null) {
- return false;
- }
- update.applyTo(document);
- return true;
- }
-
- @Override
- public Response getNext() {
- throw new UnsupportedOperationException("Queue not supported.");
- }
-
- @Override
- public Response getNext(int timeout) {
- throw new UnsupportedOperationException("Queue not supported.");
- }
-
- @Override
- public void destroy() {
- access = null;
- }
-
-}
+package com.yahoo.documentapi.local; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.SyncSession; +import com.yahoo.documentapi.messagebus.protocol.*; + +/** + * @author bratseth + */ +public class LocalSyncSession implements SyncSession { + + private LocalDocumentAccess access; + + public LocalSyncSession(LocalDocumentAccess access) { + this.access = access; + } + + @Override + public void put(DocumentPut documentPut) { + if (documentPut.getCondition().isPresent()) { + throw new UnsupportedOperationException("test-and-set is not supported."); + } + + access.documents.put(documentPut.getId(), documentPut.getDocument()); + } + + @Override + public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + access.documents.put(documentPut.getId(), documentPut.getDocument()); + } + + @Override + public Document get(DocumentId id) { + return access.documents.get(id); + } + + @Override + public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) { + // FIXME: More than half the get() methods are deprecated, but they all + // call exactly the same method, including this one, throwing away most + // of the parameters + return access.documents.get(id); + } + + @Override + public boolean remove(DocumentRemove documentRemove) { + if (documentRemove.getCondition().isPresent()) { + throw new UnsupportedOperationException("test-and-set is not supported."); + } + access.documents.remove(documentRemove.getId()); + return true; + } + + @Override + public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority) { + return remove(documentRemove); + } + + @Override + public boolean update(DocumentUpdate update) { + Document document = access.documents.get(update.getId()); + if (document == null) { + return false; + } + update.applyTo(document); + return true; + } + + @Override + public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) { + Document document = access.documents.get(update.getId()); + if (document == null) { + return false; + } + update.applyTo(document); + return true; + } + + @Override + public Response getNext() { + throw new UnsupportedOperationException("Queue not supported."); + } + + @Override + public Response getNext(int timeout) { + throw new UnsupportedOperationException("Queue not supported."); + } + + @Override + public void destroy() { + access = null; + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java index 392913e8b36..0d4061c5f25 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java @@ -26,11 +26,11 @@ import java.util.concurrent.ScheduledExecutorService; public class MessageBusDocumentAccess extends DocumentAccess { private final NetworkMessageBus bus; - + private final MessageBusParams params; // TODO: make pool size configurable? ScheduledExecutorService is not dynamic - private final ScheduledExecutorService scheduledExecutorService = - Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), + private final ScheduledExecutorService scheduledExecutorService = + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThreadFactoryFactory.getDaemonThreadFactory("mbus.access.scheduler")); /** @@ -63,7 +63,7 @@ public class MessageBusDocumentAccess extends DocumentAccess { throw new DocumentAccessException(e); } } - + private MessageBus messageBus() { return bus.getMessageBus(); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSession.java index 7f5544a93ce..92af9a3758b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSession.java @@ -1,38 +1,38 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus;
-
-/**
- * This class defines a common interface for message bus sessions.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface MessageBusSession {
-
- /**
- * Returns the route to send all messages to when sending through this session.
- *
- * @return The route string.
- */
- public String getRoute();
-
- /**
- * Sets the route to send all messages to when sending through this session.
- *
- * @param route The route string.
- */
- public void setRoute(String route);
-
- /**
- * Returns the trace level used when sending messages through this session.
- *
- * @return The trace level.
- */
- public int getTraceLevel();
-
- /**
- * Sets the trace level used when sending messages through this session.
- *
- * @param traceLevel The trace level to set.
- */
- public void setTraceLevel(int traceLevel);
-}
+package com.yahoo.documentapi.messagebus; + +/** + * This class defines a common interface for message bus sessions. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface MessageBusSession { + + /** + * Returns the route to send all messages to when sending through this session. + * + * @return The route string. + */ + public String getRoute(); + + /** + * Sets the route to send all messages to when sending through this session. + * + * @param route The route string. + */ + public void setRoute(String route); + + /** + * Returns the trace level used when sending messages through this session. + * + * @return The trace level. + */ + public int getTraceLevel(); + + /** + * Sets the trace level used when sending messages through this session. + * + * @param traceLevel The trace level to set. + */ + public void setTraceLevel(int traceLevel); +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index 5be94564556..095d0c14a49 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -1,225 +1,225 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.documentapi.AsyncParameters;
-import com.yahoo.documentapi.DocumentAccessException;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.SyncParameters;
-import com.yahoo.documentapi.SyncSession;
-import com.yahoo.documentapi.messagebus.protocol.*;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-
-/**
- * An implementation of the SyncSession interface running over message bus.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class MessageBusSyncSession implements MessageBusSession, SyncSession, ReplyHandler {
-
- private MessageBusAsyncSession session;
-
- /**
- * Creates a new sync session running on message bus logic.
- *
- * @param syncParams Common syncsession parameters, not used.
- * @param bus The message bus on which to run.
- * @param mbusParams Parameters concerning message bus configuration.
- */
- MessageBusSyncSession(SyncParameters syncParams, MessageBus bus, MessageBusParams mbusParams) {
- session = new MessageBusAsyncSession(new AsyncParameters(), bus, mbusParams, this);
- }
-
- @Override
- public void handleReply(Reply reply) {
- if (reply.getContext() instanceof RequestMonitor) {
- ((RequestMonitor)reply.getContext()).replied(reply);
- } else {
- ReplyHandler handler = reply.getCallStack().pop(reply);
- handler.handleReply(reply); // not there yet
- }
- }
-
- @Override
- public Response getNext() {
- throw new UnsupportedOperationException("Queue not supported.");
- }
-
- @Override
- public Response getNext(int timeout) {
- throw new UnsupportedOperationException("Queue not supported.");
- }
-
- @Override
- public void destroy() {
- session.destroy();
- }
-
- /**
- * Perform a synchronous sending of a message. This method block until the message is successfuly sent and a
- * corresponding reply has been received.
- *
- * @param msg The message to send.
- * @return The reply received.
- */
- public Reply syncSend(Message msg) {
- try {
- RequestMonitor monitor = new RequestMonitor();
- msg.setContext(monitor);
- msg.pushHandler(this); // store monitor
- Result result = null;
- while (result == null || result.getType() == Result.ResultType.TRANSIENT_ERROR) {
- result = session.send(msg);
- if (result != null && result.isSuccess()) {
- break;
- }
- Thread.sleep(100);
- }
- if (!result.isSuccess()) {
- throw new DocumentAccessException(result.getError().toString());
- }
- return monitor.waitForReply();
- } catch (InterruptedException e) {
- throw new DocumentAccessException(e);
- }
- }
-
- @Override
- public void put(DocumentPut documentPut) {
- put(documentPut, DocumentProtocol.Priority.NORMAL_3);
- }
-
- @Override
- public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) {
- PutDocumentMessage msg = new PutDocumentMessage(documentPut);
- msg.setPriority(priority);
- syncSendPutDocumentMessage(msg);
- }
-
- @Override
- public Document get(DocumentId id) {
- return get(id, "[all]", DocumentProtocol.Priority.NORMAL_1);
- }
-
- @Override
- public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) {
- GetDocumentMessage msg = new GetDocumentMessage(id, fieldSet);
- msg.setPriority(pri);
-
- Reply reply = syncSend(msg);
- if (reply.hasErrors()) {
- throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply));
- }
- if (reply.getType() != DocumentProtocol.REPLY_GETDOCUMENT) {
- throw new DocumentAccessException("Received unknown response: " + reply);
- }
- GetDocumentReply docReply = ((GetDocumentReply)reply);
- Document doc = docReply.getDocument();
- if (doc != null) {
- doc.setLastModified(docReply.getLastModified());
- }
- return doc;
- }
-
- @Override
- public boolean remove(DocumentRemove documentRemove) {
- RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId());
- msg.setCondition(documentRemove.getCondition());
- return remove(msg);
- }
-
- @Override
- public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority pri) {
- RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId());
- msg.setPriority(pri);
- msg.setCondition(documentRemove.getCondition());
- return remove(msg);
- }
-
- private boolean remove(RemoveDocumentMessage msg) {
- Reply reply = syncSend(msg);
- if (reply.hasErrors()) {
- throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply));
- }
- if (reply.getType() != DocumentProtocol.REPLY_REMOVEDOCUMENT) {
- throw new DocumentAccessException("Received unknown response: " + reply);
- }
- return ((RemoveDocumentReply)reply).wasFound();
- }
-
- @Override
- public boolean update(DocumentUpdate update) {
- return update(update, DocumentProtocol.Priority.NORMAL_2);
- }
-
- @Override
- public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- UpdateDocumentMessage msg = new UpdateDocumentMessage(update);
- msg.setPriority(pri);
- Reply reply = syncSend(msg);
- if (reply.hasErrors()) {
- throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply),
- MessageBusAsyncSession.getErrorCodes(reply));
- }
- if (reply.getType() != DocumentProtocol.REPLY_UPDATEDOCUMENT) {
- throw new DocumentAccessException("Received unknown response: " + reply);
- }
- return ((UpdateDocumentReply)reply).wasFound();
- }
-
- @Override
- public String getRoute() {
- return session.getRoute();
- }
-
- @Override
- public void setRoute(String route) {
- session.setRoute(route);
- }
-
- @Override
- public int getTraceLevel() {
- return session.getTraceLevel();
- }
-
- @Override
- public void setTraceLevel(int traceLevel) {
- session.setTraceLevel(traceLevel);
- }
-
- /**
- * This class implements a monitor for waiting for a reply to arrive.
- */
- static class RequestMonitor {
- private Reply reply = null;
-
- synchronized Reply waitForReply() throws InterruptedException {
- while (reply == null) {
- wait();
- }
- return reply;
- }
-
- synchronized void replied(Reply reply) {
- this.reply = reply;
- notify();
- }
- }
-
- private void syncSendPutDocumentMessage(PutDocumentMessage putDocumentMessage) {
- Reply reply = syncSend(putDocumentMessage);
- if (reply.hasErrors()) {
- throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply),
- MessageBusAsyncSession.getErrorCodes(reply));
- }
- }
-}
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.DocumentAccessException; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.SyncSession; +import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +/** + * An implementation of the SyncSession interface running over message bus. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class MessageBusSyncSession implements MessageBusSession, SyncSession, ReplyHandler { + + private MessageBusAsyncSession session; + + /** + * Creates a new sync session running on message bus logic. + * + * @param syncParams Common syncsession parameters, not used. + * @param bus The message bus on which to run. + * @param mbusParams Parameters concerning message bus configuration. + */ + MessageBusSyncSession(SyncParameters syncParams, MessageBus bus, MessageBusParams mbusParams) { + session = new MessageBusAsyncSession(new AsyncParameters(), bus, mbusParams, this); + } + + @Override + public void handleReply(Reply reply) { + if (reply.getContext() instanceof RequestMonitor) { + ((RequestMonitor)reply.getContext()).replied(reply); + } else { + ReplyHandler handler = reply.getCallStack().pop(reply); + handler.handleReply(reply); // not there yet + } + } + + @Override + public Response getNext() { + throw new UnsupportedOperationException("Queue not supported."); + } + + @Override + public Response getNext(int timeout) { + throw new UnsupportedOperationException("Queue not supported."); + } + + @Override + public void destroy() { + session.destroy(); + } + + /** + * Perform a synchronous sending of a message. This method block until the message is successfuly sent and a + * corresponding reply has been received. + * + * @param msg The message to send. + * @return The reply received. + */ + public Reply syncSend(Message msg) { + try { + RequestMonitor monitor = new RequestMonitor(); + msg.setContext(monitor); + msg.pushHandler(this); // store monitor + Result result = null; + while (result == null || result.getType() == Result.ResultType.TRANSIENT_ERROR) { + result = session.send(msg); + if (result != null && result.isSuccess()) { + break; + } + Thread.sleep(100); + } + if (!result.isSuccess()) { + throw new DocumentAccessException(result.getError().toString()); + } + return monitor.waitForReply(); + } catch (InterruptedException e) { + throw new DocumentAccessException(e); + } + } + + @Override + public void put(DocumentPut documentPut) { + put(documentPut, DocumentProtocol.Priority.NORMAL_3); + } + + @Override + public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + PutDocumentMessage msg = new PutDocumentMessage(documentPut); + msg.setPriority(priority); + syncSendPutDocumentMessage(msg); + } + + @Override + public Document get(DocumentId id) { + return get(id, "[all]", DocumentProtocol.Priority.NORMAL_1); + } + + @Override + public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) { + GetDocumentMessage msg = new GetDocumentMessage(id, fieldSet); + msg.setPriority(pri); + + Reply reply = syncSend(msg); + if (reply.hasErrors()) { + throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply)); + } + if (reply.getType() != DocumentProtocol.REPLY_GETDOCUMENT) { + throw new DocumentAccessException("Received unknown response: " + reply); + } + GetDocumentReply docReply = ((GetDocumentReply)reply); + Document doc = docReply.getDocument(); + if (doc != null) { + doc.setLastModified(docReply.getLastModified()); + } + return doc; + } + + @Override + public boolean remove(DocumentRemove documentRemove) { + RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); + msg.setCondition(documentRemove.getCondition()); + return remove(msg); + } + + @Override + public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority pri) { + RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); + msg.setPriority(pri); + msg.setCondition(documentRemove.getCondition()); + return remove(msg); + } + + private boolean remove(RemoveDocumentMessage msg) { + Reply reply = syncSend(msg); + if (reply.hasErrors()) { + throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply)); + } + if (reply.getType() != DocumentProtocol.REPLY_REMOVEDOCUMENT) { + throw new DocumentAccessException("Received unknown response: " + reply); + } + return ((RemoveDocumentReply)reply).wasFound(); + } + + @Override + public boolean update(DocumentUpdate update) { + return update(update, DocumentProtocol.Priority.NORMAL_2); + } + + @Override + public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) { + UpdateDocumentMessage msg = new UpdateDocumentMessage(update); + msg.setPriority(pri); + Reply reply = syncSend(msg); + if (reply.hasErrors()) { + throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), + MessageBusAsyncSession.getErrorCodes(reply)); + } + if (reply.getType() != DocumentProtocol.REPLY_UPDATEDOCUMENT) { + throw new DocumentAccessException("Received unknown response: " + reply); + } + return ((UpdateDocumentReply)reply).wasFound(); + } + + @Override + public String getRoute() { + return session.getRoute(); + } + + @Override + public void setRoute(String route) { + session.setRoute(route); + } + + @Override + public int getTraceLevel() { + return session.getTraceLevel(); + } + + @Override + public void setTraceLevel(int traceLevel) { + session.setTraceLevel(traceLevel); + } + + /** + * This class implements a monitor for waiting for a reply to arrive. + */ + static class RequestMonitor { + private Reply reply = null; + + synchronized Reply waitForReply() throws InterruptedException { + while (reply == null) { + wait(); + } + return reply; + } + + synchronized void replied(Reply reply) { + this.reply = reply; + notify(); + } + } + + private void syncSendPutDocumentMessage(PutDocumentMessage putDocumentMessage) { + Reply reply = syncSend(putDocumentMessage); + if (reply.hasErrors()) { + throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), + MessageBusAsyncSession.getErrorCodes(reply)); + } + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ANDPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ANDPolicy.java index 04818f80672..a8b190108fa 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ANDPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ANDPolicy.java @@ -1,67 +1,67 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An AND policy is a routing policy that can be used to write simple routes that split a message between multiple other
- * destinations. It can either be configured in a routing config, which will then produce a policy that always selects
- * all configured recipients, or it can be configured using the policy parameter (i.e. a string following the name of
- * the policy). Note that configured recipients take precedence over recipients configured in the parameter string.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ANDPolicy implements DocumentProtocolRoutingPolicy {
-
- // A list of hops that are to always be selected when select() is invoked.
- private final List<Hop> hops = new ArrayList<Hop>();
-
- /**
- * Constructs a new AND policy that requires all recipients to be ok for it to merge their replies to an ok reply.
- * I.e. all errors in all child replies are copied into the merged reply.
- *
- * @param param A string of recipients to select unless recipients have been configured.
- */
- public ANDPolicy(String param) {
- if (param == null || param.isEmpty()) {
- return;
- }
- Route route = Route.parse(param);
- for (int i = 0; i < route.getNumHops(); ++i) {
- hops.add(route.getHop(i));
- }
- }
-
- // Inherit doc from RoutingPolicy.
- public void select(RoutingContext context) {
- if (hops.isEmpty()) {
- context.addChildren(context.getAllRecipients());
- } else {
- for (Hop hop : hops) {
- Route route = new Route(context.getRoute());
- route.setHop(0, hop);
- context.addChild(route);
- }
- }
- context.setSelectOnRetry(false);
- context.addConsumableError(DocumentProtocol.ERROR_MESSAGE_IGNORED);
- }
-
- // Inherit doc from RoutingPolicy.
- public void merge(RoutingContext context) {
- DocumentProtocol.merge(context);
- }
-
- public void destroy() {
- }
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * An AND policy is a routing policy that can be used to write simple routes that split a message between multiple other + * destinations. It can either be configured in a routing config, which will then produce a policy that always selects + * all configured recipients, or it can be configured using the policy parameter (i.e. a string following the name of + * the policy). Note that configured recipients take precedence over recipients configured in the parameter string. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ANDPolicy implements DocumentProtocolRoutingPolicy { + + // A list of hops that are to always be selected when select() is invoked. + private final List<Hop> hops = new ArrayList<Hop>(); + + /** + * Constructs a new AND policy that requires all recipients to be ok for it to merge their replies to an ok reply. + * I.e. all errors in all child replies are copied into the merged reply. + * + * @param param A string of recipients to select unless recipients have been configured. + */ + public ANDPolicy(String param) { + if (param == null || param.isEmpty()) { + return; + } + Route route = Route.parse(param); + for (int i = 0; i < route.getNumHops(); ++i) { + hops.add(route.getHop(i)); + } + } + + // Inherit doc from RoutingPolicy. + public void select(RoutingContext context) { + if (hops.isEmpty()) { + context.addChildren(context.getAllRecipients()); + } else { + for (Hop hop : hops) { + Route route = new Route(context.getRoute()); + route.setHop(0, hop); + context.addChild(route); + } + } + context.setSelectOnRetry(false); + context.addConsumableError(DocumentProtocol.ERROR_MESSAGE_IGNORED); + } + + // Inherit doc from RoutingPolicy. + public void merge(RoutingContext context) { + DocumentProtocol.merge(context); + } + + public void destroy() { + } + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/BatchDocumentUpdateReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/BatchDocumentUpdateReply.java index 48eb41fdb5c..11155dd76be 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/BatchDocumentUpdateReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/BatchDocumentUpdateReply.java @@ -1,29 +1,29 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import java.util.ArrayList;
-
-public class BatchDocumentUpdateReply extends WriteDocumentReply {
-
- private ArrayList<Boolean> documentsNotFound = new ArrayList<Boolean>();
-
- /**
- * Constructs a new reply with no content.
- */
- public BatchDocumentUpdateReply() {
- super(DocumentProtocol.REPLY_BATCHDOCUMENTUPDATE);
- }
-
- /**
- * If all documents to update are found, this vector will be empty. If
- * one or more documents are not found, this vector will have the size of
- * the initial number of updates, with entries set to true where the
- * corresponding update was not found.
- *
- * @return Vector containing indices of not found documents, or empty
- * if all documents were found
- */
- public ArrayList<Boolean> getDocumentsNotFound() {
- return documentsNotFound;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import java.util.ArrayList; + +public class BatchDocumentUpdateReply extends WriteDocumentReply { + + private ArrayList<Boolean> documentsNotFound = new ArrayList<Boolean>(); + + /** + * Constructs a new reply with no content. + */ + public BatchDocumentUpdateReply() { + super(DocumentProtocol.REPLY_BATCHDOCUMENTUPDATE); + } + + /** + * If all documents to update are found, this vector will be empty. If + * one or more documents are not found, this vector will have the size of + * the initial number of updates, with entries set to true where the + * corresponding update was not found. + * + * @return Vector containing indices of not found documents, or empty + * if all documents were found + */ + public ArrayList<Boolean> getDocumentsNotFound() { + return documentsNotFound; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListEntry.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListEntry.java index 8de0cfd204c..90615bdf9ad 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListEntry.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListEntry.java @@ -1,47 +1,47 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.serialization.DocumentDeserializer;
-import com.yahoo.vespa.objects.Serializer;
-
-public class DocumentListEntry {
-
- private Document doc;
- private long timestamp;
- private boolean removeEntry;
-
- public DocumentListEntry(Document doc, long timestamp, boolean removeEntry) {
- this.doc = doc;
- this.timestamp = timestamp;
- this.removeEntry = removeEntry;
- }
-
- public void serialize(Serializer buf) {
- buf.putLong(null, timestamp);
- doc.serialize(buf);
- buf.putByte(null, (byte)(removeEntry ? 1 : 0));
- }
-
- public static int getApproxSize() {
- return 60; // optimzation. approximation is sufficient
- }
-
- public DocumentListEntry(DocumentDeserializer buf) {
- timestamp = buf.getLong(null);
- doc = new Document(buf);
- removeEntry = buf.getByte(null) > 0;
- }
-
- public Document getDocument() {
- return doc;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public boolean isRemoveEntry() {
- return removeEntry;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.Document; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.vespa.objects.Serializer; + +public class DocumentListEntry { + + private Document doc; + private long timestamp; + private boolean removeEntry; + + public DocumentListEntry(Document doc, long timestamp, boolean removeEntry) { + this.doc = doc; + this.timestamp = timestamp; + this.removeEntry = removeEntry; + } + + public void serialize(Serializer buf) { + buf.putLong(null, timestamp); + doc.serialize(buf); + buf.putByte(null, (byte)(removeEntry ? 1 : 0)); + } + + public static int getApproxSize() { + return 60; // optimzation. approximation is sufficient + } + + public DocumentListEntry(DocumentDeserializer buf) { + timestamp = buf.getLong(null); + doc = new Document(buf); + removeEntry = buf.getByte(null) > 0; + } + + public Document getDocument() { + return doc; + } + + public long getTimestamp() { + return timestamp; + } + + public boolean isRemoveEntry() { + return removeEntry; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListMessage.java index 448c2820ec3..95b51f20877 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentListMessage.java @@ -1,54 +1,54 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DocumentListMessage extends VisitorMessage {
-
- private BucketId bucket = new BucketId(16, 0);
- private final List<DocumentListEntry> entries = new ArrayList<DocumentListEntry>();
-
- public DocumentListMessage() {
- // empty
- }
-
- public DocumentListMessage(DocumentListMessage cmd) {
- bucket = cmd.bucket;
- entries.addAll(cmd.entries);
- }
-
- public BucketId getBucketId() {
- return bucket;
- }
-
- public void setBucketId(BucketId id) {
- bucket = id;
- }
-
- public List<DocumentListEntry> getDocuments() {
- return entries;
- }
-
- @Override
- public DocumentReply createReply() {
- return new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST);
- }
-
- @Override
- public int getType() {
- return DocumentProtocol.MESSAGE_DOCUMENTLIST;
- }
-
- @Override
- public int getApproxSize() {
- return DocumentListEntry.getApproxSize() * entries.size();
- }
-
- @Override
- public String toString() {
- return "DocumentListMessage(" + entries.toString() + ")";
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.BucketId; + +import java.util.ArrayList; +import java.util.List; + +public class DocumentListMessage extends VisitorMessage { + + private BucketId bucket = new BucketId(16, 0); + private final List<DocumentListEntry> entries = new ArrayList<DocumentListEntry>(); + + public DocumentListMessage() { + // empty + } + + public DocumentListMessage(DocumentListMessage cmd) { + bucket = cmd.bucket; + entries.addAll(cmd.entries); + } + + public BucketId getBucketId() { + return bucket; + } + + public void setBucketId(BucketId id) { + bucket = id; + } + + public List<DocumentListEntry> getDocuments() { + return entries; + } + + @Override + public DocumentReply createReply() { + return new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST); + } + + @Override + public int getType() { + return DocumentProtocol.MESSAGE_DOCUMENTLIST; + } + + @Override + public int getApproxSize() { + return DocumentListEntry.getApproxSize() * entries.size(); + } + + @Override + public String toString() { + return "DocumentListMessage(" + entries.toString() + ")"; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentMessage.java index c4839c87f69..7949b15d955 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentMessage.java @@ -1,85 +1,85 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Routable;
-import com.yahoo.text.Utf8String;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public abstract class DocumentMessage extends Message {
-
- private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3;
- private LoadType loadType = LoadType.DEFAULT;
-
- /**
- * Constructs a new message with no content.
- */
- public DocumentMessage() {
- // empty
- }
-
- /**
- * Creates and returns a reply to this message.
- *
- * @return The created reply.
- */
- public abstract DocumentReply createReply();
-
- @Override
- public void swapState(Routable rhs) {
- super.swapState(rhs);
- if (rhs instanceof DocumentMessage) {
- DocumentMessage msg = (DocumentMessage)rhs;
-
- DocumentProtocol.Priority pri = this.priority;
- this.priority = msg.priority;
- msg.priority = pri;
-
- LoadType lt = this.loadType;
- this.loadType = msg.loadType;
- msg.loadType = lt;
- }
- }
-
- /**
- * Returns the priority tag for this message. This is an optional tag added for VDS that is not interpreted by the
- * document protocol.
- *
- * @return The priority.
- */
- public DocumentProtocol.Priority getPriority() { return priority; }
-
- /**
- * Sets the priority tag for this message.
- *
- * @param priority The priority to set.
- */
- public void setPriority(DocumentProtocol.Priority priority) {
- this.priority = priority;
- }
-
- public LoadType getLoadType() {
- return loadType;
- }
-
- public void setLoadType(LoadType loadType) {
- if (loadType != null) {
- this.loadType = loadType;
- } else {
- this.loadType = LoadType.DEFAULT;
- }
- }
-
- @Override
- public int getApproxSize() {
- return 4 + 1; // type + priority
- }
-
- @Override
- public Utf8String getProtocol() {
- return DocumentProtocol.NAME;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.documentapi.messagebus.loadtypes.LoadType; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Routable; +import com.yahoo.text.Utf8String; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public abstract class DocumentMessage extends Message { + + private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3; + private LoadType loadType = LoadType.DEFAULT; + + /** + * Constructs a new message with no content. + */ + public DocumentMessage() { + // empty + } + + /** + * Creates and returns a reply to this message. + * + * @return The created reply. + */ + public abstract DocumentReply createReply(); + + @Override + public void swapState(Routable rhs) { + super.swapState(rhs); + if (rhs instanceof DocumentMessage) { + DocumentMessage msg = (DocumentMessage)rhs; + + DocumentProtocol.Priority pri = this.priority; + this.priority = msg.priority; + msg.priority = pri; + + LoadType lt = this.loadType; + this.loadType = msg.loadType; + msg.loadType = lt; + } + } + + /** + * Returns the priority tag for this message. This is an optional tag added for VDS that is not interpreted by the + * document protocol. + * + * @return The priority. + */ + public DocumentProtocol.Priority getPriority() { return priority; } + + /** + * Sets the priority tag for this message. + * + * @param priority The priority to set. + */ + public void setPriority(DocumentProtocol.Priority priority) { + this.priority = priority; + } + + public LoadType getLoadType() { + return loadType; + } + + public void setLoadType(LoadType loadType) { + if (loadType != null) { + this.loadType = loadType; + } else { + this.loadType = LoadType.DEFAULT; + } + } + + @Override + public int getApproxSize() { + return 4 + 1; // type + priority + } + + @Override + public Utf8String getProtocol() { + return DocumentProtocol.NAME; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentReply.java index 126d85c5703..e28b11646ab 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentReply.java @@ -1,53 +1,53 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.messagebus.Reply;
-import com.yahoo.text.Utf8String;
-
-/**
- * This class implements a generic document protocol reply that can be reused by document messages that require no
- * special reply implementation while still allowing applications to distinguish between types.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class DocumentReply extends Reply {
-
- private final int type;
- private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3;
-
- /**
- * Constructs a new reply of given type.
- *
- * @param type The type code to assign to this.
- */
- public DocumentReply(int type) {
- this.type = type;
- }
-
- /**
- * Returns the priority tag for this message.
- * @return The priority.
- */
- public DocumentProtocol.Priority getPriority() {
- return priority;
- }
-
- /**
- * Sets the priority tag for this message.
- *
- * @param priority The priority to set.
- */
- public void setPriority(DocumentProtocol.Priority priority) {
- this.priority = priority;
- }
-
- @Override
- public Utf8String getProtocol() {
- return DocumentProtocol.NAME;
- }
-
- @Override
- public final int getType() {
- return type;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.messagebus.Reply; +import com.yahoo.text.Utf8String; + +/** + * This class implements a generic document protocol reply that can be reused by document messages that require no + * special reply implementation while still allowing applications to distinguish between types. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class DocumentReply extends Reply { + + private final int type; + private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3; + + /** + * Constructs a new reply of given type. + * + * @param type The type code to assign to this. + */ + public DocumentReply(int type) { + this.type = type; + } + + /** + * Returns the priority tag for this message. + * @return The priority. + */ + public DocumentProtocol.Priority getPriority() { + return priority; + } + + /** + * Sets the priority tag for this message. + * + * @param priority The priority to set. + */ + public void setPriority(DocumentProtocol.Priority priority) { + this.priority = priority; + } + + @Override + public Utf8String getProtocol() { + return DocumentProtocol.NAME; + } + + @Override + public final int getType() { + return type; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ErrorPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ErrorPolicy.java index 0b7310ad2fb..1246f1e309a 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ErrorPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ErrorPolicy.java @@ -1,44 +1,44 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.RoutingContext;
-
-/**
- * This policy assigns an error supplied at constructor time to the routing context when {@link #select(RoutingContext)}
- * is invoked. This is useful for returning error states to the client instead of those auto-generated by mbus when a
- * routing policy can not be created.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ErrorPolicy implements DocumentProtocolRoutingPolicy {
-
- private final String msg;
-
- /**
- * Creates a new policy that will assign an {@link EmptyReply} with the given error to all routing contexts that
- * invoke {@link #select(RoutingContext)}.
- *
- * @param msg The message of the error to assign.
- */
- public ErrorPolicy(String msg) {
- this.msg = msg;
- }
-
- public void select(RoutingContext ctx) {
- ctx.setError(DocumentProtocol.ERROR_POLICY_FAILURE, msg);
- }
-
- public void merge(RoutingContext ctx) {
- throw new AssertionError("Routing should not pass terminated selection.");
- }
-
- public void destroy() {
- }
-
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.RoutingContext; + +/** + * This policy assigns an error supplied at constructor time to the routing context when {@link #select(RoutingContext)} + * is invoked. This is useful for returning error states to the client instead of those auto-generated by mbus when a + * routing policy can not be created. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ErrorPolicy implements DocumentProtocolRoutingPolicy { + + private final String msg; + + /** + * Creates a new policy that will assign an {@link EmptyReply} with the given error to all routing contexts that + * invoke {@link #select(RoutingContext)}. + * + * @param msg The message of the error to assign. + */ + public ErrorPolicy(String msg) { + this.msg = msg; + } + + public void select(RoutingContext ctx) { + ctx.setError(DocumentProtocol.ERROR_POLICY_FAILURE, msg); + } + + public void merge(RoutingContext ctx) { + throw new AssertionError("Routing should not pass terminated selection."); + } + + public void destroy() { + } + + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java index a843102f466..90140f22d90 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java @@ -1,147 +1,147 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Transport;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.jrt.slobrok.api.SlobrokList;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This policy implements the necessary logic to communicate with an external Vespa application and resolve its list of
- * recipients using that other application's slobrok servers.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ExternPolicy implements DocumentProtocolRoutingPolicy {
-
- private Supervisor orb = null;
- private Mirror mirror = null;
- private String pattern = null;
- private String session = null;
- private final String error;
- private int offset = 0;
- private int generation = 0;
- private final List<Hop> recipients = new ArrayList<>();
-
- /**
- * Constructs a new instance of this policy. The argument given is the connection spec to the slobrok to use for
- * resolving recipients, as well as the pattern to use when querying. This constructor does _not_ wait for the
- * mirror to become ready.
- *
- * @param arg The slobrok connection spec.
- */
- public ExternPolicy(String arg) {
- if (arg == null || arg.length() == 0) {
- error = "Expected parameter, got empty string.";
- return;
- }
- String[] args = arg.split(";", 2);
- if (args.length != 2 || args[0].length() == 0 || args[1].length() == 0) {
- error = "Expected parameter on the form '<spec>;<pattern>', got '" + arg + "'.";
- return;
- }
- int pos = args[1].lastIndexOf('/');
- if (pos < 0) {
- error = "Expected pattern on the form '<service>/<session>', got '" + args[1] + "'.";
- return;
- }
- SlobrokList slobroks = new SlobrokList();
- slobroks.setup(args[0].split(","));
- pattern = args[1];
- session = pattern.substring(pos);
- orb = new Supervisor(new Transport());
- mirror = new Mirror(orb, slobroks);
- error = null;
- }
-
- /**
- * This is a safety mechanism to allow the constructor to fail and signal that it can not be used.
- *
- * @return The error string, or null if no error.
- */
- public String getError() {
- return error;
- }
-
- /**
- * Returns the slobrok mirror used by this policy to resolve external recipients.
- *
- * @return The external mirror.
- */
- public Mirror getMirror() {
- return mirror;
- }
-
- /**
- * Returns the appropriate recipient hop. This method provides synchronized access to the internal mirror.
- *
- * @return The recipient hop to use.
- */
- private synchronized Hop getRecipient() {
- update();
- if (recipients.isEmpty()) {
- return null;
- }
- int offset = ++this.offset & Integer.MAX_VALUE; // mask signed bit because of modulo
- return new Hop(recipients.get(offset % recipients.size()));
- }
-
- /**
- * Updates the list of matching recipients by querying the extern slobrok.
- */
- private void update() {
- int upd = mirror.updates();
- if (generation != upd) {
- generation = upd;
- recipients.clear();
- Mirror.Entry[] arr = mirror.lookup(pattern);
- for (Mirror.Entry entry : arr) {
- recipients.add(Hop.parse(entry.getSpec() + session));
- }
- }
- }
-
- @Override
- public void finalize() throws Throwable {
- super.finalize();
- mirror.shutdown();
- orb.transport().shutdown().join();
- }
-
- public void select(RoutingContext ctx) {
- if (error != null) {
- ctx.setError(DocumentProtocol.ERROR_POLICY_FAILURE, error);
- } else if (mirror.ready()) {
- Hop hop = getRecipient();
- if (hop != null) {
- Route route = new Route(ctx.getRoute());
- route.setHop(0, hop);
- ctx.addChild(route);
- } else {
- ctx.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "Could not resolve any recipients from '" + pattern + "'.");
- }
- } else {
- ctx.setError(ErrorCode.APP_TRANSIENT_ERROR, "Extern slobrok not ready.");
- }
- }
-
- public void merge(RoutingContext ctx) {
- DocumentProtocol.merge(ctx);
- }
-
- public void destroy() {
- }
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.api.SlobrokList; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * This policy implements the necessary logic to communicate with an external Vespa application and resolve its list of + * recipients using that other application's slobrok servers. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ExternPolicy implements DocumentProtocolRoutingPolicy { + + private Supervisor orb = null; + private Mirror mirror = null; + private String pattern = null; + private String session = null; + private final String error; + private int offset = 0; + private int generation = 0; + private final List<Hop> recipients = new ArrayList<>(); + + /** + * Constructs a new instance of this policy. The argument given is the connection spec to the slobrok to use for + * resolving recipients, as well as the pattern to use when querying. This constructor does _not_ wait for the + * mirror to become ready. + * + * @param arg The slobrok connection spec. + */ + public ExternPolicy(String arg) { + if (arg == null || arg.length() == 0) { + error = "Expected parameter, got empty string."; + return; + } + String[] args = arg.split(";", 2); + if (args.length != 2 || args[0].length() == 0 || args[1].length() == 0) { + error = "Expected parameter on the form '<spec>;<pattern>', got '" + arg + "'."; + return; + } + int pos = args[1].lastIndexOf('/'); + if (pos < 0) { + error = "Expected pattern on the form '<service>/<session>', got '" + args[1] + "'."; + return; + } + SlobrokList slobroks = new SlobrokList(); + slobroks.setup(args[0].split(",")); + pattern = args[1]; + session = pattern.substring(pos); + orb = new Supervisor(new Transport()); + mirror = new Mirror(orb, slobroks); + error = null; + } + + /** + * This is a safety mechanism to allow the constructor to fail and signal that it can not be used. + * + * @return The error string, or null if no error. + */ + public String getError() { + return error; + } + + /** + * Returns the slobrok mirror used by this policy to resolve external recipients. + * + * @return The external mirror. + */ + public Mirror getMirror() { + return mirror; + } + + /** + * Returns the appropriate recipient hop. This method provides synchronized access to the internal mirror. + * + * @return The recipient hop to use. + */ + private synchronized Hop getRecipient() { + update(); + if (recipients.isEmpty()) { + return null; + } + int offset = ++this.offset & Integer.MAX_VALUE; // mask signed bit because of modulo + return new Hop(recipients.get(offset % recipients.size())); + } + + /** + * Updates the list of matching recipients by querying the extern slobrok. + */ + private void update() { + int upd = mirror.updates(); + if (generation != upd) { + generation = upd; + recipients.clear(); + Mirror.Entry[] arr = mirror.lookup(pattern); + for (Mirror.Entry entry : arr) { + recipients.add(Hop.parse(entry.getSpec() + session)); + } + } + } + + @Override + public void finalize() throws Throwable { + super.finalize(); + mirror.shutdown(); + orb.transport().shutdown().join(); + } + + public void select(RoutingContext ctx) { + if (error != null) { + ctx.setError(DocumentProtocol.ERROR_POLICY_FAILURE, error); + } else if (mirror.ready()) { + Hop hop = getRecipient(); + if (hop != null) { + Route route = new Route(ctx.getRoute()); + route.setHop(0, hop); + ctx.addChild(route); + } else { + ctx.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "Could not resolve any recipients from '" + pattern + "'."); + } + } else { + ctx.setError(ErrorCode.APP_TRANSIENT_ERROR, "Extern slobrok not ready."); + } + } + + public void merge(RoutingContext ctx) { + DocumentProtocol.merge(ctx); + } + + public void destroy() { + } + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java index d38aa2e94f2..e7d623d86e2 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java @@ -1,50 +1,50 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-
-public class GetBucketListMessage extends DocumentMessage {
-
- private BucketId bucketId;
-
- GetBucketListMessage() {
- // must be deserialized into
- }
-
- public GetBucketListMessage(BucketId bucketId) {
- this.bucketId = bucketId;
- }
-
- public BucketId getBucketId() {
- return bucketId;
- }
-
- void setBucketId(BucketId id) {
- bucketId = id;
- }
-
- @Override
- public DocumentReply createReply() {
- return new StatBucketReply();
- }
-
- @Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return bucketId.getRawId();
- }
-
- @Override
- public int getApproxSize() {
- return super.getApproxSize() + 8;
- }
-
- @Override
- public int getType() {
- return DocumentProtocol.MESSAGE_GETBUCKETLIST;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.BucketId; + +public class GetBucketListMessage extends DocumentMessage { + + private BucketId bucketId; + + GetBucketListMessage() { + // must be deserialized into + } + + public GetBucketListMessage(BucketId bucketId) { + this.bucketId = bucketId; + } + + public BucketId getBucketId() { + return bucketId; + } + + void setBucketId(BucketId id) { + bucketId = id; + } + + @Override + public DocumentReply createReply() { + return new StatBucketReply(); + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return bucketId.getRawId(); + } + + @Override + public int getApproxSize() { + return super.getApproxSize() + 8; + } + + @Override + public int getType() { + return DocumentProtocol.MESSAGE_GETBUCKETLIST; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListReply.java index 07013507d91..51a5289c46f 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListReply.java @@ -1,70 +1,70 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class GetBucketListReply extends DocumentReply {
-
- public static class BucketInfo {
- BucketId bucket;
- String bucketInformation;
-
- BucketInfo() {
- // must be deserialized into
- }
-
- public BucketInfo(BucketId bucket, String bucketInformation) {
- this.bucket = bucket;
- this.bucketInformation = bucketInformation;
- }
-
- public BucketId getBucketId() {
- return bucket;
- }
-
- public String getBucketInformation() {
- return bucketInformation;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof BucketInfo)) {
- return false;
- }
- BucketInfo rhs = (BucketInfo)obj;
- if (bucket == null) {
- if (rhs.bucket != null) {
- return false;
- }
- } else if (!bucket.equals(rhs.bucket)) {
- return false;
- }
- if (bucketInformation == null) {
- if (rhs.bucketInformation != null) {
- return false;
- }
- } else if (!bucketInformation.equals(rhs.bucketInformation)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return String.format("BucketInfo(%s: %s)", bucket, bucketInformation);
- }
- }
-
- private final List<BucketInfo> buckets = new ArrayList<BucketInfo>();
-
- public GetBucketListReply() {
- super(DocumentProtocol.REPLY_GETBUCKETLIST);
- }
-
- public List<BucketInfo> getBuckets() {
- return buckets;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.BucketId; + +import java.util.ArrayList; +import java.util.List; + +public class GetBucketListReply extends DocumentReply { + + public static class BucketInfo { + BucketId bucket; + String bucketInformation; + + BucketInfo() { + // must be deserialized into + } + + public BucketInfo(BucketId bucket, String bucketInformation) { + this.bucket = bucket; + this.bucketInformation = bucketInformation; + } + + public BucketId getBucketId() { + return bucket; + } + + public String getBucketInformation() { + return bucketInformation; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BucketInfo)) { + return false; + } + BucketInfo rhs = (BucketInfo)obj; + if (bucket == null) { + if (rhs.bucket != null) { + return false; + } + } else if (!bucket.equals(rhs.bucket)) { + return false; + } + if (bucketInformation == null) { + if (rhs.bucketInformation != null) { + return false; + } + } else if (!bucketInformation.equals(rhs.bucketInformation)) { + return false; + } + return true; + } + + @Override + public String toString() { + return String.format("BucketInfo(%s: %s)", bucket, bucketInformation); + } + } + + private final List<BucketInfo> buckets = new ArrayList<BucketInfo>(); + + public GetBucketListReply() { + super(DocumentProtocol.REPLY_GETBUCKETLIST); + } + + public List<BucketInfo> getBuckets() { + return buckets; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java index cf66704d21f..f0a94fa4f7b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java @@ -1,94 +1,94 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.DocumentId;
-
-import java.util.Arrays;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class GetDocumentMessage extends DocumentMessage {
-
- final static String DEFAULT_FIELD_SET = "[all]";
- private DocumentId documentId = null;
- private String fieldSet = DEFAULT_FIELD_SET;
-
- /**
- * Constructs a new message for deserialization.
- */
- GetDocumentMessage() {
- // empty
- }
-
- /**
- * Constructs a new document get message.
- *
- * @param documentId The identifier of the document to get.
- */
- public GetDocumentMessage(DocumentId documentId) {
- setDocumentId(documentId);
- }
-
- /**
- * Constructs a new document get message.
- *
- * @param documentId The identifier of the document to get.
- * @param fieldSet Which fields to retrieve from the document
- */
- public GetDocumentMessage(DocumentId documentId, String fieldSet) {
- setDocumentId(documentId);
- this.fieldSet = fieldSet;
- }
-
- /**
- * Returns the identifier of the document to retrieve.
- *
- * @return The document id.
- */
- public DocumentId getDocumentId() {
- return documentId;
- }
-
- /**
- * Sets the identifier of the document to retrieve.
- *
- * @param documentId The document id to set.
- */
- public void setDocumentId(DocumentId documentId) {
- if (documentId == null) {
- throw new IllegalArgumentException("Document id can not be null.");
- }
- this.documentId = documentId;
- }
-
- public String getFieldSet() {
- return fieldSet;
- }
-
- @Override
- public DocumentReply createReply() {
- return new GetDocumentReply();
- }
-
- @Override
- public int getApproxSize() {
- return super.getApproxSize() + 4 + documentId.toString().length();
- }
-
- @Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return Arrays.hashCode(documentId.getGlobalId());
- }
-
- @Override
- public int getType() {
- return DocumentProtocol.MESSAGE_GETDOCUMENT;
- }
-
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.DocumentId; + +import java.util.Arrays; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class GetDocumentMessage extends DocumentMessage { + + final static String DEFAULT_FIELD_SET = "[all]"; + private DocumentId documentId = null; + private String fieldSet = DEFAULT_FIELD_SET; + + /** + * Constructs a new message for deserialization. + */ + GetDocumentMessage() { + // empty + } + + /** + * Constructs a new document get message. + * + * @param documentId The identifier of the document to get. + */ + public GetDocumentMessage(DocumentId documentId) { + setDocumentId(documentId); + } + + /** + * Constructs a new document get message. + * + * @param documentId The identifier of the document to get. + * @param fieldSet Which fields to retrieve from the document + */ + public GetDocumentMessage(DocumentId documentId, String fieldSet) { + setDocumentId(documentId); + this.fieldSet = fieldSet; + } + + /** + * Returns the identifier of the document to retrieve. + * + * @return The document id. + */ + public DocumentId getDocumentId() { + return documentId; + } + + /** + * Sets the identifier of the document to retrieve. + * + * @param documentId The document id to set. + */ + public void setDocumentId(DocumentId documentId) { + if (documentId == null) { + throw new IllegalArgumentException("Document id can not be null."); + } + this.documentId = documentId; + } + + public String getFieldSet() { + return fieldSet; + } + + @Override + public DocumentReply createReply() { + return new GetDocumentReply(); + } + + @Override + public int getApproxSize() { + return super.getApproxSize() + 4 + documentId.toString().length(); + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return Arrays.hashCode(documentId.getGlobalId()); + } + + @Override + public int getType() { + return DocumentProtocol.MESSAGE_GETDOCUMENT; + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentReply.java index f5f687bb18b..0d1413c3bfe 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentReply.java @@ -1,108 +1,108 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.serialization.DocumentDeserializer;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class GetDocumentReply extends DocumentAcceptedReply {
-
- private DocumentDeserializer buffer = null;
- private Document document = null;
- private long lastModified = 0;
- private LazyDecoder decoder = null;
-
- /**
- * Constructs a new reply for deserialization.
- */
- GetDocumentReply() {
- super(DocumentProtocol.REPLY_GETDOCUMENT);
- }
-
- /**
- * Constructs a new reply to lazily deserialize from a byte buffer.
- * @param decoder The decoder to use for deserialization.
- * @param buf A byte buffer that contains a serialized reply.
- */
- GetDocumentReply(LazyDecoder decoder, DocumentDeserializer buf) {
- super(DocumentProtocol.REPLY_GETDOCUMENT);
- this.decoder = decoder;
- buffer = buf;
- }
-
- /**
- * Constructs a new document get reply.
- *
- * @param doc The document requested.
- */
- public GetDocumentReply(Document doc) {
- super(DocumentProtocol.REPLY_GETDOCUMENT);
- document = doc;
- }
-
- /**
- * This method will make sure that any serialized content is deserialized into proper message content on first
- * entry. Any subsequent entry into this function will do nothing.
- */
- private void deserialize() {
- if (decoder != null && buffer != null) {
- decoder.decode(this, buffer);
- decoder = null;
- buffer = null;
- }
- }
-
- /**
- * Returns the document retrieved.
- *
- * @return The document.
- */
- public Document getDocument() {
- deserialize();
- return document;
- }
-
- /**
- * Sets the document of this reply.
- *
- * @param doc The document to set.
- */
- public void setDocument(Document doc) {
- buffer = null;
- decoder = null;
- document = doc;
- lastModified = document != null && document.getLastModified() != null ? document.getLastModified() : 0;
- }
-
- /**
- * Returns the date the document was last modified.
- *
- * @return The date.
- */
- public long getLastModified() {
- deserialize();
- return lastModified;
- }
-
- /**
- * Set the date the document was last modified.
- *
- * @param modified The date.
- */
- void setLastModified(long modified) {
- lastModified = modified;
- }
-
- /**
- * Returns the internal buffer to deserialize from, may be null.
- *
- * @return The buffer.
- */
- public ByteBuffer getSerializedBuffer() {
- return buffer != null ? buffer.getBuf().getByteBuffer() : null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.Document; +import com.yahoo.document.serialization.DocumentDeserializer; + +import java.nio.ByteBuffer; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class GetDocumentReply extends DocumentAcceptedReply { + + private DocumentDeserializer buffer = null; + private Document document = null; + private long lastModified = 0; + private LazyDecoder decoder = null; + + /** + * Constructs a new reply for deserialization. + */ + GetDocumentReply() { + super(DocumentProtocol.REPLY_GETDOCUMENT); + } + + /** + * Constructs a new reply to lazily deserialize from a byte buffer. + * @param decoder The decoder to use for deserialization. + * @param buf A byte buffer that contains a serialized reply. + */ + GetDocumentReply(LazyDecoder decoder, DocumentDeserializer buf) { + super(DocumentProtocol.REPLY_GETDOCUMENT); + this.decoder = decoder; + buffer = buf; + } + + /** + * Constructs a new document get reply. + * + * @param doc The document requested. + */ + public GetDocumentReply(Document doc) { + super(DocumentProtocol.REPLY_GETDOCUMENT); + document = doc; + } + + /** + * This method will make sure that any serialized content is deserialized into proper message content on first + * entry. Any subsequent entry into this function will do nothing. + */ + private void deserialize() { + if (decoder != null && buffer != null) { + decoder.decode(this, buffer); + decoder = null; + buffer = null; + } + } + + /** + * Returns the document retrieved. + * + * @return The document. + */ + public Document getDocument() { + deserialize(); + return document; + } + + /** + * Sets the document of this reply. + * + * @param doc The document to set. + */ + public void setDocument(Document doc) { + buffer = null; + decoder = null; + document = doc; + lastModified = document != null && document.getLastModified() != null ? document.getLastModified() : 0; + } + + /** + * Returns the date the document was last modified. + * + * @return The date. + */ + public long getLastModified() { + deserialize(); + return lastModified; + } + + /** + * Set the date the document was last modified. + * + * @param modified The date. + */ + void setLastModified(long modified) { + lastModified = modified; + } + + /** + * Returns the internal buffer to deserialize from, may be null. + * + * @return The buffer. + */ + public ByteBuffer getSerializedBuffer() { + return buffer != null ? buffer.getBuf().getByteBuffer() : null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LazyDecoder.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LazyDecoder.java index 2ac7f716850..325b8ab44af 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LazyDecoder.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LazyDecoder.java @@ -8,4 +8,4 @@ public interface LazyDecoder { public void decode(Routable obj, DocumentDeserializer buf); -}
\ No newline at end of file +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java index 74ca65df547..c4f190559d3 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java @@ -1,138 +1,138 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.*;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This policy implements the logic to prefer local services that matches a slobrok pattern.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class LocalServicePolicy implements DocumentProtocolRoutingPolicy {
-
- private final String localAddress;
- private Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>();
-
- /**
- * Constructs a policy that will choose local services that match the slobrok pattern in which this policy occured.
- * If no local service can be found, this policy simply returns the asterisk to allow the network to choose any.
- *
- * @param param The address to use for this, if empty this will resolve to hostname.
- */
- public LocalServicePolicy(String param) {
- localAddress = (param != null && param.length() > 0) ? param : null;
- }
-
- // Inherit doc from RoutingPolicy.
- public void select(RoutingContext ctx) {
- Route route = new Route(ctx.getRoute());
- route.setHop(0, getRecipient(ctx));
- ctx.addChild(route);
- }
-
- // Inherit doc from RoutingPolicy.
- public void merge(RoutingContext ctx) {
- DocumentProtocol.merge(ctx);
- }
-
- /**
- * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to
- * the internal cache.
- *
- * @param ctx The routing context.
- * @return The recipient hop to use.
- */
- private synchronized Hop getRecipient(RoutingContext ctx) {
- CacheEntry entry = update(ctx);
- if (entry.recipients.isEmpty()) {
- Hop hop = new Hop(ctx.getRoute().getHop(0));
- hop.setDirective(ctx.getDirectiveIndex(), new VerbatimDirective("*"));
- return hop;
- }
- if (++entry.offset >= entry.recipients.size()) {
- entry.offset = 0;
- }
- return new Hop(entry.recipients.get(entry.offset));
- }
-
- /**
- * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is
- * handled outside of it.
- *
- * @param ctx The routing context.
- * @return The updated cache entry.
- */
- private CacheEntry update(RoutingContext ctx) {
- String key = getCacheKey(ctx);
- CacheEntry entry = cache.get(key);
- if (entry == null) {
- entry = new CacheEntry();
- cache.put(key, entry);
- }
- int upd = ctx.getMirror().updates();
- if (entry.generation != upd) {
- entry.generation = upd;
- entry.recipients.clear();
-
- Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix());
- String self = localAddress != null ? localAddress : toAddress(ctx.getMessageBus().getConnectionSpec());
- for (Mirror.Entry item : arr) {
- if (self.equals(toAddress(item.getSpec()))) {
- entry.recipients.add(Hop.parse(item.getName()));
- }
- }
- }
- return entry;
- }
-
- /**
- * Returns a cache key for this instance of the policy. Because behaviour is based on the hop in which the policy
- * occurs, the cache key is the hop string itself.
- *
- * @param ctx The routing context.
- * @return The cache key.
- */
- private String getCacheKey(RoutingContext ctx) {
- return ctx.getRoute().getHop(0).toString();
- }
-
- /**
- * Defines the necessary cache data.
- */
- private class CacheEntry {
- private final List<Hop> recipients = new ArrayList<Hop>();
- private int generation = 0;
- private int offset = 0;
- }
-
- /**
- * Searches the given connection spec for a hostname or IP address. If an address is not found, this method returns
- * null.
- *
- * @param connection The connection spec to search.
- * @return The address, may be null.
- */
- private static String toAddress(String connection) {
- if (connection.startsWith("tcp/")) {
- int pos = connection.indexOf(':');
- if (pos > 4) {
- return connection.substring(4, pos);
- }
- }
- return null;
- }
-
- public void destroy() {
- }
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This policy implements the logic to prefer local services that matches a slobrok pattern. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class LocalServicePolicy implements DocumentProtocolRoutingPolicy { + + private final String localAddress; + private Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>(); + + /** + * Constructs a policy that will choose local services that match the slobrok pattern in which this policy occured. + * If no local service can be found, this policy simply returns the asterisk to allow the network to choose any. + * + * @param param The address to use for this, if empty this will resolve to hostname. + */ + public LocalServicePolicy(String param) { + localAddress = (param != null && param.length() > 0) ? param : null; + } + + // Inherit doc from RoutingPolicy. + public void select(RoutingContext ctx) { + Route route = new Route(ctx.getRoute()); + route.setHop(0, getRecipient(ctx)); + ctx.addChild(route); + } + + // Inherit doc from RoutingPolicy. + public void merge(RoutingContext ctx) { + DocumentProtocol.merge(ctx); + } + + /** + * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to + * the internal cache. + * + * @param ctx The routing context. + * @return The recipient hop to use. + */ + private synchronized Hop getRecipient(RoutingContext ctx) { + CacheEntry entry = update(ctx); + if (entry.recipients.isEmpty()) { + Hop hop = new Hop(ctx.getRoute().getHop(0)); + hop.setDirective(ctx.getDirectiveIndex(), new VerbatimDirective("*")); + return hop; + } + if (++entry.offset >= entry.recipients.size()) { + entry.offset = 0; + } + return new Hop(entry.recipients.get(entry.offset)); + } + + /** + * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is + * handled outside of it. + * + * @param ctx The routing context. + * @return The updated cache entry. + */ + private CacheEntry update(RoutingContext ctx) { + String key = getCacheKey(ctx); + CacheEntry entry = cache.get(key); + if (entry == null) { + entry = new CacheEntry(); + cache.put(key, entry); + } + int upd = ctx.getMirror().updates(); + if (entry.generation != upd) { + entry.generation = upd; + entry.recipients.clear(); + + Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix()); + String self = localAddress != null ? localAddress : toAddress(ctx.getMessageBus().getConnectionSpec()); + for (Mirror.Entry item : arr) { + if (self.equals(toAddress(item.getSpec()))) { + entry.recipients.add(Hop.parse(item.getName())); + } + } + } + return entry; + } + + /** + * Returns a cache key for this instance of the policy. Because behaviour is based on the hop in which the policy + * occurs, the cache key is the hop string itself. + * + * @param ctx The routing context. + * @return The cache key. + */ + private String getCacheKey(RoutingContext ctx) { + return ctx.getRoute().getHop(0).toString(); + } + + /** + * Defines the necessary cache data. + */ + private class CacheEntry { + private final List<Hop> recipients = new ArrayList<Hop>(); + private int generation = 0; + private int offset = 0; + } + + /** + * Searches the given connection spec for a hostname or IP address. If an address is not found, this method returns + * null. + * + * @param connection The connection spec to search. + * @return The address, may be null. + */ + private static String toAddress(String connection) { + if (connection.startsWith("tcp/")) { + int pos = connection.indexOf(':'); + if (pos > 4) { + return connection.substring(4, pos); + } + } + return null; + } + + public void destroy() { + } + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentMessage.java index f6fcb5965ad..e184de7d4a4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentMessage.java @@ -1,101 +1,101 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.TestAndSetCondition;
-
-import java.util.Arrays;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RemoveDocumentMessage extends TestAndSetMessage {
- private DocumentRemove remove = null;
-
- /**
- * Constructs a new message for deserialization.
- */
- RemoveDocumentMessage() {
- // empty
- }
-
- /**
- * Constructs a new document remove message.
- *
- * @param documentId The identifier of the document to remove.
- */
- public RemoveDocumentMessage(DocumentId documentId) {
- remove = new DocumentRemove(documentId);
- }
-
- /**
- * Constructs a new document remove message.
- *
- * @param remove The DocumentRemove operation to perform
- */
- public RemoveDocumentMessage(DocumentRemove remove) {
- this.remove = remove;
- }
-
- /**
- * Returns the identifier of the document to remove.
- *
- * @return The document id.
- */
- public DocumentId getDocumentId() {
- return remove.getId();
- }
-
- /**
- * Sets the identifier of the document to remove.
- *
- * @param documentId The document id to set.
- */
- public void setDocumentId(DocumentId documentId) {
- if (documentId == null) {
- throw new IllegalArgumentException("Document id can not be null.");
- }
-
- remove = new DocumentRemove(documentId);
- }
-
- @Override
- public DocumentReply createReply() {
- return new RemoveDocumentReply();
- }
-
- @Override
- public int getApproxSize() {
- return super.getApproxSize() + 4 + remove.getId().toString().length();
- }
-
- @Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return Arrays.hashCode(remove.getId().getGlobalId());
- }
-
- @Override
- public int getType() {
- return DocumentProtocol.MESSAGE_REMOVEDOCUMENT;
- }
-
- @Override
- public void setCondition(TestAndSetCondition condition) {
- remove.setCondition(condition);
- }
-
- @Override
- public TestAndSetCondition getCondition() {
- return remove.getCondition();
- }
-
- public DocumentRemove getDocumentRemove() {
- return remove;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.TestAndSetCondition; + +import java.util.Arrays; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RemoveDocumentMessage extends TestAndSetMessage { + private DocumentRemove remove = null; + + /** + * Constructs a new message for deserialization. + */ + RemoveDocumentMessage() { + // empty + } + + /** + * Constructs a new document remove message. + * + * @param documentId The identifier of the document to remove. + */ + public RemoveDocumentMessage(DocumentId documentId) { + remove = new DocumentRemove(documentId); + } + + /** + * Constructs a new document remove message. + * + * @param remove The DocumentRemove operation to perform + */ + public RemoveDocumentMessage(DocumentRemove remove) { + this.remove = remove; + } + + /** + * Returns the identifier of the document to remove. + * + * @return The document id. + */ + public DocumentId getDocumentId() { + return remove.getId(); + } + + /** + * Sets the identifier of the document to remove. + * + * @param documentId The document id to set. + */ + public void setDocumentId(DocumentId documentId) { + if (documentId == null) { + throw new IllegalArgumentException("Document id can not be null."); + } + + remove = new DocumentRemove(documentId); + } + + @Override + public DocumentReply createReply() { + return new RemoveDocumentReply(); + } + + @Override + public int getApproxSize() { + return super.getApproxSize() + 4 + remove.getId().toString().length(); + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return Arrays.hashCode(remove.getId().getGlobalId()); + } + + @Override + public int getType() { + return DocumentProtocol.MESSAGE_REMOVEDOCUMENT; + } + + @Override + public void setCondition(TestAndSetCondition condition) { + remove.setCondition(condition); + } + + @Override + public TestAndSetCondition getCondition() { + return remove.getCondition(); + } + + public DocumentRemove getDocumentRemove() { + return remove; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentReply.java index c259aaa5731..c60bf65fed4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveDocumentReply.java @@ -1,35 +1,35 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RemoveDocumentReply extends WriteDocumentReply {
-
- private boolean found = true;
-
- /**
- * Constructs a new reply with no content.
- */
- public RemoveDocumentReply() {
- super(DocumentProtocol.REPLY_REMOVEDOCUMENT);
- }
-
- /**
- * Returns whether or not the document was found and removed.
- *
- * @return True if document was found.
- */
- public boolean wasFound() {
- return found;
- }
-
- /**
- * Set whether or not the document was found and removed.
- *
- * @param found True if the document was found.
- */
- public void setWasFound(boolean found) {
- this.found = found;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RemoveDocumentReply extends WriteDocumentReply { + + private boolean found = true; + + /** + * Constructs a new reply with no content. + */ + public RemoveDocumentReply() { + super(DocumentProtocol.REPLY_REMOVEDOCUMENT); + } + + /** + * Returns whether or not the document was found and removed. + * + * @return True if document was found. + */ + public boolean wasFound() { + return found; + } + + /** + * Set whether or not the document was found and removed. + * + * @param found True if the document was found. + */ + public void setWasFound(boolean found) { + this.found = found; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ReplyMerger.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ReplyMerger.java index e0873a840d0..97ecc20dd3d 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ReplyMerger.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ReplyMerger.java @@ -1,111 +1,111 @@ -// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.collections.Tuple2;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Reply;
-
-/**
- * Encapsulated logic for merging replies from 1-n related DocumentProtocol messages.
- * For internal use only. Not multithread safe.
- */
-final class ReplyMerger {
-
- private Reply successReply = null;
- private int successIndex = -1;
- private Reply error = null;
- private Reply ignore = null;
-
- public void merge(int i, Reply r) {
- if (r.hasErrors()) {
- mergeAllReplyErrors(r);
- } else {
- updateStateWithSuccessfulReply(i, r);
- }
- }
-
- private boolean resourceWasFound(Reply r) {
- if (r instanceof RemoveDocumentReply) {
- return ((RemoveDocumentReply) r).wasFound();
- }
- if (r instanceof UpdateDocumentReply) {
- return ((UpdateDocumentReply) r).wasFound();
- }
- if (r instanceof GetDocumentReply) {
- return ((GetDocumentReply) r).getLastModified() > 0;
- }
- return false;
- }
-
- private boolean replyIsBetterThanCurrent(Reply r) {
- return resourceWasFound(r) && !resourceWasFound(successReply);
- }
-
- private void updateStateWithSuccessfulReply(int i, Reply r) {
- if (successReply == null || replyIsBetterThanCurrent(r)) {
- setCurrentBestReply(i, r);
- }
- }
-
- private void setCurrentBestReply(int i, Reply r) {
- successReply = r;
- successIndex = i;
- }
-
- private void mergeAllReplyErrors(Reply r) {
- if (handleReplyWithOnlyIgnoredErrors(r)) {
- return;
- }
- if (error == null) {
- error = new EmptyReply();
- r.swapState(error);
- return;
- }
- for (int j = 0; j < r.getNumErrors(); ++j) {
- error.addError(r.getError(j));
- }
- }
-
- private boolean handleReplyWithOnlyIgnoredErrors(Reply r) {
- if (DocumentProtocol.hasOnlyErrorsOfType(r, DocumentProtocol.ERROR_MESSAGE_IGNORED)) {
- if (ignore == null) {
- ignore = new EmptyReply();
- }
- ignore.addError(r.getError(0));
- return true;
- }
- return false;
- }
-
- private boolean shouldReturnErrorReply() {
- return (error != null || (ignore != null && successReply == null));
- }
-
- private Tuple2<Integer, Reply> createMergedErrorReplyResult() {
- if (error != null) {
- return new Tuple2<>(null, error);
- }
- if (ignore != null && successReply == null) {
- return new Tuple2<>(null, ignore);
- }
- throw new IllegalStateException("createMergedErrorReplyResult called without error");
- }
-
- private boolean successfullyMergedAtLeastOneReply() {
- return successReply != null;
- }
-
- private Tuple2<Integer, Reply> createEmptyReplyResult() {
- return new Tuple2<>(null, (Reply)new EmptyReply());
- }
-
- public Tuple2<Integer, Reply> mergedReply() {
- if (shouldReturnErrorReply()) {
- return createMergedErrorReplyResult();
- } else if (!successfullyMergedAtLeastOneReply()) {
- return createEmptyReplyResult();
- }
- return new Tuple2<>(successIndex, successReply);
- }
-
-}
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.collections.Tuple2; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Reply; + +/** + * Encapsulated logic for merging replies from 1-n related DocumentProtocol messages. + * For internal use only. Not multithread safe. + */ +final class ReplyMerger { + + private Reply successReply = null; + private int successIndex = -1; + private Reply error = null; + private Reply ignore = null; + + public void merge(int i, Reply r) { + if (r.hasErrors()) { + mergeAllReplyErrors(r); + } else { + updateStateWithSuccessfulReply(i, r); + } + } + + private boolean resourceWasFound(Reply r) { + if (r instanceof RemoveDocumentReply) { + return ((RemoveDocumentReply) r).wasFound(); + } + if (r instanceof UpdateDocumentReply) { + return ((UpdateDocumentReply) r).wasFound(); + } + if (r instanceof GetDocumentReply) { + return ((GetDocumentReply) r).getLastModified() > 0; + } + return false; + } + + private boolean replyIsBetterThanCurrent(Reply r) { + return resourceWasFound(r) && !resourceWasFound(successReply); + } + + private void updateStateWithSuccessfulReply(int i, Reply r) { + if (successReply == null || replyIsBetterThanCurrent(r)) { + setCurrentBestReply(i, r); + } + } + + private void setCurrentBestReply(int i, Reply r) { + successReply = r; + successIndex = i; + } + + private void mergeAllReplyErrors(Reply r) { + if (handleReplyWithOnlyIgnoredErrors(r)) { + return; + } + if (error == null) { + error = new EmptyReply(); + r.swapState(error); + return; + } + for (int j = 0; j < r.getNumErrors(); ++j) { + error.addError(r.getError(j)); + } + } + + private boolean handleReplyWithOnlyIgnoredErrors(Reply r) { + if (DocumentProtocol.hasOnlyErrorsOfType(r, DocumentProtocol.ERROR_MESSAGE_IGNORED)) { + if (ignore == null) { + ignore = new EmptyReply(); + } + ignore.addError(r.getError(0)); + return true; + } + return false; + } + + private boolean shouldReturnErrorReply() { + return (error != null || (ignore != null && successReply == null)); + } + + private Tuple2<Integer, Reply> createMergedErrorReplyResult() { + if (error != null) { + return new Tuple2<>(null, error); + } + if (ignore != null && successReply == null) { + return new Tuple2<>(null, ignore); + } + throw new IllegalStateException("createMergedErrorReplyResult called without error"); + } + + private boolean successfullyMergedAtLeastOneReply() { + return successReply != null; + } + + private Tuple2<Integer, Reply> createEmptyReplyResult() { + return new Tuple2<>(null, (Reply)new EmptyReply()); + } + + public Tuple2<Integer, Reply> mergedReply() { + if (shouldReturnErrorReply()) { + return createMergedErrorReplyResult(); + } else if (!successfullyMergedAtLeastOneReply()) { + return createEmptyReplyResult(); + } + return new Tuple2<>(successIndex, successReply); + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoundRobinPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoundRobinPolicy.java index f0e49146851..bbacefdc80b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoundRobinPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoundRobinPolicy.java @@ -1,125 +1,125 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This policy implements round-robin selection of the configured recipients that are currently registered in slobrok.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RoundRobinPolicy implements DocumentProtocolRoutingPolicy {
-
- private final Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>();
-
- // Inherit doc from RoutingPolicy.
- public void select(RoutingContext ctx) {
- Hop hop = getRecipient(ctx);
- if (hop != null) {
- Route route = new Route(ctx.getRoute());
- route.setHop(0, hop);
- ctx.addChild(route);
- } else {
- Reply reply = new EmptyReply();
- reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "None of the configured recipients are currently available."));
- ctx.setReply(reply);
- }
- }
-
- // Inherit doc from RoutingPolicy.
- public void merge(RoutingContext ctx) {
- DocumentProtocol.merge(ctx);
- }
-
- /**
- * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to
- * the internal cache.
- *
- * @param ctx The routing context.
- * @return The recipient hop to use.
- */
- private synchronized Hop getRecipient(RoutingContext ctx) {
- CacheEntry entry = update(ctx);
- if (entry.recipients.isEmpty()) {
- return null;
- }
- if (++entry.offset >= entry.recipients.size()) {
- entry.offset = 0;
- }
- return new Hop(entry.recipients.get(entry.offset));
- }
-
- /**
- * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is
- * handled outside of it.
- *
- * @param ctx The routing context.
- * @return The updated cache entry.
- */
- private CacheEntry update(RoutingContext ctx) {
- String key = getCacheKey(ctx);
- CacheEntry entry = cache.get(key);
- if (entry == null) {
- entry = new CacheEntry();
- cache.put(key, entry);
- }
-
- int upd = ctx.getMirror().updates();
- if (entry.generation != upd) {
- entry.generation = upd;
- entry.recipients.clear();
- for (int i = 0; i < ctx.getNumRecipients(); ++i) {
- Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getRecipient(i).getHop(0).toString());
- for (Mirror.Entry item : arr) {
- entry.recipients.add(Hop.parse(item.getName()));
- }
- }
- }
- return entry;
- }
-
- /**
- * Returns a cache key for this instance of the policy. Because behaviour is based on the recipient list of this
- * policy, the cache key is the concatenated string of recipient routes.
- *
- * @param ctx The routing context.
- * @return The cache key.
- */
- private String getCacheKey(RoutingContext ctx) {
- StringBuilder ret = new StringBuilder();
- for (int i = 0; i < ctx.getNumRecipients(); ++i) {
- ret.append(ctx.getRecipient(i).getHop(0).toString()).append(" ");
- }
- return ret.toString();
- }
-
- /**
- * Defines the necessary cache data.
- */
- private class CacheEntry {
- private final List<Hop> recipients = new ArrayList<Hop>();
- private int generation = 0;
- private int offset = 0;
- }
-
- public void destroy() {
- }
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This policy implements round-robin selection of the configured recipients that are currently registered in slobrok. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RoundRobinPolicy implements DocumentProtocolRoutingPolicy { + + private final Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>(); + + // Inherit doc from RoutingPolicy. + public void select(RoutingContext ctx) { + Hop hop = getRecipient(ctx); + if (hop != null) { + Route route = new Route(ctx.getRoute()); + route.setHop(0, hop); + ctx.addChild(route); + } else { + Reply reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "None of the configured recipients are currently available.")); + ctx.setReply(reply); + } + } + + // Inherit doc from RoutingPolicy. + public void merge(RoutingContext ctx) { + DocumentProtocol.merge(ctx); + } + + /** + * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to + * the internal cache. + * + * @param ctx The routing context. + * @return The recipient hop to use. + */ + private synchronized Hop getRecipient(RoutingContext ctx) { + CacheEntry entry = update(ctx); + if (entry.recipients.isEmpty()) { + return null; + } + if (++entry.offset >= entry.recipients.size()) { + entry.offset = 0; + } + return new Hop(entry.recipients.get(entry.offset)); + } + + /** + * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is + * handled outside of it. + * + * @param ctx The routing context. + * @return The updated cache entry. + */ + private CacheEntry update(RoutingContext ctx) { + String key = getCacheKey(ctx); + CacheEntry entry = cache.get(key); + if (entry == null) { + entry = new CacheEntry(); + cache.put(key, entry); + } + + int upd = ctx.getMirror().updates(); + if (entry.generation != upd) { + entry.generation = upd; + entry.recipients.clear(); + for (int i = 0; i < ctx.getNumRecipients(); ++i) { + Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getRecipient(i).getHop(0).toString()); + for (Mirror.Entry item : arr) { + entry.recipients.add(Hop.parse(item.getName())); + } + } + } + return entry; + } + + /** + * Returns a cache key for this instance of the policy. Because behaviour is based on the recipient list of this + * policy, the cache key is the concatenated string of recipient routes. + * + * @param ctx The routing context. + * @return The cache key. + */ + private String getCacheKey(RoutingContext ctx) { + StringBuilder ret = new StringBuilder(); + for (int i = 0; i < ctx.getNumRecipients(); ++i) { + ret.append(ctx.getRecipient(i).getHop(0).toString()).append(" "); + } + return ret.toString(); + } + + /** + * Defines the necessary cache data. + */ + private class CacheEntry { + private final List<Hop> recipients = new ArrayList<Hop>(); + private int generation = 0; + private int offset = 0; + } + + public void destroy() { + } + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableRepository.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableRepository.java index 6f044a1951f..5f29f3600e4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableRepository.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableRepository.java @@ -1,237 +1,237 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.component.Version;
-import com.yahoo.component.VersionSpecification;
-import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.serialization.*;
-import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
-import com.yahoo.io.GrowableByteBuffer;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.Routable;
-
-import java.util.*;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * This class encapsulates the logic required to map routable type and version to a corresponding {@link
- * RoutableFactory}. It is owned and accessed through a {@link DocumentProtocol} instance. This class uses a factory
- * cache to reduce the latency of matching version specifications to actual versions when resolving factories.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-final class RoutableRepository {
-
- private static final Logger log = Logger.getLogger(RoutableRepository.class.getName());
- private final CopyOnWriteHashMap<Integer, VersionMap> factoryTypes = new CopyOnWriteHashMap<>();
- private final CopyOnWriteHashMap<CacheKey, RoutableFactory> cache = new CopyOnWriteHashMap<>();
- private LoadTypeSet loadTypes;
-
- public RoutableRepository(LoadTypeSet set) {
- loadTypes = set;
- }
-
- /**
- * Decodes a {@link Routable} from the given byte array. This uses the content of the byte array to dispatch the
- * decode request to the appropriate {@link RoutableFactory} that was previously registered.
- *
- * If a routable can not be decoded, this method returns null.
- *
- * @param version The version of the encoded routable.
- * @param data The byte array containing the encoded routable.
- * @return The decoded routable.
- */
- Routable decode(DocumentTypeManager docMan, Version version, byte[] data) {
- if (data == null || data.length == 0) {
- log.log(LogLevel.ERROR, "Received empty byte array for deserialization.");
- return null;
- }
- DocumentDeserializer in;
-
- if (version.getMajor() >= 5) {
- in = DocumentDeserializerFactory.createHead(docMan, GrowableByteBuffer.wrap(data));
- } else {
- in = DocumentDeserializerFactory.create42(docMan, GrowableByteBuffer.wrap(data));
- }
-
- int type = in.getInt(null);
- RoutableFactory factory = getFactory(version, type);
- if (factory == null) {
- log.log(LogLevel.ERROR, "No routable factory found for routable type " + type +
- " (version " + version + ").");
- return null;
- }
- Routable ret = factory.decode(in, loadTypes);
- if (ret == null) {
- log.log(LogLevel.ERROR, "Routable factory " + factory.getClass().getName() + " failed to deserialize " +
- "routable of type " + type + " (version " + version + ").");
- log.log(LogLevel.ERROR, Arrays.toString(data));
- return null;
- }
- return ret;
- }
-
- /**
- * Encodes a {@link Routable} into a byte array. This dispatches the encode request to the appropriate {@link
- * RoutableFactory} that was previously registered.
- *
- * If a routable can not be encoded, this method returns an empty byte array.
- *
- * @param version The version to encode the routable as.
- * @param obj The routable to encode.
- * @return The byte array containing the encoded routable.
- */
- byte[] encode(Version version, Routable obj) {
- int type = obj.getType();
- RoutableFactory factory = getFactory(version, type);
- if (factory == null) {
- log.log(LogLevel.ERROR, "No routable factory found for routable type " + type +
- " (version " + version + ").");
- return new byte[0];
- }
- DocumentSerializer out;
-
- if (version.getMajor() >= 5) {
- out = DocumentSerializerFactory.createHead(new GrowableByteBuffer(8192));
- } else {
- out = DocumentSerializerFactory.create42(new GrowableByteBuffer(8192));
- }
-
- out.putInt(null, type);
- if (!factory.encode(obj, out)) {
- log.log(LogLevel.ERROR, "Routable factory " + factory.getClass().getName() + " failed to serialize " +
- "routable of type " + type + " (version " + version + ").");
- return new byte[0];
- }
- byte[] ret = new byte[out.getBuf().position()];
- out.getBuf().rewind();
- out.getBuf().get(ret);
- return ret;
- }
-
- /**
- * Registers a routable factory for a given version and routable type.
- *
- * @param version The version specification that the given factory supports.
- * @param type The routable type that the given factory supports.
- * @param factory The routable factory to register.
- */
- void putFactory(VersionSpecification version, int type, RoutableFactory factory) {
- VersionMap versionMap = factoryTypes.get(type);
- if (versionMap == null) {
- versionMap = new VersionMap();
-
- factoryTypes.put(type, versionMap);
- }
- if (versionMap.putFactory(version, factory)) {
- cache.clear();
- }
- }
-
- /**
- * Returns the routable factory for a given version and routable type.
- *
- * @param version The version that the factory must support.
- * @param type The routable type that the factory must support.
- * @return The routable factory matching the criteria, or null.
- */
- RoutableFactory getFactory(Version version, int type) {
- CacheKey cacheKey = new CacheKey(version, type);
- RoutableFactory factory = cache.get(cacheKey);
- if (factory != null) {
- return factory;
- }
- VersionMap versionMap = factoryTypes.get(type);
- if (versionMap == null) {
- return null;
- }
- factory = versionMap.getFactory(version);
- if (factory == null) {
- return null;
- }
- cache.put(cacheKey, factory);
- return factory;
- }
-
- /**
- * Returns a list of routable types that support the given version.
- *
- * @param version The version to return types for.
- * @return The list of supported types.
- */
- List<Integer> getRoutableTypes(Version version) {
- List<Integer> ret = new ArrayList<>();
- for (Map.Entry<Integer, VersionMap> entry : factoryTypes.entrySet()) {
- if (entry.getValue().getFactory(version) != null) {
- ret.add(entry.getKey());
- }
- }
- return ret;
- }
-
- /**
- * Internal helper class that implements a map from {@link VersionSpecification} to {@link RoutableFactory}.
- */
- private static class VersionMap {
-
- final Map<VersionSpecification, RoutableFactory> factoryVersions = new HashMap<>();
-
- boolean putFactory(VersionSpecification version, RoutableFactory factory) {
- return factoryVersions.put(version, factory) == null;
- }
-
- RoutableFactory getFactory(Version version) {
- VersionSpecification versionSpec = version.toSpecification();
-
- // Retrieve the factory with the highest version lower than or equal to actual version
- return factoryVersions.entrySet().stream()
- // Drop factories that have a higher version than actual version
- .filter(entry -> entry.getKey().compareTo(versionSpec) <= 0)
-
- // Get the factory with the highest version
- .max((entry1, entry2) -> entry1.getKey().compareTo(entry2.getKey()))
- .map(Map.Entry::getValue)
-
- // Return factory or null if no suitable factory found
- .orElse(null);
- }
- }
-
- /**
- * Internal helper class that implements a cache key for mapping a {@link Version} and routable type to a {@link
- * RoutableFactory}.
- */
- private static class CacheKey {
-
- final Version version;
- final int type;
-
- public CacheKey(Version version, int type) {
- this.version = version;
- this.type = type;
- }
-
- @Override
- public int hashCode() {
- return version.hashCode() + type;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof CacheKey)) {
- return false;
- }
- CacheKey rhs = (CacheKey)obj;
- if (!version.equals(rhs.version)) {
- return false;
- }
- if (type != rhs.type) {
- return false;
- }
- return true;
- }
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.component.Version; +import com.yahoo.component.VersionSpecification; +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.serialization.*; +import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Routable; + +import java.util.*; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class encapsulates the logic required to map routable type and version to a corresponding {@link + * RoutableFactory}. It is owned and accessed through a {@link DocumentProtocol} instance. This class uses a factory + * cache to reduce the latency of matching version specifications to actual versions when resolving factories. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +final class RoutableRepository { + + private static final Logger log = Logger.getLogger(RoutableRepository.class.getName()); + private final CopyOnWriteHashMap<Integer, VersionMap> factoryTypes = new CopyOnWriteHashMap<>(); + private final CopyOnWriteHashMap<CacheKey, RoutableFactory> cache = new CopyOnWriteHashMap<>(); + private LoadTypeSet loadTypes; + + public RoutableRepository(LoadTypeSet set) { + loadTypes = set; + } + + /** + * Decodes a {@link Routable} from the given byte array. This uses the content of the byte array to dispatch the + * decode request to the appropriate {@link RoutableFactory} that was previously registered. + * + * If a routable can not be decoded, this method returns null. + * + * @param version The version of the encoded routable. + * @param data The byte array containing the encoded routable. + * @return The decoded routable. + */ + Routable decode(DocumentTypeManager docMan, Version version, byte[] data) { + if (data == null || data.length == 0) { + log.log(LogLevel.ERROR, "Received empty byte array for deserialization."); + return null; + } + DocumentDeserializer in; + + if (version.getMajor() >= 5) { + in = DocumentDeserializerFactory.createHead(docMan, GrowableByteBuffer.wrap(data)); + } else { + in = DocumentDeserializerFactory.create42(docMan, GrowableByteBuffer.wrap(data)); + } + + int type = in.getInt(null); + RoutableFactory factory = getFactory(version, type); + if (factory == null) { + log.log(LogLevel.ERROR, "No routable factory found for routable type " + type + + " (version " + version + ")."); + return null; + } + Routable ret = factory.decode(in, loadTypes); + if (ret == null) { + log.log(LogLevel.ERROR, "Routable factory " + factory.getClass().getName() + " failed to deserialize " + + "routable of type " + type + " (version " + version + ")."); + log.log(LogLevel.ERROR, Arrays.toString(data)); + return null; + } + return ret; + } + + /** + * Encodes a {@link Routable} into a byte array. This dispatches the encode request to the appropriate {@link + * RoutableFactory} that was previously registered. + * + * If a routable can not be encoded, this method returns an empty byte array. + * + * @param version The version to encode the routable as. + * @param obj The routable to encode. + * @return The byte array containing the encoded routable. + */ + byte[] encode(Version version, Routable obj) { + int type = obj.getType(); + RoutableFactory factory = getFactory(version, type); + if (factory == null) { + log.log(LogLevel.ERROR, "No routable factory found for routable type " + type + + " (version " + version + ")."); + return new byte[0]; + } + DocumentSerializer out; + + if (version.getMajor() >= 5) { + out = DocumentSerializerFactory.createHead(new GrowableByteBuffer(8192)); + } else { + out = DocumentSerializerFactory.create42(new GrowableByteBuffer(8192)); + } + + out.putInt(null, type); + if (!factory.encode(obj, out)) { + log.log(LogLevel.ERROR, "Routable factory " + factory.getClass().getName() + " failed to serialize " + + "routable of type " + type + " (version " + version + ")."); + return new byte[0]; + } + byte[] ret = new byte[out.getBuf().position()]; + out.getBuf().rewind(); + out.getBuf().get(ret); + return ret; + } + + /** + * Registers a routable factory for a given version and routable type. + * + * @param version The version specification that the given factory supports. + * @param type The routable type that the given factory supports. + * @param factory The routable factory to register. + */ + void putFactory(VersionSpecification version, int type, RoutableFactory factory) { + VersionMap versionMap = factoryTypes.get(type); + if (versionMap == null) { + versionMap = new VersionMap(); + + factoryTypes.put(type, versionMap); + } + if (versionMap.putFactory(version, factory)) { + cache.clear(); + } + } + + /** + * Returns the routable factory for a given version and routable type. + * + * @param version The version that the factory must support. + * @param type The routable type that the factory must support. + * @return The routable factory matching the criteria, or null. + */ + RoutableFactory getFactory(Version version, int type) { + CacheKey cacheKey = new CacheKey(version, type); + RoutableFactory factory = cache.get(cacheKey); + if (factory != null) { + return factory; + } + VersionMap versionMap = factoryTypes.get(type); + if (versionMap == null) { + return null; + } + factory = versionMap.getFactory(version); + if (factory == null) { + return null; + } + cache.put(cacheKey, factory); + return factory; + } + + /** + * Returns a list of routable types that support the given version. + * + * @param version The version to return types for. + * @return The list of supported types. + */ + List<Integer> getRoutableTypes(Version version) { + List<Integer> ret = new ArrayList<>(); + for (Map.Entry<Integer, VersionMap> entry : factoryTypes.entrySet()) { + if (entry.getValue().getFactory(version) != null) { + ret.add(entry.getKey()); + } + } + return ret; + } + + /** + * Internal helper class that implements a map from {@link VersionSpecification} to {@link RoutableFactory}. + */ + private static class VersionMap { + + final Map<VersionSpecification, RoutableFactory> factoryVersions = new HashMap<>(); + + boolean putFactory(VersionSpecification version, RoutableFactory factory) { + return factoryVersions.put(version, factory) == null; + } + + RoutableFactory getFactory(Version version) { + VersionSpecification versionSpec = version.toSpecification(); + + // Retrieve the factory with the highest version lower than or equal to actual version + return factoryVersions.entrySet().stream() + // Drop factories that have a higher version than actual version + .filter(entry -> entry.getKey().compareTo(versionSpec) <= 0) + + // Get the factory with the highest version + .max((entry1, entry2) -> entry1.getKey().compareTo(entry2.getKey())) + .map(Map.Entry::getValue) + + // Return factory or null if no suitable factory found + .orElse(null); + } + } + + /** + * Internal helper class that implements a cache key for mapping a {@link Version} and routable type to a {@link + * RoutableFactory}. + */ + private static class CacheKey { + + final Version version; + final int type; + + public CacheKey(Version version, int type) { + this.version = version; + this.type = type; + } + + @Override + public int hashCode() { + return version.hashCode() + type; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof CacheKey)) { + return false; + } + CacheKey rhs = (CacheKey)obj; + if (!version.equals(rhs.version)) { + return false; + } + if (type != rhs.type) { + return false; + } + return true; + } + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java index 05e39503308..beb295509d1 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java @@ -1,148 +1,148 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public abstract class RoutingPolicyFactories {
-
- static class AndPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new ANDPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class StoragePolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new StoragePolicy(param);
- }
-
- public void destroy() {
- }
- }
-
- static class ContentPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new ContentPolicy(param);
- }
-
- public void destroy() {
- }
- }
-
- static class MessageTypePolicyFactory implements RoutingPolicyFactory {
- private final String configId;
-
- public MessageTypePolicyFactory(String configId) {
- this.configId = configId;
- }
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new MessageTypePolicy((param == null || param.isEmpty()) ? configId : param);
- }
-
- public void destroy() {
- }
- }
-
- static class DocumentRouteSelectorPolicyFactory implements RoutingPolicyFactory {
-
- private final String configId;
-
- public DocumentRouteSelectorPolicyFactory(String configId) {
- this.configId = configId;
- }
-
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- DocumentRouteSelectorPolicy ret = new DocumentRouteSelectorPolicy((param == null || param.isEmpty()) ?
- configId : param);
- String error = ret.getError();
- if (error != null) {
- return new ErrorPolicy(error);
- }
- return ret;
- }
-
-
- public void destroy() {
- }
- }
-
- static class ExternPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- ExternPolicy ret = new ExternPolicy(param);
- String error = ret.getError();
- if (error != null) {
- return new ErrorPolicy(error);
- }
- return ret;
- }
-
-
- public void destroy() {
- }
- }
-
- static class LocalServicePolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new LocalServicePolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class RoundRobinPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new RoundRobinPolicy();
- }
-
-
- public void destroy() {
- }
- }
-
- static class LoadBalancerPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new LoadBalancerPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class SearchColumnPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchColumnPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class SearchRowPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchRowPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class SubsetServicePolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SubsetServicePolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public abstract class RoutingPolicyFactories { + + static class AndPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new ANDPolicy(param); + } + + + public void destroy() { + } + } + + static class StoragePolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new StoragePolicy(param); + } + + public void destroy() { + } + } + + static class ContentPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new ContentPolicy(param); + } + + public void destroy() { + } + } + + static class MessageTypePolicyFactory implements RoutingPolicyFactory { + private final String configId; + + public MessageTypePolicyFactory(String configId) { + this.configId = configId; + } + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new MessageTypePolicy((param == null || param.isEmpty()) ? configId : param); + } + + public void destroy() { + } + } + + static class DocumentRouteSelectorPolicyFactory implements RoutingPolicyFactory { + + private final String configId; + + public DocumentRouteSelectorPolicyFactory(String configId) { + this.configId = configId; + } + + public DocumentProtocolRoutingPolicy createPolicy(String param) { + DocumentRouteSelectorPolicy ret = new DocumentRouteSelectorPolicy((param == null || param.isEmpty()) ? + configId : param); + String error = ret.getError(); + if (error != null) { + return new ErrorPolicy(error); + } + return ret; + } + + + public void destroy() { + } + } + + static class ExternPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + ExternPolicy ret = new ExternPolicy(param); + String error = ret.getError(); + if (error != null) { + return new ErrorPolicy(error); + } + return ret; + } + + + public void destroy() { + } + } + + static class LocalServicePolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new LocalServicePolicy(param); + } + + + public void destroy() { + } + } + + static class RoundRobinPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new RoundRobinPolicy(); + } + + + public void destroy() { + } + } + + static class LoadBalancerPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new LoadBalancerPolicy(param); + } + + + public void destroy() { + } + } + + static class SearchColumnPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new SearchColumnPolicy(param); + } + + + public void destroy() { + } + } + + static class SearchRowPolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new SearchRowPolicy(param); + } + + + public void destroy() { + } + } + + static class SubsetServicePolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new SubsetServicePolicy(param); + } + + + public void destroy() { + } + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyRepository.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyRepository.java index 3bfa85ac4d5..26b0eec8cf4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyRepository.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyRepository.java @@ -1,76 +1,76 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.documentapi.metrics.DocumentProtocolMetricSet;
-import com.yahoo.messagebus.routing.RoutingPolicy;
-import com.yahoo.log.LogLevel;
-
-import java.util.Map;
-import java.util.logging.Logger;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-class RoutingPolicyRepository {
-
- private static final Logger log = Logger.getLogger(RoutingPolicyRepository.class.getName());
- private final Map<String, RoutingPolicyFactory> factories = new ConcurrentHashMap<String, RoutingPolicyFactory>();
- private final DocumentProtocolMetricSet metrics;
-
- RoutingPolicyRepository(DocumentProtocolMetricSet metrics) {
- this.metrics = metrics;
- }
-
- /**
- * Registers a routing policy factory for a given name.
- *
- * @param name The name of the factory to register.
- * @param factory The factory to register.
- */
- void putFactory(String name, RoutingPolicyFactory factory) {
- factories.put(name, factory);
- }
-
- /**
- * Returns the routing policy factory for a given name.
- *
- * @param name The name of the factory to return.
- * @return The routing policy factory matching the criteria, or null.
- */
- RoutingPolicyFactory getFactory(String name) {
- return factories.get(name);
- }
-
- /**
- * Creates and returns a routing policy using the named factory and the given parameter.
- *
- * @param name The name of the factory to use.
- * @param param The parameter to pass to the factory.
- * @return The created policy.
- */
- RoutingPolicy createPolicy(String name, String param) {
- RoutingPolicyFactory factory = getFactory(name);
- if (factory == null) {
- log.log(LogLevel.ERROR, "No routing policy factory found for name '" + name + "'.");
- return null;
- }
- DocumentProtocolRoutingPolicy ret;
- try {
- ret = factory.createPolicy(param);
- } catch (Exception e) {
- ret = new ErrorPolicy(e.getMessage());
- }
-
- if (ret.getMetrics() != null) {
- metrics.routingPolicyMetrics.addMetric(ret.getMetrics());
- }
-
- if (ret == null) {
- log.log(LogLevel.ERROR, "Routing policy factory " + factory.getClass().getName() + " failed to create a " +
- "routing policy for parameter '" + name + "'.");
- return null;
- }
- return ret;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.documentapi.metrics.DocumentProtocolMetricSet; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.log.LogLevel; + +import java.util.Map; +import java.util.logging.Logger; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +class RoutingPolicyRepository { + + private static final Logger log = Logger.getLogger(RoutingPolicyRepository.class.getName()); + private final Map<String, RoutingPolicyFactory> factories = new ConcurrentHashMap<String, RoutingPolicyFactory>(); + private final DocumentProtocolMetricSet metrics; + + RoutingPolicyRepository(DocumentProtocolMetricSet metrics) { + this.metrics = metrics; + } + + /** + * Registers a routing policy factory for a given name. + * + * @param name The name of the factory to register. + * @param factory The factory to register. + */ + void putFactory(String name, RoutingPolicyFactory factory) { + factories.put(name, factory); + } + + /** + * Returns the routing policy factory for a given name. + * + * @param name The name of the factory to return. + * @return The routing policy factory matching the criteria, or null. + */ + RoutingPolicyFactory getFactory(String name) { + return factories.get(name); + } + + /** + * Creates and returns a routing policy using the named factory and the given parameter. + * + * @param name The name of the factory to use. + * @param param The parameter to pass to the factory. + * @return The created policy. + */ + RoutingPolicy createPolicy(String name, String param) { + RoutingPolicyFactory factory = getFactory(name); + if (factory == null) { + log.log(LogLevel.ERROR, "No routing policy factory found for name '" + name + "'."); + return null; + } + DocumentProtocolRoutingPolicy ret; + try { + ret = factory.createPolicy(param); + } catch (Exception e) { + ret = new ErrorPolicy(e.getMessage()); + } + + if (ret.getMetrics() != null) { + metrics.routingPolicyMetrics.addMetric(ret.getMetrics()); + } + + if (ret == null) { + log.log(LogLevel.ERROR, "Routing policy factory " + factory.getClass().getName() + " failed to create a " + + "routing policy for parameter '" + name + "'."); + return null; + } + return ret; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java index d36d3ee1e4c..d6c644b238c 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java @@ -1,85 +1,85 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SearchRowPolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SearchRowPolicy.class.getName());
- private int minOk = 0; // Hide OUT_OF_SERVICE as long as this number of replies are something else.
-
- /**
- * Creates a search row policy that wraps the underlying search group policy in case the parameter is something
- * other than an empty string.
- *
- * @param param The number of minimum non-OOS replies that this policy requires.
- */
- public SearchRowPolicy(String param) {
- if (param != null && param.length() > 0) {
- try {
- minOk = Integer.parseInt(param);
- }
- catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (minOk <= 0) {
- log.log(LogLevel.WARNING, "Ignoring a request to set the minimum number of OK replies to " + minOk + " " +
- "because it makes no sense. This routing policy will not allow any recipient " +
- "to be out of service.");
- }
- }
- }
-
- @Override
- public void select(RoutingContext context) {
- context.addChildren(context.getMatchedRecipients());
- context.setSelectOnRetry(false);
- if (minOk > 0) {
- context.addConsumableError(ErrorCode.SERVICE_OOS);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- if (minOk > 0) {
- Set<Integer> oosReplies = new HashSet<Integer>();
- int idx = 0;
- for (RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- Reply ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- oosReplies.add(idx);
- }
- ++idx;
- }
- if (context.getNumChildren() - oosReplies.size() >= minOk) {
- DocumentProtocol.merge(context, oosReplies);
- return;
- }
- }
- DocumentProtocol.merge(context);
- }
-
- @Override
- public void destroy() {
- // empty
- }
-
- @Override
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingNodeIterator; + +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SearchRowPolicy implements DocumentProtocolRoutingPolicy { + + private static Logger log = Logger.getLogger(SearchRowPolicy.class.getName()); + private int minOk = 0; // Hide OUT_OF_SERVICE as long as this number of replies are something else. + + /** + * Creates a search row policy that wraps the underlying search group policy in case the parameter is something + * other than an empty string. + * + * @param param The number of minimum non-OOS replies that this policy requires. + */ + public SearchRowPolicy(String param) { + if (param != null && param.length() > 0) { + try { + minOk = Integer.parseInt(param); + } + catch (NumberFormatException e) { + log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e); + } + if (minOk <= 0) { + log.log(LogLevel.WARNING, "Ignoring a request to set the minimum number of OK replies to " + minOk + " " + + "because it makes no sense. This routing policy will not allow any recipient " + + "to be out of service."); + } + } + } + + @Override + public void select(RoutingContext context) { + context.addChildren(context.getMatchedRecipients()); + context.setSelectOnRetry(false); + if (minOk > 0) { + context.addConsumableError(ErrorCode.SERVICE_OOS); + } + } + + @Override + public void merge(RoutingContext context) { + if (minOk > 0) { + Set<Integer> oosReplies = new HashSet<Integer>(); + int idx = 0; + for (RoutingNodeIterator it = context.getChildIterator(); + it.isValid(); it.next()) + { + Reply ref = it.getReplyRef(); + if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) { + oosReplies.add(idx); + } + ++idx; + } + if (context.getNumChildren() - oosReplies.size() >= minOk) { + DocumentProtocol.merge(context, oosReplies); + return; + } + } + DocumentProtocol.merge(context); + } + + @Override + public void destroy() { + // empty + } + + @Override + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java index 3854637ba5f..615699c674e 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java @@ -1,60 +1,60 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-
-public class StatBucketMessage extends DocumentMessage {
-
- private BucketId bucketId;
- private String documentSelection;
-
- StatBucketMessage() {
- // need to deserialize into
- }
-
- public StatBucketMessage(BucketId bucket, String documentSelection) {
- this.bucketId = bucket;
- this.documentSelection = documentSelection;
- }
-
- public BucketId getBucketId() {
- return bucketId;
- }
-
- void setBucketId(BucketId bucket) {
- bucketId = bucket;
- }
-
- public String getDocumentSelection() {
- return documentSelection;
- }
-
- void setDocumentSelection(String documentSelection) {
- this.documentSelection = documentSelection;
- }
-
- @Override
- public DocumentReply createReply() {
- return new StatBucketReply();
- }
-
- @Override
- public int getApproxSize() {
- return super.getApproxSize() + 8 + documentSelection.length();
- }
-
- @Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return bucketId.getRawId();
- }
-
- @Override
- public int getType() {
- return DocumentProtocol.MESSAGE_STATBUCKET;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.BucketId; + +public class StatBucketMessage extends DocumentMessage { + + private BucketId bucketId; + private String documentSelection; + + StatBucketMessage() { + // need to deserialize into + } + + public StatBucketMessage(BucketId bucket, String documentSelection) { + this.bucketId = bucket; + this.documentSelection = documentSelection; + } + + public BucketId getBucketId() { + return bucketId; + } + + void setBucketId(BucketId bucket) { + bucketId = bucket; + } + + public String getDocumentSelection() { + return documentSelection; + } + + void setDocumentSelection(String documentSelection) { + this.documentSelection = documentSelection; + } + + @Override + public DocumentReply createReply() { + return new StatBucketReply(); + } + + @Override + public int getApproxSize() { + return super.getApproxSize() + 8 + documentSelection.length(); + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return bucketId.getRawId(); + } + + @Override + public int getType() { + return DocumentProtocol.MESSAGE_STATBUCKET; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java index 43c369106d1..a1439ef845f 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java @@ -1,19 +1,19 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-public class StatBucketReply extends DocumentReply {
-
- private String results = "";
-
- public StatBucketReply() {
- super(DocumentProtocol.REPLY_STATBUCKET);
- }
-
- public String getResults() {
- return results;
- }
-
- public void setResults(String result) {
- results = result;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +public class StatBucketReply extends DocumentReply { + + private String results = ""; + + public StatBucketReply() { + super(DocumentProtocol.REPLY_STATBUCKET); + } + + public String getResults() { + return results; + } + + public void setResults(String result) { + results = result; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SubsetServicePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SubsetServicePolicy.java index dc06fe7042d..76e74b98f86 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SubsetServicePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SubsetServicePolicy.java @@ -1,145 +1,145 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.*;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-/**
- * This policy implements the logic to select a subset of services that matches a slobrok pattern.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SubsetServicePolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SubsetServicePolicy.class.getName());
- private final int subsetSize;
- private final Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>();
-
- /**
- * Creates an instance of a subset service policy. The parameter string is parsed as an integer number that is the
- * number of services to include in the set to choose from.
- *
- * @param param The number of services to include in the set.
- */
- public SubsetServicePolicy(String param) {
- int subsetSize = 5;
- if (param != null && param.length() > 0) {
- try {
- subsetSize = Integer.parseInt(param);
- }
- catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (subsetSize <= 0) {
- log.warning("Ignoring a request to set the subset size to " + subsetSize + " because it makes no " +
- "sense. This routing policy will choose any one matching service.");
- }
- } else {
- log.warning("No parameter given to SubsetService policy, using default value " + subsetSize + ".");
- }
- this.subsetSize = subsetSize;
- }
-
- // Inherit doc from RoutingPolicy.
- public void select(RoutingContext ctx) {
- Route route = new Route(ctx.getRoute());
- route.setHop(0, getRecipient(ctx));
- ctx.addChild(route);
- }
-
- // Inherit doc from RoutingPolicy.
- public void merge(RoutingContext ctx) {
- DocumentProtocol.merge(ctx);
- }
-
- /**
- * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to
- * the internal cache.
- *
- * @param ctx The routing context.
- * @return The recipient hop to use.
- */
- private Hop getRecipient(RoutingContext ctx) {
- Hop hop = null;
- if (subsetSize > 0) {
- synchronized (this) {
- CacheEntry entry = update(ctx);
- if (!entry.recipients.isEmpty()) {
- if (++entry.offset >= entry.recipients.size()) {
- entry.offset = 0;
- }
- hop = new Hop(entry.recipients.get(entry.offset));
- }
- }
- }
- if (hop == null) {
- hop = new Hop(ctx.getRoute().getHop(0));
- hop.setDirective(ctx.getDirectiveIndex(), new VerbatimDirective("*"));
- }
- return hop;
- }
-
- /**
- * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is
- * handled outside of it.
- *
- * @param ctx The routing context.
- * @return The updated cache entry.
- */
- private CacheEntry update(RoutingContext ctx) {
- String key = getCacheKey(ctx);
- CacheEntry entry = cache.get(key);
- if (entry == null) {
- entry = new CacheEntry();
- cache.put(key, entry);
- }
-
- int upd = ctx.getMirror().updates();
- if (entry.generation != upd) {
- entry.generation = upd;
- entry.recipients.clear();
-
- Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix());
- int pos = ctx.getMessageBus().getConnectionSpec().hashCode();
- for (int i = 0; i < subsetSize && i < arr.length; ++i) {
- entry.recipients.add(Hop.parse(arr[((pos + i) & Integer.MAX_VALUE) % arr.length].getName()));
- }
- }
- return entry;
- }
-
- /**
- * Returns a cache key for this instance of the policy. Because behaviour is based on the hop in which the policy
- * occurs, the cache key is the hop string itself.
- *
- * @param ctx The routing context.
- * @return The cache key.
- */
- private String getCacheKey(RoutingContext ctx) {
- return ctx.getRoute().getHop(0).toString();
- }
-
- /**
- * Defines the necessary cache data.
- */
- private class CacheEntry {
- private final List<Hop> recipients = new ArrayList<Hop>();
- private int generation = 0;
- private int offset = 0;
- }
-
- public void destroy() {
- }
-
- public MetricSet getMetrics() {
- return null;
- }
-}
+package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.routing.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +/** + * This policy implements the logic to select a subset of services that matches a slobrok pattern. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SubsetServicePolicy implements DocumentProtocolRoutingPolicy { + + private static Logger log = Logger.getLogger(SubsetServicePolicy.class.getName()); + private final int subsetSize; + private final Map<String, CacheEntry> cache = new HashMap<String, CacheEntry>(); + + /** + * Creates an instance of a subset service policy. The parameter string is parsed as an integer number that is the + * number of services to include in the set to choose from. + * + * @param param The number of services to include in the set. + */ + public SubsetServicePolicy(String param) { + int subsetSize = 5; + if (param != null && param.length() > 0) { + try { + subsetSize = Integer.parseInt(param); + } + catch (NumberFormatException e) { + log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e); + } + if (subsetSize <= 0) { + log.warning("Ignoring a request to set the subset size to " + subsetSize + " because it makes no " + + "sense. This routing policy will choose any one matching service."); + } + } else { + log.warning("No parameter given to SubsetService policy, using default value " + subsetSize + "."); + } + this.subsetSize = subsetSize; + } + + // Inherit doc from RoutingPolicy. + public void select(RoutingContext ctx) { + Route route = new Route(ctx.getRoute()); + route.setHop(0, getRecipient(ctx)); + ctx.addChild(route); + } + + // Inherit doc from RoutingPolicy. + public void merge(RoutingContext ctx) { + DocumentProtocol.merge(ctx); + } + + /** + * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to + * the internal cache. + * + * @param ctx The routing context. + * @return The recipient hop to use. + */ + private Hop getRecipient(RoutingContext ctx) { + Hop hop = null; + if (subsetSize > 0) { + synchronized (this) { + CacheEntry entry = update(ctx); + if (!entry.recipients.isEmpty()) { + if (++entry.offset >= entry.recipients.size()) { + entry.offset = 0; + } + hop = new Hop(entry.recipients.get(entry.offset)); + } + } + } + if (hop == null) { + hop = new Hop(ctx.getRoute().getHop(0)); + hop.setDirective(ctx.getDirectiveIndex(), new VerbatimDirective("*")); + } + return hop; + } + + /** + * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is + * handled outside of it. + * + * @param ctx The routing context. + * @return The updated cache entry. + */ + private CacheEntry update(RoutingContext ctx) { + String key = getCacheKey(ctx); + CacheEntry entry = cache.get(key); + if (entry == null) { + entry = new CacheEntry(); + cache.put(key, entry); + } + + int upd = ctx.getMirror().updates(); + if (entry.generation != upd) { + entry.generation = upd; + entry.recipients.clear(); + + Mirror.Entry[] arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix()); + int pos = ctx.getMessageBus().getConnectionSpec().hashCode(); + for (int i = 0; i < subsetSize && i < arr.length; ++i) { + entry.recipients.add(Hop.parse(arr[((pos + i) & Integer.MAX_VALUE) % arr.length].getName())); + } + } + return entry; + } + + /** + * Returns a cache key for this instance of the policy. Because behaviour is based on the hop in which the policy + * occurs, the cache key is the hop string itself. + * + * @param ctx The routing context. + * @return The cache key. + */ + private String getCacheKey(RoutingContext ctx) { + return ctx.getRoute().getHop(0).toString(); + } + + /** + * Defines the necessary cache data. + */ + private class CacheEntry { + private final List<Hop> recipients = new ArrayList<Hop>(); + private int generation = 0; + private int offset = 0; + } + + public void destroy() { + } + + public MetricSet getMetrics() { + return null; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/UpdateDocumentReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/UpdateDocumentReply.java index 5f091101554..30801cbbac0 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/UpdateDocumentReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/UpdateDocumentReply.java @@ -1,35 +1,35 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class UpdateDocumentReply extends WriteDocumentReply {
-
- private boolean found = true;
-
- /**
- * Constructs a new reply with no content.
- */
- public UpdateDocumentReply() {
- super(DocumentProtocol.REPLY_UPDATEDOCUMENT);
- }
-
- /**
- * Returns whether or not the document was found and updated.
- *
- * @return true if document was found
- */
- public boolean wasFound() {
- return found;
- }
-
- /**
- * Sets whether or not the document was found and updated.
- *
- * @param found True if the document was found
- */
- public void setWasFound(boolean found) {
- this.found = found;
- }
-}
\ No newline at end of file +package com.yahoo.documentapi.messagebus.protocol; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class UpdateDocumentReply extends WriteDocumentReply { + + private boolean found = true; + + /** + * Constructs a new reply with no content. + */ + public UpdateDocumentReply() { + super(DocumentProtocol.REPLY_UPDATEDOCUMENT); + } + + /** + * Returns whether or not the document was found and updated. + * + * @return true if document was found + */ + public boolean wasFound() { + return found; + } + + /** + * Sets whether or not the document was found and updated. + * + * @param found True if the document was found + */ + public void setWasFound(boolean found) { + this.found = found; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Argument.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Argument.java index 3a434eab101..e60ff35bdcf 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Argument.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Argument.java @@ -1,40 +1,40 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.systemstate.rule;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class Argument {
-
- private final String name;
- private final String value;
-
- /**
- * Constructs a new argument.
- *
- * @param name The name of this argument.
- * @param value The value of this argument.
- */
- public Argument(String name, String value) {
- this.name = name;
- this.value = value;
- }
-
- /**
- * Returns the name of this argument.
- *
- * @return The name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the value of this argument.
- *
- * @return The value.
- */
- public String getValue() {
- return value;
- }
-}
+package com.yahoo.documentapi.messagebus.systemstate.rule; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class Argument { + + private final String name; + private final String value; + + /** + * Constructs a new argument. + * + * @param name The name of this argument. + * @param value The value of this argument. + */ + public Argument(String name, String value) { + this.name = name; + this.value = value; + } + + /** + * Returns the name of this argument. + * + * @return The name. + */ + public String getName() { + return name; + } + + /** + * Returns the value of this argument. + * + * @return The value. + */ + public String getValue() { + return value; + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Location.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Location.java index 870a39c0122..3a8fe5c6229 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Location.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/Location.java @@ -1,120 +1,120 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.systemstate.rule;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class Location {
-
- private List<String> items = new ArrayList<String>();
-
- /**
- * Constructs a new location with no items.
- */
- public Location() {
- // empty
- }
-
- /**
- * Constructs a new location based on a location string.
- *
- * @param loc The location string to parse.
- */
- public Location(String loc) {
- items.addAll(Arrays.asList(loc.split("/")));
- normalize();
- }
-
- /**
- * Constructs a new location based on a list of items.
- *
- * @param items The components that make up this location.
- */
- public Location(List<String> items) {
- this.items.addAll(items);
- normalize();
- }
-
- /**
- * Constructs a new location as a copy of another.
- *
- * @param loc The location to copy.
- */
- public Location(Location loc) {
- items.addAll(loc.items);
- }
-
- /**
- * Constructs a new location based on a working directory and a list of items.
- *
- * @param pwd The path of the working directory.
- * @param items The components that make up this location.
- */
- public Location(Location pwd, List<String> items) {
- this.items.addAll(pwd.getItems());
- this.items.addAll(items);
- normalize();
- }
-
- /**
- * Returns a location object that represents the "next" step along this location path. This means removing the first
- * elements of this location's items and returning a new location for this sublist.
- *
- * @return The next location along this path.
- */
- public Location getNext() {
- List<String> next = new ArrayList<String>(items);
- next.remove(0);
- return new Location(next);
- }
-
- /**
- * Returns the components of this location.
- *
- * @return The component array.
- */
- public List<String> getItems() {
- return items;
- }
-
- /**
- * Normalizes the items list of this location so that all PREV or THIS locations are replaced by their actual
- * meaning. This carries some overhead since it is not done in place.
- *
- * @return This, to allow chaining.
- */
- private Location normalize() {
- List<String> norm = new ArrayList<String>();
- for (String item : items) {
- if (item.equals(NodeState.NODE_PARENT)) {
- if (norm.size() == 0) {
- // ignore
- }
- else {
- norm.remove(norm.size() - 1);
- }
- }
- else if (!item.equals(NodeState.NODE_CURRENT)) {
- norm.add(item);
- }
- }
- items = norm;
- return this;
- }
-
- @Override
- public String toString() {
- StringBuffer ret = new StringBuffer();
- for (int i = 0; i < items.size(); ++i) {
- ret.append(items.get(i));
- if (i < items.size() - 1) {
- ret.append("/");
- }
- }
- return ret.toString();
- }
-}
+package com.yahoo.documentapi.messagebus.systemstate.rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Arrays; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class Location { + + private List<String> items = new ArrayList<String>(); + + /** + * Constructs a new location with no items. + */ + public Location() { + // empty + } + + /** + * Constructs a new location based on a location string. + * + * @param loc The location string to parse. + */ + public Location(String loc) { + items.addAll(Arrays.asList(loc.split("/"))); + normalize(); + } + + /** + * Constructs a new location based on a list of items. + * + * @param items The components that make up this location. + */ + public Location(List<String> items) { + this.items.addAll(items); + normalize(); + } + + /** + * Constructs a new location as a copy of another. + * + * @param loc The location to copy. + */ + public Location(Location loc) { + items.addAll(loc.items); + } + + /** + * Constructs a new location based on a working directory and a list of items. + * + * @param pwd The path of the working directory. + * @param items The components that make up this location. + */ + public Location(Location pwd, List<String> items) { + this.items.addAll(pwd.getItems()); + this.items.addAll(items); + normalize(); + } + + /** + * Returns a location object that represents the "next" step along this location path. This means removing the first + * elements of this location's items and returning a new location for this sublist. + * + * @return The next location along this path. + */ + public Location getNext() { + List<String> next = new ArrayList<String>(items); + next.remove(0); + return new Location(next); + } + + /** + * Returns the components of this location. + * + * @return The component array. + */ + public List<String> getItems() { + return items; + } + + /** + * Normalizes the items list of this location so that all PREV or THIS locations are replaced by their actual + * meaning. This carries some overhead since it is not done in place. + * + * @return This, to allow chaining. + */ + private Location normalize() { + List<String> norm = new ArrayList<String>(); + for (String item : items) { + if (item.equals(NodeState.NODE_PARENT)) { + if (norm.size() == 0) { + // ignore + } + else { + norm.remove(norm.size() - 1); + } + } + else if (!item.equals(NodeState.NODE_CURRENT)) { + norm.add(item); + } + } + items = norm; + return this; + } + + @Override + public String toString() { + StringBuffer ret = new StringBuffer(); + for (int i = 0; i < items.size(); ++i) { + ret.append(items.get(i)); + if (i < items.size() - 1) { + ret.append("/"); + } + } + return ret.toString(); + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/NodeState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/NodeState.java index f5920f32119..ccd5c0c811d 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/NodeState.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/systemstate/rule/NodeState.java @@ -1,310 +1,310 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.systemstate.rule;
-
-import com.yahoo.log.LogLevel;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class NodeState {
-
- /** A location string that expresses the use of the PARENT node. */
- public static final String NODE_PARENT = "..";
-
- /** A location string that expresses the use of THIS node. */
- public static final String NODE_CURRENT = ".";
-
- private static Logger log = Logger.getLogger(NodeState.class.getName());
- private final Map<String, NodeState> children = new LinkedHashMap<String, NodeState>();
- private final Map<String, String> state = new LinkedHashMap<String, String>();
- private NodeState parent = null;
- private String id = null;
-
- /**
- * Creates a node state that no internal content.
- */
- public NodeState() {
- // empty
- }
-
- /**
- * Creates a node state based on a list of argument objects. These arguments are iterated and added to this node's
- * internal state map.
- *
- * @param args The arguments to use as state.
- */
- public NodeState(List<Argument> args) {
- for (Argument arg : args) {
- setState(arg.getName(), arg.getValue());
- }
- }
-
- /**
- * Adds a child to this node at the given location. The key can be a location string, in which case the necessary
- * intermediate node states are created.
- *
- * @param key The location at which to add the child.
- * @param child The child node to add.
- * @return This, to allow chaining.
- */
- public NodeState addChild(String key, NodeState child) {
- getChild(key, true).copy(child);
- return this;
- }
-
- /**
- * Returns the child at the given location relative to this.
- *
- * @param key The location of the child to return.
- * @return The child object, null if not found.
- */
- public NodeState getChild(String key) {
- return getChild(key, false);
- }
-
- /**
- * Returns the child at the given location relative to this. This method can be forced to return a child node even
- * if it does not exist, by adding all intermediate nodes and the target node itself.
- *
- * @param key The location of the child to return.
- * @param force Whether or not to force a return value by creating missing nodes.
- * @return The child object, null if not found.
- */
- public NodeState getChild(String key, boolean force) {
- if (key == null || key.length() == 0) {
- return this;
- }
- String arr[] = key.split("/", 2);
- while (arr.length == 2 && arr[0].equals(NODE_CURRENT)) {
- arr = arr[1].split("/", 2);
- }
- if (arr[0].equals(NODE_CURRENT)) {
- return this;
- }
- if (arr[0].equals(NODE_PARENT)) {
- if (parent == null) {
- log.log(LogLevel.ERROR, "Location string '" + key + "' requests a parent above the top-most node, " +
- "returning self to avoid crash.");
- }
- return parent.getChild(arr[1], force);
- }
- if (!children.containsKey(arr[0])) {
- if (!force) {
- return null;
- }
- children.put(arr[0], new NodeState());
- children.get(arr[0]).setParent(this, arr[0]);
- }
- if (arr.length == 2) {
- return children.get(arr[0]).getChild(arr[1], force);
- }
- return children.get(arr[0]);
- }
-
- /**
- * Returns the map of child nodes for iteration.
- *
- * @return The internal child map.
- */
- public Map<String, NodeState> getChildren() {
- return children;
- }
-
- /**
- * Removes the named child node from this node, and attempts to compact the system state from this node upwards by
- * removing empty nodes.
- *
- * @param key The child to remove.
- * @return The result of invoking {@link #compact} after the remove.
- */
- public NodeState removeChild(String key) {
- if (key == null || key.length() == 0) {
- return this;
- }
- int pos = key.lastIndexOf("/");
- if (pos > -1) {
- NodeState parent = getChild(key.substring(0, pos), false);
- if (parent != null) {
- return parent.removeChild(key.substring(pos + 1));
- }
- }
- else {
- children.remove(key);
- }
- return compact();
- }
-
- /**
- * Retrieves some arbitrary state information for a given key. The key can be a location string, in which case the
- * necessary intermediate nodes are traversed. If the key is not found, this method returns null.
- *
- * @param key The name of the state information to return.
- * @return The value of the state key.
- */
- public String getState(String key) {
- if (key == null || key.length() == 0) {
- return null;
- }
- int pos = key.lastIndexOf("/");
- if (pos > -1) {
- NodeState parent = getChild(key.substring(0, pos), false);
- return parent != null ? parent.getState(key.substring(pos + 1)) : null;
- }
- return state.get(key);
- }
-
- /**
- * Sets some arbitrary state data in this node. The key can be a location string, in which case the necessary
- * intermediate nodes are traversed and even created if missing.
- *
- * @param key The key to set.
- * @param value The value to assign to the key.
- * @return This, to allow chaining.
- */
- public NodeState setState(String key, String value) {
- if (key == null || key.length() == 0) {
- return this;
- }
- int pos = key.lastIndexOf("/");
- if (pos > -1) {
- getChild(key.substring(0, pos), true).setState(key.substring(pos + 1), value);
- }
- else {
- if (value == null || value.length() == 0) {
- return removeState(key);
- }
- else {
- state.put(key, value);
- }
- }
- return this;
- }
-
- /**
- * Removes the named (key, value) state pair from this node, and attempts to compact the system state from this node
- * upwards by removing empty nodes.
- *
- * @param key The state variable to clear.
- * @return The result of invoking {@link #compact} after the remove.
- */
- public NodeState removeState(String key) {
- if (key == null || key.length() == 0) {
- return this;
- }
- int pos = key.lastIndexOf("/");
- if (pos > -1) {
- NodeState parent = getChild(key.substring(0, pos), false);
- if (parent != null) {
- return parent.removeState(key.substring(pos + 1));
- }
- }
- else {
- state.remove(key);
- }
- return compact();
- }
-
- /**
- * Compacts the system state tree from this node upwards. This will delete itself if it has a parent, but no
- * internal state and no children.
- *
- * @return This or the first non-null ancestor, to allow chaining.
- */
- private NodeState compact() {
- if (state.isEmpty() && children.isEmpty()) {
- if (parent != null) {
- return parent.removeChild(id);
- }
- }
- return this;
- }
-
- /**
- * Copies the state content of another node state object into this.
- *
- * @param node The node state to copy into this.
- * @return This, to allow chaining.
- */
- public NodeState copy(NodeState node) {
- for (String key : node.state.keySet()) {
- state.put(key, node.state.get(key));
- }
- for (String key : node.children.keySet()) {
- getChild(key, true).copy(node.children.get(key));
- }
- return this;
- }
-
- /**
- * Clears both the internal state and child list, then compacts the tree from this node upwards.
- *
- * @return The result of invoking {@link #compact} after the remove.
- */
- public NodeState clear() {
- state.clear();
- children.clear();
- return compact();
- }
-
- /**
- * Sets the parent of this node.
- *
- * @param parent The parent node.
- * @param id The identifier of this node as seen in the parent.
- * @return This, to allow chaining.
- */
- public NodeState setParent(NodeState parent, String id) {
- this.parent = parent;
- this.id = id;
- return this;
- }
-
- /**
- * Returns a string representation of this node state.
- *
- * @param prefix The prefix to use for this string.
- * @return A string representation of this.
- * @throws UnsupportedEncodingException Thrown if the host system does not support UTF-8 encoding.
- */
- private String toString(String prefix) throws UnsupportedEncodingException {
- StringBuffer buf = new StringBuffer();
- if (!state.isEmpty()) {
- buf.append(prefix.length() == 0 ? "." : prefix).append("?");
- String[] arr = state.keySet().toArray(new String[state.keySet().size()]);
- for (int i = 0; i < arr.length; ++i) {
- buf.append(arr[i]).append("=").append(URLEncoder.encode(state.get(arr[i]), "UTF-8"));
- if (i < arr.length - 1) {
- buf.append("&");
- }
- }
- buf.append(" ");
- }
- if (prefix.length() > 0) {
- prefix += "/";
- }
- String[] keys = children.keySet().toArray(new String[children.keySet().size()]);
- Arrays.sort(keys);
- for (String loc : keys) {
- buf.append(children.get(loc).toString(prefix + URLEncoder.encode(loc, "UTF-8")));
- }
- return buf.toString();
- }
-
- @Override
- public String toString() {
- try {
- return toString("").trim();
- }
- catch (UnsupportedEncodingException e) {
- return e.toString();
- }
- }
-}
+package com.yahoo.documentapi.messagebus.systemstate.rule; + +import com.yahoo.log.LogLevel; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class NodeState { + + /** A location string that expresses the use of the PARENT node. */ + public static final String NODE_PARENT = ".."; + + /** A location string that expresses the use of THIS node. */ + public static final String NODE_CURRENT = "."; + + private static Logger log = Logger.getLogger(NodeState.class.getName()); + private final Map<String, NodeState> children = new LinkedHashMap<String, NodeState>(); + private final Map<String, String> state = new LinkedHashMap<String, String>(); + private NodeState parent = null; + private String id = null; + + /** + * Creates a node state that no internal content. + */ + public NodeState() { + // empty + } + + /** + * Creates a node state based on a list of argument objects. These arguments are iterated and added to this node's + * internal state map. + * + * @param args The arguments to use as state. + */ + public NodeState(List<Argument> args) { + for (Argument arg : args) { + setState(arg.getName(), arg.getValue()); + } + } + + /** + * Adds a child to this node at the given location. The key can be a location string, in which case the necessary + * intermediate node states are created. + * + * @param key The location at which to add the child. + * @param child The child node to add. + * @return This, to allow chaining. + */ + public NodeState addChild(String key, NodeState child) { + getChild(key, true).copy(child); + return this; + } + + /** + * Returns the child at the given location relative to this. + * + * @param key The location of the child to return. + * @return The child object, null if not found. + */ + public NodeState getChild(String key) { + return getChild(key, false); + } + + /** + * Returns the child at the given location relative to this. This method can be forced to return a child node even + * if it does not exist, by adding all intermediate nodes and the target node itself. + * + * @param key The location of the child to return. + * @param force Whether or not to force a return value by creating missing nodes. + * @return The child object, null if not found. + */ + public NodeState getChild(String key, boolean force) { + if (key == null || key.length() == 0) { + return this; + } + String arr[] = key.split("/", 2); + while (arr.length == 2 && arr[0].equals(NODE_CURRENT)) { + arr = arr[1].split("/", 2); + } + if (arr[0].equals(NODE_CURRENT)) { + return this; + } + if (arr[0].equals(NODE_PARENT)) { + if (parent == null) { + log.log(LogLevel.ERROR, "Location string '" + key + "' requests a parent above the top-most node, " + + "returning self to avoid crash."); + } + return parent.getChild(arr[1], force); + } + if (!children.containsKey(arr[0])) { + if (!force) { + return null; + } + children.put(arr[0], new NodeState()); + children.get(arr[0]).setParent(this, arr[0]); + } + if (arr.length == 2) { + return children.get(arr[0]).getChild(arr[1], force); + } + return children.get(arr[0]); + } + + /** + * Returns the map of child nodes for iteration. + * + * @return The internal child map. + */ + public Map<String, NodeState> getChildren() { + return children; + } + + /** + * Removes the named child node from this node, and attempts to compact the system state from this node upwards by + * removing empty nodes. + * + * @param key The child to remove. + * @return The result of invoking {@link #compact} after the remove. + */ + public NodeState removeChild(String key) { + if (key == null || key.length() == 0) { + return this; + } + int pos = key.lastIndexOf("/"); + if (pos > -1) { + NodeState parent = getChild(key.substring(0, pos), false); + if (parent != null) { + return parent.removeChild(key.substring(pos + 1)); + } + } + else { + children.remove(key); + } + return compact(); + } + + /** + * Retrieves some arbitrary state information for a given key. The key can be a location string, in which case the + * necessary intermediate nodes are traversed. If the key is not found, this method returns null. + * + * @param key The name of the state information to return. + * @return The value of the state key. + */ + public String getState(String key) { + if (key == null || key.length() == 0) { + return null; + } + int pos = key.lastIndexOf("/"); + if (pos > -1) { + NodeState parent = getChild(key.substring(0, pos), false); + return parent != null ? parent.getState(key.substring(pos + 1)) : null; + } + return state.get(key); + } + + /** + * Sets some arbitrary state data in this node. The key can be a location string, in which case the necessary + * intermediate nodes are traversed and even created if missing. + * + * @param key The key to set. + * @param value The value to assign to the key. + * @return This, to allow chaining. + */ + public NodeState setState(String key, String value) { + if (key == null || key.length() == 0) { + return this; + } + int pos = key.lastIndexOf("/"); + if (pos > -1) { + getChild(key.substring(0, pos), true).setState(key.substring(pos + 1), value); + } + else { + if (value == null || value.length() == 0) { + return removeState(key); + } + else { + state.put(key, value); + } + } + return this; + } + + /** + * Removes the named (key, value) state pair from this node, and attempts to compact the system state from this node + * upwards by removing empty nodes. + * + * @param key The state variable to clear. + * @return The result of invoking {@link #compact} after the remove. + */ + public NodeState removeState(String key) { + if (key == null || key.length() == 0) { + return this; + } + int pos = key.lastIndexOf("/"); + if (pos > -1) { + NodeState parent = getChild(key.substring(0, pos), false); + if (parent != null) { + return parent.removeState(key.substring(pos + 1)); + } + } + else { + state.remove(key); + } + return compact(); + } + + /** + * Compacts the system state tree from this node upwards. This will delete itself if it has a parent, but no + * internal state and no children. + * + * @return This or the first non-null ancestor, to allow chaining. + */ + private NodeState compact() { + if (state.isEmpty() && children.isEmpty()) { + if (parent != null) { + return parent.removeChild(id); + } + } + return this; + } + + /** + * Copies the state content of another node state object into this. + * + * @param node The node state to copy into this. + * @return This, to allow chaining. + */ + public NodeState copy(NodeState node) { + for (String key : node.state.keySet()) { + state.put(key, node.state.get(key)); + } + for (String key : node.children.keySet()) { + getChild(key, true).copy(node.children.get(key)); + } + return this; + } + + /** + * Clears both the internal state and child list, then compacts the tree from this node upwards. + * + * @return The result of invoking {@link #compact} after the remove. + */ + public NodeState clear() { + state.clear(); + children.clear(); + return compact(); + } + + /** + * Sets the parent of this node. + * + * @param parent The parent node. + * @param id The identifier of this node as seen in the parent. + * @return This, to allow chaining. + */ + public NodeState setParent(NodeState parent, String id) { + this.parent = parent; + this.id = id; + return this; + } + + /** + * Returns a string representation of this node state. + * + * @param prefix The prefix to use for this string. + * @return A string representation of this. + * @throws UnsupportedEncodingException Thrown if the host system does not support UTF-8 encoding. + */ + private String toString(String prefix) throws UnsupportedEncodingException { + StringBuffer buf = new StringBuffer(); + if (!state.isEmpty()) { + buf.append(prefix.length() == 0 ? "." : prefix).append("?"); + String[] arr = state.keySet().toArray(new String[state.keySet().size()]); + for (int i = 0; i < arr.length; ++i) { + buf.append(arr[i]).append("=").append(URLEncoder.encode(state.get(arr[i]), "UTF-8")); + if (i < arr.length - 1) { + buf.append("&"); + } + } + buf.append(" "); + } + if (prefix.length() > 0) { + prefix += "/"; + } + String[] keys = children.keySet().toArray(new String[children.keySet().size()]); + Arrays.sort(keys); + for (String loc : keys) { + buf.append(children.get(loc).toString(prefix + URLEncoder.encode(loc, "UTF-8"))); + } + return buf.toString(); + } + + @Override + public String toString() { + try { + return toString("").trim(); + } + catch (UnsupportedEncodingException e) { + return e.toString(); + } + } +} |