summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-01-29 11:59:42 +0100
committerGitHub <noreply@github.com>2018-01-29 11:59:42 +0100
commitc90710ca371546d34023470f08929b4cd73b5ff9 (patch)
tree649fe88d1be1d491e632d495ba97f61894baa3d5
parent28e3545728977a0be82159b8f278be8e772cb59b (diff)
parent3ef73bbd5e2a972344f49705906f42f9a3703981 (diff)
Merge pull request #4776 from vespa-engine/balder/handle-timeout-properly-for-batched-send
Balder/handle timeout properly for batched send
-rwxr-xr-xvespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java50
-rwxr-xr-xvespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java26
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java24
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java73
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java1
5 files changed, 123 insertions, 51 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java
index 823f4b6f568..d1ed02209b2 100755
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java
@@ -640,7 +640,7 @@ public class VespaFeedHandlerTestCase {
@Test
public void testOverrides() throws Exception {
setup(null);
- Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&route=storage&priority=HIGH_2");
+ Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&route=storage&priority=HIGH_2&totaltimeout=-1");
assertEquals(2, res.messages.size());
@@ -652,6 +652,54 @@ public class VespaFeedHandlerTestCase {
}
@Test
+ public void testTimeoutWithNoUpperBound() throws Exception {
+ setup(null);
+ Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&totaltimeout=-1");
+
+ assertEquals(2, res.messages.size());
+
+ for (Message m : res.messages) {
+ assertEquals(2222, m.getTimeRemaining());
+ }
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ setup(null);
+ Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222");
+
+ assertEquals(2, res.messages.size());
+
+ for (Message m : res.messages) {
+ assertTrue(2222 >= m.getTimeRemaining());
+ }
+ }
+
+ @Test
+ public void testTotalTimeout() throws Exception {
+ setup(null);
+ Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?totaltimeout=2.222");
+
+ assertEquals(2, res.messages.size());
+
+ for (Message m : res.messages) {
+ assertTrue(2222 >= m.getTimeRemaining());
+ }
+ }
+
+ @Test
+ public void testTotalTimeoutAndNormalTimeout() throws Exception {
+ setup(null);
+ Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?totaltimeout=1000&timeout=2.222");
+
+ assertEquals(2, res.messages.size());
+
+ for (Message m : res.messages) {
+ assertEquals(2222, m.getTimeRemaining());
+ }
+ }
+
+ @Test
public void testBogusPriority() throws Exception {
try {
setup(null);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java
index 23eddfceb14..8da09bc78ad 100755
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java
@@ -259,7 +259,7 @@ public class GetSearcherTestCase {
public void testConfig() throws Exception {
DocumentSessionFactory factory = new DocumentSessionFactory(docType);
GetSearcher searcher = new GetSearcher(new FeedContext(
- new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(458).route("route66").retryenabled(false)), defLoadTypeCfg),
+ new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(58).route("route66").retryenabled(false)), defLoadTypeCfg),
factory, docMan, new ClusterList(), new NullFeedMetric()));
Chain<Searcher> searchChain = new Chain<>(searcher);
@@ -275,16 +275,16 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(Route.parse("route66"), gdm.getRoute());
assertFalse(gdm.getRetryEnabled());
- assertEquals(458000, gdm.getTimeRemaining());
+ assertTrue(58000 >= gdm.getTimeRemaining());
}
}
@Test
public void testConfigChanges() throws Exception {
- String config = "raw:timeout 458\nroute \"riksveg18\"\nretryenabled true";
+ String config = "raw:timeout 37\nroute \"riksveg18\"\nretryenabled true";
DocumentSessionFactory factory = new DocumentSessionFactory(docType);
GetSearcher searcher = new GetSearcher(new FeedContext(
- new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(458).route("riksveg18").retryenabled(true)),
+ new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(58).route("riksveg18").retryenabled(true)),
defLoadTypeCfg),
factory, docMan, new ClusterList(), new NullFeedMetric()));
Chain<Searcher> searchChain = new Chain<>(searcher);
@@ -302,7 +302,7 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(Route.parse("riksveg18"), gdm.getRoute());
assertTrue(gdm.getRetryEnabled());
- assertEquals(458000, gdm.getTimeRemaining());
+ assertTrue(58000 >= gdm.getTimeRemaining());
}
factory.messages.clear();
@@ -330,7 +330,7 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(Route.parse("e6"), gdm.getRoute());
assertFalse(gdm.getRetryEnabled());
- assertEquals(123000, gdm.getTimeRemaining());
+ assertTrue(123000 >= gdm.getTimeRemaining());
}
}
@@ -343,9 +343,10 @@ public class GetSearcherTestCase {
Chain<Searcher> searchChain = new Chain<>(searcher);
Result result = new Execution(searchChain, Execution.Context.createContextStub()).search(
- newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=458"));
+ newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=58"));
assertEquals(2, factory.messages.size());
+ long lastTimeout = 58000;
{
Message m = factory.messages.get(0);
assertEquals(DocumentProtocol.MESSAGE_GETDOCUMENT, m.getType());
@@ -355,7 +356,8 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority());
assertEquals(Route.parse("highwaytohell"), gdm.getRoute());
- assertEquals(458000, gdm.getTimeRemaining());
+ assertTrue(lastTimeout >= gdm.getTimeRemaining());
+ lastTimeout = gdm.getTimeRemaining();
}
{
@@ -367,7 +369,7 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority());
assertEquals(Route.parse("highwaytohell"), gdm.getRoute());
- assertEquals(458000, gdm.getTimeRemaining());
+ assertTrue(lastTimeout >= gdm.getTimeRemaining());
}
}
@@ -383,6 +385,7 @@ public class GetSearcherTestCase {
Result result = new Execution(searchChain, Execution.Context.createContextStub()).search(
newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=123"));
+ long lastTimeout = 123000;
assertEquals(2, factory.messages.size());
{
Message m = factory.messages.get(0);
@@ -393,7 +396,8 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority());
assertEquals(Route.parse("highwaytohell"), gdm.getRoute());
- assertEquals(123000, gdm.getTimeRemaining());
+ assertTrue(lastTimeout >= gdm.getTimeRemaining());
+ lastTimeout = gdm.getTimeRemaining();
}
{
@@ -405,7 +409,7 @@ public class GetSearcherTestCase {
assertEquals("[all]", gdm.getFieldSet());
assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority());
assertEquals(Route.parse("highwaytohell"), gdm.getRoute());
- assertEquals(123000, gdm.getTimeRemaining());
+ assertTrue(lastTimeout >= gdm.getTimeRemaining());
}
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
index 771d64fd6a3..3897f1d7d2a 100644
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
@@ -2,6 +2,7 @@
package com.yahoo.feedapi;
import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.concurrent.SystemTimer;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.vespa.config.content.LoadTypeConfig;
@@ -106,6 +107,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
String loadTypeStr = null;
String traceStr = null;
String createIfNonExistentParam = null;
+ Double totalTimeoutParam = null;
if (request != null) {
routeParam = request.getProperty("route");
@@ -114,6 +116,10 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
if (timeoutStr != null) {
timeoutParam = Double.parseDouble(timeoutStr);
}
+ timeoutStr = request.getProperty("totaltimeout");
+ if (timeoutStr != null) {
+ totalTimeoutParam = Double.parseDouble(timeoutStr);
+ }
priorityParam = request.getProperty("priority");
traceStr = request.getProperty("tracelevel");
@@ -140,6 +146,8 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
abortOnFeedError = (abortOnFeedErrorParam == null ? defaultAbortOnSendError : (!"false".equals(abortOnFeedErrorParam)));
createIfNonExistent = (createIfNonExistentParam == null ? defaultCreateIfNonExistent : ("true".equals(createIfNonExistentParam)));
}
+ long totalTimeout = (totalTimeoutParam == null) ? timeout : (long)(totalTimeoutParam*1000);
+
DocumentProtocol.Priority priority = null;
if (priorityParam != null) {
priority = DocumentProtocol.getPriorityByName(priorityParam);
@@ -154,7 +162,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
loadType = LoadType.DEFAULT;
}
- return new PropertySetter(route, timeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0);
+ return new PropertySetter(route, timeout, totalTimeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0);
}
public long getDefaultTimeoutMillis() { return defaultTimeoutMillis; }
@@ -217,6 +225,8 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
private Route route;
/** Timeout (in milliseconds) */
private long timeout;
+ private long totalTimeout;
+ private long startTime;
/** Explicit priority set. May be null */
private DocumentProtocol.Priority priority;
private boolean retryEnabled;
@@ -226,11 +236,12 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
private LoadType loadType;
private int traceLevel;
- public PropertySetter(Route route, long timeout, DocumentProtocol.Priority priority, LoadType loadType,
+ public PropertySetter(Route route, long timeout, long totalTimeout, DocumentProtocol.Priority priority, LoadType loadType,
boolean retryEnabled, boolean abortOnDocumentError, boolean abortOnFeedError,
boolean createIfNonExistent, int traceLevel) {
this.route = route;
this.timeout = timeout;
+ this.totalTimeout = totalTimeout;
this.priority = priority;
this.loadType = loadType;
this.retryEnabled = retryEnabled;
@@ -238,6 +249,13 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
this.abortOnFeedError = abortOnFeedError;
this.createIfNonExistent = createIfNonExistent;
this.traceLevel = traceLevel;
+ this.startTime = SystemTimer.INSTANCE.milliTime();
+ }
+
+ private long getTimeRemaining() {
+ return (totalTimeout < 0L)
+ ? timeout
+ : Math.min(timeout, totalTimeout - (SystemTimer.INSTANCE.milliTime() - startTime));
}
public Route getRoute() {
@@ -289,7 +307,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
if (route != null) {
msg.setRoute(route);
}
- msg.setTimeRemaining(timeout);
+ msg.setTimeRemaining(getTimeRemaining());
msg.setRetryEnabled(retryEnabled);
msg.getTrace().setLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel));
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 8fda21e2494..cf29814bef3 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -74,44 +74,45 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
try {
int busy = busyThreads.incrementAndGet();
if (busy > maxBusyThreads) {
- log.warning("too many threads ["+busy+"] busy, returning SERVICE UNAVAILABLE");
+ log.warning("too many threads [" + busy + "] busy, returning SERVICE UNAVAILABLE");
return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE);
}
- boolean asynchronous = request.getBooleanProperty("asynchronous");
-
- MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
-
- String route = properties.getRoute().toString();
- FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
-
- SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
- sender.addMessageProcessor(properties);
- sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
-
- Feeder feeder = createFeeder(sender, request);
- feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
- feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
- response.setAbortOnFeedError(properties.getAbortOnFeedError());
-
- List<String> errors = feeder.parse();
- for (String s : errors) {
- response.addXMLParseError(s);
- }
- if (errors.size() > 0 && feeder instanceof XMLFeeder) {
- response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
- }
-
- sender.done();
-
- if (asynchronous) {
- return response;
- }
- long millis = getTimeoutMillis(request);
- boolean completed = sender.waitForPending(millis);
- if ( ! completed)
- response.addError("Timed out after "+millis+" ms waiting for responses");
- response.done();
- return response;
+ boolean asynchronous = request.getBooleanProperty("asynchronous");
+
+ MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
+
+ String route = properties.getRoute().toString();
+ FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
+
+ SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
+ sender.addMessageProcessor(properties);
+ sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
+
+ Feeder feeder = createFeeder(sender, request);
+ feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
+ feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
+ response.setAbortOnFeedError(properties.getAbortOnFeedError());
+
+ List<String> errors = feeder.parse();
+ for (String s : errors) {
+ response.addXMLParseError(s);
+ }
+ if (errors.size() > 0 && feeder instanceof XMLFeeder) {
+ response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
+ }
+
+ sender.done();
+
+ if (asynchronous) {
+ return response;
+ }
+ long millis = getTimeoutMillis(request);
+ boolean completed = sender.waitForPending(millis);
+ if (!completed) {
+ response.addError("Timed out after " + millis + " ms waiting for responses");
+ }
+ response.done();
+ return response;
} finally {
busyThreads.decrementAndGet();
}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java
index 516d11524cc..0d70f03014e 100755
--- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java
@@ -113,6 +113,7 @@ public class VespaFeeder {
setPriority(req);
setCreateIfNonExistent(req);
setJsonInput(req, input);
+ req.setProperty("totaltimeout", "-1");
}
private void setPriority(InputStreamRequest req) {