diff options
Diffstat (limited to 'vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java')
-rw-r--r-- | vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 556d9bd60c7..18c0de339e6 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -20,6 +20,7 @@ import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBusParams; @@ -346,7 +347,7 @@ public class SimpleFeeder implements ReplyHandler { numThreads = params.getNumDispatchThreads(); numMessagesToSend = params.getNumMessagesToSend(); mbus = newMessageBus(docTypeMgr, params); - session = newSession(mbus, this, params.getMaxPending()); + session = newSession(mbus, this, params); docTypeMgr.configure(params.getConfigId()); benchmarkMode = params.isBenchmarkMode(); destination = (params.getDumpStream() != null) @@ -473,11 +474,18 @@ public class SimpleFeeder implements ReplyHandler { params.getConfigId()); } - private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, int maxPending) { + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) { SourceSessionParams params = new SourceSessionParams(); params.setReplyHandler(replyHandler); - if (maxPending > 0) { - params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(maxPending)); + if (feederParams.getMaxPending() > 0) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending())); + } else { + DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy() + .setWindowSizeIncrement(feederParams.getWindowIncrementSize()) + .setResizeRate(feederParams.getWindowResizeRate()) + .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor()) + .setWindowSizeBackOff(feederParams.getWindowSizeBackOff()); + params.setThrottlePolicy(throttlePolicy); } return mbus.getMessageBus().createSourceSession(params); } |