diff options
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java | 58 |
1 files changed, 32 insertions, 26 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 22f7a491056..69bd6dcca00 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -8,10 +8,17 @@ import com.yahoo.document.BucketIdFactory; import com.yahoo.jrt.slobrok.api.IMirror; import com.yahoo.jrt.slobrok.api.Mirror; import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.metrics.MetricSet; -import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingNodeIterator; +import com.yahoo.messagebus.routing.VerbatimDirective; import com.yahoo.vdslib.distribution.Distribution; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; @@ -39,13 +46,13 @@ import java.util.logging.Logger; public class StoragePolicy extends ExternalSlobrokPolicy { private static final Logger log = Logger.getLogger(StoragePolicy.class.getName()); - public static final String owningBucketStates = "uim"; - public static final String upStates = "ui"; + private static final String owningBucketStates = "uim"; + private static final String upStates = "ui"; /** This class merely generates slobrok a host pattern for a given distributor. */ public static class SlobrokHostPatternGenerator { private final String clusterName; - public SlobrokHostPatternGenerator(String clusterName) { this.clusterName = clusterName; } + SlobrokHostPatternGenerator(String clusterName) { this.clusterName = clusterName; } /** * Find host pattern of the hosts that are valid targets for this request. @@ -63,9 +70,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private int totalTargets = 1; protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. - public void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.requiredUpPercentageToSendToKnownGoodNodes = percent; } + void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.requiredUpPercentageToSendToKnownGoodNodes = percent; } - public void updateValidTargets(ClusterState state) { + void updateValidTargets(ClusterState state) { List<Integer> validRandomTargets = new ArrayList<>(); for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) { if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); @@ -74,7 +81,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { this.totalTargets = state.getNodeCount(NodeType.DISTRIBUTOR); } public abstract String getTargetSpec(Integer distributor, RoutingContext context); - public String getRandomTargetSpec(RoutingContext context) { + String getRandomTargetSpec(RoutingContext context) { // Try to use list of random targets, if at least X % of the nodes are up while (100 * validRandomTargets.size() / totalTargets >= requiredUpPercentageToSendToKnownGoodNodes) { int randIndex = randomizer.nextInt(validRandomTargets.size()); @@ -96,7 +103,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private final SlobrokHostPatternGenerator patternGenerator; ExternalSlobrokPolicy policy; - public SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) { + SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) { this.patternGenerator = patternGenerator; this.policy = policy; } @@ -189,9 +196,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ public static class Parameters { - protected String clusterName = null; - protected String distributionConfigId = null; - protected SlobrokHostPatternGenerator slobrokHostPatternGenerator = null; + protected final String clusterName; + protected final String distributionConfigId; + protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; public Parameters(Map<String, String> params) { clusterName = params.get("cluster"); @@ -222,26 +229,25 @@ public class StoragePolicy extends ExternalSlobrokPolicy { * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the * failure was related to node being bad. (Hard to detect from failure) */ - public int getAttemptRandomOnFailuresLimit() { return 5; } + int getAttemptRandomOnFailuresLimit() { return 5; } /** * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the * old one. This guards us against version resets. */ - public int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } + int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } /** * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. * (To avoid hitting trashing bad nodes still in slobrok) */ - public int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } + int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } } /** Helper class to get the bucket identifier of a message. */ public static class BucketIdCalculator { private static final BucketIdFactory factory = new BucketIdFactory(); - @SuppressWarnings("deprecation") private BucketId getBucketId(Message msg) { switch (msg.getType()) { case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); @@ -258,7 +264,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } - public BucketId handleBucketIdCalculation(RoutingContext context) { + BucketId handleBucketIdCalculation(RoutingContext context) { BucketId id = getBucketId(context.getMessage()); if (id == null || id.getRawId() == 0) { Reply reply = new EmptyReply(); @@ -272,13 +278,13 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Class handling the logic of picking a distributor */ public static class DistributorSelectionLogic { /** Class that tracks a failure of a given type per node. */ - public static class InstabilityChecker { + static class InstabilityChecker { private List<Integer> nodeFailures = new ArrayList<>(); private int failureLimit; - public InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } + InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } - public boolean tooManyFailures(int nodeIndex) { + boolean tooManyFailures(int nodeIndex) { if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { nodeFailures.set(nodeIndex, 0); return true; @@ -287,7 +293,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } - public void addFailure(Integer calculatedDistributor) { + void addFailure(Integer calculatedDistributor) { while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); } @@ -297,7 +303,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { Integer calculatedDistributor; ClusterState usedState; - public MessageContext(ClusterState usedState) { this.usedState = usedState; } + MessageContext(ClusterState usedState) { this.usedState = usedState; } public String toString() { return "Context(Distributor " + calculatedDistributor + @@ -312,7 +318,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private int oldClusterVersionGottenCount = 0; private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection - public DistributorSelectionLogic(Parameters params, ExternalSlobrokPolicy policy) { + DistributorSelectionLogic(Parameters params, ExternalSlobrokPolicy policy) { this.hostFetcher = params.createHostFetcher(policy); this.hostFetcher.setRequiredUpPercentageToSendToKnownGoodNodes(params.getRequiredUpPercentageToSendToKnownGoodNodes()); this.distribution = params.createDistribution(policy); @@ -325,7 +331,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { distribution.close(); } - public String getTargetSpec(RoutingContext context, BucketId bucketId) { + String getTargetSpec(RoutingContext context, BucketId bucketId) { String sendRandomReason = null; MessageContext messageContext = new MessageContext(cachedClusterState); context.setContext(messageContext); @@ -382,7 +388,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } - public void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { + void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { final MessageContext context = (MessageContext) routingContext.getContext(); final Optional<ClusterState> replyState = clusterStateFromReply(reply); if (!replyState.isPresent()) { @@ -479,7 +485,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } - public void handleErrorReply(Reply reply, Object untypedContext) { + void handleErrorReply(Reply reply, Object untypedContext) { MessageContext messageContext = (MessageContext) untypedContext; if (messageContext.calculatedDistributor != null) { persistentFailureChecker.addFailure(messageContext.calculatedDistributor); |