summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedapi
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java3
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java13
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java160
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java39
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java3
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java5
6 files changed, 32 insertions, 191 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
index c644b551a79..4e9e17d0b5f 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
@@ -3,7 +3,6 @@ package com.yahoo.feedapi;
import com.yahoo.document.Document;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
@@ -47,7 +46,7 @@ public class DummySessionFactory implements SessionFactory {
}
@Override
- public SendSession createSendSession(ReplyHandler r, Metric metric) {
+ public SendSession createSendSession(ReplyHandler r) {
if (output != null) {
return new DumpDocuments(output, r, this);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
index 3a435f7cda2..2d35adfab75 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import com.yahoo.jdisc.Metric;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.clientmetrics.ClientMetrics;
@@ -14,22 +13,16 @@ public class FeedContext {
private final MessagePropertyProcessor propertyProcessor;
private final DocumentTypeManager docTypeManager;
private final ClientMetrics metrics;
- private final Metric metric;
private Map<String, SharedSender> senders = new TreeMap<>();
public static final Object sync = new Object();
public static FeedContext instance = null;
- public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, Metric metric) {
+ public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager) {
this.propertyProcessor = propertyProcessor;
this.factory = factory;
docTypeManager = manager;
metrics = new ClientMetrics();
- this.metric = metric;
- }
-
- public Metric getMetricAPI() {
- return metric;
}
private void shutdownSenders() {
@@ -43,7 +36,7 @@ public class FeedContext {
Map<String, SharedSender> newSenders = new TreeMap<>();
for (Map.Entry<String, SharedSender> sender : senders.entrySet()) {
- newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric));
+ newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue()));
}
shutdownSenders();
@@ -58,7 +51,7 @@ public class FeedContext {
SharedSender sender = senders.get(route);
if (sender == null) {
- sender = new SharedSender(route, factory, null, metric);
+ sender = new SharedSender(route, factory, null);
senders.put(route, sender);
metrics.addRouteMetricSet(sender.getMetrics());
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
index dc78f037534..ecfd3c9eded 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
@@ -2,9 +2,11 @@
package com.yahoo.feedapi;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.RateThrottlingPolicy;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy;
import com.yahoo.vespaclient.config.FeederConfig;
@@ -19,61 +21,28 @@ public class FeederOptions {
private boolean abortOnDocumentError = true;
private boolean abortOnSendError = true;
private boolean retryEnabled = true;
- private double retryDelay = 1;
private double timeout = 60;
private int maxPendingBytes = 0;
private int maxPendingDocs = 0;
private double maxFeedRate = 0.0;
- private String documentManagerConfigId = "client";
- private String idPrefix = "";
private String route = "default";
- private String routingConfigId;
- private String slobrokConfigId;
private int traceLevel;
private int mbusPort;
private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3;
- private boolean priorityExplicitlySet = false;
private String docprocChain = "";
/** Constructs an options object with all default values. */
- public FeederOptions() {
+ FeederOptions() {
// empty
}
- /**
- * Implements the copy constructor.
- *
- * @param src The options to copy.
- */
- public FeederOptions(FeederOptions src) {
- abortOnDocumentError = src.abortOnDocumentError;
- abortOnSendError = src.abortOnSendError;
- retryEnabled = src.retryEnabled;
- retryDelay = src.retryDelay;
- timeout = src.timeout;
- maxPendingBytes = src.maxPendingBytes;
- maxPendingDocs = src.maxPendingDocs;
- maxFeedRate = src.maxFeedRate;
- documentManagerConfigId = src.documentManagerConfigId;
- idPrefix = src.idPrefix;
- route = src.route;
- routingConfigId = src.routingConfigId;
- slobrokConfigId = src.slobrokConfigId;
- traceLevel = src.traceLevel;
- mbusPort = src.mbusPort;
- priority = src.priority;
- docprocChain = src.docprocChain;
- }
-
/** Constructor that sets values from config. */
- public FeederOptions(FeederConfig config) {
+ FeederOptions(FeederConfig config) {
setAbortOnDocumentError(config.abortondocumenterror());
setAbortOnSendError(config.abortonsenderror());
- setIdPrefix(config.idprefix());
setMaxPendingBytes(config.maxpendingbytes());
setMaxPendingDocs(config.maxpendingdocs());
setRetryEnabled(config.retryenabled());
- setRetryDelay(config.retrydelay());
setRoute(config.route());
setTimeout(config.timeout());
setTraceLevel(config.tracelevel());
@@ -82,31 +51,18 @@ public class FeederOptions {
setMaxFeedRate(config.maxfeedrate());
}
- public void setMaxFeedRate(double feedRate) {
+ void setMaxFeedRate(double feedRate) {
maxFeedRate = feedRate;
}
-
- public double getMaxFeedRate() {
- return maxFeedRate;
- }
-
- public boolean getRetryEnabled() {
+ boolean getRetryEnabled() {
return retryEnabled;
}
- public void setRetryEnabled(boolean retryEnabled) {
+ private void setRetryEnabled(boolean retryEnabled) {
this.retryEnabled = retryEnabled;
}
- public double getRetryDelay() {
- return retryDelay;
- }
-
- public void setRetryDelay(double retryDelay) {
- this.retryDelay = retryDelay;
- }
-
public double getTimeout() {
return timeout;
}
@@ -115,46 +71,30 @@ public class FeederOptions {
this.timeout = timeout;
}
- public int getMaxPendingBytes() {
- return maxPendingBytes;
- }
-
- public void setMaxPendingBytes(int maxPendingBytes) {
+ private void setMaxPendingBytes(int maxPendingBytes) {
this.maxPendingBytes = maxPendingBytes;
}
- public int getMaxPendingDocs() {
- return maxPendingDocs;
- }
-
- public void setMaxPendingDocs(int maxPendingDocs) {
+ private void setMaxPendingDocs(int maxPendingDocs) {
this.maxPendingDocs = maxPendingDocs;
}
- public boolean abortOnDocumentError() {
+ boolean abortOnDocumentError() {
return abortOnDocumentError;
}
- public void setAbortOnDocumentError(boolean abortOnDocumentError) {
+ void setAbortOnDocumentError(boolean abortOnDocumentError) {
this.abortOnDocumentError = abortOnDocumentError;
}
- public boolean abortOnSendError() {
+ boolean abortOnSendError() {
return abortOnSendError;
}
- public void setAbortOnSendError(boolean abortOnSendError) {
+ private void setAbortOnSendError(boolean abortOnSendError) {
this.abortOnSendError = abortOnSendError;
}
- public String getIdPrefix() {
- return idPrefix;
- }
-
- public void setIdPrefix(String idPrefix) {
- this.idPrefix = idPrefix;
- }
-
public void setRoute(String route) {
this.route = route;
}
@@ -167,35 +107,7 @@ public class FeederOptions {
return priority;
}
- public boolean isPriorityExplicitlySet() {
- return priorityExplicitlySet;
- }
-
- public String getSlobrokConfigId() {
- return slobrokConfigId;
- }
-
- public void setSlobrokConfigId(String slobrokConfigId) {
- this.slobrokConfigId = slobrokConfigId;
- }
-
- public String getRoutingConfigId() {
- return routingConfigId;
- }
-
- public void setRoutingConfigId(String routingConfigId) {
- this.routingConfigId = routingConfigId;
- }
-
- public String getDocumentManagerConfigId() {
- return documentManagerConfigId;
- }
-
- public void setDocumentManagerConfigId(String documentManagerConfigId) {
- this.documentManagerConfigId = documentManagerConfigId;
- }
-
- public int getTraceLevel() {
+ int getTraceLevel() {
return traceLevel;
}
@@ -203,24 +115,19 @@ public class FeederOptions {
this.traceLevel = traceLevel;
}
- public int getMessageBusPort() {
- return mbusPort;
- }
-
- public void setMessageBusPort(int mbusPort) {
+ private void setMessageBusPort(int mbusPort) {
this.mbusPort = mbusPort;
}
public void setPriority(DocumentProtocol.Priority priority) {
this.priority = priority;
- this.priorityExplicitlySet = true;
}
- public String getDocprocChain() {
+ String getDocprocChain() {
return docprocChain;
}
- public void setDocprocChain(String chain) {
+ private void setDocprocChain(String chain) {
docprocChain = chain;
}
@@ -228,7 +135,7 @@ public class FeederOptions {
* Creates a source session params object with parameters set as these options
* dictate.
*/
- public SourceSessionParams toSourceSessionParams() {
+ SourceSessionParams toSourceSessionParams() {
SourceSessionParams params = new SourceSessionParams();
StaticThrottlePolicy policy;
@@ -252,7 +159,7 @@ public class FeederOptions {
return params;
}
- public RPCNetworkParams getNetworkParams() {
+ RPCNetworkParams getNetworkParams() {
try {
RPCNetworkParams networkParams = new RPCNetworkParams();
if (mbusPort != -1) {
@@ -271,19 +178,13 @@ public class FeederOptions {
"abortOnDocumentError=" + abortOnDocumentError +
", abortOnSendError=" + abortOnSendError +
", retryEnabled=" + retryEnabled +
- ", retryDelay=" + retryDelay +
", timeout=" + timeout +
", maxPendingBytes=" + maxPendingBytes +
", maxPendingDocs=" + maxPendingDocs +
- ", documentManagerConfigId='" + documentManagerConfigId + '\'' +
- ", idPrefix='" + idPrefix + '\'' +
", route='" + route + '\'' +
- ", routingConfigId='" + routingConfigId + '\'' +
- ", slobrokConfigId='" + slobrokConfigId + '\'' +
", traceLevel=" + traceLevel +
", mbusPort=" + mbusPort +
", priority=" + priority.name() +
- ", priorityExplicitlySet=" + priorityExplicitlySet +
", docprocChain='" + docprocChain + '\'' +
'}';
}
@@ -301,24 +202,12 @@ public class FeederOptions {
if (maxPendingDocs != that.maxPendingDocs) return false;
if (maxFeedRate != that.maxFeedRate) return false;
if (mbusPort != that.mbusPort) return false;
- if (priorityExplicitlySet != that.priorityExplicitlySet) return false;
- if (Double.compare(that.retryDelay, retryDelay) != 0) return false;
if (retryEnabled != that.retryEnabled) return false;
if (Double.compare(that.timeout, timeout) != 0) return false;
if (traceLevel != that.traceLevel) return false;
if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false;
- if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) {
- return false;
- }
- if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false;
if (priority != that.priority) return false;
if (route != null ? !route.equals(that.route) : that.route != null) return false;
- if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) {
- return false;
- }
- if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) {
- return false;
- }
return true;
}
@@ -330,22 +219,15 @@ public class FeederOptions {
result = (abortOnDocumentError ? 1 : 0);
result = 31 * result + (abortOnSendError ? 1 : 0);
result = 31 * result + (retryEnabled ? 1 : 0);
- temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L;
- result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L;
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + maxPendingBytes;
result = 31 * result + maxPendingDocs;
result = 31 * result + ((int)(maxFeedRate * 1000));
- result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0);
- result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0);
result = 31 * result + (route != null ? route.hashCode() : 0);
- result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0);
- result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0);
result = 31 * result + traceLevel;
result = 31 * result + mbusPort;
result = 31 * result + (priority != null ? priority.hashCode() : 0);
- result = 31 * result + (priorityExplicitlySet ? 1 : 0);
result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0);
return result;
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
index 12a4ecde493..5e52da23c12 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
@@ -5,34 +5,21 @@ import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import java.util.Collections;
-
public class MessageBusSessionFactory implements SessionFactory {
private final MessageBusDocumentAccess access;
private final MessagePropertyProcessor processor;
- private interface Metrics {
- String NUM_OPERATIONS = "num_operations";
- String NUM_PUTS = "num_puts";
- String NUM_REMOVES = "num_removes";
- String NUM_UPDATES = "num_updates";
- }
-
public MessageBusSessionFactory(MessagePropertyProcessor processor) {
this(processor, null, null);
}
- public MessageBusSessionFactory(MessagePropertyProcessor processor,
+ private MessageBusSessionFactory(MessagePropertyProcessor processor,
DocumentmanagerConfig documentmanagerConfig,
SlobroksConfig slobroksConfig) {
this.processor = processor;
@@ -53,10 +40,9 @@ public class MessageBusSessionFactory implements SessionFactory {
}
@Override
- public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) {
+ public synchronized SendSession createSendSession(ReplyHandler handler) {
return new SourceSessionWrapper(
- access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()),
- metric);
+ access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()));
}
public void shutDown() {
@@ -66,18 +52,13 @@ public class MessageBusSessionFactory implements SessionFactory {
private class SourceSessionWrapper extends SendSession {
private final SourceSession session;
- private final Metric metric;
- private final Metric.Context context;
- private SourceSessionWrapper(SourceSession session, Metric metric) {
+ private SourceSessionWrapper(SourceSession session) {
this.session = session;
- this.metric = metric;
- this.context = metric.createContext(Collections.<String, String>emptyMap());
}
@Override
protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
- updateCounters(m);
if (blockIfQueueFull) {
return session.sendBlocking(m);
} else {
@@ -85,18 +66,6 @@ public class MessageBusSessionFactory implements SessionFactory {
}
}
- private void updateCounters(Message m) {
- metric.add(Metrics.NUM_OPERATIONS, 1, context);
-
- if (m instanceof PutDocumentMessage) {
- metric.add(Metrics.NUM_PUTS, 1, context);
- } else if (m instanceof RemoveDocumentMessage) {
- metric.add(Metrics.NUM_REMOVES, 1, context);
- } else if (m instanceof UpdateDocumentMessage) {
- metric.add(Metrics.NUM_UPDATES, 1, context);
- }
- }
-
@Override
public void close() {
session.close();
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
index 6dce2b6f315..52583052ddf 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.ReplyHandler;
/**
@@ -17,5 +16,5 @@ public interface SessionFactory {
* @param handler A replyhandler to callback when receiving replies from messagebus
* @return The session to use for sending messages.
*/
- SendSession createSendSession(ReplyHandler handler, Metric metric);
+ SendSession createSendSession(ReplyHandler handler);
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 52d21897a15..4fcbcf4d634 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -2,7 +2,6 @@
package com.yahoo.feedapi;
import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Message;
@@ -31,8 +30,8 @@ public class SharedSender implements ReplyHandler {
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
*/
- SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) {
- sender = (factory != null) ? factory.createSendSession(this, metric) : null;
+ SharedSender(String route, SessionFactory factory, SharedSender oldSender) {
+ sender = (factory != null) ? factory.createSendSession(this) : null;
metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null);
}