diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi')
6 files changed, 32 insertions, 191 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java index c644b551a79..4e9e17d0b5f 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java @@ -3,7 +3,6 @@ package com.yahoo.feedapi; import com.yahoo.document.Document; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; @@ -47,7 +46,7 @@ public class DummySessionFactory implements SessionFactory { } @Override - public SendSession createSendSession(ReplyHandler r, Metric metric) { + public SendSession createSendSession(ReplyHandler r) { if (output != null) { return new DumpDocuments(output, r, this); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java index 3a435f7cda2..2d35adfab75 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import com.yahoo.jdisc.Metric; import com.yahoo.document.DocumentTypeManager; import com.yahoo.clientmetrics.ClientMetrics; @@ -14,22 +13,16 @@ public class FeedContext { private final MessagePropertyProcessor propertyProcessor; private final DocumentTypeManager docTypeManager; private final ClientMetrics metrics; - private final Metric metric; private Map<String, SharedSender> senders = new TreeMap<>(); public static final Object sync = new Object(); public static FeedContext instance = null; - public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, Metric metric) { + public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager) { this.propertyProcessor = propertyProcessor; this.factory = factory; docTypeManager = manager; metrics = new ClientMetrics(); - this.metric = metric; - } - - public Metric getMetricAPI() { - return metric; } private void shutdownSenders() { @@ -43,7 +36,7 @@ public class FeedContext { Map<String, SharedSender> newSenders = new TreeMap<>(); for (Map.Entry<String, SharedSender> sender : senders.entrySet()) { - newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric)); + newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue())); } shutdownSenders(); @@ -58,7 +51,7 @@ public class FeedContext { SharedSender sender = senders.get(route); if (sender == null) { - sender = new SharedSender(route, factory, null, metric); + sender = new SharedSender(route, factory, null); senders.put(route, sender); metrics.addRouteMetricSet(sender.getMetrics()); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java index dc78f037534..ecfd3c9eded 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java @@ -2,9 +2,11 @@ package com.yahoo.feedapi; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.RateThrottlingPolicy; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy; import com.yahoo.vespaclient.config.FeederConfig; @@ -19,61 +21,28 @@ public class FeederOptions { private boolean abortOnDocumentError = true; private boolean abortOnSendError = true; private boolean retryEnabled = true; - private double retryDelay = 1; private double timeout = 60; private int maxPendingBytes = 0; private int maxPendingDocs = 0; private double maxFeedRate = 0.0; - private String documentManagerConfigId = "client"; - private String idPrefix = ""; private String route = "default"; - private String routingConfigId; - private String slobrokConfigId; private int traceLevel; private int mbusPort; private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3; - private boolean priorityExplicitlySet = false; private String docprocChain = ""; /** Constructs an options object with all default values. */ - public FeederOptions() { + FeederOptions() { // empty } - /** - * Implements the copy constructor. - * - * @param src The options to copy. - */ - public FeederOptions(FeederOptions src) { - abortOnDocumentError = src.abortOnDocumentError; - abortOnSendError = src.abortOnSendError; - retryEnabled = src.retryEnabled; - retryDelay = src.retryDelay; - timeout = src.timeout; - maxPendingBytes = src.maxPendingBytes; - maxPendingDocs = src.maxPendingDocs; - maxFeedRate = src.maxFeedRate; - documentManagerConfigId = src.documentManagerConfigId; - idPrefix = src.idPrefix; - route = src.route; - routingConfigId = src.routingConfigId; - slobrokConfigId = src.slobrokConfigId; - traceLevel = src.traceLevel; - mbusPort = src.mbusPort; - priority = src.priority; - docprocChain = src.docprocChain; - } - /** Constructor that sets values from config. */ - public FeederOptions(FeederConfig config) { + FeederOptions(FeederConfig config) { setAbortOnDocumentError(config.abortondocumenterror()); setAbortOnSendError(config.abortonsenderror()); - setIdPrefix(config.idprefix()); setMaxPendingBytes(config.maxpendingbytes()); setMaxPendingDocs(config.maxpendingdocs()); setRetryEnabled(config.retryenabled()); - setRetryDelay(config.retrydelay()); setRoute(config.route()); setTimeout(config.timeout()); setTraceLevel(config.tracelevel()); @@ -82,31 +51,18 @@ public class FeederOptions { setMaxFeedRate(config.maxfeedrate()); } - public void setMaxFeedRate(double feedRate) { + void setMaxFeedRate(double feedRate) { maxFeedRate = feedRate; } - - public double getMaxFeedRate() { - return maxFeedRate; - } - - public boolean getRetryEnabled() { + boolean getRetryEnabled() { return retryEnabled; } - public void setRetryEnabled(boolean retryEnabled) { + private void setRetryEnabled(boolean retryEnabled) { this.retryEnabled = retryEnabled; } - public double getRetryDelay() { - return retryDelay; - } - - public void setRetryDelay(double retryDelay) { - this.retryDelay = retryDelay; - } - public double getTimeout() { return timeout; } @@ -115,46 +71,30 @@ public class FeederOptions { this.timeout = timeout; } - public int getMaxPendingBytes() { - return maxPendingBytes; - } - - public void setMaxPendingBytes(int maxPendingBytes) { + private void setMaxPendingBytes(int maxPendingBytes) { this.maxPendingBytes = maxPendingBytes; } - public int getMaxPendingDocs() { - return maxPendingDocs; - } - - public void setMaxPendingDocs(int maxPendingDocs) { + private void setMaxPendingDocs(int maxPendingDocs) { this.maxPendingDocs = maxPendingDocs; } - public boolean abortOnDocumentError() { + boolean abortOnDocumentError() { return abortOnDocumentError; } - public void setAbortOnDocumentError(boolean abortOnDocumentError) { + void setAbortOnDocumentError(boolean abortOnDocumentError) { this.abortOnDocumentError = abortOnDocumentError; } - public boolean abortOnSendError() { + boolean abortOnSendError() { return abortOnSendError; } - public void setAbortOnSendError(boolean abortOnSendError) { + private void setAbortOnSendError(boolean abortOnSendError) { this.abortOnSendError = abortOnSendError; } - public String getIdPrefix() { - return idPrefix; - } - - public void setIdPrefix(String idPrefix) { - this.idPrefix = idPrefix; - } - public void setRoute(String route) { this.route = route; } @@ -167,35 +107,7 @@ public class FeederOptions { return priority; } - public boolean isPriorityExplicitlySet() { - return priorityExplicitlySet; - } - - public String getSlobrokConfigId() { - return slobrokConfigId; - } - - public void setSlobrokConfigId(String slobrokConfigId) { - this.slobrokConfigId = slobrokConfigId; - } - - public String getRoutingConfigId() { - return routingConfigId; - } - - public void setRoutingConfigId(String routingConfigId) { - this.routingConfigId = routingConfigId; - } - - public String getDocumentManagerConfigId() { - return documentManagerConfigId; - } - - public void setDocumentManagerConfigId(String documentManagerConfigId) { - this.documentManagerConfigId = documentManagerConfigId; - } - - public int getTraceLevel() { + int getTraceLevel() { return traceLevel; } @@ -203,24 +115,19 @@ public class FeederOptions { this.traceLevel = traceLevel; } - public int getMessageBusPort() { - return mbusPort; - } - - public void setMessageBusPort(int mbusPort) { + private void setMessageBusPort(int mbusPort) { this.mbusPort = mbusPort; } public void setPriority(DocumentProtocol.Priority priority) { this.priority = priority; - this.priorityExplicitlySet = true; } - public String getDocprocChain() { + String getDocprocChain() { return docprocChain; } - public void setDocprocChain(String chain) { + private void setDocprocChain(String chain) { docprocChain = chain; } @@ -228,7 +135,7 @@ public class FeederOptions { * Creates a source session params object with parameters set as these options * dictate. */ - public SourceSessionParams toSourceSessionParams() { + SourceSessionParams toSourceSessionParams() { SourceSessionParams params = new SourceSessionParams(); StaticThrottlePolicy policy; @@ -252,7 +159,7 @@ public class FeederOptions { return params; } - public RPCNetworkParams getNetworkParams() { + RPCNetworkParams getNetworkParams() { try { RPCNetworkParams networkParams = new RPCNetworkParams(); if (mbusPort != -1) { @@ -271,19 +178,13 @@ public class FeederOptions { "abortOnDocumentError=" + abortOnDocumentError + ", abortOnSendError=" + abortOnSendError + ", retryEnabled=" + retryEnabled + - ", retryDelay=" + retryDelay + ", timeout=" + timeout + ", maxPendingBytes=" + maxPendingBytes + ", maxPendingDocs=" + maxPendingDocs + - ", documentManagerConfigId='" + documentManagerConfigId + '\'' + - ", idPrefix='" + idPrefix + '\'' + ", route='" + route + '\'' + - ", routingConfigId='" + routingConfigId + '\'' + - ", slobrokConfigId='" + slobrokConfigId + '\'' + ", traceLevel=" + traceLevel + ", mbusPort=" + mbusPort + ", priority=" + priority.name() + - ", priorityExplicitlySet=" + priorityExplicitlySet + ", docprocChain='" + docprocChain + '\'' + '}'; } @@ -301,24 +202,12 @@ public class FeederOptions { if (maxPendingDocs != that.maxPendingDocs) return false; if (maxFeedRate != that.maxFeedRate) return false; if (mbusPort != that.mbusPort) return false; - if (priorityExplicitlySet != that.priorityExplicitlySet) return false; - if (Double.compare(that.retryDelay, retryDelay) != 0) return false; if (retryEnabled != that.retryEnabled) return false; if (Double.compare(that.timeout, timeout) != 0) return false; if (traceLevel != that.traceLevel) return false; if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false; - if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) { - return false; - } - if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false; if (priority != that.priority) return false; if (route != null ? !route.equals(that.route) : that.route != null) return false; - if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) { - return false; - } - if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) { - return false; - } return true; } @@ -330,22 +219,15 @@ public class FeederOptions { result = (abortOnDocumentError ? 1 : 0); result = 31 * result + (abortOnSendError ? 1 : 0); result = 31 * result + (retryEnabled ? 1 : 0); - temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L; - result = 31 * result + (int) (temp ^ (temp >>> 32)); temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L; result = 31 * result + (int) (temp ^ (temp >>> 32)); result = 31 * result + maxPendingBytes; result = 31 * result + maxPendingDocs; result = 31 * result + ((int)(maxFeedRate * 1000)); - result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0); - result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0); result = 31 * result + (route != null ? route.hashCode() : 0); - result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0); - result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0); result = 31 * result + traceLevel; result = 31 * result + mbusPort; result = 31 * result + (priority != null ? priority.hashCode() : 0); - result = 31 * result + (priorityExplicitlySet ? 1 : 0); result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0); return result; } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java index 12a4ecde493..5e52da23c12 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java @@ -5,34 +5,21 @@ import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import java.util.Collections; - public class MessageBusSessionFactory implements SessionFactory { private final MessageBusDocumentAccess access; private final MessagePropertyProcessor processor; - private interface Metrics { - String NUM_OPERATIONS = "num_operations"; - String NUM_PUTS = "num_puts"; - String NUM_REMOVES = "num_removes"; - String NUM_UPDATES = "num_updates"; - } - public MessageBusSessionFactory(MessagePropertyProcessor processor) { this(processor, null, null); } - public MessageBusSessionFactory(MessagePropertyProcessor processor, + private MessageBusSessionFactory(MessagePropertyProcessor processor, DocumentmanagerConfig documentmanagerConfig, SlobroksConfig slobroksConfig) { this.processor = processor; @@ -53,10 +40,9 @@ public class MessageBusSessionFactory implements SessionFactory { } @Override - public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) { + public synchronized SendSession createSendSession(ReplyHandler handler) { return new SourceSessionWrapper( - access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()), - metric); + access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams())); } public void shutDown() { @@ -66,18 +52,13 @@ public class MessageBusSessionFactory implements SessionFactory { private class SourceSessionWrapper extends SendSession { private final SourceSession session; - private final Metric metric; - private final Metric.Context context; - private SourceSessionWrapper(SourceSession session, Metric metric) { + private SourceSessionWrapper(SourceSession session) { this.session = session; - this.metric = metric; - this.context = metric.createContext(Collections.<String, String>emptyMap()); } @Override protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { - updateCounters(m); if (blockIfQueueFull) { return session.sendBlocking(m); } else { @@ -85,18 +66,6 @@ public class MessageBusSessionFactory implements SessionFactory { } } - private void updateCounters(Message m) { - metric.add(Metrics.NUM_OPERATIONS, 1, context); - - if (m instanceof PutDocumentMessage) { - metric.add(Metrics.NUM_PUTS, 1, context); - } else if (m instanceof RemoveDocumentMessage) { - metric.add(Metrics.NUM_REMOVES, 1, context); - } else if (m instanceof UpdateDocumentMessage) { - metric.add(Metrics.NUM_UPDATES, 1, context); - } - } - @Override public void close() { session.close(); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java index 6dce2b6f315..52583052ddf 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.ReplyHandler; /** @@ -17,5 +16,5 @@ public interface SessionFactory { * @param handler A replyhandler to callback when receiving replies from messagebus * @return The session to use for sending messages. */ - SendSession createSendSession(ReplyHandler handler, Metric metric); + SendSession createSendSession(ReplyHandler handler); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 52d21897a15..4fcbcf4d634 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -2,7 +2,6 @@ package com.yahoo.feedapi; import com.yahoo.concurrent.SystemTimer; -import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Message; @@ -31,8 +30,8 @@ public class SharedSender implements ReplyHandler { * Creates a new shared sender. * If oldsender != null, we copy that status information from that sender. */ - SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) { - sender = (factory != null) ? factory.createSendSession(this, metric) : null; + SharedSender(String route, SessionFactory factory, SharedSender oldSender) { + sender = (factory != null) ? factory.createSendSession(this) : null; metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null); } |