summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java48
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java11
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java33
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java6
4 files changed, 50 insertions, 48 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
index 119cafe3d1b..ec5fc0cad07 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -145,12 +145,11 @@ class ClientFeederV3 {
} catch (InterruptedException e) {
// NOP, just terminate
} catch (Throwable e) {
- log.log(LogLevel.WARNING, "Unhandled exception while feeding: "
- + Exceptions.toMessageString(e), e);
+ log.log(LogLevel.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
} finally {
replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null));
}
- return new FeedResponse(200, replies, 3 /* protocol version */, clientId, outstandingOperations.get(), hostName);
+ return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName);
} finally {
ongoingRequests.decrementAndGet();
threadsAvailableForFeeding.incrementAndGet();
@@ -200,10 +199,9 @@ class ClientFeederV3 {
}
}
- private Result sendMessage(
- FeederSettings settings,
- DocumentOperationMessageV3 msg,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
+ private Result sendMessage(FeederSettings settings,
+ DocumentOperationMessageV3 msg,
+ AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
Result result = null;
while (result == null || result.getError().getCode() == SEND_QUEUE_FULL) {
msg.getMessage().pushHandler(feedReplyHandler);
@@ -221,13 +219,11 @@ class ClientFeederV3 {
return result;
}
- private void feed(
- FeederSettings settings,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> repliesFromOldMessages,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
+ private void feed(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages,
+ AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
while (true) {
-
Optional<DocumentOperationMessageV3> msg = pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages);
if (! msg.isPresent()) {
@@ -240,8 +236,11 @@ class ClientFeederV3 {
result = sendMessage(settings, msg.get(), threadsAvailableForFeeding);
} catch (RuntimeException e) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), Exceptions.toMessageString(e),
- ErrorCode.ERROR, false, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ Exceptions.toMessageString(e),
+ ErrorCode.ERROR,
+ false,
+ msg.get().getMessage()));
continue;
}
@@ -250,15 +249,21 @@ class ClientFeederV3 {
updateOpsPerSec();
log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.get().getOperationId());
} else if (!result.getError().isFatal()) {
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(),
- ErrorCode.TRANSIENT_ERROR, false, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.TRANSIENT_ERROR,
+ false,
+ msg.get().getMessage()));
continue;
} else {
// should probably not happen, but everybody knows stuff that
// shouldn't happen, happens all the time
boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
- repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(),
- ErrorCode.ERROR, isConditionNotMet, msg.get().getMessage()));
+ repliesFromOldMessages.add(createOperationStatus(msg.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.ERROR,
+ isConditionNotMet,
+ msg.get().getMessage()));
continue;
}
}
@@ -274,8 +279,9 @@ class ClientFeederV3 {
// protected for mocking
/** Returns the next message in the stream, or null if none */
- protected DocumentOperationMessageV3 getNextMessage(
- String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception {
+ protected DocumentOperationMessageV3 getNextMessage(String operationId,
+ InputStream requestInputStream,
+ FeederSettings settings) throws Exception {
FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings);
// This is a bit hard to set up while testing, so we accept that things are not perfect.
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
index 5052692a379..1df0ce3594b 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -39,12 +39,11 @@ public class FeedHandler extends LoggingRequestHandler {
private final DocumentApiMetrics metricsHelper;
@Inject
- public FeedHandler(
- LoggingRequestHandler.Context parentCtx,
- DocumentmanagerConfig documentManagerConfig,
- SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
- MetricReceiver metricReceiver) throws Exception {
+ public FeedHandler(LoggingRequestHandler.Context parentCtx,
+ DocumentmanagerConfig documentManagerConfig,
+ SessionCache sessionCache,
+ ThreadpoolConfig threadpoolConfig,
+ MetricReceiver metricReceiver) throws Exception {
super(parentCtx);
metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper);
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
index 0ca608baa87..37803d96714 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
@@ -46,12 +46,11 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
private final AtomicInteger threadsAvailableForFeeding;
private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());
- public FeedHandlerV3(
- LoggingRequestHandler.Context parentCtx,
- DocumentmanagerConfig documentManagerConfig,
- SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
- DocumentApiMetrics metricsHelper) {
+ public FeedHandlerV3(LoggingRequestHandler.Context parentCtx,
+ DocumentmanagerConfig documentManagerConfig,
+ SessionCache sessionCache,
+ ThreadpoolConfig threadpoolConfig,
+ DocumentApiMetrics metricsHelper) {
super(parentCtx);
docTypeManager = new DocumentTypeManager(documentManagerConfig);
this.sessionCache = sessionCache;
@@ -76,21 +75,19 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
// verify the version header first. This is done in the old code.
@Override
public HttpResponse handle(HttpRequest request) {
- final String clientId = clientId(request);
- final ClientFeederV3 clientFeederV3;
+ String clientId = clientId(request);
+ ClientFeederV3 clientFeederV3;
synchronized (monitor) {
if (! clientFeederByClientId.containsKey(clientId)) {
SourceSessionParams sourceSessionParams = sourceSessionParams(request);
- clientFeederByClientId.put(
- clientId,
- new ClientFeederV3(
- retainSource(sessionCache, sourceSessionParams),
- new FeedReaderFactory(),
- docTypeManager,
- clientId,
- metric,
- feedReplyHandler,
- threadsAvailableForFeeding));
+ clientFeederByClientId.put(clientId,
+ new ClientFeederV3(retainSource(sessionCache, sourceSessionParams),
+ new FeedReaderFactory(),
+ docTypeManager,
+ clientId,
+ metric,
+ feedReplyHandler,
+ threadsAvailableForFeeding));
}
clientFeederV3 = clientFeederByClientId.get(clientId);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
index 25ef7651e28..addc6d41645 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
@@ -9,13 +9,13 @@ import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.vespaclient.config.FeederConfig;
-
/**
- * Just a wrapper for feeder options, from config or HTTP parameters.
+ * A wrapper for feeder options, from config or HTTP parameters.
*
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @author Einar M R Rosenvinge
*/
public class FeederOptions {
+
// These default values are here basically just for convenience in test cases,
// they are overridden by real config values in all other cases.
private boolean abortOnDocumentError = true;