aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java107
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/hitfield/XmlRenderer.java3
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java6
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java48
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java4
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java24
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java21
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java19
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java17
10 files changed, 126 insertions, 124 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 8dc7c2685bf..1e60050375e 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -36,8 +36,8 @@ import java.util.logging.Logger;
/**
* Class to encapsulate access to slobrok sessions.
*
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar Rosenvinge</a>
+ * @author Steinar Knutsen
+ * @author Einar Rosenvinge
*/
public final class SessionCache extends AbstractComponent {
@@ -68,10 +68,10 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- public SessionCache(final String messagebusConfigId, final String slobrokConfigId, final String identity,
- final String containerMbusConfigId, final String documentManagerConfigId,
- final String loadTypeConfigId,
- final DocumentTypeManager documentTypeManager) {
+ public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
+ String containerMbusConfigId, String documentManagerConfigId,
+ String loadTypeConfigId,
+ DocumentTypeManager documentTypeManager) {
this.messagebusConfigId = messagebusConfigId;
this.slobrokConfigId = slobrokConfigId;
this.identity = identity;
@@ -102,12 +102,12 @@ public final class SessionCache extends AbstractComponent {
return messageBus != null;
}
- private static SharedMessageBus createSharedMessageBus(final ContainerMbusConfig mbusConfig,
- final String slobrokConfigId, final String identity,
+ private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
+ String slobrokConfigId, String identity,
Protocol protocol) {
- final MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- final int maxPendingSize = DocumentUtil
+ int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
mbusConfig.containerCoreMemory());
logSystemInfo(mbusConfig, maxPendingSize);
@@ -115,7 +115,7 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
mbusParams.setMaxPendingSize(maxPendingSize);
- final RPCNetworkParams netParams = new RPCNetworkParams()
+ RPCNetworkParams netParams = new RPCNetworkParams()
.setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
.setListenPort(mbusConfig.port());
@@ -164,16 +164,17 @@ public final class SessionCache extends AbstractComponent {
}
private abstract class SessionCreator<PARAMS, KEY, SESSION extends SharedResource> {
+
abstract SESSION create(PARAMS p);
abstract KEY buildKey(PARAMS p);
abstract void logReuse(SESSION session);
- ReferencedResource<SESSION> retain(final Object lock, final Map<KEY, SESSION> registry, final PARAMS p) {
+ ReferencedResource<SESSION> retain(Object lock, Map<KEY, SESSION> registry, PARAMS p) {
SESSION session;
ResourceReference sessionReference;
- final KEY key = buildKey(p);
+ KEY key = buildKey(p);
// this lock is held for a horribly long time, but I see no way of
// making it slimmer
synchronized (lock) {
@@ -194,7 +195,7 @@ public final class SessionCache extends AbstractComponent {
return new ReferencedResource<>(session, sessionReference);
}
- SESSION createAndStore(final Map<KEY, SESSION> registry, final PARAMS p, final KEY key) {
+ SESSION createAndStore(Map<KEY, SESSION> registry, PARAMS p, KEY key) {
SESSION session = create(p);
registry.put(key, session);
return session;
@@ -204,34 +205,36 @@ public final class SessionCache extends AbstractComponent {
private class DestinationSessionCreator
extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> {
+
@Override
- SharedDestinationSession create(final DestinationSessionParams p) {
+ SharedDestinationSession create(DestinationSessionParams p) {
log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + "");
return messageBus.newDestinationSession(p);
}
@Override
- String buildKey(final DestinationSessionParams p) {
+ String buildKey(DestinationSessionParams p) {
return p.getName();
}
@Override
- void logReuse(final SharedDestinationSession session) {
+ void logReuse(SharedDestinationSession session) {
log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + "");
}
+
}
private class SourceSessionCreator
extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {
@Override
- SharedSourceSession create(final SourceSessionParams p) {
+ SharedSourceSession create(SourceSessionParams p) {
log.log(LogLevel.DEBUG, "Creating new source session.");
return messageBus.newSourceSession(p);
}
@Override
- SourceSessionKey buildKey(final SourceSessionParams p) {
+ SourceSessionKey buildKey(SourceSessionParams p) {
return new SourceSessionKey(p);
}
@@ -245,31 +248,33 @@ public final class SessionCache extends AbstractComponent {
extends SessionCreator<IntermediateSessionParams, String, SharedIntermediateSession> {
@Override
- SharedIntermediateSession create(final IntermediateSessionParams p) {
+ SharedIntermediateSession create(IntermediateSessionParams p) {
log.log(LogLevel.DEBUG, "Creating new intermediate session " + p.getName() + "");
return messageBus.newIntermediateSession(p);
}
@Override
- String buildKey(final IntermediateSessionParams p) {
+ String buildKey(IntermediateSessionParams p) {
return p.getName();
}
@Override
- void logReuse(final SharedIntermediateSession session) {
+ void logReuse(SharedIntermediateSession session) {
log.log(LogLevel.DEBUG, "Reusing intermediate session " + session.name() + "");
}
}
static class ThrottlePolicySignature {
+
@Override
public int hashCode() {
return getClass().hashCode();
}
+
}
- static class StaticThrottlePolicySignature extends
- ThrottlePolicySignature {
+ static class StaticThrottlePolicySignature extends ThrottlePolicySignature {
+
private final int maxPendingCount;
private final long maxPendingSize;
@@ -280,7 +285,7 @@ public final class SessionCache extends AbstractComponent {
@Override
public int hashCode() {
- final int prime = 31;
+ int prime = 31;
int result = super.hashCode();
result = prime * result + maxPendingCount;
result = prime * result
@@ -308,8 +313,8 @@ public final class SessionCache extends AbstractComponent {
}
- static class DynamicThrottlePolicySignature extends
- ThrottlePolicySignature {
+ static class DynamicThrottlePolicySignature extends ThrottlePolicySignature {
+
private final int maxPending;
private final double maxWindowSize;
private final double minWindowSize;
@@ -326,7 +331,7 @@ public final class SessionCache extends AbstractComponent {
@Override
public int hashCode() {
- final int prime = 31;
+ int prime = 31;
int result = super.hashCode();
result = prime * result + maxPending;
long temp;
@@ -342,31 +347,28 @@ public final class SessionCache extends AbstractComponent {
}
@Override
- public boolean equals(final Object obj) {
+ public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (getClass() != obj.getClass()) {
return false;
}
- final DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj;
+ DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj;
if (maxPending != other.maxPending) {
return false;
}
- if (Double.doubleToLongBits(maxWindowSize) != Double
- .doubleToLongBits(other.maxWindowSize)) {
+ if (Double.doubleToLongBits(maxWindowSize) != Double.doubleToLongBits(other.maxWindowSize)) {
return false;
}
if (Double.doubleToLongBits(minWindowSize) != Double
.doubleToLongBits(other.minWindowSize)) {
return false;
}
- if (Double.doubleToLongBits(windowSizeBackoff) != Double
- .doubleToLongBits(other.windowSizeBackoff)) {
+ if (Double.doubleToLongBits(windowSizeBackoff) != Double.doubleToLongBits(other.windowSizeBackoff)) {
return false;
}
- if (Double.doubleToLongBits(windowSizeIncrement) != Double
- .doubleToLongBits(other.windowSizeIncrement)) {
+ if (Double.doubleToLongBits(windowSizeIncrement) != Double.doubleToLongBits(other.windowSizeIncrement)) {
return false;
}
return true;
@@ -374,8 +376,8 @@ public final class SessionCache extends AbstractComponent {
}
- static class UnknownThrottlePolicySignature extends
- ThrottlePolicySignature {
+ static class UnknownThrottlePolicySignature extends ThrottlePolicySignature {
+
private final ThrottlePolicy policy;
UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
@@ -383,7 +385,7 @@ public final class SessionCache extends AbstractComponent {
}
@Override
- public boolean equals(final Object other) {
+ public boolean equals(Object other) {
if (other == null) {
return false;
}
@@ -395,23 +397,21 @@ public final class SessionCache extends AbstractComponent {
}
static class SourceSessionKey {
+
private final double timeout;
private final ThrottlePolicySignature policy;
- SourceSessionKey(final SourceSessionParams p) {
+ SourceSessionKey(SourceSessionParams p) {
timeout = p.getTimeout();
policy = createSignature(p.getThrottlePolicy());
}
- private static ThrottlePolicySignature createSignature(
- final ThrottlePolicy policy) {
- final Class<?> policyClass = policy.getClass();
+ private static ThrottlePolicySignature createSignature(ThrottlePolicy policy) {
+ Class<?> policyClass = policy.getClass();
if (policyClass == DynamicThrottlePolicy.class) {
- return new DynamicThrottlePolicySignature(
- (DynamicThrottlePolicy) policy);
+ return new DynamicThrottlePolicySignature((DynamicThrottlePolicy) policy);
} else if (policyClass == StaticThrottlePolicy.class) {
- return new StaticThrottlePolicySignature(
- (StaticThrottlePolicy) policy);
+ return new StaticThrottlePolicySignature((StaticThrottlePolicy) policy);
} else {
return new UnknownThrottlePolicySignature(policy);
}
@@ -427,10 +427,9 @@ public final class SessionCache extends AbstractComponent {
@Override
public int hashCode() {
- final int prime = 31;
+ int prime = 31;
int result = 1;
- result = prime * result
- + ((policy == null) ? 0 : policy.hashCode());
+ result = prime * result + ((policy == null) ? 0 : policy.hashCode());
long temp;
temp = Double.doubleToLongBits(timeout);
result = prime * result + (int) (temp ^ (temp >>> 32));
@@ -438,7 +437,7 @@ public final class SessionCache extends AbstractComponent {
}
@Override
- public boolean equals(final Object obj) {
+ public boolean equals(Object obj) {
if (this == obj) {
return true;
}
@@ -448,7 +447,7 @@ public final class SessionCache extends AbstractComponent {
if (getClass() != obj.getClass()) {
return false;
}
- final SourceSessionKey other = (SourceSessionKey) obj;
+ SourceSessionKey other = (SourceSessionKey) obj;
if (policy == null) {
if (other.policy != null) {
return false;
@@ -456,11 +455,11 @@ public final class SessionCache extends AbstractComponent {
} else if (!policy.equals(other.policy)) {
return false;
}
- if (Double.doubleToLongBits(timeout) != Double
- .doubleToLongBits(other.timeout)) {
+ if (Double.doubleToLongBits(timeout) != Double.doubleToLongBits(other.timeout)) {
return false;
}
return true;
}
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/hitfield/XmlRenderer.java b/container-search/src/main/java/com/yahoo/prelude/hitfield/XmlRenderer.java
index 99c5daa05b8..5b7960bdea9 100644
--- a/container-search/src/main/java/com/yahoo/prelude/hitfield/XmlRenderer.java
+++ b/container-search/src/main/java/com/yahoo/prelude/hitfield/XmlRenderer.java
@@ -146,8 +146,7 @@ public class XmlRenderer {
indent(nestingLevel);
}
- private void renderWeightedSet(Inspector seq, int nestingLevel, boolean nestedarray)
- {
+ private void renderWeightedSet(Inspector seq, int nestingLevel, boolean nestedarray) {
int limit = seq.entryCount();
renderTarget.append('\n');
for (int i = 0; i < limit; ++i) {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index a146caa972e..8d989f51de6 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -186,6 +186,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
public static class MessageBusSenderFactory implements SenderFactory {
+
private final MessageBus messageBus;
public MessageBusSenderFactory(MessageBus messageBus) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
index 8cf53e47717..a7753a7401e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
@@ -3,10 +3,11 @@ package com.yahoo.messagebus;
/**
* This is an implementation of the {@link ThrottlePolicy} that passes all requests (no real throttling).
+ *
* @author dybis
*/
-public class AllPassThrottlePolicy implements ThrottlePolicy
-{
+public class AllPassThrottlePolicy implements ThrottlePolicy {
+
@Override
public boolean canSend(Message msg, int pendingCount) {
return true;
@@ -19,4 +20,5 @@ public class AllPassThrottlePolicy implements ThrottlePolicy
@Override
public void processReply(Reply reply) {
}
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 41a192a24be..dae20543b34 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel;
import java.util.logging.Logger;
/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
+ * This is an implementation of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
* {@link SourceSession} is allowed to have.
*
* <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
@@ -44,7 +44,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Constructs a new instance of this class using the given clock to calculate efficiency.
*
- * @param timer The timer to use.
+ * @param timer the timer to use
*/
public DynamicThrottlePolicy(Timer timer) {
this.timer = timer;
@@ -64,8 +64,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
@Override
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
+ public boolean canSend(Message message, int pendingCount) {
+ if ( ! super.canSend(message, pendingCount)) {
return false;
}
long time = timer.milliTime();
@@ -78,8 +78,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
@Override
- public void processMessage(Message msg) {
- super.processMessage(msg);
+ public void processMessage(Message message) {
+ super.processMessage(message);
if (++numSent < windowSize * resizeRate) {
return;
}
@@ -126,7 +126,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
@Override
public void processReply(Reply reply) {
super.processReply(reply);
- if (!reply.hasErrors()) {
+ if ( ! reply.hasErrors()) {
++numOk;
}
}
@@ -136,8 +136,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* the correlation between throughput and window size. The algorithm will increase the window size until efficiency
* drops below the efficiency of the local maxima times this value.
*
- * @param efficiencyThreshold The limit to set.
- * @return This, to allow chaining.
+ * @param efficiencyThreshold the limit to set
+ * @return this, to allow chaining
* @see #setWindowSizeBackOff(double)
*/
public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) {
@@ -148,8 +148,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Sets the step size used when increasing window size.
*
- * @param windowSizeIncrement The step size to set.
- * @return This, to allow chaining.
+ * @param windowSizeIncrement the step size to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) {
this.windowSizeIncrement = windowSizeIncrement;
@@ -161,8 +161,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
* reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
*
- * @param windowSizeBackOff The back off to set.
- * @return This, to allow chaining.
+ * @param windowSizeBackOff the back off to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) {
this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff));
@@ -173,8 +173,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
* becomes. However, the smaller the value, the less accurate the measurements become.
*
- * @param resizeRate The rate to set.
- * @return This, to allow chaining.
+ * @param resizeRate the rate to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setResizeRate(double resizeRate) {
this.resizeRate = resizeRate;
@@ -186,8 +186,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* will be allocated to this clients. Resources are shared between clients
* proportiannally to their weights.
*
- * @param weight The weight to set.
- * @return This, to allow chaining.
+ * @param weight the weight to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWeight(double weight) {
this.weight = weight;
@@ -198,8 +198,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the maximium number of pending operations allowed at any time, in
* order to avoid using too much resources.
*
- * @param max The max to set.
- * @return This, to allow chaining.
+ * @param max the max to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setMaxWindowSize(double max) {
this.maxWindowSize = max;
@@ -209,7 +209,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Get the maximum number of pending operations allowed at any time.
*
- * @return The maximum number of operations.
+ * @return the maximum number of operations
*/
public double getMaxWindowSize() {
return maxWindowSize;
@@ -220,8 +220,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the minimium number of pending operations allowed at any time, in
* order to keep a level of performance.
*
- * @param min The min to set.
- * @return This, to allow chaining.
+ * @param min the min to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setMinWindowSize(double min) {
this.minWindowSize = min;
@@ -231,7 +231,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Get the minimum number of pending operations allowed at any time.
*
- * @return The minimum number of operations.
+ * @return the minimum number of operations
*/
public double getMinWindowSize() {
return minWindowSize;
@@ -247,7 +247,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Returns the maximum number of pending messages allowed.
*
- * @return The max limit.
+ * @return the max limit
*/
public int getMaxPendingCount() {
return (int)windowSize;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
index 6b073b737db..029c6a8074c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
@@ -35,8 +35,8 @@ public class RateThrottlingPolicy extends StaticThrottlePolicy {
currentPeriod = timer.milliTime() / PERIOD;
}
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
+ public boolean canSend(Message message, int pendingCount) {
+ if (!super.canSend(message, pendingCount)) {
return false;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 4fdfa341e3b..2a4a123a60b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -112,11 +112,11 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
* } while (!result.isAccepted());
* </code>
*
- * @param msg the message to send
+ * @param message the message to send
* @return the result of <i>initiating</i> sending of this message
*/
- public Result send(Message msg) {
- return sendInternal(updateTiming(msg));
+ public Result send(Message message) {
+ return sendInternal(updateTiming(message));
}
private Message updateTiming(Message msg) {
@@ -127,29 +127,29 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
return msg;
}
- private Result sendInternal(Message msg) {
+ private Result sendInternal(Message message) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
"Source session is closed.");
}
- if (throttlePolicy != null && ! throttlePolicy.canSend(msg, pendingCount)) {
+ if (throttlePolicy != null && ! throttlePolicy.canSend(message, pendingCount)) {
return new Result(ErrorCode.SEND_QUEUE_FULL,
"Too much pending data (" + pendingCount + " messages).");
}
- msg.pushHandler(replyHandler);
+ message.pushHandler(replyHandler);
if (throttlePolicy != null) {
- throttlePolicy.processMessage(msg);
+ throttlePolicy.processMessage(message);
}
++pendingCount;
}
- if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
- msg.getTrace().trace(TraceLevel.COMPONENT,
- "Source session accepted a " + msg.getApproxSize() + " byte message. " +
+ if (message.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
+ message.getTrace().trace(TraceLevel.COMPONENT,
+ "Source session accepted a " + message.getApproxSize() + " byte message. " +
pendingCount + " message(s) now pending.");
}
- msg.pushHandler(this);
- sequencer.handleMessage(msg);
+ message.pushHandler(this);
+ sequencer.handleMessage(message);
return Result.ACCEPTED;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
index e671a36fa26..e9796e6c4d2 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
@@ -24,7 +24,7 @@ public class SourceSessionParams {
/**
* Implements the copy constructor.
*
- * @param params The object to copy.
+ * @param params the object to copy
*/
public SourceSessionParams(SourceSessionParams params) {
throttlePolicy = params.throttlePolicy;
@@ -32,11 +32,7 @@ public class SourceSessionParams {
replyHandler = params.replyHandler;
}
- /**
- * Returns the policy to use for throttling output.
- *
- * @return The policy.
- */
+ /** Returns the policy to use for throttling output. */
public ThrottlePolicy getThrottlePolicy() {
return throttlePolicy;
}
@@ -45,7 +41,7 @@ public class SourceSessionParams {
* Sets the policy to use for throttling output.
*
* @param throttlePolicy The policy to set.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setThrottlePolicy(ThrottlePolicy throttlePolicy) {
this.throttlePolicy = throttlePolicy;
@@ -55,7 +51,7 @@ public class SourceSessionParams {
/**
* Returns the number of seconds a message can spend trying to succeed.
*
- * @return The timeout in seconds.
+ * @return the timeout in seconds
*/
public double getTimeout() {
return timeout;
@@ -66,7 +62,7 @@ public class SourceSessionParams {
* for any message bus operation.
*
* @param timeout The numer of seconds allowed.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setTimeout(double timeout) {
this.timeout = timeout;
@@ -76,7 +72,7 @@ public class SourceSessionParams {
/**
* Returns whether or not a reply handler has been assigned to this.
*
- * @return True if a handler is set.
+ * @return true if a handler is set
*/
boolean hasReplyHandler() {
return replyHandler != null;
@@ -85,7 +81,7 @@ public class SourceSessionParams {
/**
* Returns the handler to receive incoming replies.
*
- * @return The handler.
+ * @return the handler
*/
public ReplyHandler getReplyHandler() {
return replyHandler;
@@ -95,10 +91,11 @@ public class SourceSessionParams {
* Sets the handler to recive incoming replies.
*
* @param handler The handler to set.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setReplyHandler(ReplyHandler handler) {
replyHandler = handler;
return this;
}
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
index 7d73cd4bf3f..aa1bc1ce624 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
@@ -2,7 +2,7 @@
package com.yahoo.messagebus;
/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
+ * This is an implementation of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
* {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by
* way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
* #setMaxPendingSize(long)}), or some combination thereof.
@@ -17,7 +17,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
private long maxPendingSize = 0;
private long pendingSize = 0;
- public boolean canSend(Message msg, int pendingCount) {
+ @Override
+ public boolean canSend(Message message, int pendingCount) {
if (maxPendingCount > 0 && pendingCount >= maxPendingCount) {
return false;
}
@@ -27,12 +28,14 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
return true;
}
- public void processMessage(Message msg) {
- int size = msg.getApproxSize();
- msg.setContext(size);
+ @Override
+ public void processMessage(Message message) {
+ int size = message.getApproxSize();
+ message.setContext(size);
pendingSize += size;
}
+ @Override
public void processReply(Reply reply) {
int size = (Integer)reply.getContext();
pendingSize -= size;
@@ -41,7 +44,7 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
/**
* Returns the maximum number of pending messages allowed.
*
- * @return The max limit.
+ * @return the max limit
*/
public int getMaxPendingCount() {
return maxPendingCount;
@@ -50,8 +53,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
/**
* Sets the maximum number of pending messages allowed.
*
- * @param maxCount The max count.
- * @return This, to allow chaining.
+ * @param maxCount The max count
+ * @return this, to allow chaining
*/
public StaticThrottlePolicy setMaxPendingCount(int maxCount) {
maxPendingCount = maxCount;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
index 30a0b82f2cd..1c7595f2b5c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
@@ -14,23 +14,24 @@ public interface ThrottlePolicy {
/**
* Returns whether or not the given message can be sent according to the current state of this policy.
*
- * @param msg The message to evaluate.
- * @param pendingCount The current number of pending messages.
- * @return True to send the message.
+ * @param message the message to evaluate
+ * @param pendingCount the current number of pending messages
+ * @return true to send the message
*/
- public boolean canSend(Message msg, int pendingCount);
+ boolean canSend(Message message, int pendingCount);
/**
* This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent.
*
- * @param msg The message beint sent.
+ * @param message the message being sent
*/
- public void processMessage(Message msg);
+ void processMessage(Message message);
/**
* This method is called once for every reply that is received.
*
- * @param reply The reply received.
+ * @param reply the reply received
*/
- public void processReply(Reply reply);
+ void processReply(Reply reply);
+
}