diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 09:41:42 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 09:41:42 +0200 |
commit | feafa659ac4ac76cd9ac3067ca394a6470b1e59e (patch) | |
tree | ff0cb300de7cccb6ca026708e81e35c285eaa300 /vespaclient-container-plugin | |
parent | c5adf87ecf4d6de277ad233137beeec318c869c3 (diff) |
Time out connections on the IOThread level
Time out connections on the IOThread level instead of leaving this to Apache.
Keep old connections alive for a while after timeout and keep polling them
such that, if the old connection hits a different real behind a VIP than the
new connection we'll still get the replies.
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 42 insertions, 97 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index db1c2471752..5548f8fbc1f 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -156,45 +156,38 @@ class ClientFeederV3 { } private int getOverloadReturnCode(HttpRequest request) { - if (request.getHeader(Headers.SILENTUPGRADE) != null ) { - return 299; - } + if (request.getHeader(Headers.SILENTUPGRADE) != null ) return 299; return 429; } - private Optional<DocumentOperationMessageV3> pullMessageFromRequest( - FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) { + private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings, + InputStream requestInputStream, + BlockingQueue<OperationStatus> repliesFromOldMessages) { while (true) { Optional<String> operationId; try { operationId = streamReaderV3.getNextOperationId(requestInputStream); + if (operationId.isEmpty()) return Optional.empty(); } catch (IOException ioe) { - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, Exceptions.toMessageString(ioe), ioe); - } - return Optional.empty(); - } - if (! operationId.isPresent()) { + log.log(Level.FINE, () -> Exceptions.toMessageString(ioe)); return Optional.empty(); } - DocumentOperationMessageV3 message; try { - message = getNextMessage(operationId.get(), requestInputStream, settings); + DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings); + if (message != null) + setRoute(message, settings); + return Optional.ofNullable(message); } catch (Exception e) { - if (log.isLoggable(Level.WARNING)) { - log.log(Level.WARNING, Exceptions.toMessageString(e)); - } + log.log(Level.WARNING, () -> Exceptions.toMessageString(e)); metric.add(MetricNames.PARSE_ERROR, 1, null); - repliesFromOldMessages.add(new OperationStatus( - Exceptions.toMessageString(e), operationId.get(), ErrorCode.ERROR, false, "")); - - continue; + repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e), + operationId.get(), + ErrorCode.ERROR, + false, + "")); } - if (message != null) - setRoute(message, settings); - return Optional.ofNullable(message); } } @@ -223,47 +216,45 @@ class ClientFeederV3 { BlockingQueue<OperationStatus> repliesFromOldMessages, AtomicInteger threadsAvailableForFeeding) throws InterruptedException { while (true) { - Optional<DocumentOperationMessageV3> msg = pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages); + Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings, + requestInputStream, + repliesFromOldMessages); - if (! msg.isPresent()) { - break; - } - setMessageParameters(msg.get(), settings); + if (message.isEmpty()) break; + setMessageParameters(message.get(), settings); Result result; try { - result = sendMessage(settings, msg.get(), threadsAvailableForFeeding); + result = sendMessage(settings, message.get(), threadsAvailableForFeeding); } catch (RuntimeException e) { - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), Exceptions.toMessageString(e), ErrorCode.ERROR, false, - msg.get().getMessage())); + message.get().getMessage())); continue; } if (result.isAccepted()) { outstandingOperations.incrementAndGet(); updateOpsPerSec(); - log(Level.FINE, "Sent message successfully, document id: ", msg.get().getOperationId()); + log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId()); } else if (!result.getError().isFatal()) { - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, false, - msg.get().getMessage())); - continue; + message.get().getMessage())); } else { // should probably not happen, but everybody knows stuff that // shouldn't happen, happens all the time boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.ERROR, isConditionNotMet, - msg.get().getMessage())); - continue; + message.get().getMessage())); } } } @@ -326,17 +317,11 @@ class ClientFeederV3 { } protected final void log(Level level, Object... msgParts) { - StringBuilder s; + if (!log.isLoggable(level)) return; - if (!log.isLoggable(level)) { - return; - } - - s = new StringBuilder(); - for (Object part : msgParts) { + StringBuilder s = new StringBuilder(); + for (Object part : msgParts) s.append(part.toString()); - } - log.log(level, s.toString()); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java index e6d8a88d10b..909c643a006 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java @@ -6,6 +6,8 @@ import com.yahoo.messagebus.routing.Route; import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; import com.yahoo.vespa.http.client.core.Headers; +import java.util.Optional; + /** * Wrapper for the feed feederSettings read from HTTP request. * @@ -14,7 +16,7 @@ import com.yahoo.vespa.http.client.core.Headers; public class FeederSettings { private static final Route DEFAULT_ROUTE = Route.parse("default"); - public final boolean drain; + public final boolean drain; // TODO: Implement drain=true public final Route route; public final boolean denyIfBusy; public final DataFormat dataFormat; @@ -22,55 +24,13 @@ public class FeederSettings { public final Integer traceLevel; public FeederSettings(HttpRequest request) { - { - String tmpDrain = request.getHeader(Headers.DRAIN); - if (tmpDrain != null) { - drain = Boolean.parseBoolean(tmpDrain); - } else { - drain = false; - } - } - { - String tmpRoute = request.getHeader(Headers.ROUTE); - if (tmpRoute != null) { - route = Route.parse(tmpRoute); - } else { - route = DEFAULT_ROUTE; - } - } - { - String tmpDenyIfBusy = request.getHeader(Headers.DENY_IF_BUSY); - if (tmpDenyIfBusy != null) { - denyIfBusy = Boolean.parseBoolean(tmpDenyIfBusy); - } else { - denyIfBusy = false; - } - } - { - // TODO: Change default to JSON on Vespa 8 - String tmpDataFormat = request.getHeader(Headers.DATA_FORMAT); - if (tmpDataFormat != null) { - dataFormat = DataFormat.valueOf(tmpDataFormat); - } else { - dataFormat = DataFormat.XML_UTF8; - } - } - { - String tmpDataFormat = request.getHeader(Headers.PRIORITY); - if (tmpDataFormat != null) { - priority = tmpDataFormat; - } else { - priority = null; - } - } - { - String tmpDataFormat = request.getHeader(Headers.TRACE_LEVEL); - if (tmpDataFormat != null) { - traceLevel = Integer.valueOf(tmpDataFormat); - } else { - traceLevel = null; - } - } + this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false); + this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE); + this.denyIfBusy = Optional.ofNullable(request.getHeader(Headers.DENY_IF_BUSY)).map(Boolean::parseBoolean).orElse(false); + // TODO: Change default to JSON on Vespa 8: + this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.XML_UTF8); + this.priority = request.getHeader(Headers.PRIORITY); + this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null); } } |