diff options
Diffstat (limited to 'vespaclient-container-plugin')
3 files changed, 47 insertions, 45 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index 119cafe3d1b..ec5fc0cad07 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -145,12 +145,11 @@ class ClientFeederV3 { } catch (InterruptedException e) { // NOP, just terminate } catch (Throwable e) { - log.log(LogLevel.WARNING, "Unhandled exception while feeding: " - + Exceptions.toMessageString(e), e); + log.log(LogLevel.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); } finally { replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null)); } - return new FeedResponse(200, replies, 3 /* protocol version */, clientId, outstandingOperations.get(), hostName); + return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName); } finally { ongoingRequests.decrementAndGet(); threadsAvailableForFeeding.incrementAndGet(); @@ -200,10 +199,9 @@ class ClientFeederV3 { } } - private Result sendMessage( - FeederSettings settings, - DocumentOperationMessageV3 msg, - AtomicInteger threadsAvailableForFeeding) throws InterruptedException { + private Result sendMessage(FeederSettings settings, + DocumentOperationMessageV3 msg, + AtomicInteger threadsAvailableForFeeding) throws InterruptedException { Result result = null; while (result == null || result.getError().getCode() == SEND_QUEUE_FULL) { msg.getMessage().pushHandler(feedReplyHandler); @@ -221,13 +219,11 @@ class ClientFeederV3 { return result; } - private void feed( - FeederSettings settings, - InputStream requestInputStream, - BlockingQueue<OperationStatus> repliesFromOldMessages, - AtomicInteger threadsAvailableForFeeding) throws InterruptedException { + private void feed(FeederSettings settings, + InputStream requestInputStream, + BlockingQueue<OperationStatus> repliesFromOldMessages, + AtomicInteger threadsAvailableForFeeding) throws InterruptedException { while (true) { - Optional<DocumentOperationMessageV3> msg = pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages); if (! msg.isPresent()) { @@ -240,8 +236,11 @@ class ClientFeederV3 { result = sendMessage(settings, msg.get(), threadsAvailableForFeeding); } catch (RuntimeException e) { - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), Exceptions.toMessageString(e), - ErrorCode.ERROR, false, msg.get().getMessage())); + repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + Exceptions.toMessageString(e), + ErrorCode.ERROR, + false, + msg.get().getMessage())); continue; } @@ -250,15 +249,21 @@ class ClientFeederV3 { updateOpsPerSec(); log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.get().getOperationId()); } else if (!result.getError().isFatal()) { - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(), - ErrorCode.TRANSIENT_ERROR, false, msg.get().getMessage())); + repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + result.getError().getMessage(), + ErrorCode.TRANSIENT_ERROR, + false, + msg.get().getMessage())); continue; } else { // should probably not happen, but everybody knows stuff that // shouldn't happen, happens all the time boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; - repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(), - ErrorCode.ERROR, isConditionNotMet, msg.get().getMessage())); + repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), + result.getError().getMessage(), + ErrorCode.ERROR, + isConditionNotMet, + msg.get().getMessage())); continue; } } @@ -274,8 +279,9 @@ class ClientFeederV3 { // protected for mocking /** Returns the next message in the stream, or null if none */ - protected DocumentOperationMessageV3 getNextMessage( - String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception { + protected DocumentOperationMessageV3 getNextMessage(String operationId, + InputStream requestInputStream, + FeederSettings settings) throws Exception { FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings); // This is a bit hard to set up while testing, so we accept that things are not perfect. diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java index 5052692a379..1df0ce3594b 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -39,12 +39,11 @@ public class FeedHandler extends LoggingRequestHandler { private final DocumentApiMetrics metricsHelper; @Inject - public FeedHandler( - LoggingRequestHandler.Context parentCtx, - DocumentmanagerConfig documentManagerConfig, - SessionCache sessionCache, - ThreadpoolConfig threadpoolConfig, - MetricReceiver metricReceiver) throws Exception { + public FeedHandler(LoggingRequestHandler.Context parentCtx, + DocumentmanagerConfig documentManagerConfig, + SessionCache sessionCache, + ThreadpoolConfig threadpoolConfig, + MetricReceiver metricReceiver) throws Exception { super(parentCtx); metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java index 0ca608baa87..37803d96714 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -46,12 +46,11 @@ public class FeedHandlerV3 extends LoggingRequestHandler { private final AtomicInteger threadsAvailableForFeeding; private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName()); - public FeedHandlerV3( - LoggingRequestHandler.Context parentCtx, - DocumentmanagerConfig documentManagerConfig, - SessionCache sessionCache, - ThreadpoolConfig threadpoolConfig, - DocumentApiMetrics metricsHelper) { + public FeedHandlerV3(LoggingRequestHandler.Context parentCtx, + DocumentmanagerConfig documentManagerConfig, + SessionCache sessionCache, + ThreadpoolConfig threadpoolConfig, + DocumentApiMetrics metricsHelper) { super(parentCtx); docTypeManager = new DocumentTypeManager(documentManagerConfig); this.sessionCache = sessionCache; @@ -76,21 +75,19 @@ public class FeedHandlerV3 extends LoggingRequestHandler { // verify the version header first. This is done in the old code. @Override public HttpResponse handle(HttpRequest request) { - final String clientId = clientId(request); - final ClientFeederV3 clientFeederV3; + String clientId = clientId(request); + ClientFeederV3 clientFeederV3; synchronized (monitor) { if (! clientFeederByClientId.containsKey(clientId)) { SourceSessionParams sourceSessionParams = sourceSessionParams(request); - clientFeederByClientId.put( - clientId, - new ClientFeederV3( - retainSource(sessionCache, sourceSessionParams), - new FeedReaderFactory(), - docTypeManager, - clientId, - metric, - feedReplyHandler, - threadsAvailableForFeeding)); + clientFeederByClientId.put(clientId, + new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), + new FeedReaderFactory(), + docTypeManager, + clientId, + metric, + feedReplyHandler, + threadsAvailableForFeeding)); } clientFeederV3 = clientFeederByClientId.get(clientId); } |