diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-29 11:59:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-29 11:59:42 +0100 |
commit | c90710ca371546d34023470f08929b4cd73b5ff9 (patch) | |
tree | 649fe88d1be1d491e632d495ba97f61894baa3d5 | |
parent | 28e3545728977a0be82159b8f278be8e772cb59b (diff) | |
parent | 3ef73bbd5e2a972344f49705906f42f9a3703981 (diff) |
Merge pull request #4776 from vespa-engine/balder/handle-timeout-properly-for-batched-send
Balder/handle timeout properly for batched send
5 files changed, 123 insertions, 51 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java index 823f4b6f568..d1ed02209b2 100755 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/VespaFeedHandlerTestCase.java @@ -640,7 +640,7 @@ public class VespaFeedHandlerTestCase { @Test public void testOverrides() throws Exception { setup(null); - Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&route=storage&priority=HIGH_2"); + Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&route=storage&priority=HIGH_2&totaltimeout=-1"); assertEquals(2, res.messages.size()); @@ -652,6 +652,54 @@ public class VespaFeedHandlerTestCase { } @Test + public void testTimeoutWithNoUpperBound() throws Exception { + setup(null); + Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222&totaltimeout=-1"); + + assertEquals(2, res.messages.size()); + + for (Message m : res.messages) { + assertEquals(2222, m.getTimeRemaining()); + } + } + + @Test + public void testTimeout() throws Exception { + setup(null); + Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?timeout=2.222"); + + assertEquals(2, res.messages.size()); + + for (Message m : res.messages) { + assertTrue(2222 >= m.getTimeRemaining()); + } + } + + @Test + public void testTotalTimeout() throws Exception { + setup(null); + Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?totaltimeout=2.222"); + + assertEquals(2, res.messages.size()); + + for (Message m : res.messages) { + assertTrue(2222 >= m.getTimeRemaining()); + } + } + + @Test + public void testTotalTimeoutAndNormalTimeout() throws Exception { + setup(null); + Result res = testFeed(xmlFilesPath + "test10b.xml", "feed?totaltimeout=1000&timeout=2.222"); + + assertEquals(2, res.messages.size()); + + for (Message m : res.messages) { + assertEquals(2222, m.getTimeRemaining()); + } + } + + @Test public void testBogusPriority() throws Exception { try { setup(null); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java index 23eddfceb14..8da09bc78ad 100755 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/storage/searcher/GetSearcherTestCase.java @@ -259,7 +259,7 @@ public class GetSearcherTestCase { public void testConfig() throws Exception { DocumentSessionFactory factory = new DocumentSessionFactory(docType); GetSearcher searcher = new GetSearcher(new FeedContext( - new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(458).route("route66").retryenabled(false)), defLoadTypeCfg), + new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(58).route("route66").retryenabled(false)), defLoadTypeCfg), factory, docMan, new ClusterList(), new NullFeedMetric())); Chain<Searcher> searchChain = new Chain<>(searcher); @@ -275,16 +275,16 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(Route.parse("route66"), gdm.getRoute()); assertFalse(gdm.getRetryEnabled()); - assertEquals(458000, gdm.getTimeRemaining()); + assertTrue(58000 >= gdm.getTimeRemaining()); } } @Test public void testConfigChanges() throws Exception { - String config = "raw:timeout 458\nroute \"riksveg18\"\nretryenabled true"; + String config = "raw:timeout 37\nroute \"riksveg18\"\nretryenabled true"; DocumentSessionFactory factory = new DocumentSessionFactory(docType); GetSearcher searcher = new GetSearcher(new FeedContext( - new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(458).route("riksveg18").retryenabled(true)), + new MessagePropertyProcessor(new FeederConfig(new FeederConfig.Builder().timeout(58).route("riksveg18").retryenabled(true)), defLoadTypeCfg), factory, docMan, new ClusterList(), new NullFeedMetric())); Chain<Searcher> searchChain = new Chain<>(searcher); @@ -302,7 +302,7 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(Route.parse("riksveg18"), gdm.getRoute()); assertTrue(gdm.getRetryEnabled()); - assertEquals(458000, gdm.getTimeRemaining()); + assertTrue(58000 >= gdm.getTimeRemaining()); } factory.messages.clear(); @@ -330,7 +330,7 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(Route.parse("e6"), gdm.getRoute()); assertFalse(gdm.getRetryEnabled()); - assertEquals(123000, gdm.getTimeRemaining()); + assertTrue(123000 >= gdm.getTimeRemaining()); } } @@ -343,9 +343,10 @@ public class GetSearcherTestCase { Chain<Searcher> searchChain = new Chain<>(searcher); Result result = new Execution(searchChain, Execution.Context.createContextStub()).search( - newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=458")); + newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=58")); assertEquals(2, factory.messages.size()); + long lastTimeout = 58000; { Message m = factory.messages.get(0); assertEquals(DocumentProtocol.MESSAGE_GETDOCUMENT, m.getType()); @@ -355,7 +356,8 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority()); assertEquals(Route.parse("highwaytohell"), gdm.getRoute()); - assertEquals(458000, gdm.getTimeRemaining()); + assertTrue(lastTimeout >= gdm.getTimeRemaining()); + lastTimeout = gdm.getTimeRemaining(); } { @@ -367,7 +369,7 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority()); assertEquals(Route.parse("highwaytohell"), gdm.getRoute()); - assertEquals(458000, gdm.getTimeRemaining()); + assertTrue(lastTimeout >= gdm.getTimeRemaining()); } } @@ -383,6 +385,7 @@ public class GetSearcherTestCase { Result result = new Execution(searchChain, Execution.Context.createContextStub()).search( newQuery("?id[0]=userdoc:kittens:1:2&id[1]=userdoc:kittens:3:4&priority=LOW_2&route=highwaytohell&timeout=123")); + long lastTimeout = 123000; assertEquals(2, factory.messages.size()); { Message m = factory.messages.get(0); @@ -393,7 +396,8 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority()); assertEquals(Route.parse("highwaytohell"), gdm.getRoute()); - assertEquals(123000, gdm.getTimeRemaining()); + assertTrue(lastTimeout >= gdm.getTimeRemaining()); + lastTimeout = gdm.getTimeRemaining(); } { @@ -405,7 +409,7 @@ public class GetSearcherTestCase { assertEquals("[all]", gdm.getFieldSet()); assertEquals(DocumentProtocol.Priority.LOW_2, gdm.getPriority()); assertEquals(Route.parse("highwaytohell"), gdm.getRoute()); - assertEquals(123000, gdm.getTimeRemaining()); + assertTrue(lastTimeout >= gdm.getTimeRemaining()); } } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java index 771d64fd6a3..3897f1d7d2a 100644 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java @@ -2,6 +2,7 @@ package com.yahoo.feedapi; import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.concurrent.SystemTimer; import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.vespa.config.content.LoadTypeConfig; @@ -106,6 +107,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib String loadTypeStr = null; String traceStr = null; String createIfNonExistentParam = null; + Double totalTimeoutParam = null; if (request != null) { routeParam = request.getProperty("route"); @@ -114,6 +116,10 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib if (timeoutStr != null) { timeoutParam = Double.parseDouble(timeoutStr); } + timeoutStr = request.getProperty("totaltimeout"); + if (timeoutStr != null) { + totalTimeoutParam = Double.parseDouble(timeoutStr); + } priorityParam = request.getProperty("priority"); traceStr = request.getProperty("tracelevel"); @@ -140,6 +146,8 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib abortOnFeedError = (abortOnFeedErrorParam == null ? defaultAbortOnSendError : (!"false".equals(abortOnFeedErrorParam))); createIfNonExistent = (createIfNonExistentParam == null ? defaultCreateIfNonExistent : ("true".equals(createIfNonExistentParam))); } + long totalTimeout = (totalTimeoutParam == null) ? timeout : (long)(totalTimeoutParam*1000); + DocumentProtocol.Priority priority = null; if (priorityParam != null) { priority = DocumentProtocol.getPriorityByName(priorityParam); @@ -154,7 +162,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib loadType = LoadType.DEFAULT; } - return new PropertySetter(route, timeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0); + return new PropertySetter(route, timeout, totalTimeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0); } public long getDefaultTimeoutMillis() { return defaultTimeoutMillis; } @@ -217,6 +225,8 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib private Route route; /** Timeout (in milliseconds) */ private long timeout; + private long totalTimeout; + private long startTime; /** Explicit priority set. May be null */ private DocumentProtocol.Priority priority; private boolean retryEnabled; @@ -226,11 +236,12 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib private LoadType loadType; private int traceLevel; - public PropertySetter(Route route, long timeout, DocumentProtocol.Priority priority, LoadType loadType, + public PropertySetter(Route route, long timeout, long totalTimeout, DocumentProtocol.Priority priority, LoadType loadType, boolean retryEnabled, boolean abortOnDocumentError, boolean abortOnFeedError, boolean createIfNonExistent, int traceLevel) { this.route = route; this.timeout = timeout; + this.totalTimeout = totalTimeout; this.priority = priority; this.loadType = loadType; this.retryEnabled = retryEnabled; @@ -238,6 +249,13 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib this.abortOnFeedError = abortOnFeedError; this.createIfNonExistent = createIfNonExistent; this.traceLevel = traceLevel; + this.startTime = SystemTimer.INSTANCE.milliTime(); + } + + private long getTimeRemaining() { + return (totalTimeout < 0L) + ? timeout + : Math.min(timeout, totalTimeout - (SystemTimer.INSTANCE.milliTime() - startTime)); } public Route getRoute() { @@ -289,7 +307,7 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib if (route != null) { msg.setRoute(route); } - msg.setTimeRemaining(timeout); + msg.setTimeRemaining(getTimeRemaining()); msg.setRetryEnabled(retryEnabled); msg.getTrace().setLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel)); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8fda21e2494..cf29814bef3 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -74,44 +74,45 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { try { int busy = busyThreads.incrementAndGet(); if (busy > maxBusyThreads) { - log.warning("too many threads ["+busy+"] busy, returning SERVICE UNAVAILABLE"); + log.warning("too many threads [" + busy + "] busy, returning SERVICE UNAVAILABLE"); return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); } - boolean asynchronous = request.getBooleanProperty("asynchronous"); - - MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); - - String route = properties.getRoute().toString(); - FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - - SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); - sender.addMessageProcessor(properties); - sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - - Feeder feeder = createFeeder(sender, request); - feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); - feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); - response.setAbortOnFeedError(properties.getAbortOnFeedError()); - - List<String> errors = feeder.parse(); - for (String s : errors) { - response.addXMLParseError(s); - } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } - - sender.done(); - - if (asynchronous) { - return response; - } - long millis = getTimeoutMillis(request); - boolean completed = sender.waitForPending(millis); - if ( ! completed) - response.addError("Timed out after "+millis+" ms waiting for responses"); - response.done(); - return response; + boolean asynchronous = request.getBooleanProperty("asynchronous"); + + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); + + SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); + sender.addMessageProcessor(properties); + sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); + + Feeder feeder = createFeeder(sender, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); + + List<String> errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } + if (errors.size() > 0 && feeder instanceof XMLFeeder) { + response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); + } + + sender.done(); + + if (asynchronous) { + return response; + } + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if (!completed) { + response.addError("Timed out after " + millis + " ms waiting for responses"); + } + response.done(); + return response; } finally { busyThreads.decrementAndGet(); } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java index 516d11524cc..0d70f03014e 100755 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java @@ -113,6 +113,7 @@ public class VespaFeeder { setPriority(req); setCreateIfNonExistent(req); setJsonInput(req, input); + req.setProperty("totaltimeout", "-1"); } private void setPriority(InputStreamRequest req) { |