From fc7f77270eaca41d85d7d2499b5ff10f6afaa68e Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 23 Apr 2019 13:56:54 +0200 Subject: Some more cleanup. --- .../com/yahoo/feedapi/DummySessionFactory.java | 3 +- .../main/java/com/yahoo/feedapi/FeedContext.java | 13 +- .../main/java/com/yahoo/feedapi/FeederOptions.java | 160 +++------------------ .../yahoo/feedapi/MessageBusSessionFactory.java | 39 +---- .../java/com/yahoo/feedapi/SessionFactory.java | 3 +- .../main/java/com/yahoo/feedapi/SharedSender.java | 5 +- .../java/com/yahoo/feedhandler/FeedResponse.java | 45 +----- .../com/yahoo/feedhandler/VespaFeedHandler.java | 87 +++++------ .../yahoo/feedhandler/VespaFeedHandlerBase.java | 33 +---- .../com/yahoo/feedapi/FeederOptionsTestCase.java | 24 +--- .../java/com/yahoo/vespafeeder/VespaFeeder.java | 21 ++- 11 files changed, 81 insertions(+), 352 deletions(-) diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java index c644b551a79..4e9e17d0b5f 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java @@ -3,7 +3,6 @@ package com.yahoo.feedapi; import com.yahoo.document.Document; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; @@ -47,7 +46,7 @@ public class DummySessionFactory implements SessionFactory { } @Override - public SendSession createSendSession(ReplyHandler r, Metric metric) { + public SendSession createSendSession(ReplyHandler r) { if (output != null) { return new DumpDocuments(output, r, this); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java index 3a435f7cda2..2d35adfab75 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import com.yahoo.jdisc.Metric; import com.yahoo.document.DocumentTypeManager; import com.yahoo.clientmetrics.ClientMetrics; @@ -14,22 +13,16 @@ public class FeedContext { private final MessagePropertyProcessor propertyProcessor; private final DocumentTypeManager docTypeManager; private final ClientMetrics metrics; - private final Metric metric; private Map senders = new TreeMap<>(); public static final Object sync = new Object(); public static FeedContext instance = null; - public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, Metric metric) { + public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager) { this.propertyProcessor = propertyProcessor; this.factory = factory; docTypeManager = manager; metrics = new ClientMetrics(); - this.metric = metric; - } - - public Metric getMetricAPI() { - return metric; } private void shutdownSenders() { @@ -43,7 +36,7 @@ public class FeedContext { Map newSenders = new TreeMap<>(); for (Map.Entry sender : senders.entrySet()) { - newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric)); + newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue())); } shutdownSenders(); @@ -58,7 +51,7 @@ public class FeedContext { SharedSender sender = senders.get(route); if (sender == null) { - sender = new SharedSender(route, factory, null, metric); + sender = new SharedSender(route, factory, null); senders.put(route, sender); metrics.addRouteMetricSet(sender.getMetrics()); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java index dc78f037534..ecfd3c9eded 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java @@ -2,9 +2,11 @@ package com.yahoo.feedapi; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.RateThrottlingPolicy; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy; import com.yahoo.vespaclient.config.FeederConfig; @@ -19,61 +21,28 @@ public class FeederOptions { private boolean abortOnDocumentError = true; private boolean abortOnSendError = true; private boolean retryEnabled = true; - private double retryDelay = 1; private double timeout = 60; private int maxPendingBytes = 0; private int maxPendingDocs = 0; private double maxFeedRate = 0.0; - private String documentManagerConfigId = "client"; - private String idPrefix = ""; private String route = "default"; - private String routingConfigId; - private String slobrokConfigId; private int traceLevel; private int mbusPort; private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3; - private boolean priorityExplicitlySet = false; private String docprocChain = ""; /** Constructs an options object with all default values. */ - public FeederOptions() { + FeederOptions() { // empty } - /** - * Implements the copy constructor. - * - * @param src The options to copy. - */ - public FeederOptions(FeederOptions src) { - abortOnDocumentError = src.abortOnDocumentError; - abortOnSendError = src.abortOnSendError; - retryEnabled = src.retryEnabled; - retryDelay = src.retryDelay; - timeout = src.timeout; - maxPendingBytes = src.maxPendingBytes; - maxPendingDocs = src.maxPendingDocs; - maxFeedRate = src.maxFeedRate; - documentManagerConfigId = src.documentManagerConfigId; - idPrefix = src.idPrefix; - route = src.route; - routingConfigId = src.routingConfigId; - slobrokConfigId = src.slobrokConfigId; - traceLevel = src.traceLevel; - mbusPort = src.mbusPort; - priority = src.priority; - docprocChain = src.docprocChain; - } - /** Constructor that sets values from config. */ - public FeederOptions(FeederConfig config) { + FeederOptions(FeederConfig config) { setAbortOnDocumentError(config.abortondocumenterror()); setAbortOnSendError(config.abortonsenderror()); - setIdPrefix(config.idprefix()); setMaxPendingBytes(config.maxpendingbytes()); setMaxPendingDocs(config.maxpendingdocs()); setRetryEnabled(config.retryenabled()); - setRetryDelay(config.retrydelay()); setRoute(config.route()); setTimeout(config.timeout()); setTraceLevel(config.tracelevel()); @@ -82,31 +51,18 @@ public class FeederOptions { setMaxFeedRate(config.maxfeedrate()); } - public void setMaxFeedRate(double feedRate) { + void setMaxFeedRate(double feedRate) { maxFeedRate = feedRate; } - - public double getMaxFeedRate() { - return maxFeedRate; - } - - public boolean getRetryEnabled() { + boolean getRetryEnabled() { return retryEnabled; } - public void setRetryEnabled(boolean retryEnabled) { + private void setRetryEnabled(boolean retryEnabled) { this.retryEnabled = retryEnabled; } - public double getRetryDelay() { - return retryDelay; - } - - public void setRetryDelay(double retryDelay) { - this.retryDelay = retryDelay; - } - public double getTimeout() { return timeout; } @@ -115,46 +71,30 @@ public class FeederOptions { this.timeout = timeout; } - public int getMaxPendingBytes() { - return maxPendingBytes; - } - - public void setMaxPendingBytes(int maxPendingBytes) { + private void setMaxPendingBytes(int maxPendingBytes) { this.maxPendingBytes = maxPendingBytes; } - public int getMaxPendingDocs() { - return maxPendingDocs; - } - - public void setMaxPendingDocs(int maxPendingDocs) { + private void setMaxPendingDocs(int maxPendingDocs) { this.maxPendingDocs = maxPendingDocs; } - public boolean abortOnDocumentError() { + boolean abortOnDocumentError() { return abortOnDocumentError; } - public void setAbortOnDocumentError(boolean abortOnDocumentError) { + void setAbortOnDocumentError(boolean abortOnDocumentError) { this.abortOnDocumentError = abortOnDocumentError; } - public boolean abortOnSendError() { + boolean abortOnSendError() { return abortOnSendError; } - public void setAbortOnSendError(boolean abortOnSendError) { + private void setAbortOnSendError(boolean abortOnSendError) { this.abortOnSendError = abortOnSendError; } - public String getIdPrefix() { - return idPrefix; - } - - public void setIdPrefix(String idPrefix) { - this.idPrefix = idPrefix; - } - public void setRoute(String route) { this.route = route; } @@ -167,35 +107,7 @@ public class FeederOptions { return priority; } - public boolean isPriorityExplicitlySet() { - return priorityExplicitlySet; - } - - public String getSlobrokConfigId() { - return slobrokConfigId; - } - - public void setSlobrokConfigId(String slobrokConfigId) { - this.slobrokConfigId = slobrokConfigId; - } - - public String getRoutingConfigId() { - return routingConfigId; - } - - public void setRoutingConfigId(String routingConfigId) { - this.routingConfigId = routingConfigId; - } - - public String getDocumentManagerConfigId() { - return documentManagerConfigId; - } - - public void setDocumentManagerConfigId(String documentManagerConfigId) { - this.documentManagerConfigId = documentManagerConfigId; - } - - public int getTraceLevel() { + int getTraceLevel() { return traceLevel; } @@ -203,24 +115,19 @@ public class FeederOptions { this.traceLevel = traceLevel; } - public int getMessageBusPort() { - return mbusPort; - } - - public void setMessageBusPort(int mbusPort) { + private void setMessageBusPort(int mbusPort) { this.mbusPort = mbusPort; } public void setPriority(DocumentProtocol.Priority priority) { this.priority = priority; - this.priorityExplicitlySet = true; } - public String getDocprocChain() { + String getDocprocChain() { return docprocChain; } - public void setDocprocChain(String chain) { + private void setDocprocChain(String chain) { docprocChain = chain; } @@ -228,7 +135,7 @@ public class FeederOptions { * Creates a source session params object with parameters set as these options * dictate. */ - public SourceSessionParams toSourceSessionParams() { + SourceSessionParams toSourceSessionParams() { SourceSessionParams params = new SourceSessionParams(); StaticThrottlePolicy policy; @@ -252,7 +159,7 @@ public class FeederOptions { return params; } - public RPCNetworkParams getNetworkParams() { + RPCNetworkParams getNetworkParams() { try { RPCNetworkParams networkParams = new RPCNetworkParams(); if (mbusPort != -1) { @@ -271,19 +178,13 @@ public class FeederOptions { "abortOnDocumentError=" + abortOnDocumentError + ", abortOnSendError=" + abortOnSendError + ", retryEnabled=" + retryEnabled + - ", retryDelay=" + retryDelay + ", timeout=" + timeout + ", maxPendingBytes=" + maxPendingBytes + ", maxPendingDocs=" + maxPendingDocs + - ", documentManagerConfigId='" + documentManagerConfigId + '\'' + - ", idPrefix='" + idPrefix + '\'' + ", route='" + route + '\'' + - ", routingConfigId='" + routingConfigId + '\'' + - ", slobrokConfigId='" + slobrokConfigId + '\'' + ", traceLevel=" + traceLevel + ", mbusPort=" + mbusPort + ", priority=" + priority.name() + - ", priorityExplicitlySet=" + priorityExplicitlySet + ", docprocChain='" + docprocChain + '\'' + '}'; } @@ -301,24 +202,12 @@ public class FeederOptions { if (maxPendingDocs != that.maxPendingDocs) return false; if (maxFeedRate != that.maxFeedRate) return false; if (mbusPort != that.mbusPort) return false; - if (priorityExplicitlySet != that.priorityExplicitlySet) return false; - if (Double.compare(that.retryDelay, retryDelay) != 0) return false; if (retryEnabled != that.retryEnabled) return false; if (Double.compare(that.timeout, timeout) != 0) return false; if (traceLevel != that.traceLevel) return false; if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false; - if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) { - return false; - } - if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false; if (priority != that.priority) return false; if (route != null ? !route.equals(that.route) : that.route != null) return false; - if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) { - return false; - } - if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) { - return false; - } return true; } @@ -330,22 +219,15 @@ public class FeederOptions { result = (abortOnDocumentError ? 1 : 0); result = 31 * result + (abortOnSendError ? 1 : 0); result = 31 * result + (retryEnabled ? 1 : 0); - temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L; - result = 31 * result + (int) (temp ^ (temp >>> 32)); temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L; result = 31 * result + (int) (temp ^ (temp >>> 32)); result = 31 * result + maxPendingBytes; result = 31 * result + maxPendingDocs; result = 31 * result + ((int)(maxFeedRate * 1000)); - result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0); - result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0); result = 31 * result + (route != null ? route.hashCode() : 0); - result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0); - result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0); result = 31 * result + traceLevel; result = 31 * result + mbusPort; result = 31 * result + (priority != null ? priority.hashCode() : 0); - result = 31 * result + (priorityExplicitlySet ? 1 : 0); result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0); return result; } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java index 12a4ecde493..5e52da23c12 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java @@ -5,34 +5,21 @@ import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import java.util.Collections; - public class MessageBusSessionFactory implements SessionFactory { private final MessageBusDocumentAccess access; private final MessagePropertyProcessor processor; - private interface Metrics { - String NUM_OPERATIONS = "num_operations"; - String NUM_PUTS = "num_puts"; - String NUM_REMOVES = "num_removes"; - String NUM_UPDATES = "num_updates"; - } - public MessageBusSessionFactory(MessagePropertyProcessor processor) { this(processor, null, null); } - public MessageBusSessionFactory(MessagePropertyProcessor processor, + private MessageBusSessionFactory(MessagePropertyProcessor processor, DocumentmanagerConfig documentmanagerConfig, SlobroksConfig slobroksConfig) { this.processor = processor; @@ -53,10 +40,9 @@ public class MessageBusSessionFactory implements SessionFactory { } @Override - public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) { + public synchronized SendSession createSendSession(ReplyHandler handler) { return new SourceSessionWrapper( - access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()), - metric); + access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams())); } public void shutDown() { @@ -66,18 +52,13 @@ public class MessageBusSessionFactory implements SessionFactory { private class SourceSessionWrapper extends SendSession { private final SourceSession session; - private final Metric metric; - private final Metric.Context context; - private SourceSessionWrapper(SourceSession session, Metric metric) { + private SourceSessionWrapper(SourceSession session) { this.session = session; - this.metric = metric; - this.context = metric.createContext(Collections.emptyMap()); } @Override protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { - updateCounters(m); if (blockIfQueueFull) { return session.sendBlocking(m); } else { @@ -85,18 +66,6 @@ public class MessageBusSessionFactory implements SessionFactory { } } - private void updateCounters(Message m) { - metric.add(Metrics.NUM_OPERATIONS, 1, context); - - if (m instanceof PutDocumentMessage) { - metric.add(Metrics.NUM_PUTS, 1, context); - } else if (m instanceof RemoveDocumentMessage) { - metric.add(Metrics.NUM_REMOVES, 1, context); - } else if (m instanceof UpdateDocumentMessage) { - metric.add(Metrics.NUM_UPDATES, 1, context); - } - } - @Override public void close() { session.close(); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java index 6dce2b6f315..52583052ddf 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.ReplyHandler; /** @@ -17,5 +16,5 @@ public interface SessionFactory { * @param handler A replyhandler to callback when receiving replies from messagebus * @return The session to use for sending messages. */ - SendSession createSendSession(ReplyHandler handler, Metric metric); + SendSession createSendSession(ReplyHandler handler); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 52d21897a15..4fcbcf4d634 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -2,7 +2,6 @@ package com.yahoo.feedapi; import com.yahoo.concurrent.SystemTimer; -import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Message; @@ -31,8 +30,8 @@ public class SharedSender implements ReplyHandler { * Creates a new shared sender. * If oldsender != null, we copy that status information from that sender. */ - SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) { - sender = (factory != null) ? factory.createSendSession(this, metric) : null; + SharedSender(String route, SessionFactory factory, SharedSender oldSender) { + sender = (factory != null) ? factory.createSendSession(this) : null; metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java index da226e9ece6..58cceb2d8ee 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java @@ -2,8 +2,6 @@ package com.yahoo.feedhandler; import com.yahoo.clientmetrics.RouteMetricSet; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.VespaHeaders; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; @@ -14,18 +12,13 @@ import com.yahoo.messagebus.ErrorCode; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Reply; import com.yahoo.search.result.ErrorMessage; -import com.yahoo.text.Utf8String; -import com.yahoo.text.XMLWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; import java.util.stream.Stream; -public final class FeedResponse extends HttpResponse implements SharedSender.ResultCallback { +public final class FeedResponse implements SharedSender.ResultCallback { private final static Logger log = Logger.getLogger(FeedResponse.class.getName()); private final List errorMessages = new ArrayList<>(); @@ -37,7 +30,6 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res private final SharedSender.Pending pendingNumber = new SharedSender.Pending(); FeedResponse(RouteMetricSet metrics) { - super(com.yahoo.jdisc.http.HttpResponse.Status.OK); this.metrics = metrics; } @@ -49,41 +41,6 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res abortOnError = abort; } - @Override - public void render(OutputStream outputStream) throws IOException { - if ( ! errorMessages.isEmpty()) - setStatus(VespaHeaders.getStatus(false, errorMessages.get(0), errorMessages.iterator())); - - XMLWriter writer = new XMLWriter(new OutputStreamWriter(outputStream)); - writer.openTag("result"); - - if (traces.length() > 0) { - writer.openTag("trace"); - writer.append(traces); - writer.closeTag(); - } - if (!errors.isEmpty()) { - writer.openTag("errors"); - writer.attribute(new Utf8String("count"), errors.size()); - - for (int i = 0; i < errors.size() && i < 10; ++i) { - writer.openTag("error"); - writer.attribute(new Utf8String("message"), errors.get(i)); - writer.closeTag(); - } - writer.closeTag(); - } - - writer.closeTag(); - writer.flush(); - outputStream.close(); - } - - @Override - public java.lang.String getContentType() { - return "application/xml"; - } - private String prettyPrint(Message m) { if (m instanceof PutDocumentMessage) { return "PUT[" + ((PutDocumentMessage)m).getDocumentPut().getDocument().getId() + "] "; diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 59cd86670e1..ca2da28d0dd 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -2,11 +2,8 @@ package com.yahoo.feedhandler; import com.yahoo.clientmetrics.RouteMetricSet; -import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; -import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; @@ -16,8 +13,6 @@ import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; /** @@ -30,68 +25,50 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { public static final String JSON_INPUT = "jsonInput"; - private final AtomicInteger busyThreads = new AtomicInteger(0); - private final int maxBusyThreads; - - private VespaFeedHandler(FeedContext context, Executor executor) { - super(context, executor); - this.maxBusyThreads = 32; - } - - public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) { - return new VespaFeedHandler(context, executor); + private VespaFeedHandler(FeedContext context) { + super(context); } - @Override - public HttpResponse handle(HttpRequest request) { - return handle(request, null, 1); + public static VespaFeedHandler createFromContext(FeedContext context) { + return new VespaFeedHandler(context); } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { - try { - int busy = busyThreads.incrementAndGet(); - if (busy > maxBusyThreads) - return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); - - MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + public FeedResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); - String route = properties.getRoute().toString(); - FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - SingleSender sender = new SingleSender(response, getSharedSender(route)); - sender.addMessageProcessor(properties); - sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); - Feeder feeder = createFeeder(feedAccess, request); - feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); - feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); - response.setAbortOnFeedError(properties.getAbortOnFeedError()); + SingleSender sender = new SingleSender(response, getSharedSender(route)); + sender.addMessageProcessor(properties); + ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); + Feeder feeder = createFeeder(feedAccess, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); - List errors = feeder.parse(); - for (String s : errors) { - response.addXMLParseError(s); - } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } + List errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } + if (errors.size() > 0 && feeder instanceof XMLFeeder) { + response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); + } - sender.done(); - feedAccess.close(); - long millis = getTimeoutMillis(request); - boolean completed = sender.waitForPending(millis); - if (!completed) { - response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); - } - response.done(); - return response; - } finally { - busyThreads.decrementAndGet(); + sender.done(); + feedAccess.close(); + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if (!completed) { + response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); } + response.done(); + return response; + } private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { - String contentType = request.getHeader("Content-Type"); - if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { + if (Boolean.valueOf(request.getProperty(JSON_INPUT))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); } else { return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java index ec2b7202f09..532f10663b9 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java @@ -1,10 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedhandler; -import com.yahoo.component.provider.ComponentRegistry; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; -import com.yahoo.docproc.DocprocService; import com.yahoo.document.DocumentTypeManager; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.MessagePropertyProcessor; @@ -12,22 +9,18 @@ import com.yahoo.feedapi.SharedSender; import com.yahoo.search.query.ParameterParser; -import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.Executor; -import java.util.zip.GZIPInputStream; -public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { +public abstract class VespaFeedHandlerBase { protected FeedContext context; private final long defaultTimeoutMillis; - VespaFeedHandlerBase(FeedContext context, Executor executor) { - this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis()); + VespaFeedHandlerBase(FeedContext context) { + this(context, context.getPropertyProcessor().getDefaultTimeoutMillis()); } - private VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) { - super(executor, context.getMetricAPI()); + private VespaFeedHandlerBase(FeedContext context, long defaultTimeoutMillis) { this.context = context; this.defaultTimeoutMillis = defaultTimeoutMillis; } @@ -36,14 +29,6 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { return context.getSharedSender(route); } - DocprocService getDocprocChain(HttpRequest request) { - return context.getPropertyProcessor().getDocprocChain(request); - } - - ComponentRegistry getDocprocServiceRegistry(HttpRequest request) { - return context.getPropertyProcessor().getDocprocServiceRegistry(request); - } - MessagePropertyProcessor getPropertyProcessor() { return context.getPropertyProcessor(); } @@ -55,15 +40,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { * @throws IllegalArgumentException if GZIP stream creation failed */ InputStream getRequestInputStream(HttpRequest request) { - if ("gzip".equals(request.getHeader("Content-Encoding"))) { - try { - return new GZIPInputStream(request.getData()); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to create GZIP input stream from content", e); - } - } else { - return request.getData(); - } + return request.getData(); } protected DocumentTypeManager getDocumentTypeManager() { diff --git a/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java index 4dfde6f1d41..cb33b71424d 100644 --- a/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java +++ b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java @@ -1,9 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import static org.junit.Assert.*; import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * @author Einar M R Rosenvinge */ @@ -27,26 +29,6 @@ public class FeederOptionsTestCase { assertTrue(f2.equals(f1)); assertTrue(f1.hashCode() == f2.hashCode()); - f1.setRoutingConfigId("blabla"); - assertFalse(f1.equals(f2)); - assertFalse(f2.equals(f1)); - assertFalse(f1.hashCode() == f2.hashCode()); - - f2.setRoutingConfigId("blabla"); - assertTrue(f1.equals(f2)); - assertTrue(f2.equals(f1)); - assertTrue(f1.hashCode() == f2.hashCode()); - - f1.setRetryDelay(5000); - assertFalse(f1.equals(f2)); - assertFalse(f2.equals(f1)); - assertFalse(f1.hashCode() == f2.hashCode()); - - f2.setRetryDelay(5000); - assertTrue(f1.equals(f2)); - assertTrue(f2.equals(f1)); - assertTrue(f1.hashCode() == f2.hashCode()); - f1.setRoute("all roads lead to rome"); assertFalse(f1.equals(f2)); assertFalse(f2.equals(f1)); diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java index 557caf21a89..100aba3a917 100755 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java @@ -2,12 +2,10 @@ package com.yahoo.vespafeeder; import com.yahoo.clientmetrics.RouteMetricSet; -import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentTypeManagerConfigurer; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedhandler.FeedResponse; -import com.yahoo.feedhandler.NullFeedMetric; import com.yahoo.feedhandler.VespaFeedHandler; import com.yahoo.log.LogSetup; import com.yahoo.concurrent.SystemTimer; @@ -20,14 +18,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; public class VespaFeeder { - Arguments args; - DocumentTypeManager manager; - Executor threadPool = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("vespa-feeder")); + private final Arguments args; + private final DocumentTypeManager manager; public VespaFeeder(Arguments args, DocumentTypeManager manager) { this.args = args; @@ -37,7 +32,7 @@ public class VespaFeeder { public static class FeedErrorException extends Exception { String message; - public FeedErrorException(String message) { + FeedErrorException(String message) { this.message = message; } @@ -66,7 +61,7 @@ public class VespaFeeder { return new FeedErrorException(buffer.toString()); } - public RouteMetricSet.ProgressCallback createProgressCallback(PrintStream output) { + RouteMetricSet.ProgressCallback createProgressCallback(PrintStream output) { if ("benchmark".equals(args.getMode())) { return new BenchmarkProgressPrinter(SystemTimer.INSTANCE, output); } else { @@ -75,15 +70,15 @@ public class VespaFeeder { } void parseFiles(InputStream stdin, PrintStream output) throws Exception { - FeedContext context = new FeedContext(args.getPropertyProcessor(), args.getSessionFactory(), manager, new NullFeedMetric(true)); + FeedContext context = new FeedContext(args.getPropertyProcessor(), args.getSessionFactory(), manager); final BufferedInputStream input = new BufferedInputStream(stdin); - VespaFeedHandler handler = VespaFeedHandler.createFromContext(context, threadPool); + VespaFeedHandler handler = VespaFeedHandler.createFromContext(context); if (args.getFiles().isEmpty()) { InputStreamRequest req = new InputStreamRequest(input); setProperties(req, input); - FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); + FeedResponse response = handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); if ( ! response.isSuccess()) { throw renderErrors(response.getErrorList()); } @@ -101,7 +96,7 @@ public class VespaFeeder { final BufferedInputStream inputSnooper = new BufferedInputStream(new FileInputStream(fileName)); setProperties(req, inputSnooper); inputSnooper.close(); - FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); + FeedResponse response = handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); if (!response.isSuccess()) { throw renderErrors(response.getErrorList()); } -- cgit v1.2.3 From 8bdf825e164469f4a3f22a2d6e92e799c02b9f6e Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 23 Apr 2019 14:35:44 +0200 Subject: Move the NullFeedMetric too where it is used. --- .../com/yahoo/feedhandler/v3/FeedTesterV3.java | 1 - .../com/yahoo/feedhandler/v3/NullFeedMetric.java | 35 ++++++++++++++++++++++ .../java/com/yahoo/feedhandler/NullFeedMetric.java | 35 ---------------------- 3 files changed, 35 insertions(+), 36 deletions(-) create mode 100644 vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/NullFeedMetric.java delete mode 100644 vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/FeedTesterV3.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/FeedTesterV3.java index 3c758114ecf..fd9655fb838 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/FeedTesterV3.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/FeedTesterV3.java @@ -11,7 +11,6 @@ import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.feedhandler.NullFeedMetric; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.shared.SharedSourceSession; diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/NullFeedMetric.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/NullFeedMetric.java new file mode 100644 index 00000000000..4777c6c7b99 --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/v3/NullFeedMetric.java @@ -0,0 +1,35 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler.v3; + +import com.yahoo.jdisc.Metric; +import java.util.Map; + +/** + * @author Einar M R Rosenvinge + * @since 5.1.20 + */ +public final class NullFeedMetric implements Metric { + + public NullFeedMetric(boolean flag) { + if (!flag) { + throw new IllegalArgumentException("must set flag allowing to throw away metrics"); + } + } + + @Override + public void set(String key, Number val, Context ctx) { + } + + @Override + public void add(String key, Number val, Context ctx) { + } + + @Override + public Context createContext(Map properties) { + return NullFeedContext.INSTANCE; + } + + private static class NullFeedContext implements Context { + private static final NullFeedContext INSTANCE = new NullFeedContext(); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java deleted file mode 100644 index 2940c0fcc44..00000000000 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.feedhandler; - -import com.yahoo.jdisc.Metric; -import java.util.Map; - -/** - * @author Einar M R Rosenvinge - * @since 5.1.20 - */ -public final class NullFeedMetric implements Metric { - - public NullFeedMetric(boolean flag) { - if (!flag) { - throw new IllegalArgumentException("must set flag allowing to throw away metrics"); - } - } - - @Override - public void set(String key, Number val, Context ctx) { - } - - @Override - public void add(String key, Number val, Context ctx) { - } - - @Override - public Context createContext(Map properties) { - return NullFeedContext.INSTANCE; - } - - private static class NullFeedContext implements Context { - private static final NullFeedContext INSTANCE = new NullFeedContext(); - } -} -- cgit v1.2.3 From 45a01db7791bd53b7d2a762719767258d4fc8859 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 23 Apr 2019 15:33:31 +0200 Subject: Remove the tea spoon. --- .../src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index ca2da28d0dd..892f3763805 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -51,9 +51,6 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { for (String s : errors) { response.addXMLParseError(s); } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } sender.done(); feedAccess.close(); -- cgit v1.2.3