summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2019-09-16 16:56:51 +0200
committerJon Bratseth <bratseth@verizonmedia.com>2019-09-16 16:56:51 +0200
commit2e8b813dc0d1533c952840e4e44d99f3b3f896f5 (patch)
tree9d1eaaba2459451b71d6b0a797e2d2aeb5b35d7c /vespaclient-container-plugin
parentc9babe83512093d584ae73b98c2df91a28d13343 (diff)
Nonfunctional changes only#
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java48
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java11
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java33
3 files changed, 47 insertions, 45 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
index 119cafe3d1b..ec5fc0cad07 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -145,12 +145,11 @@ class ClientFeederV3 {
} catch (InterruptedException e) {
// NOP, just terminate
} catch (Throwable e) {
- log.log(LogLevel.WARNING, "Unhandled exception while feeding: "
- + Exceptions.toMessageString(e), e);
+ log.log(LogLevel.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
} finally {
replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null));
}
- return new FeedResponse(200, replies, 3 /* protocol version */, clientId, outstandingOperations.get(), hostName);
+ return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName);
} finally {
ongoingRequests.decrementAndGet();
threadsAvailableForFeeding.incrementAndGet();
@@ -200,10 +199,9 @@ class ClientFeederV3 {
}
}
- private Result sendMessage(
- FeederSettings settings,
- DocumentOperationMessageV3 msg,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
+ private Result sendMessage(FeederSettings settings,
+ DocumentOperationMessageV3 msg,
+ AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
Result result = null;
while (result == null || result.getError().getCode() == SEND_QUEUE_FULL) {
msg.getMessage().pushHandler(feedReplyHandler);
@@ -221,13 +219,11 @@ class ClientFeederV3 {
return result;
}
- private void feed(
- FeederSettings settings,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> repliesFromOldMessages,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
+ private void feed(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages,
+ AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
while (true) {
-
Optional<DocumentOperationMessageV3> msg = pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages);
if (! msg.isPresent()) {
@@ -240,8 +236,11 @@ class ClientFeederV3 {
result = sendMessage(settings, msg.get(), threadsAvailableForFeeding);
} catch (RuntimeException e) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), Exceptions.toMessageString(e),
- ErrorCode.ERROR, false, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ Exceptions.toMessageString(e),
+ ErrorCode.ERROR,
+ false,
+ msg.get().getMessage()));
continue;
}
@@ -250,15 +249,21 @@ class ClientFeederV3 {
updateOpsPerSec();
log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.get().getOperationId());
} else if (!result.getError().isFatal()) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(),
- ErrorCode.TRANSIENT_ERROR, false, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.TRANSIENT_ERROR,
+ false,
+ msg.get().getMessage()));
continue;
} else {
// should probably not happen, but everybody knows stuff that
// shouldn't happen, happens all the time
boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(),
- ErrorCode.ERROR, isConditionNotMet, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.ERROR,
+ isConditionNotMet,
+ msg.get().getMessage()));
continue;
}
}
@@ -274,8 +279,9 @@ class ClientFeederV3 {
// protected for mocking
/** Returns the next message in the stream, or null if none */
- protected DocumentOperationMessageV3 getNextMessage(
- String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception {
+ protected DocumentOperationMessageV3 getNextMessage(String operationId,
+ InputStream requestInputStream,
+ FeederSettings settings) throws Exception {
FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings);
// This is a bit hard to set up while testing, so we accept that things are not perfect.
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
index 5052692a379..1df0ce3594b 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -39,12 +39,11 @@ public class FeedHandler extends LoggingRequestHandler {
private final DocumentApiMetrics metricsHelper;
@Inject
- public FeedHandler(
- LoggingRequestHandler.Context parentCtx,
- DocumentmanagerConfig documentManagerConfig,
- SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
- MetricReceiver metricReceiver) throws Exception {
+ public FeedHandler(LoggingRequestHandler.Context parentCtx,
+ DocumentmanagerConfig documentManagerConfig,
+ SessionCache sessionCache,
+ ThreadpoolConfig threadpoolConfig,
+ MetricReceiver metricReceiver) throws Exception {
super(parentCtx);
metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper);
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
index 0ca608baa87..37803d96714 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
@@ -46,12 +46,11 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
private final AtomicInteger threadsAvailableForFeeding;
private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());
- public FeedHandlerV3(
- LoggingRequestHandler.Context parentCtx,
- DocumentmanagerConfig documentManagerConfig,
- SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
- DocumentApiMetrics metricsHelper) {
+ public FeedHandlerV3(LoggingRequestHandler.Context parentCtx,
+ DocumentmanagerConfig documentManagerConfig,
+ SessionCache sessionCache,
+ ThreadpoolConfig threadpoolConfig,
+ DocumentApiMetrics metricsHelper) {
super(parentCtx);
docTypeManager = new DocumentTypeManager(documentManagerConfig);
this.sessionCache = sessionCache;
@@ -76,21 +75,19 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
// verify the version header first. This is done in the old code.
@Override
public HttpResponse handle(HttpRequest request) {
- final String clientId = clientId(request);
- final ClientFeederV3 clientFeederV3;
+ String clientId = clientId(request);
+ ClientFeederV3 clientFeederV3;
synchronized (monitor) {
if (! clientFeederByClientId.containsKey(clientId)) {
SourceSessionParams sourceSessionParams = sourceSessionParams(request);
- clientFeederByClientId.put(
- clientId,
- new ClientFeederV3(
- retainSource(sessionCache, sourceSessionParams),
- new FeedReaderFactory(),
- docTypeManager,
- clientId,
- metric,
- feedReplyHandler,
- threadsAvailableForFeeding));
+ clientFeederByClientId.put(clientId,
+ new ClientFeederV3(retainSource(sessionCache, sourceSessionParams),
+ new FeedReaderFactory(),
+ docTypeManager,
+ clientId,
+ metric,
+ feedReplyHandler,
+ threadsAvailableForFeeding));
}
clientFeederV3 = clientFeederByClientId.get(clientId);
}