diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-04-11 13:35:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-11 13:35:35 +0200 |
commit | 30f1c233e6fd26c89f3b4f194478b66cbf1152cb (patch) | |
tree | 0b3dc3952e6ac7a3c27b0db4a8596ccab6774886 | |
parent | dc1cde6b396c53a26af59c653409d2da49c12128 (diff) | |
parent | aa229a5230ab629788bf4c3ddfe947aa2f3db01b (diff) |
Merge pull request #22081 from vespa-engine/balder/allow-control-of-throttlepolicy
- Allow control of throttlepolicy per session.
13 files changed, 91 insertions, 43 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java index 1775dbe53c1..7c5bebc47e8 100644 --- a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java +++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java @@ -17,11 +17,9 @@ import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; -import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.vespa.config.content.DistributionConfig; -import com.yahoo.vespa.config.content.LoadTypeConfig; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 1a2afa3621f..20043450501 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -271,7 +271,9 @@ "public" ], "methods": [ - "public void <init>()" + "public void <init>()", + "public void setThrottlePolicy(com.yahoo.messagebus.ThrottlePolicy)", + "public com.yahoo.messagebus.ThrottlePolicy getThrottlePolicy()" ], "fields": [] }, @@ -926,8 +928,6 @@ "public void setTraceLevel(int)", "public int getTraceLevel()", "public void setPriority(com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", - "public com.yahoo.messagebus.ThrottlePolicy getThrottlePolicy()", - "public void setThrottlePolicy(com.yahoo.messagebus.ThrottlePolicy)", "public void setLoadType(com.yahoo.documentapi.messagebus.loadtypes.LoadType)", "public com.yahoo.documentapi.messagebus.loadtypes.LoadType getLoadType()", "public boolean skipBucketsOnFatalErrors()", diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java b/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java index b0f411402e8..082f47f45d4 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/Parameters.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; +import com.yahoo.messagebus.ThrottlePolicy; + /** * Superclass of the classes which contains the parameters for creating or opening a session. This is currently empty, * but keeping this parameter hierarchy in place means that we can later add parameters with default values that all @@ -9,5 +11,12 @@ package com.yahoo.documentapi; * @author bratseth */ public class Parameters { - // empty + ThrottlePolicy throttlePolicy; + public void setThrottlePolicy(ThrottlePolicy throttlePolicy) { + this.throttlePolicy = throttlePolicy; + } + + public ThrottlePolicy getThrottlePolicy() { + return throttlePolicy; + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java index 7446c681dec..d030c8d0f04 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java @@ -4,7 +4,6 @@ package com.yahoo.documentapi; import com.yahoo.document.BucketId; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.fieldset.AllFields; -import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.documentapi.messagebus.loadtypes.LoadType; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.messagebus.ThrottlePolicy; @@ -51,7 +50,6 @@ public class VisitorParameters extends Parameters { private LoadType loadType = LoadType.DEFAULT; // TODO: Remove on Vespa 8 private DocumentProtocol.Priority priority = null; private int traceLevel = 0; - private ThrottlePolicy throttlePolicy = null; private boolean skipBucketsOnFatalErrors = false; private int slices = 1; private int sliceId = 0; @@ -325,14 +323,6 @@ public class VisitorParameters extends Parameters { this.priority = priority; } - public ThrottlePolicy getThrottlePolicy() { - return throttlePolicy; - } - - public void setThrottlePolicy(ThrottlePolicy policy) { - throttlePolicy = policy; - } - /** * @deprecated load types are deprecated */ diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index fb682ef6cbb..279e04c43b4 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -32,6 +32,7 @@ import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; @@ -65,7 +66,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { private static final Logger log = Logger.getLogger(MessageBusAsyncSession.class.getName()); private final AtomicLong requestId = new AtomicLong(0); private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); - private final ThrottlePolicy throttlePolicy; private final SourceSession session; private final String routeForGet; private String route; @@ -95,11 +95,12 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { route = mbusParams.getRoute(); routeForGet = mbusParams.getRouteForGet(); traceLevel = mbusParams.getTraceLevel(); - throttlePolicy = mbusParams.getSourceSessionParams().getThrottlePolicy(); - if (handler == null) { - handler = new MyReplyHandler(asyncParams.getResponseHandler(), responses); + SourceSessionParams sourceSessionParams = new SourceSessionParams(mbusParams.getSourceSessionParams()); + if (asyncParams.getThrottlePolicy() != null) { + sourceSessionParams.setThrottlePolicy(asyncParams.getThrottlePolicy()); } - session = bus.createSourceSession(handler, mbusParams.getSourceSessionParams()); + sourceSessionParams.setReplyHandler((handler != null) ? handler : new MyReplyHandler(asyncParams.getResponseHandler(), responses)); + session = bus.createSourceSession(sourceSessionParams); } @Override @@ -247,12 +248,14 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public double getCurrentWindowSize() { - if (throttlePolicy instanceof StaticThrottlePolicy) { - return ((StaticThrottlePolicy)throttlePolicy).getMaxPendingCount(); + if (getThrottlePolicy() instanceof StaticThrottlePolicy) { + return ((StaticThrottlePolicy)getThrottlePolicy()).getMaxPendingCount(); } return 0; } + ThrottlePolicy getThrottlePolicy() { return session.getThrottlePolicy(); } + /** * Returns a concatenated error string from the errors contained in a reply. * diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index 2ef3bffdb53..318b518f44e 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -7,7 +7,6 @@ import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentUpdate; import com.yahoo.document.fieldset.AllFields; -import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.DocumentAccessException; import com.yahoo.documentapi.DocumentOperationParameters; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/Destination.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java index 06f82168447..e08d9b14ee5 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/Destination.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.DocumentRemove; import com.yahoo.documentapi.DocumentAccess; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java index 2d9cc47ee2f..db7ab0ea238 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; @@ -14,17 +14,17 @@ import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; -import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; -import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.test.AbstractDocumentApiTestCase; import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.AllPassThrottlePolicy; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.ThrottlePolicy; import com.yahoo.messagebus.network.Identity; import org.junit.After; import org.junit.Before; @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -67,7 +68,6 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase { params.setRouteNameForGet("Route"); params.setRoutingConfigId("file:src/test/cfg/messagebus.cfg"); params.setTraceLevel(9); - params.setSourceSessionParams(new SourceSessionParams().setThrottlePolicy(null)); access = new MessageBusDocumentAccess(params); destination = new VisitableDestination(slobrokConfigId, params.getDocumentManagerConfigId()); @@ -133,4 +133,24 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase { session.destroy(); } + @Test + public void requireThatDefaultThrottlePolicyIsDynamicAndShared() { + MessageBusAsyncSession mbusSessionA = (MessageBusAsyncSession) access().createAsyncSession(new AsyncParameters()); + assertTrue(mbusSessionA.getThrottlePolicy() instanceof DynamicThrottlePolicy); + MessageBusAsyncSession mbusSessionB = (MessageBusAsyncSession) access().createAsyncSession(new AsyncParameters()); + assertSame(mbusSessionA.getThrottlePolicy(), mbusSessionB.getThrottlePolicy()); + mbusSessionB.destroy(); + mbusSessionA.destroy(); + } + + @Test + public void requireThatThrottlePolicyCanBeConfigured() { + var asyncParams = new AsyncParameters(); + ThrottlePolicy allPass = new AllPassThrottlePolicy(); + asyncParams.setThrottlePolicy(allPass); + MessageBusAsyncSession mbusSession = (MessageBusAsyncSession) access().createAsyncSession(asyncParams); + assertSame(allPass, mbusSession.getThrottlePolicy()); + mbusSession.destroy(); + } + } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSessionTestCase.java index ab881e143b7..53545510ae7 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSessionTestCase.java @@ -1,18 +1,36 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.document.BucketId; import com.yahoo.document.DocumentId; import com.yahoo.document.fieldset.AllFields; import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.select.parser.ParseException; -import com.yahoo.documentapi.*; -import com.yahoo.documentapi.messagebus.MessageBusVisitorSession; +import com.yahoo.documentapi.AckToken; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorControlSession; +import com.yahoo.documentapi.VisitorDataHandler; +import com.yahoo.documentapi.VisitorDataQueue; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.messagebus.loadtypes.LoadType; -import com.yahoo.documentapi.messagebus.protocol.*; -import com.yahoo.messagebus.*; +import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage; +import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.DocumentReply; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage; +import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.Trace; +import com.yahoo.messagebus.TraceNode; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RouteSpec; import com.yahoo.messagebus.routing.RoutingTable; @@ -21,21 +39,28 @@ import com.yahoo.vdslib.VisitorStatistics; import org.junit.Test; import java.nio.charset.Charset; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; // TODO replace explicit pre-mockito mock classes with proper mockito mocks wherever possible public class MessageBusVisitorSessionTestCase { private class MockSender implements MessageBusVisitorSession.Sender { private int maxPending = 1000; private int pendingCount = 0; - private ArrayList<Message> messages = new ArrayList<Message>(); + private ArrayList<Message> messages = new ArrayList<>(); private ReplyHandler replyHandler = null; private boolean destroyed = false; private RuntimeException exceptionOnSend = null; @@ -313,7 +338,7 @@ public class MessageBusVisitorSessionTestCase { public class MockAsyncTaskExecutor implements MessageBusVisitorSession.AsyncTaskExecutor { private long sequenceCounter = 0; private long timeMs = 0; - private Set<TaskDescriptor> tasks = new TreeSet<TaskDescriptor>(); + private Set<TaskDescriptor> tasks = new TreeSet<>(); private int rejectTasksAfter = -1; public void setRejectTasksAfter(int rejectTasksAfter) { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ScheduledEventQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java index 4abd1cc9fea..4f937aa26ba 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ScheduledEventQueueTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; -import com.yahoo.documentapi.messagebus.ScheduledEventQueue; import com.yahoo.concurrent.Timer; import org.junit.Test; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/VisitorControlHandlerTest.java index 7b9172a1a3d..b1ebcb20fd2 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/VisitorControlHandlerTest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; +package com.yahoo.documentapi.messagebus; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.vdslib.VisitorStatistics; diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 039f22c7525..83bab8e4734 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -827,7 +827,8 @@ "public com.yahoo.messagebus.Result send(com.yahoo.messagebus.Message, java.lang.String, boolean)", "public com.yahoo.messagebus.ReplyHandler getReplyHandler()", "public int getPendingCount()", - "public com.yahoo.messagebus.SourceSession setTimeout(double)" + "public com.yahoo.messagebus.SourceSession setTimeout(double)", + "public com.yahoo.messagebus.ThrottlePolicy getThrottlePolicy()" ], "fields": [] }, diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 85892b4cb87..ad06aed43ec 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -393,4 +393,8 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked return this; } + public ThrottlePolicy getThrottlePolicy() { + return throttlePolicy; + } + } |