summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-28 09:41:42 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-28 09:41:42 +0200
commitfeafa659ac4ac76cd9ac3067ca394a6470b1e59e (patch)
treeff0cb300de7cccb6ca026708e81e35c285eaa300 /vespaclient-container-plugin
parentc5adf87ecf4d6de277ad233137beeec318c869c3 (diff)
Time out connections on the IOThread level
Time out connections on the IOThread level instead of leaving this to Apache. Keep old connections alive for a while after timeout and keep polling them such that, if the old connection hits a different real behind a VIP than the new connection we'll still get the replies.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java79
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java60
2 files changed, 42 insertions, 97 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
index db1c2471752..5548f8fbc1f 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -156,45 +156,38 @@ class ClientFeederV3 {
}
private int getOverloadReturnCode(HttpRequest request) {
- if (request.getHeader(Headers.SILENTUPGRADE) != null ) {
- return 299;
- }
+ if (request.getHeader(Headers.SILENTUPGRADE) != null ) return 299;
return 429;
}
- private Optional<DocumentOperationMessageV3> pullMessageFromRequest(
- FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) {
+ private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages) {
while (true) {
Optional<String> operationId;
try {
operationId = streamReaderV3.getNextOperationId(requestInputStream);
+ if (operationId.isEmpty()) return Optional.empty();
} catch (IOException ioe) {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, Exceptions.toMessageString(ioe), ioe);
- }
- return Optional.empty();
- }
- if (! operationId.isPresent()) {
+ log.log(Level.FINE, () -> Exceptions.toMessageString(ioe));
return Optional.empty();
}
- DocumentOperationMessageV3 message;
try {
- message = getNextMessage(operationId.get(), requestInputStream, settings);
+ DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings);
+ if (message != null)
+ setRoute(message, settings);
+ return Optional.ofNullable(message);
} catch (Exception e) {
- if (log.isLoggable(Level.WARNING)) {
- log.log(Level.WARNING, Exceptions.toMessageString(e));
- }
+ log.log(Level.WARNING, () -> Exceptions.toMessageString(e));
metric.add(MetricNames.PARSE_ERROR, 1, null);
- repliesFromOldMessages.add(new OperationStatus(
- Exceptions.toMessageString(e), operationId.get(), ErrorCode.ERROR, false, ""));
-
- continue;
+ repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e),
+ operationId.get(),
+ ErrorCode.ERROR,
+ false,
+ ""));
}
- if (message != null)
- setRoute(message, settings);
- return Optional.ofNullable(message);
}
}
@@ -223,47 +216,45 @@ class ClientFeederV3 {
BlockingQueue<OperationStatus> repliesFromOldMessages,
AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
while (true) {
- Optional<DocumentOperationMessageV3> msg = pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages);
+ Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings,
+ requestInputStream,
+ repliesFromOldMessages);
- if (! msg.isPresent()) {
- break;
- }
- setMessageParameters(msg.get(), settings);
+ if (message.isEmpty()) break;
+ setMessageParameters(message.get(), settings);
Result result;
try {
- result = sendMessage(settings, msg.get(), threadsAvailableForFeeding);
+ result = sendMessage(settings, message.get(), threadsAvailableForFeeding);
} catch (RuntimeException e) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
Exceptions.toMessageString(e),
ErrorCode.ERROR,
false,
- msg.get().getMessage()));
+ message.get().getMessage()));
continue;
}
if (result.isAccepted()) {
outstandingOperations.incrementAndGet();
updateOpsPerSec();
- log(Level.FINE, "Sent message successfully, document id: ", msg.get().getOperationId());
+ log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId());
} else if (!result.getError().isFatal()) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
result.getError().getMessage(),
ErrorCode.TRANSIENT_ERROR,
false,
- msg.get().getMessage()));
- continue;
+ message.get().getMessage()));
} else {
// should probably not happen, but everybody knows stuff that
// shouldn't happen, happens all the time
boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
result.getError().getMessage(),
ErrorCode.ERROR,
isConditionNotMet,
- msg.get().getMessage()));
- continue;
+ message.get().getMessage()));
}
}
}
@@ -326,17 +317,11 @@ class ClientFeederV3 {
}
protected final void log(Level level, Object... msgParts) {
- StringBuilder s;
+ if (!log.isLoggable(level)) return;
- if (!log.isLoggable(level)) {
- return;
- }
-
- s = new StringBuilder();
- for (Object part : msgParts) {
+ StringBuilder s = new StringBuilder();
+ for (Object part : msgParts)
s.append(part.toString());
- }
-
log.log(level, s.toString());
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
index e6d8a88d10b..909c643a006 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
@@ -6,6 +6,8 @@ import com.yahoo.messagebus.routing.Route;
import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
import com.yahoo.vespa.http.client.core.Headers;
+import java.util.Optional;
+
/**
* Wrapper for the feed feederSettings read from HTTP request.
*
@@ -14,7 +16,7 @@ import com.yahoo.vespa.http.client.core.Headers;
public class FeederSettings {
private static final Route DEFAULT_ROUTE = Route.parse("default");
- public final boolean drain;
+ public final boolean drain; // TODO: Implement drain=true
public final Route route;
public final boolean denyIfBusy;
public final DataFormat dataFormat;
@@ -22,55 +24,13 @@ public class FeederSettings {
public final Integer traceLevel;
public FeederSettings(HttpRequest request) {
- {
- String tmpDrain = request.getHeader(Headers.DRAIN);
- if (tmpDrain != null) {
- drain = Boolean.parseBoolean(tmpDrain);
- } else {
- drain = false;
- }
- }
- {
- String tmpRoute = request.getHeader(Headers.ROUTE);
- if (tmpRoute != null) {
- route = Route.parse(tmpRoute);
- } else {
- route = DEFAULT_ROUTE;
- }
- }
- {
- String tmpDenyIfBusy = request.getHeader(Headers.DENY_IF_BUSY);
- if (tmpDenyIfBusy != null) {
- denyIfBusy = Boolean.parseBoolean(tmpDenyIfBusy);
- } else {
- denyIfBusy = false;
- }
- }
- {
- // TODO: Change default to JSON on Vespa 8
- String tmpDataFormat = request.getHeader(Headers.DATA_FORMAT);
- if (tmpDataFormat != null) {
- dataFormat = DataFormat.valueOf(tmpDataFormat);
- } else {
- dataFormat = DataFormat.XML_UTF8;
- }
- }
- {
- String tmpDataFormat = request.getHeader(Headers.PRIORITY);
- if (tmpDataFormat != null) {
- priority = tmpDataFormat;
- } else {
- priority = null;
- }
- }
- {
- String tmpDataFormat = request.getHeader(Headers.TRACE_LEVEL);
- if (tmpDataFormat != null) {
- traceLevel = Integer.valueOf(tmpDataFormat);
- } else {
- traceLevel = null;
- }
- }
+ this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false);
+ this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE);
+ this.denyIfBusy = Optional.ofNullable(request.getHeader(Headers.DENY_IF_BUSY)).map(Boolean::parseBoolean).orElse(false);
+ // TODO: Change default to JSON on Vespa 8:
+ this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.XML_UTF8);
+ this.priority = request.getHeader(Headers.PRIORITY);
+ this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null);
}
}