summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-23 13:56:54 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-23 14:28:53 +0200
commitfc7f77270eaca41d85d7d2499b5ff10f6afaa68e (patch)
treead7bc70e8975ccb5d478a8de2e3d5f1ef8d4f97f /vespaclient-core
parent979a2980aeaf89cc111f9dec74fa46cf191a8d8f (diff)
Some more cleanup.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java3
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java13
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java160
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java39
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java3
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java5
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java45
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java87
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java33
-rw-r--r--vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java24
10 files changed, 73 insertions, 339 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
index c644b551a79..4e9e17d0b5f 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
@@ -3,7 +3,6 @@ package com.yahoo.feedapi;
import com.yahoo.document.Document;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
@@ -47,7 +46,7 @@ public class DummySessionFactory implements SessionFactory {
}
@Override
- public SendSession createSendSession(ReplyHandler r, Metric metric) {
+ public SendSession createSendSession(ReplyHandler r) {
if (output != null) {
return new DumpDocuments(output, r, this);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
index 3a435f7cda2..2d35adfab75 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import com.yahoo.jdisc.Metric;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.clientmetrics.ClientMetrics;
@@ -14,22 +13,16 @@ public class FeedContext {
private final MessagePropertyProcessor propertyProcessor;
private final DocumentTypeManager docTypeManager;
private final ClientMetrics metrics;
- private final Metric metric;
private Map<String, SharedSender> senders = new TreeMap<>();
public static final Object sync = new Object();
public static FeedContext instance = null;
- public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, Metric metric) {
+ public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager) {
this.propertyProcessor = propertyProcessor;
this.factory = factory;
docTypeManager = manager;
metrics = new ClientMetrics();
- this.metric = metric;
- }
-
- public Metric getMetricAPI() {
- return metric;
}
private void shutdownSenders() {
@@ -43,7 +36,7 @@ public class FeedContext {
Map<String, SharedSender> newSenders = new TreeMap<>();
for (Map.Entry<String, SharedSender> sender : senders.entrySet()) {
- newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric));
+ newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue()));
}
shutdownSenders();
@@ -58,7 +51,7 @@ public class FeedContext {
SharedSender sender = senders.get(route);
if (sender == null) {
- sender = new SharedSender(route, factory, null, metric);
+ sender = new SharedSender(route, factory, null);
senders.put(route, sender);
metrics.addRouteMetricSet(sender.getMetrics());
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
index dc78f037534..ecfd3c9eded 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
@@ -2,9 +2,11 @@
package com.yahoo.feedapi;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.RateThrottlingPolicy;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy;
import com.yahoo.vespaclient.config.FeederConfig;
@@ -19,61 +21,28 @@ public class FeederOptions {
private boolean abortOnDocumentError = true;
private boolean abortOnSendError = true;
private boolean retryEnabled = true;
- private double retryDelay = 1;
private double timeout = 60;
private int maxPendingBytes = 0;
private int maxPendingDocs = 0;
private double maxFeedRate = 0.0;
- private String documentManagerConfigId = "client";
- private String idPrefix = "";
private String route = "default";
- private String routingConfigId;
- private String slobrokConfigId;
private int traceLevel;
private int mbusPort;
private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3;
- private boolean priorityExplicitlySet = false;
private String docprocChain = "";
/** Constructs an options object with all default values. */
- public FeederOptions() {
+ FeederOptions() {
// empty
}
- /**
- * Implements the copy constructor.
- *
- * @param src The options to copy.
- */
- public FeederOptions(FeederOptions src) {
- abortOnDocumentError = src.abortOnDocumentError;
- abortOnSendError = src.abortOnSendError;
- retryEnabled = src.retryEnabled;
- retryDelay = src.retryDelay;
- timeout = src.timeout;
- maxPendingBytes = src.maxPendingBytes;
- maxPendingDocs = src.maxPendingDocs;
- maxFeedRate = src.maxFeedRate;
- documentManagerConfigId = src.documentManagerConfigId;
- idPrefix = src.idPrefix;
- route = src.route;
- routingConfigId = src.routingConfigId;
- slobrokConfigId = src.slobrokConfigId;
- traceLevel = src.traceLevel;
- mbusPort = src.mbusPort;
- priority = src.priority;
- docprocChain = src.docprocChain;
- }
-
/** Constructor that sets values from config. */
- public FeederOptions(FeederConfig config) {
+ FeederOptions(FeederConfig config) {
setAbortOnDocumentError(config.abortondocumenterror());
setAbortOnSendError(config.abortonsenderror());
- setIdPrefix(config.idprefix());
setMaxPendingBytes(config.maxpendingbytes());
setMaxPendingDocs(config.maxpendingdocs());
setRetryEnabled(config.retryenabled());
- setRetryDelay(config.retrydelay());
setRoute(config.route());
setTimeout(config.timeout());
setTraceLevel(config.tracelevel());
@@ -82,31 +51,18 @@ public class FeederOptions {
setMaxFeedRate(config.maxfeedrate());
}
- public void setMaxFeedRate(double feedRate) {
+ void setMaxFeedRate(double feedRate) {
maxFeedRate = feedRate;
}
-
- public double getMaxFeedRate() {
- return maxFeedRate;
- }
-
- public boolean getRetryEnabled() {
+ boolean getRetryEnabled() {
return retryEnabled;
}
- public void setRetryEnabled(boolean retryEnabled) {
+ private void setRetryEnabled(boolean retryEnabled) {
this.retryEnabled = retryEnabled;
}
- public double getRetryDelay() {
- return retryDelay;
- }
-
- public void setRetryDelay(double retryDelay) {
- this.retryDelay = retryDelay;
- }
-
public double getTimeout() {
return timeout;
}
@@ -115,46 +71,30 @@ public class FeederOptions {
this.timeout = timeout;
}
- public int getMaxPendingBytes() {
- return maxPendingBytes;
- }
-
- public void setMaxPendingBytes(int maxPendingBytes) {
+ private void setMaxPendingBytes(int maxPendingBytes) {
this.maxPendingBytes = maxPendingBytes;
}
- public int getMaxPendingDocs() {
- return maxPendingDocs;
- }
-
- public void setMaxPendingDocs(int maxPendingDocs) {
+ private void setMaxPendingDocs(int maxPendingDocs) {
this.maxPendingDocs = maxPendingDocs;
}
- public boolean abortOnDocumentError() {
+ boolean abortOnDocumentError() {
return abortOnDocumentError;
}
- public void setAbortOnDocumentError(boolean abortOnDocumentError) {
+ void setAbortOnDocumentError(boolean abortOnDocumentError) {
this.abortOnDocumentError = abortOnDocumentError;
}
- public boolean abortOnSendError() {
+ boolean abortOnSendError() {
return abortOnSendError;
}
- public void setAbortOnSendError(boolean abortOnSendError) {
+ private void setAbortOnSendError(boolean abortOnSendError) {
this.abortOnSendError = abortOnSendError;
}
- public String getIdPrefix() {
- return idPrefix;
- }
-
- public void setIdPrefix(String idPrefix) {
- this.idPrefix = idPrefix;
- }
-
public void setRoute(String route) {
this.route = route;
}
@@ -167,35 +107,7 @@ public class FeederOptions {
return priority;
}
- public boolean isPriorityExplicitlySet() {
- return priorityExplicitlySet;
- }
-
- public String getSlobrokConfigId() {
- return slobrokConfigId;
- }
-
- public void setSlobrokConfigId(String slobrokConfigId) {
- this.slobrokConfigId = slobrokConfigId;
- }
-
- public String getRoutingConfigId() {
- return routingConfigId;
- }
-
- public void setRoutingConfigId(String routingConfigId) {
- this.routingConfigId = routingConfigId;
- }
-
- public String getDocumentManagerConfigId() {
- return documentManagerConfigId;
- }
-
- public void setDocumentManagerConfigId(String documentManagerConfigId) {
- this.documentManagerConfigId = documentManagerConfigId;
- }
-
- public int getTraceLevel() {
+ int getTraceLevel() {
return traceLevel;
}
@@ -203,24 +115,19 @@ public class FeederOptions {
this.traceLevel = traceLevel;
}
- public int getMessageBusPort() {
- return mbusPort;
- }
-
- public void setMessageBusPort(int mbusPort) {
+ private void setMessageBusPort(int mbusPort) {
this.mbusPort = mbusPort;
}
public void setPriority(DocumentProtocol.Priority priority) {
this.priority = priority;
- this.priorityExplicitlySet = true;
}
- public String getDocprocChain() {
+ String getDocprocChain() {
return docprocChain;
}
- public void setDocprocChain(String chain) {
+ private void setDocprocChain(String chain) {
docprocChain = chain;
}
@@ -228,7 +135,7 @@ public class FeederOptions {
* Creates a source session params object with parameters set as these options
* dictate.
*/
- public SourceSessionParams toSourceSessionParams() {
+ SourceSessionParams toSourceSessionParams() {
SourceSessionParams params = new SourceSessionParams();
StaticThrottlePolicy policy;
@@ -252,7 +159,7 @@ public class FeederOptions {
return params;
}
- public RPCNetworkParams getNetworkParams() {
+ RPCNetworkParams getNetworkParams() {
try {
RPCNetworkParams networkParams = new RPCNetworkParams();
if (mbusPort != -1) {
@@ -271,19 +178,13 @@ public class FeederOptions {
"abortOnDocumentError=" + abortOnDocumentError +
", abortOnSendError=" + abortOnSendError +
", retryEnabled=" + retryEnabled +
- ", retryDelay=" + retryDelay +
", timeout=" + timeout +
", maxPendingBytes=" + maxPendingBytes +
", maxPendingDocs=" + maxPendingDocs +
- ", documentManagerConfigId='" + documentManagerConfigId + '\'' +
- ", idPrefix='" + idPrefix + '\'' +
", route='" + route + '\'' +
- ", routingConfigId='" + routingConfigId + '\'' +
- ", slobrokConfigId='" + slobrokConfigId + '\'' +
", traceLevel=" + traceLevel +
", mbusPort=" + mbusPort +
", priority=" + priority.name() +
- ", priorityExplicitlySet=" + priorityExplicitlySet +
", docprocChain='" + docprocChain + '\'' +
'}';
}
@@ -301,24 +202,12 @@ public class FeederOptions {
if (maxPendingDocs != that.maxPendingDocs) return false;
if (maxFeedRate != that.maxFeedRate) return false;
if (mbusPort != that.mbusPort) return false;
- if (priorityExplicitlySet != that.priorityExplicitlySet) return false;
- if (Double.compare(that.retryDelay, retryDelay) != 0) return false;
if (retryEnabled != that.retryEnabled) return false;
if (Double.compare(that.timeout, timeout) != 0) return false;
if (traceLevel != that.traceLevel) return false;
if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false;
- if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) {
- return false;
- }
- if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false;
if (priority != that.priority) return false;
if (route != null ? !route.equals(that.route) : that.route != null) return false;
- if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) {
- return false;
- }
- if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) {
- return false;
- }
return true;
}
@@ -330,22 +219,15 @@ public class FeederOptions {
result = (abortOnDocumentError ? 1 : 0);
result = 31 * result + (abortOnSendError ? 1 : 0);
result = 31 * result + (retryEnabled ? 1 : 0);
- temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L;
- result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L;
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + maxPendingBytes;
result = 31 * result + maxPendingDocs;
result = 31 * result + ((int)(maxFeedRate * 1000));
- result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0);
- result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0);
result = 31 * result + (route != null ? route.hashCode() : 0);
- result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0);
- result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0);
result = 31 * result + traceLevel;
result = 31 * result + mbusPort;
result = 31 * result + (priority != null ? priority.hashCode() : 0);
- result = 31 * result + (priorityExplicitlySet ? 1 : 0);
result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0);
return result;
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
index 12a4ecde493..5e52da23c12 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
@@ -5,34 +5,21 @@ import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import java.util.Collections;
-
public class MessageBusSessionFactory implements SessionFactory {
private final MessageBusDocumentAccess access;
private final MessagePropertyProcessor processor;
- private interface Metrics {
- String NUM_OPERATIONS = "num_operations";
- String NUM_PUTS = "num_puts";
- String NUM_REMOVES = "num_removes";
- String NUM_UPDATES = "num_updates";
- }
-
public MessageBusSessionFactory(MessagePropertyProcessor processor) {
this(processor, null, null);
}
- public MessageBusSessionFactory(MessagePropertyProcessor processor,
+ private MessageBusSessionFactory(MessagePropertyProcessor processor,
DocumentmanagerConfig documentmanagerConfig,
SlobroksConfig slobroksConfig) {
this.processor = processor;
@@ -53,10 +40,9 @@ public class MessageBusSessionFactory implements SessionFactory {
}
@Override
- public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) {
+ public synchronized SendSession createSendSession(ReplyHandler handler) {
return new SourceSessionWrapper(
- access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()),
- metric);
+ access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()));
}
public void shutDown() {
@@ -66,18 +52,13 @@ public class MessageBusSessionFactory implements SessionFactory {
private class SourceSessionWrapper extends SendSession {
private final SourceSession session;
- private final Metric metric;
- private final Metric.Context context;
- private SourceSessionWrapper(SourceSession session, Metric metric) {
+ private SourceSessionWrapper(SourceSession session) {
this.session = session;
- this.metric = metric;
- this.context = metric.createContext(Collections.<String, String>emptyMap());
}
@Override
protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
- updateCounters(m);
if (blockIfQueueFull) {
return session.sendBlocking(m);
} else {
@@ -85,18 +66,6 @@ public class MessageBusSessionFactory implements SessionFactory {
}
}
- private void updateCounters(Message m) {
- metric.add(Metrics.NUM_OPERATIONS, 1, context);
-
- if (m instanceof PutDocumentMessage) {
- metric.add(Metrics.NUM_PUTS, 1, context);
- } else if (m instanceof RemoveDocumentMessage) {
- metric.add(Metrics.NUM_REMOVES, 1, context);
- } else if (m instanceof UpdateDocumentMessage) {
- metric.add(Metrics.NUM_UPDATES, 1, context);
- }
- }
-
@Override
public void close() {
session.close();
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
index 6dce2b6f315..52583052ddf 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.ReplyHandler;
/**
@@ -17,5 +16,5 @@ public interface SessionFactory {
* @param handler A replyhandler to callback when receiving replies from messagebus
* @return The session to use for sending messages.
*/
- SendSession createSendSession(ReplyHandler handler, Metric metric);
+ SendSession createSendSession(ReplyHandler handler);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 52d21897a15..4fcbcf4d634 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -2,7 +2,6 @@
package com.yahoo.feedapi;
import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Message;
@@ -31,8 +30,8 @@ public class SharedSender implements ReplyHandler {
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
*/
- SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) {
- sender = (factory != null) ? factory.createSendSession(this, metric) : null;
+ SharedSender(String route, SessionFactory factory, SharedSender oldSender) {
+ sender = (factory != null) ? factory.createSendSession(this) : null;
metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
index da226e9ece6..58cceb2d8ee 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -2,8 +2,6 @@
package com.yahoo.feedhandler;
import com.yahoo.clientmetrics.RouteMetricSet;
-import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.container.jdisc.VespaHeaders;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
@@ -14,18 +12,13 @@ import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
import com.yahoo.search.result.ErrorMessage;
-import com.yahoo.text.Utf8String;
-import com.yahoo.text.XMLWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import java.util.stream.Stream;
-public final class FeedResponse extends HttpResponse implements SharedSender.ResultCallback {
+public final class FeedResponse implements SharedSender.ResultCallback {
private final static Logger log = Logger.getLogger(FeedResponse.class.getName());
private final List<ErrorMessage> errorMessages = new ArrayList<>();
@@ -37,7 +30,6 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
private final SharedSender.Pending pendingNumber = new SharedSender.Pending();
FeedResponse(RouteMetricSet metrics) {
- super(com.yahoo.jdisc.http.HttpResponse.Status.OK);
this.metrics = metrics;
}
@@ -49,41 +41,6 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
abortOnError = abort;
}
- @Override
- public void render(OutputStream outputStream) throws IOException {
- if ( ! errorMessages.isEmpty())
- setStatus(VespaHeaders.getStatus(false, errorMessages.get(0), errorMessages.iterator()));
-
- XMLWriter writer = new XMLWriter(new OutputStreamWriter(outputStream));
- writer.openTag("result");
-
- if (traces.length() > 0) {
- writer.openTag("trace");
- writer.append(traces);
- writer.closeTag();
- }
- if (!errors.isEmpty()) {
- writer.openTag("errors");
- writer.attribute(new Utf8String("count"), errors.size());
-
- for (int i = 0; i < errors.size() && i < 10; ++i) {
- writer.openTag("error");
- writer.attribute(new Utf8String("message"), errors.get(i));
- writer.closeTag();
- }
- writer.closeTag();
- }
-
- writer.closeTag();
- writer.flush();
- outputStream.close();
- }
-
- @Override
- public java.lang.String getContentType() {
- return "application/xml";
- }
-
private String prettyPrint(Message m) {
if (m instanceof PutDocumentMessage) {
return "PUT[" + ((PutDocumentMessage)m).getDocumentPut().getDocument().getId() + "] ";
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 59cd86670e1..ca2da28d0dd 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -2,11 +2,8 @@
package com.yahoo.feedhandler;
import com.yahoo.clientmetrics.RouteMetricSet;
-import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.protect.Error;
-import com.yahoo.feedapi.DocprocMessageProcessor;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.Feeder;
import com.yahoo.feedapi.JsonFeeder;
@@ -16,8 +13,6 @@ import com.yahoo.feedapi.SingleSender;
import com.yahoo.feedapi.XMLFeeder;
import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -30,68 +25,50 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
public static final String JSON_INPUT = "jsonInput";
- private final AtomicInteger busyThreads = new AtomicInteger(0);
- private final int maxBusyThreads;
-
- private VespaFeedHandler(FeedContext context, Executor executor) {
- super(context, executor);
- this.maxBusyThreads = 32;
- }
-
- public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) {
- return new VespaFeedHandler(context, executor);
+ private VespaFeedHandler(FeedContext context) {
+ super(context);
}
- @Override
- public HttpResponse handle(HttpRequest request) {
- return handle(request, null, 1);
+ public static VespaFeedHandler createFromContext(FeedContext context) {
+ return new VespaFeedHandler(context);
}
- public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
- try {
- int busy = busyThreads.incrementAndGet();
- if (busy > maxBusyThreads)
- return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE);
-
- MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
+ public FeedResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
+ MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
- String route = properties.getRoute().toString();
- FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
+ String route = properties.getRoute().toString();
+ FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
- SingleSender sender = new SingleSender(response, getSharedSender(route));
- sender.addMessageProcessor(properties);
- sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
- ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
- Feeder feeder = createFeeder(feedAccess, request);
- feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
- feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
- response.setAbortOnFeedError(properties.getAbortOnFeedError());
+ SingleSender sender = new SingleSender(response, getSharedSender(route));
+ sender.addMessageProcessor(properties);
+ ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
+ Feeder feeder = createFeeder(feedAccess, request);
+ feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
+ feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
+ response.setAbortOnFeedError(properties.getAbortOnFeedError());
- List<String> errors = feeder.parse();
- for (String s : errors) {
- response.addXMLParseError(s);
- }
- if (errors.size() > 0 && feeder instanceof XMLFeeder) {
- response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
- }
+ List<String> errors = feeder.parse();
+ for (String s : errors) {
+ response.addXMLParseError(s);
+ }
+ if (errors.size() > 0 && feeder instanceof XMLFeeder) {
+ response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
+ }
- sender.done();
- feedAccess.close();
- long millis = getTimeoutMillis(request);
- boolean completed = sender.waitForPending(millis);
- if (!completed) {
- response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses");
- }
- response.done();
- return response;
- } finally {
- busyThreads.decrementAndGet();
+ sender.done();
+ feedAccess.close();
+ long millis = getTimeoutMillis(request);
+ boolean completed = sender.waitForPending(millis);
+ if (!completed) {
+ response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses");
}
+ response.done();
+ return response;
+
}
private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) {
- String contentType = request.getHeader("Content-Type");
- if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
+ if (Boolean.valueOf(request.getProperty(JSON_INPUT))) {
return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
} else {
return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
index ec2b7202f09..532f10663b9 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
@@ -1,10 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedhandler;
-import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
-import com.yahoo.docproc.DocprocService;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.MessagePropertyProcessor;
@@ -12,22 +9,18 @@ import com.yahoo.feedapi.SharedSender;
import com.yahoo.search.query.ParameterParser;
-import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.Executor;
-import java.util.zip.GZIPInputStream;
-public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
+public abstract class VespaFeedHandlerBase {
protected FeedContext context;
private final long defaultTimeoutMillis;
- VespaFeedHandlerBase(FeedContext context, Executor executor) {
- this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis());
+ VespaFeedHandlerBase(FeedContext context) {
+ this(context, context.getPropertyProcessor().getDefaultTimeoutMillis());
}
- private VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) {
- super(executor, context.getMetricAPI());
+ private VespaFeedHandlerBase(FeedContext context, long defaultTimeoutMillis) {
this.context = context;
this.defaultTimeoutMillis = defaultTimeoutMillis;
}
@@ -36,14 +29,6 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
return context.getSharedSender(route);
}
- DocprocService getDocprocChain(HttpRequest request) {
- return context.getPropertyProcessor().getDocprocChain(request);
- }
-
- ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) {
- return context.getPropertyProcessor().getDocprocServiceRegistry(request);
- }
-
MessagePropertyProcessor getPropertyProcessor() {
return context.getPropertyProcessor();
}
@@ -55,15 +40,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
* @throws IllegalArgumentException if GZIP stream creation failed
*/
InputStream getRequestInputStream(HttpRequest request) {
- if ("gzip".equals(request.getHeader("Content-Encoding"))) {
- try {
- return new GZIPInputStream(request.getData());
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to create GZIP input stream from content", e);
- }
- } else {
- return request.getData();
- }
+ return request.getData();
}
protected DocumentTypeManager getDocumentTypeManager() {
diff --git a/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java
index 4dfde6f1d41..cb33b71424d 100644
--- a/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java
+++ b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java
@@ -1,9 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import static org.junit.Assert.*;
import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
*/
@@ -27,26 +29,6 @@ public class FeederOptionsTestCase {
assertTrue(f2.equals(f1));
assertTrue(f1.hashCode() == f2.hashCode());
- f1.setRoutingConfigId("blabla");
- assertFalse(f1.equals(f2));
- assertFalse(f2.equals(f1));
- assertFalse(f1.hashCode() == f2.hashCode());
-
- f2.setRoutingConfigId("blabla");
- assertTrue(f1.equals(f2));
- assertTrue(f2.equals(f1));
- assertTrue(f1.hashCode() == f2.hashCode());
-
- f1.setRetryDelay(5000);
- assertFalse(f1.equals(f2));
- assertFalse(f2.equals(f1));
- assertFalse(f1.hashCode() == f2.hashCode());
-
- f2.setRetryDelay(5000);
- assertTrue(f1.equals(f2));
- assertTrue(f2.equals(f1));
- assertTrue(f1.hashCode() == f2.hashCode());
-
f1.setRoute("all roads lead to rome");
assertFalse(f1.equals(f2));
assertFalse(f2.equals(f1));