summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-16 12:05:44 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-16 12:05:44 +0200
commitba6711be92d89a0191eaa45400c75f7a2b3dfb46 (patch)
tree3de958074425cb29d243026ad77bb113d03eb1f2
parentea1409bfa056576f7afa664553e58a7062afe542 (diff)
- Use double checked locking to ensure that we do not create Policies that we forget to destroy.
- Catch exceptions and close/destroy when necessary.
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java21
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java9
-rw-r--r--vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java15
3 files changed, 33 insertions, 12 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 918bd193d89..552f47ec5a6 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -349,15 +349,24 @@ public class StoragePolicy extends SlobrokPolicy {
private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
- this.hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
- this.distribution = params.createDistribution(policy);
- persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
- maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
+ try {
+ hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
+ distribution = params.createDistribution(policy);
+ persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
+ maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
+ } catch (Throwable e) {
+ destroy();
+ throw e;
+ }
}
public void destroy() {
- hostFetcher.close();
- distribution.close();
+ if (hostFetcher != null) {
+ hostFetcher.close();
+ }
+ if (distribution != null) {
+ distribution.close();
+ }
}
String getTargetSpec(RoutingContext context, BucketId bucketId) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
index ad26475bf74..102e0be923d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
@@ -28,7 +28,7 @@ public class ProtocolRepository {
*/
public void putProtocol(Protocol protocol) {
if (protocols.put(protocol.getName(), protocol) != null) {
- routingPolicyCache.clear();
+ clearPolicyCache();
}
}
@@ -72,6 +72,10 @@ public class ProtocolRepository {
return ret;
}
synchronized (this) {
+ ret = routingPolicyCache.get(cacheKey);
+ if (ret != null) {
+ return ret;
+ }
Protocol protocol = getProtocol(protocolName);
if (protocol == null) {
log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' not supported.");
@@ -81,6 +85,9 @@ public class ProtocolRepository {
ret = protocol.createPolicy(policyName, policyParam);
} catch (RuntimeException e) {
log.log(LogLevel.ERROR, "Protcol '" + protocolName + "' threw an exception: " + e.getMessage(), e);
+ if (ret != null) {
+ ret.destroy();
+ }
return null;
}
if (ret == null) {
diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
index 0063011e41c..404ec357e93 100644
--- a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
+++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
@@ -108,8 +108,13 @@ public class Distribution {
distributionBitMasks[i] = mask;
mask = (mask << 1) | 1;
}
- configSub = new ConfigSubscriber();
- configSub.subscribe(configSubscriber, StorDistributionConfig.class, configId);
+ try {
+ configSub = new ConfigSubscriber();
+ configSub.subscribe(configSubscriber, StorDistributionConfig.class, configId);
+ } catch (Throwable e) {
+ close();
+ throw e;
+ }
}
public Distribution(StorDistributionConfig config) {
@@ -146,7 +151,7 @@ public class Distribution {
return seed;
}
- private class ScoredGroup implements Comparable<ScoredGroup> {
+ private static class ScoredGroup implements Comparable<ScoredGroup> {
Group group;
double score;
@@ -158,7 +163,7 @@ public class Distribution {
return Double.valueOf(o.score).compareTo(score);
}
}
- private class ScoredNode {
+ private static class ScoredNode {
int index;
int reliability;
double score;
@@ -205,7 +210,7 @@ public class Distribution {
}
return getIdealDistributorGroup(bucket, clusterState, results.first().group, redundancyArray[0]);
}
- private class ResultGroup implements Comparable<ResultGroup> {
+ private static class ResultGroup implements Comparable<ResultGroup> {
Group group;
int redundancy;