diff options
11 files changed, 57 insertions, 20 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java index 1775dbe53c1..7c5bebc47e8 100644 --- a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java +++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java @@ -17,11 +17,9 @@ import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; -import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.vespa.config.content.DistributionConfig; -import com.yahoo.vespa.config.content.LoadTypeConfig; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java b/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java index b0f411402e8..35a62650037 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java @@ -1,6 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; +import com.yahoo.messagebus.ThrottlePolicy; + +import java.util.Optional; + /** * Superclass of the classes which contains the parameters for creating or opening a session. This is currently empty, * but keeping this parameter hierarchy in place means that we can later add parameters with default values that all @@ -9,5 +13,12 @@ package com.yahoo.documentapi; * @author bratseth */ public class Parameters { - // empty + ThrottlePolicy throttlePolicyOverride; + public void setThrottlePolicyOverride(ThrottlePolicy throttlePolicyOverride) { + this.throttlePolicyOverride = throttlePolicyOverride; + } + + public Optional<ThrottlePolicy> getThrottlePolicyOverride() { + return Optional.ofNullable(throttlePolicyOverride); + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index fb682ef6cbb..d50d38e7e8b 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -32,6 +32,7 @@ import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; @@ -65,7 +66,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { private static final Logger log = Logger.getLogger(MessageBusAsyncSession.class.getName()); private final AtomicLong requestId = new AtomicLong(0); private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); - private final ThrottlePolicy throttlePolicy; private final SourceSession session; private final String routeForGet; private String route; @@ -95,11 +95,10 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { route = mbusParams.getRoute(); routeForGet = mbusParams.getRouteForGet(); traceLevel = mbusParams.getTraceLevel(); - throttlePolicy = mbusParams.getSourceSessionParams().getThrottlePolicy(); - if (handler == null) { - handler = new MyReplyHandler(asyncParams.getResponseHandler(), responses); - } - session = bus.createSourceSession(handler, mbusParams.getSourceSessionParams()); + SourceSessionParams sourceSessionParams = new SourceSessionParams(mbusParams.getSourceSessionParams()); + asyncParams.getThrottlePolicyOverride().ifPresent(policy -> sourceSessionParams.setThrottlePolicy(policy)); + sourceSessionParams.setReplyHandler((handler != null) ? handler : new MyReplyHandler(asyncParams.getResponseHandler(), responses)); + session = bus.createSourceSession(sourceSessionParams); } @Override @@ -247,12 +246,14 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public double getCurrentWindowSize() { - if (throttlePolicy instanceof StaticThrottlePolicy) { - return ((StaticThrottlePolicy)throttlePolicy).getMaxPendingCount(); + if (getThrottlePolicy() instanceof StaticThrottlePolicy) { + return ((StaticThrottlePolicy)getThrottlePolicy()).getMaxPendingCount(); } return 0; } + ThrottlePolicy getThrottlePolicy() { return session.getThrottlePolicy(); } + /** * Returns a concatenated error string from the errors contained in a reply. * diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index 2ef3bffdb53..318b518f44e 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -7,7 +7,6 @@ import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentUpdate; import com.yahoo.document.fieldset.AllFields; -import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.DocumentAccessException; import com.yahoo.documentapi.DocumentOperationParameters; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/Destination.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java index 06f82168447..e08d9b14ee5 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/Destination.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.DocumentRemove; import com.yahoo.documentapi.DocumentAccess; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java index 2d9cc47ee2f..103a77732fb 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; @@ -14,6 +14,7 @@ import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.MessageBusAsyncSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply; @@ -22,9 +23,11 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.test.AbstractDocumentApiTestCase; import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.AllPassThrottlePolicy; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.ThrottlePolicy; import com.yahoo.messagebus.network.Identity; import org.junit.After; import org.junit.Before; @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -67,7 +71,6 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase { params.setRouteNameForGet("Route"); params.setRoutingConfigId("file:src/test/cfg/messagebus.cfg"); params.setTraceLevel(9); - params.setSourceSessionParams(new SourceSessionParams().setThrottlePolicy(null)); access = new MessageBusDocumentAccess(params); destination = new VisitableDestination(slobrokConfigId, params.getDocumentManagerConfigId()); @@ -133,4 +136,24 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase { session.destroy(); } + @Test + public void requireThatDefaultThrottlePolicyIsDynamicAndShared() { + MessageBusAsyncSession mbusSessionA = (MessageBusAsyncSession) access().createAsyncSession(new AsyncParameters()); + assertTrue(mbusSessionA.getThrottlePolicy() instanceof DynamicThrottlePolicy); + MessageBusAsyncSession mbusSessionB = (MessageBusAsyncSession) access().createAsyncSession(new AsyncParameters()); + assertSame(mbusSessionA.getThrottlePolicy(), mbusSessionB.getThrottlePolicy()); + mbusSessionB.destroy(); + mbusSessionA.destroy(); + } + + @Test + public void requireThatThrottlePolicyCanBeConfigured() { + var asyncParams = new AsyncParameters(); + ThrottlePolicy allPass = new AllPassThrottlePolicy(); + asyncParams.setThrottlePolicyOverride(allPass); + MessageBusAsyncSession mbusSession = (MessageBusAsyncSession) access().createAsyncSession(asyncParams); + assertSame(allPass, mbusSession.getThrottlePolicy()); + mbusSession.destroy(); + } + } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSessionTestCase.java index ab881e143b7..8f299f1b679 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSessionTestCase.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.BucketId; import com.yahoo.document.DocumentId; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ScheduledEventQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java index 4abd1cc9fea..a5404b3d7d8 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ScheduledEventQueueTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.documentapi.messagebus.ScheduledEventQueue; import com.yahoo.concurrent.Timer; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/VisitorControlHandlerTest.java index 7b9172a1a3d..b1ebcb20fd2 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/VisitorControlHandlerTest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.vdslib.VisitorStatistics; diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 039f22c7525..83bab8e4734 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -827,7 +827,8 @@ "public com.yahoo.messagebus.Result send(com.yahoo.messagebus.Message, java.lang.String, boolean)", "public com.yahoo.messagebus.ReplyHandler getReplyHandler()", "public int getPendingCount()", - "public com.yahoo.messagebus.SourceSession setTimeout(double)" + "public com.yahoo.messagebus.SourceSession setTimeout(double)", + "public com.yahoo.messagebus.ThrottlePolicy getThrottlePolicy()" ], "fields": [] }, diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 85892b4cb87..ad06aed43ec 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -393,4 +393,8 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked return this; } + public ThrottlePolicy getThrottlePolicy() { + return throttlePolicy; + } + } |