diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespaclient-core/src/main/java |
Publish
Diffstat (limited to 'vespaclient-core/src/main/java')
27 files changed, 2527 insertions, 0 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java new file mode 100755 index 00000000000..87c1dfc7030 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.clientmetrics; + +import com.yahoo.messagebus.Reply; +import com.yahoo.metrics.*; +import com.yahoo.text.XMLWriter; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; + +/** + * @author thomasg + */ +public class ClientMetrics { + + MetricSet topSet; + SumMetric sum; + List<String> routes = new ArrayList<String>(); + + public ClientMetrics() { + topSet = new SimpleMetricSet("routes", "", "", null); + sum = new SumMetric("total", "", "Messages sent to all routes", topSet); + } + + public MetricSet getMetricSet() { + return topSet; + } + + public void addRouteMetricSet(RouteMetricSet metric) { + topSet.registerMetric(metric); + sum.addMetricToSum(metric); + routes.add(metric.getRoute()); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java new file mode 100644 index 00000000000..b55aa82b6ca --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java @@ -0,0 +1,105 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.clientmetrics; + +import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.Reply; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.metrics.*; +import com.yahoo.messagebus.Error; + +import java.util.List; +import java.util.stream.Stream; + +/** +* @author thomasg +*/ +public class MessageTypeMetricSet extends MetricSet { + ValueMetric<Long> latency; + CountMetric count; + CountMetric ignored; + + SumMetric errorSum; + MetricSet errors; + String msgName; + + class ErrorMetric extends CountMetric { + ErrorMetric(String name, MetricSet owner) { + super(name, "", "Number of errors of type " + name, owner); + } + + ErrorMetric(ErrorMetric other, CopyType copyType, MetricSet owner) { + super(other, copyType, owner); + } + + @Override + public String getXMLTag() { + return "error"; + } + + @Override + public Metric clone(CopyType type, MetricSet owner, boolean includeUnused) { + return new ErrorMetric(this, type, owner); + } + + } + + public MessageTypeMetricSet(String msgName, MetricSet owner) { + super(msgName.toLowerCase(), "", "", owner); + this.msgName = msgName; + latency = new ValueMetric<Long>("latency", "", "Latency (in ms)", this).averageMetric(); + count = new CountMetric("count", "", "Number received", this); + ignored = new CountMetric("ignored", "", "Number ignored due to no matching document routing selectors", this); + errors = new SimpleMetricSet("errors", "", "The errors returned", this); + errorSum = new SumMetric("total", "", "Total number of errors", errors); + } + + public MessageTypeMetricSet(MessageTypeMetricSet source, CopyType copyType, MetricSet owner, boolean includeUnused) { + super(source, copyType, owner, includeUnused); + msgName = source.msgName; + } + + public String getMessageName() { + return msgName; + } + + public void addReply(Reply r) { + if (!r.hasErrors() || onlyTestAndSetConditionFailed(r.getErrors())) { + updateSuccessMetrics(r); + } else { + updateFailureMetrics(r); + } + } + + private void updateFailureMetrics(Reply r) { + String error = DocumentProtocol.getErrorName(r.getError(0).getCode()); + CountMetric s = (CountMetric)errors.getMetric(error); + if (s == null) { + s = new ErrorMetric(error, errors); + errorSum.addMetricToSum(s); + } + s.inc(); + } + + private void updateSuccessMetrics(Reply r) { + if (!(r instanceof DocumentIgnoredReply)) { + if (r.getMessage().getTimeReceived() != 0) { + latency.addValue(SystemTimer.INSTANCE.milliTime() - r.getMessage().getTimeReceived()); + } + count.inc(); + } else { + ignored.inc(); + } + } + + @Override + public Metric clone(CopyType type, MetricSet owner, boolean includeUnused) + { return new MessageTypeMetricSet(this, type, owner, includeUnused); } + + /** + * Returns true if every error in a stream is a test and set condition failed + */ + private static boolean onlyTestAndSetConditionFailed(Stream<Error> errors) { + return errors.allMatch(e -> e.getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java new file mode 100644 index 00000000000..f6cb5a872e6 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java @@ -0,0 +1,67 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.clientmetrics; + +import com.yahoo.messagebus.Reply; +import com.yahoo.metrics.*; + +import java.util.HashMap; +import java.util.Map; + +/** +* @author thomasg +*/ +public class RouteMetricSet extends MetricSet { + + SumMetric sum; + ProgressCallback callback; + Map<Integer,MessageTypeMetricSet> typeMap = new HashMap<Integer,MessageTypeMetricSet>(); + + public interface ProgressCallback { + void onProgress(RouteMetricSet route); + void done(RouteMetricSet route); + } + + public RouteMetricSet(String route, ProgressCallback callback) { + super(route, "", "Messages sent to the named route", null); + sum = new SumMetric("total", "", "All kinds of messages sent to the given route", this); + this.callback = callback; + } + + @Override + public String getXMLTag() { + return "route"; + } + + public RouteMetricSet(RouteMetricSet source, CopyType copyType, MetricSet owner, boolean includeUnused) { + super(source, copyType, owner, includeUnused); + } + + public void addReply(Reply r) { + MessageTypeMetricSet type = typeMap.get(r.getMessage().getType()); + if (type == null) { + String msgName = r.getMessage().getClass().getSimpleName().replace("Message", ""); + type = new MessageTypeMetricSet(msgName, this); + sum.addMetricToSum(type); + typeMap.put(r.getMessage().getType(), type); + } + + type.addReply(r); + if (callback != null) { + callback.onProgress(this); + } + } + + public void done() { + if (callback != null) { + callback.done(this); + } + } + + @Override + public Metric clone(CopyType type, MetricSet owner, boolean includeUnused) + { return new RouteMetricSet(this, type, owner, includeUnused); } + + String getRoute() { + return getName(); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java new file mode 100755 index 00000000000..80825810837 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java @@ -0,0 +1,79 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.*; +import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vdslib.Entry; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class DocprocMessageProcessor implements MessageProcessor { + private final DocprocService docproc; + private final ComponentRegistry<DocprocService> docprocServiceRegistry; + + public DocprocMessageProcessor(DocprocService docproc, ComponentRegistry<DocprocService> docprocServiceRegistry) { + this.docproc = docproc; + this.docprocServiceRegistry = docprocServiceRegistry; + } + + @Override + public void process(Message m) { + try { + List<DocumentOperation> documentBases = new ArrayList<DocumentOperation>(); + + if (m.getType() == DocumentProtocol.MESSAGE_PUTDOCUMENT) { + documentBases.add(((PutDocumentMessage) m).getDocumentPut()); + } else if (m.getType() == DocumentProtocol.MESSAGE_UPDATEDOCUMENT) { + documentBases.add(((UpdateDocumentMessage) m).getDocumentUpdate()); + } else if (m.getType() == DocumentProtocol.MESSAGE_REMOVEDOCUMENT) { + documentBases.add(((RemoveDocumentMessage) m).getDocumentRemove()); + } else if (m.getType() == DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE) { + for (DocumentUpdate update : ((BatchDocumentUpdateMessage) m).getUpdates()) { + documentBases.add(update); + } + } + + if (docproc != null) { + processDocumentOperations(documentBases, m); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception { + Processing processing = Processing.createProcessingFromDocumentOperations(docproc.getName(), documentOperations, new CallStack(docproc.getCallStack())); + processing.setServiceName(docproc.getName()); + processing.setDocprocServiceRegistry(docprocServiceRegistry); + processing.setVariable("route", m.getRoute()); + processing.setVariable("timeout", m.getTimeRemaining()); + + DocumentProcessor.Progress progress = docproc.getExecutor().process(processing); + while (DocumentProcessor.Progress.LATER.equals(progress)) { + Thread.sleep(50); + progress = docproc.getExecutor().process(processing); + } + + if (progress == DocumentProcessor.Progress.FAILED + || progress == DocumentProcessor.Progress.PERMANENT_FAILURE) { + throw new RuntimeException("Processing of " + documentOperations + " failed: " + progress + "."); + } + + m.setRoute((Route) processing.getVariable("route")); + m.setTimeRemaining((Long) processing.getVariable("timeout")); + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java new file mode 100755 index 00000000000..7b22abc8a76 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java @@ -0,0 +1,211 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.document.Document; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +public class DummySessionFactory implements SessionFactory { + + public interface ReplyFactory { + Reply createReply(Message m); + } + + public final List<Message> messages; + private boolean autoReply = false; + private ReplyFactory autoReplyFactory = null; + private Error autoError; + private int sessionsCreated = 0; + OutputStream output = null; + + protected DummySessionFactory() { + messages = new ArrayList<>(); + } + + public static DummySessionFactory createDefault() { + return new DummySessionFactory(); + } + + protected DummySessionFactory(boolean autoReply) { + this.autoReply = autoReply; + messages = new ArrayList<>(); + } + + protected DummySessionFactory(ReplyFactory autoReplyFactory) { + this.autoReply = true; + this.autoReplyFactory = autoReplyFactory; + messages = new ArrayList<>(); + } + + public static DummySessionFactory createWithAutoReplyFactory(ReplyFactory autoReplyFactory) { + return new DummySessionFactory(autoReplyFactory); + } + + protected DummySessionFactory(Error e) { + autoReply = true; + this.autoError = e; + messages = new ArrayList<>(); + } + + public static DummySessionFactory createWithErrorAutoReply(Error e) { + return new DummySessionFactory(e); + } + + public static DummySessionFactory createWithAutoReply() { + return new DummySessionFactory(true); + } + + public DummySessionFactory(Error e, OutputStream out) { + messages = null; + autoReply = true; + output = out; + } + + public int sessionsCreated() { + return sessionsCreated; + } + void add(Message m) { + if (messages != null) { + messages.add(m); + } + + } + + @Override + public SendSession createSendSession(ReplyHandler r, Metric metric) { + ++sessionsCreated; + + if (output != null) { + return new DumpDocuments(output, r, this); + } + if (autoReply) { + return new AutoReplySession(r, autoReplyFactory, autoError, this); + } + return new DummySession(r, this); + } + + @Override + public VisitorSession createVisitorSession(VisitorParameters p) { + return null; + } + + public void sendReply(Message m, Error error) { + MyContext ctxt = (MyContext) m.getContext(); + + Reply r = new EmptyReply(); + r.setMessage(m); + r.setContext(ctxt.oldContext); + + if (error != null) { + r.addError(error); + } + + ctxt.handler.handleReply(r); + } + + private class MyContext { + MyContext(ReplyHandler handler, Object ctxt) { + this.handler = handler; + this.oldContext = ctxt; + } + + ReplyHandler handler; + Object oldContext; + } + + private class AutoReplySession extends SendSession { + + ReplyHandler handler; + ReplyFactory replyFactory; + Error e; + DummySessionFactory owner; + + public AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory, + Error e, DummySessionFactory owner) { + this.handler = handler; + this.replyFactory = replyFactory; + this.e = e; + this.owner = owner; + } + + protected void handleMessage(Message m) { + + } + + @Override + protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { + owner.add(m); + handleMessage(m); + Reply r; + if (replyFactory == null) { + r = new EmptyReply(); + } else { + r = replyFactory.createReply(m); + } + + m.setTimeReceivedNow(); + r.setMessage(m); + r.setContext(m.getContext()); + + if (e != null) { + r.addError(e); + } + handler.handleReply(r); + return Result.ACCEPTED; + } + + @Override + public void close() { + } + + } + + private class DumpDocuments extends AutoReplySession { + final OutputStream out; + public DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) { + super(r, null, null, factory); + this.out = out; + } + protected void handleMessage(Message m) { + if (m instanceof PutDocumentMessage) { + PutDocumentMessage p = (PutDocumentMessage) m; + Document d = p.getDocumentPut().getDocument(); + d.serialize(out); + } + } + } + + private class DummySession extends SendSession { + ReplyHandler handler; + DummySessionFactory owner; + + public DummySession(ReplyHandler handler, DummySessionFactory owner) { + this.handler = handler; + this.owner = owner; + } + + @Override + protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { + m.setContext(new MyContext(handler, m.getContext())); + owner.add(m); + return Result.ACCEPTED; + } + + @Override + public void close() { + } + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java new file mode 100755 index 00000000000..a26064cd98b --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java @@ -0,0 +1,111 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.jdisc.Metric; +import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.clientmetrics.ClientMetrics; +import com.yahoo.vespaclient.ClusterList; +import com.yahoo.vespaclient.config.FeederConfig; + +import java.util.Map; +import java.util.TreeMap; + +public class FeedContext { + + private final SessionFactory factory; + private final MessagePropertyProcessor propertyProcessor; + private final DocumentTypeManager docTypeManager; + private final ClusterList clusterList; + private final ClientMetrics metrics; + private final Metric metric; + private Map<String, SharedSender> senders = new TreeMap<>(); + + public static final Object sync = new Object(); + public static FeedContext instance = null; + + public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, ClusterList clusterList, Metric metric) { + this.propertyProcessor = propertyProcessor; + this.factory = factory; + docTypeManager = manager; + this.clusterList = clusterList; + metrics = new ClientMetrics(); + this.metric = metric; + } + + public ClientMetrics getMetrics() { + return metrics; + } + + public ClusterList getClusterList() { + return clusterList; + } + + public SessionFactory getSessionFactory() { + return factory; + } + + public void shutdownSenders() { + for (SharedSender s : senders.values()) { + s.shutdown(); + } + } + + public synchronized SharedSender getSharedSender(String route) { + if (propertyProcessor.configChanged()) { + Map<String, SharedSender> newSenders = new TreeMap<>(); + + for (Map.Entry<String, SharedSender> sender : senders.entrySet()) { + newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric)); + } + + shutdownSenders(); + senders = newSenders; + propertyProcessor.setConfigChanged(false); + } + + if (route == null) { + route = propertyProcessor.getFeederOptions().getRoute(); + } + + SharedSender sender = senders.get(route); + + if (sender == null) { + sender = new SharedSender(route, factory, sender, metric); + senders.put(route, sender); + metrics.addRouteMetricSet(sender.getMetrics()); + } + + return sender; + } + + public MessagePropertyProcessor getPropertyProcessor() { + return propertyProcessor; + } + + public DocumentTypeManager getDocumentTypeManager() { + return docTypeManager; + } + + public static FeedContext getInstance(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, Metric metric) { + synchronized (sync) { + try { + if (instance == null) { + MessagePropertyProcessor proc = new MessagePropertyProcessor(feederConfig, loadTypeConfig); + MessageBusSessionFactory mbusFactory = new MessageBusSessionFactory(proc); + instance = new FeedContext(proc, + mbusFactory, + mbusFactory.getAccess().getDocumentTypeManager(), + new ClusterList("client"), metric); + } else { + instance.getPropertyProcessor().configure(feederConfig, loadTypeConfig); + } + + return instance; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java new file mode 100644 index 00000000000..6fc7b31f648 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java @@ -0,0 +1,109 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import java.io.InputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.LinkedList; +import java.util.List; + +import javax.xml.stream.XMLStreamException; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; + +/** + * Base class for unpacking document operation streams and pushing to feed + * access points. + * + * @author Thomas Gundersen + * @author steinar + */ +public abstract class Feeder { + + protected final InputStream stream; + protected final DocumentTypeManager docMan; + protected List<String> errors = new LinkedList<String>(); + protected boolean doAbort = true; + protected boolean createIfNonExistent = false; + protected final VespaFeedSender sender; + private final int MAX_ERRORS = 10; + + protected Feeder(DocumentTypeManager docMan, VespaFeedSender sender, InputStream stream) { + this.docMan = docMan; + this.sender = sender; + this.stream = stream; + } + + public void setAbortOnDocumentError(boolean doAbort) { + this.doAbort = doAbort; + } + + public void setCreateIfNonExistent(boolean value) { + this.createIfNonExistent = value; + } + + public void addException(Exception e) { + String message; + if (e.getMessage() != null) { + message = e.getMessage().replaceAll("\"", "'"); + } else { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + message = "(no message) " + sw.toString(); + } + + addError("ERROR: " + message); + } + + private void addError(String error) { + if (errors.size() < MAX_ERRORS) { + errors.add(error); + } else if (errors.size() == MAX_ERRORS) { + errors.add("Reached maximum limit of errors (" + MAX_ERRORS + "). Not collecting any more."); + } + } + + protected abstract FeedReader createReader() throws Exception; + + public List<String> parse() { + FeedReader reader = null; + + try { + reader = createReader(); + } catch (Exception e) { + addError("ERROR: " + e.getClass().toString() + ": " + e.getMessage().replaceAll("\"", "'")); + return errors; + } + + while (!sender.isAborted()) { + try { + VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); + reader.read(op); + if (createIfNonExistent && op.getDocumentUpdate() != null) { + op.getDocumentUpdate().setCreateIfNonExistent(true); + } + + // Done feeding. + if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { + break; + } else { + sender.sendOperation(op); + } + } catch (XMLStreamException e) { + addException(e); + break; + } catch (Exception e) { + addException(e); + if (doAbort) { + break; + } + } + } + + return errors; + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java new file mode 100755 index 00000000000..2894993b983 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java @@ -0,0 +1,364 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy; +import com.yahoo.vespaclient.config.FeederConfig; + + +/** + * Just a wrapper for feeder options, from config or HTTP parameters. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class FeederOptions { + // These default values are here basically just for convenience in test cases, + // they are overridden by real config values in all other cases. + private boolean abortOnDocumentError = true; + private boolean abortOnSendError = true; + private boolean retryEnabled = true; + private double retryDelay = 1; + private double timeout = 60; + private int maxPendingBytes = 0; + private int maxPendingDocs = 0; + private double maxFeedRate = 0.0; + private String documentManagerConfigId = "client"; + private String idPrefix = ""; + private String route = "default"; + private String routingConfigId; + private String slobrokConfigId; + private int traceLevel; + private int mbusPort; + private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3; + private boolean priorityExplicitlySet = false; + private String docprocChain = ""; + + /** Constructs an options object with all default values. */ + public FeederOptions() { + // empty + } + + /** + * Implements the copy constructor. + * + * @param src The options to copy. + */ + public FeederOptions(FeederOptions src) { + abortOnDocumentError = src.abortOnDocumentError; + abortOnSendError = src.abortOnSendError; + retryEnabled = src.retryEnabled; + retryDelay = src.retryDelay; + timeout = src.timeout; + maxPendingBytes = src.maxPendingBytes; + maxPendingDocs = src.maxPendingDocs; + maxFeedRate = src.maxFeedRate; + documentManagerConfigId = src.documentManagerConfigId; + idPrefix = src.idPrefix; + route = src.route; + routingConfigId = src.routingConfigId; + slobrokConfigId = src.slobrokConfigId; + traceLevel = src.traceLevel; + mbusPort = src.mbusPort; + priority = src.priority; + docprocChain = src.docprocChain; + } + + /** Constructor that sets values from config. */ + public FeederOptions(FeederConfig config) { + setAbortOnDocumentError(config.abortondocumenterror()); + setAbortOnSendError(config.abortonsenderror()); + setIdPrefix(config.idprefix()); + setMaxPendingBytes(config.maxpendingbytes()); + setMaxPendingDocs(config.maxpendingdocs()); + setRetryEnabled(config.retryenabled()); + setRetryDelay(config.retrydelay()); + setRoute(config.route()); + setTimeout(config.timeout()); + setTraceLevel(config.tracelevel()); + setMessageBusPort(config.mbusport()); + setDocprocChain(config.docprocchain()); + setMaxFeedRate(config.maxfeedrate()); + } + + public void setMaxFeedRate(double feedRate) { + maxFeedRate = feedRate; + } + + + public double getMaxFeedRate() { + return maxFeedRate; + } + + public boolean getRetryEnabled() { + return retryEnabled; + } + + public void setRetryEnabled(boolean retryEnabled) { + this.retryEnabled = retryEnabled; + } + + public double getRetryDelay() { + return retryDelay; + } + + public void setRetryDelay(double retryDelay) { + this.retryDelay = retryDelay; + } + + public double getTimeout() { + return timeout; + } + + public void setTimeout(double timeout) { + this.timeout = timeout; + } + + public int getMaxPendingBytes() { + return maxPendingBytes; + } + + public void setMaxPendingBytes(int maxPendingBytes) { + this.maxPendingBytes = maxPendingBytes; + } + + public int getMaxPendingDocs() { + return maxPendingDocs; + } + + public void setMaxPendingDocs(int maxPendingDocs) { + this.maxPendingDocs = maxPendingDocs; + } + + public boolean abortOnDocumentError() { + return abortOnDocumentError; + } + + public void setAbortOnDocumentError(boolean abortOnDocumentError) { + this.abortOnDocumentError = abortOnDocumentError; + } + + public boolean abortOnSendError() { + return abortOnSendError; + } + + public void setAbortOnSendError(boolean abortOnSendError) { + this.abortOnSendError = abortOnSendError; + } + + public String getIdPrefix() { + return idPrefix; + } + + public void setIdPrefix(String idPrefix) { + this.idPrefix = idPrefix; + } + + public void setRoute(String route) { + this.route = route; + } + + public String getRoute() { + return route; + } + + public DocumentProtocol.Priority getPriority() { + return priority; + } + + public boolean isPriorityExplicitlySet() { + return priorityExplicitlySet; + } + + public String getSlobrokConfigId() { + return slobrokConfigId; + } + + public void setSlobrokConfigId(String slobrokConfigId) { + this.slobrokConfigId = slobrokConfigId; + } + + public String getRoutingConfigId() { + return routingConfigId; + } + + public void setRoutingConfigId(String routingConfigId) { + this.routingConfigId = routingConfigId; + } + + public String getDocumentManagerConfigId() { + return documentManagerConfigId; + } + + public void setDocumentManagerConfigId(String documentManagerConfigId) { + this.documentManagerConfigId = documentManagerConfigId; + } + + public int getTraceLevel() { + return traceLevel; + } + + public void setTraceLevel(int traceLevel) { + this.traceLevel = traceLevel; + } + + public int getMessageBusPort() { + return mbusPort; + } + + public void setMessageBusPort(int mbusPort) { + this.mbusPort = mbusPort; + } + + public void setPriority(DocumentProtocol.Priority priority) { + this.priority = priority; + this.priorityExplicitlySet = true; + } + + public String getDocprocChain() { + return docprocChain; + } + + public void setDocprocChain(String chain) { + docprocChain = chain; + } + + /** + * Creates a source session params object with parameters set as these options + * dictate. + */ + public SourceSessionParams toSourceSessionParams() { + SourceSessionParams params = new SourceSessionParams(); + + StaticThrottlePolicy policy; + if (maxFeedRate > 0.0) { + policy = new RateThrottlingPolicy(maxFeedRate); + } else if ((maxPendingDocs == 0) && (maxPendingBytes == 0)) { + policy = new DynamicThrottlePolicy(); + } else { + policy = new StaticThrottlePolicy(); + } + if (maxPendingDocs > 0) { + policy.setMaxPendingCount(maxPendingDocs); + } + if (maxPendingBytes > 0) { + policy.setMaxPendingSize(maxPendingBytes); + } + + params.setThrottlePolicy(policy); + + params.setTimeout(getTimeout()); + return params; + } + + public MessageBusParams toMessageBusParams() { + MessageBusParams mbusParams = new MessageBusParams(); + if (retryEnabled) { + RetryTransientErrorsPolicy retryPolicy = new RetryTransientErrorsPolicy(); + retryPolicy.setBaseDelay(retryDelay); + mbusParams.setRetryPolicy(retryPolicy); + } else { + mbusParams.setRetryPolicy(null); + } + return mbusParams; + } + + public RPCNetworkParams getNetworkParams() { + try { + RPCNetworkParams networkParams = new RPCNetworkParams(); + if (mbusPort != -1) { + networkParams.setListenPort(mbusPort); + } + return networkParams; + } catch (Exception e) { + } + + return null; + } + + @Override + public String toString() { + return "FeederOptions{" + + "abortOnDocumentError=" + abortOnDocumentError + + ", abortOnSendError=" + abortOnSendError + + ", retryEnabled=" + retryEnabled + + ", retryDelay=" + retryDelay + + ", timeout=" + timeout + + ", maxPendingBytes=" + maxPendingBytes + + ", maxPendingDocs=" + maxPendingDocs + + ", documentManagerConfigId='" + documentManagerConfigId + '\'' + + ", idPrefix='" + idPrefix + '\'' + + ", route='" + route + '\'' + + ", routingConfigId='" + routingConfigId + '\'' + + ", slobrokConfigId='" + slobrokConfigId + '\'' + + ", traceLevel=" + traceLevel + + ", mbusPort=" + mbusPort + + ", priority=" + priority.name() + + ", priorityExplicitlySet=" + priorityExplicitlySet + + ", docprocChain='" + docprocChain + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof FeederOptions)) return false; + + FeederOptions that = (FeederOptions) o; + + if (abortOnDocumentError != that.abortOnDocumentError) return false; + if (abortOnSendError != that.abortOnSendError) return false; + if (maxPendingBytes != that.maxPendingBytes) return false; + if (maxPendingDocs != that.maxPendingDocs) return false; + if (maxFeedRate != that.maxFeedRate) return false; + if (mbusPort != that.mbusPort) return false; + if (priorityExplicitlySet != that.priorityExplicitlySet) return false; + if (Double.compare(that.retryDelay, retryDelay) != 0) return false; + if (retryEnabled != that.retryEnabled) return false; + if (Double.compare(that.timeout, timeout) != 0) return false; + if (traceLevel != that.traceLevel) return false; + if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false; + if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) { + return false; + } + if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false; + if (priority != that.priority) return false; + if (route != null ? !route.equals(that.route) : that.route != null) return false; + if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) { + return false; + } + if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result; + long temp; + result = (abortOnDocumentError ? 1 : 0); + result = 31 * result + (abortOnSendError ? 1 : 0); + result = 31 * result + (retryEnabled ? 1 : 0); + temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L; + result = 31 * result + (int) (temp ^ (temp >>> 32)); + temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L; + result = 31 * result + (int) (temp ^ (temp >>> 32)); + result = 31 * result + maxPendingBytes; + result = 31 * result + maxPendingDocs; + result = 31 * result + ((int)(maxFeedRate * 1000)); + result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0); + result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0); + result = 31 * result + (route != null ? route.hashCode() : 0); + result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0); + result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0); + result = 31 * result + traceLevel; + result = 31 * result + mbusPort; + result = 31 * result + (priority != null ? priority.hashCode() : 0); + result = 31 * result + (priorityExplicitlySet ? 1 : 0); + result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0); + return result; + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java new file mode 100644 index 00000000000..06ba1ac8c10 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java @@ -0,0 +1,24 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import java.io.InputStream; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.vespaxmlparser.FeedReader; + +/** + * Unpack JSON document operations and push to a feed access point. + * + * @author steinar + */ +public class JsonFeeder extends Feeder { + public JsonFeeder(DocumentTypeManager docMan, SimpleFeedAccess sender, InputStream stream) { + super(docMan, new VespaFeedSender(sender), stream); + } + + @Override + protected FeedReader createReader() throws Exception { + return new JsonFeedReader(stream, docMan); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java new file mode 100755 index 00000000000..8021ea86783 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java @@ -0,0 +1,102 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; +import com.yahoo.documentapi.messagebus.MessageBusParams; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; + +import java.util.Collections; + +public class MessageBusSessionFactory implements SessionFactory { + + private final MessageBusDocumentAccess access; + private final MessagePropertyProcessor processor; + + private interface Metrics { + String NUM_OPERATIONS = "num_operations"; + String NUM_PUTS = "num_puts"; + String NUM_REMOVES = "num_removes"; + String NUM_UPDATES = "num_updates"; + } + + public MessageBusSessionFactory(MessagePropertyProcessor processor) { + this.processor = processor; + MessageBusParams params = new MessageBusParams(processor.getLoadTypes()); + params.setTraceLevel(processor.getFeederOptions().getTraceLevel()); + params.setRPCNetworkParams(processor.getFeederOptions().getNetworkParams()); + params.setDocumentManagerConfigId("client"); + access = new MessageBusDocumentAccess(params); + } + + public MessageBusDocumentAccess getAccess() { + return access; + } + + @Override + public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) { + return new SourceSessionWrapper( + access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()), + metric); + } + + public void shutDown() { + access.shutdown(); + } + + @Override + public synchronized VisitorSession createVisitorSession(VisitorParameters params) { + try { + return access.createVisitorSession(params); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private class SourceSessionWrapper extends SendSession { + private final SourceSession session; + private final Metric metric; + private final Metric.Context context; + + private SourceSessionWrapper(SourceSession session, Metric metric) { + this.session = session; + this.metric = metric; + this.context = metric.createContext(Collections.<String, String>emptyMap()); + } + + @Override + protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { + updateCounters(m); + if (blockIfQueueFull) { + return session.sendBlocking(m); + } else { + return session.send(m); + } + } + + private void updateCounters(Message m) { + metric.add(Metrics.NUM_OPERATIONS, 1, context); + + if (m instanceof PutDocumentMessage) { + metric.add(Metrics.NUM_PUTS, 1, context); + } else if (m instanceof RemoveDocumentMessage) { + metric.add(Metrics.NUM_REMOVES, 1, context); + } else if (m instanceof UpdateDocumentMessage) { + metric.add(Metrics.NUM_UPDATES, 1, context); + } + } + + @Override + public void close() { + session.close(); + } + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java new file mode 100755 index 00000000000..5ade035476d --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.messagebus.Message; + +public interface MessageProcessor { + public void process(Message m); +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java new file mode 100644 index 00000000000..f99a73ebd55 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java @@ -0,0 +1,324 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.container.Container; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.jdisc.DocumentProcessingHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.messagebus.loadtypes.LoadType; +import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespaclient.config.FeederConfig; + +import java.util.logging.Logger; + +/** + * Utility class for assigning properties to messages, either from implicit + * config values or from explicit values in requests. + */ +public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscriber<FeederConfig> { + + private static final Logger log = Logger.getLogger(MessagePropertyProcessor.class.getName()); + private FeederOptions feederOptions = null; + private Route defaultRoute = null; + private long defaultTimeoutMillis = 0; + private boolean retryEnabled = true; + private String defaultDocprocChain = null; + private boolean defaultAbortOnDocumentError = true; + private boolean defaultAbortOnSendError = true; + private boolean defaultCreateIfNonExistent = false; + private LoadTypeSet loadTypes = null; + private boolean configChanged = false; + + public MessagePropertyProcessor(String configId, String loadTypeConfig) { + new ConfigSubscriber().subscribe(this, FeederConfig.class, configId); + loadTypes = new LoadTypeSet(loadTypeConfig); + } + + public MessagePropertyProcessor(FeederConfig config, LoadTypeConfig loadTypeCfg) { + loadTypes = new LoadTypeSet(); + configure(config, loadTypeCfg); + } + + public void setRoute(String routeOverride) { + defaultRoute = Route.parse(routeOverride); + } + + private synchronized String getDocprocChainParameter(HttpRequest request) { + String docprocChainParam = request.getProperty("docprocchain"); + return (docprocChainParam == null ? defaultDocprocChain : docprocChainParam); + } + + public synchronized DocprocService getDocprocChain(HttpRequest request) { + ComponentRegistry<DocprocService> services = getDocprocServiceRegistry(request); + + String docprocChain = getDocprocChainParameter(request); + if (docprocChain == null) { + return null; + } + + return services.getComponent(docprocChain); + } + + public synchronized ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) { + String docprocChain = getDocprocChainParameter(request); + if (docprocChain == null) { + return null; + } + + Container container = Container.get(); + if (container == null) { + throw new IllegalStateException("Could not get Container instance."); + } + + ComponentRegistry<RequestHandler> requestHandlerRegistry = container.getRequestHandlerRegistry(); + if (requestHandlerRegistry == null) { + throw new IllegalStateException("Could not get requesthandlerregistry."); + } + + DocumentProcessingHandler handler = (DocumentProcessingHandler) requestHandlerRegistry + .getComponent(DocumentProcessingHandler.class.getName()); + if (handler == null) { + return null; + } + ComponentRegistry<DocprocService> services = handler.getDocprocServiceRegistry(); + if (services == null) { + throw new IllegalStateException("Could not get DocprocServiceRegistry."); + } + return services; + } + + public PropertySetter buildPropertySetter(HttpRequest request) { + String routeParam = null; + double timeoutParam = -1; + String priorityParam = null; + String abortOnDocErrorParam = null; + String abortOnFeedErrorParam = null; + String loadTypeStr = null; + String traceStr = null; + String createIfNonExistentParam = null; + + if (request != null) { + routeParam = request.getProperty("route"); + + String timeoutStr = request.getProperty("timeout"); + if (timeoutStr != null) { + timeoutParam = Double.parseDouble(timeoutStr); + } + + priorityParam = request.getProperty("priority"); + traceStr = request.getProperty("tracelevel"); + abortOnDocErrorParam = request.getProperty("abortondocumenterror"); + abortOnFeedErrorParam = request.getProperty("abortonfeederror"); + loadTypeStr = request.getProperty("loadtype"); + createIfNonExistentParam = request.getProperty("createifnonexistent"); + } + + Route route = (routeParam != null ? Route.parse(routeParam) : null); + long timeout; + boolean retry; + boolean abortOnDocumentError; + boolean abortOnFeedError; + boolean createIfNonExistent; + + synchronized (this) { + if (route == null) { + route = defaultRoute; + } + timeout = (timeoutParam < 0 ? defaultTimeoutMillis : (long)(timeoutParam * 1000)); + retry = retryEnabled; + abortOnDocumentError = (abortOnDocErrorParam == null ? defaultAbortOnDocumentError : (!"false".equals(abortOnDocErrorParam))); + abortOnFeedError = (abortOnFeedErrorParam == null ? defaultAbortOnSendError : (!"false".equals(abortOnFeedErrorParam))); + createIfNonExistent = (createIfNonExistentParam == null ? defaultCreateIfNonExistent : ("true".equals(createIfNonExistentParam))); + } + DocumentProtocol.Priority priority = null; + if (priorityParam != null) { + priority = DocumentProtocol.getPriorityByName(priorityParam); + } + + LoadType loadType = null; + if (loadTypes != null && loadTypeStr != null) { + loadType = loadTypes.getNameMap().get(loadTypeStr); + } + + if (loadType == null) { + loadType = LoadType.DEFAULT; + } + + return new PropertySetter(route, timeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0); + } + + public long getDefaultTimeoutMillis() { return defaultTimeoutMillis; } + + public synchronized boolean configChanged() { + return configChanged; + } + + public synchronized void setConfigChanged(boolean configChanged) { + this.configChanged = configChanged; + } + + public synchronized FeederOptions getFeederOptions() { + return feederOptions; + } + + public synchronized void configure(FeederConfig config, LoadTypeConfig loadTypeConfig) { + loadTypes.configure(loadTypeConfig); + configure(config); + } + + public LoadTypeSet getLoadTypes() { + return loadTypes; + } + + public synchronized void configure(FeederConfig config) { + if (feederOptions != null) { + setConfigChanged(true); + } + + feederOptions = new FeederOptions(config); + if (feederOptions.getRoute() != null) { + defaultRoute = Route.parse(feederOptions.getRoute()); + } else { + defaultRoute = null; + } + defaultTimeoutMillis = (long) (feederOptions.getTimeout() * 1000); + retryEnabled = feederOptions.getRetryEnabled(); + defaultAbortOnDocumentError = feederOptions.abortOnDocumentError(); + defaultAbortOnSendError = feederOptions.abortOnSendError(); + + if (!"".equals(feederOptions.getDocprocChain())) { + defaultDocprocChain = feederOptions.getDocprocChain(); + } else { + defaultDocprocChain = null; + } + + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Received new config (" + + "route: " + (defaultRoute != null ? defaultRoute : "<none>") + + ", timeout: " + defaultTimeoutMillis + " ms, retry enabled: " + retryEnabled + + ", docproc chain: " + (defaultDocprocChain != null ? defaultDocprocChain : "<none>") + + ", abort on doc error: " + defaultAbortOnDocumentError + + ", abort on feed error: " + defaultAbortOnSendError + ")"); + } + } + + public class PropertySetter implements MessageProcessor { + /** Route either set by configuration or by explicit request override. May be null */ + private Route route; + /** Timeout (in milliseconds) */ + private long timeout; + /** Explicit priority set. May be null */ + private DocumentProtocol.Priority priority; + private boolean retryEnabled; + private boolean abortOnDocumentError; + private boolean abortOnFeedError; + private boolean createIfNonExistent; + private LoadType loadType; + private int traceLevel; + + public PropertySetter(Route route, long timeout, DocumentProtocol.Priority priority, LoadType loadType, + boolean retryEnabled, boolean abortOnDocumentError, boolean abortOnFeedError, + boolean createIfNonExistent, int traceLevel) { + this.route = route; + this.timeout = timeout; + this.priority = priority; + this.loadType = loadType; + this.retryEnabled = retryEnabled; + this.abortOnDocumentError = abortOnDocumentError; + this.abortOnFeedError = abortOnFeedError; + this.createIfNonExistent = createIfNonExistent; + this.traceLevel = traceLevel; + } + + public Route getRoute() { + return route; + } + + public void setRoute(Route route) { + this.route = route; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public DocumentProtocol.Priority getPriority() { + return priority; + } + + public void setPriority(DocumentProtocol.Priority priority) { + this.priority = priority; + } + + public LoadType getLoadType() { + return loadType; + } + + public void setLoadType(LoadType loadType) { + this.loadType = loadType; + } + + public boolean getAbortOnDocumentError() { + return abortOnDocumentError; + } + + public boolean getAbortOnFeedError() { + return abortOnFeedError; + } + + public boolean getCreateIfNonExistent() { + return createIfNonExistent; + } + + @Override + public void process(Message msg) { + if (route != null) { + msg.setRoute(route); + } + msg.setTimeRemaining(timeout); + msg.setRetryEnabled(retryEnabled); + msg.getTrace().setLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel)); + + if (loadType != null) { + ((DocumentMessage) msg).setLoadType(loadType); + ((DocumentMessage) msg).setPriority(loadType.getPriority()); + } + + if (priority != null) { + ((DocumentMessage) msg).setPriority(priority); + } + } + + public void process(VisitorParameters params) { + if (route != null) { + params.setRoute(route); + } + params.setTimeoutMs(timeout); + + params.setTraceLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel)); + + if (loadType != null) { + params.setLoadType(loadType); + params.setPriority(loadType.getPriority()); + } + + if (priority != null) { + params.setPriority(priority); + } + } + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java new file mode 100755 index 00000000000..dc2294832b1 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Result; + +/** + * Wrapper class to send Messages. Used instead of using a MessageBus session directly + * so that unit tests can be more easily made. + */ +public abstract class SendSession { + + protected abstract Result onSend(Message m, boolean blockIfQueueIsFull) throws InterruptedException; + + public Result send(Message m, boolean blockIfQueueIsFull) throws InterruptedException { + return onSend(m, blockIfQueueIsFull); + } + + public abstract void close(); + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java new file mode 100755 index 00000000000..307d8773049 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.ReplyHandler; + +/** + * Wraps the creation of messagebus source sessions to allow + * for unit testing of the components without involving messagebus itself. + */ +public interface SessionFactory { + + /** + * Creates a messagebus session for sending regular messages. + * + * + * @param handler A replyhandler to callback when receiving replies from messagebus + * @return The session to use for sending messages. + */ + SendSession createSendSession(ReplyHandler handler, Metric metric); + + /** + * Creates a messagebus session for visiting data. + * + * @param params Parameters to the visitor + * @return A visitor session. + */ + VisitorSession createVisitorSession(VisitorParameters params); + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java new file mode 100755 index 00000000000..48ff0eae36b --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -0,0 +1,246 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.jdisc.Metric; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.clientmetrics.RouteMetricSet; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +/** + * This class allows multiple clients to use one shared messagebus session. + * The user should create a ResultCallback, which acts as a "session" for that + * client, and send one or more messages using the send() methods. + * When done sending messages, the client can wait for all messages to be replied to + * using the waitForPending method. + */ +public class SharedSender implements ReplyHandler { + + public static final Logger log = Logger.getLogger(SharedSender.class.getName()); + + private SendSession sender; + private final Object monitor = new Object(); + private RouteMetricSet metrics; + + // Maps from filename to number of pending requests + private Map<ResultCallback, Integer> activeOwners = new HashMap<>(); + + /** + * Creates a new shared sender. + * If oldsender != null, we copy that status information from that sender. + */ + public SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) { + if (factory != null) { + sender = factory.createSendSession(this, metric); + } + + if (oldSender != null) { + this.metrics = oldSender.metrics; + } else { + metrics = new RouteMetricSet(route, null); + } + } + + public RouteMetricSet getMetrics() { + return metrics; + } + + public void remove(ResultCallback owner) { + synchronized (monitor) { + activeOwners.remove(owner); + } + } + + public void shutdown() { + try { + synchronized (monitor) { + while ( ! activeOwners.isEmpty()) { + monitor.wait(180 * 1000); + } + } + } catch (InterruptedException e) { + } + sender.close(); + } + + /** + * Waits until there are no more pending documents + * for the given callback, or the timeout expires. + * + * @param owner The callback to check for. + * @param timeoutMs The number of milliseconds to wait, or -1 to wait indefinitely. + * @return true if there were no more pending, or false if the timeout expired. + */ + public boolean waitForPending(ResultCallback owner, long timeoutMs) { + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMs; + + try { + while (timeoutMs == -1 || timeLeft > 0) { + synchronized (monitor) { + Integer count = activeOwners.get(owner); + if (count == null || count == 0) { + return true; + } else if (timeLeft > 0) { + monitor.wait(timeLeft); + } else { + monitor.wait(); + } + } + + timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); + } + } catch (InterruptedException e) { + } + + return false; + } + + public int getPendingCount(ResultCallback owner) { + Integer count = activeOwners.get(owner); + if (count == null) { + return 0; + } + return count; + } + + /** + * Returns true if the given result callback has any pending messages with this + * sender. + * + * @param owner The callback to check + * @return True if there are any pending, false if not. + */ + public boolean hasPending(ResultCallback owner) { + return getPendingCount(owner) > 0; + } + + /** + * Waits until the given file has no pending documents. + * + * @param owner the file to check for pending documents + */ + public void waitForPending(ResultCallback owner) { + waitForPending(owner, -1); + } + + /** + * Sends a message + * + * @param msg The message to send. + * @param owner A callback to send replies to when received from messagebus + */ + public void send(Message msg, ResultCallback owner) { + send(msg, owner, -1, true); + } + + /** + * Sends a message. Waits until the number of pending messages for this owner has + * become lower than the specified limit if necessary. + * + * @param msg The message to send + * @param owner The callback to send replies to when received from messagebus + * @param maxPendingPerOwner The maximum number of pending messages the callback + * @param blockingQueue If true, block until the message bus queue is available. + */ + public void send(Message msg, ResultCallback owner, int maxPendingPerOwner, boolean blockingQueue) { + // Silently fail messages that are attempted sent after the callback aborted. + if (owner.isAborted()) { + return; + } + + try { + synchronized (monitor) { + if (maxPendingPerOwner != -1 && blockingQueue) { + while (true) { + Integer count = activeOwners.get(owner); + + if (count != null && count >= maxPendingPerOwner) { + log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); + monitor.wait(10000); + } else { + break; + } + } + } + + Integer count = activeOwners.get(owner); + + if (count == null) { + activeOwners.put(owner, 1); + } else { + activeOwners.put(owner, count + 1); + } + } + } catch (InterruptedException e) { + return; + } + + msg.setContext(owner); + + try { + com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue); + if (!r.isAccepted()) { + EmptyReply reply = new EmptyReply(); + msg.swapState(reply); + reply.setMessage(msg); + reply.addError(r.getError()); + handleReply(reply); + } + } catch (InterruptedException e) { + } + } + + /** + * Implement replyHandler from messagebus. Called when a reply is received from messagebus. + * Tries to find the callback from the reply context and updates the pending state for the callback. + * + * @param r the reply to process. + */ + @Override + public void handleReply(Reply r) { + synchronized (monitor) { + ResultCallback owner = (ResultCallback) r.getContext(); + + if (owner != null) { + metrics.addReply(r); + + Integer count = activeOwners.get(owner); + + if (count != null) { + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); + } + + if ( ! owner.handleReply(r, count - 1)) { + activeOwners.remove(owner); + } else { + activeOwners.put(owner, count - 1); + } + } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + } + + monitor.notifyAll(); + } + } + + public interface ResultCallback { + + /** Return true if we should continue waiting for replies for this sender. */ + boolean handleReply(Reply r, int numPending); + + /** + * Returns true if feeding has been aborted. No more feeding is allowed with this + * callback after that. + */ + boolean isAborted(); + + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java new file mode 100755 index 00000000000..52f5add9f44 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; + +public interface SimpleFeedAccess { + + void put(Document doc); + void remove(DocumentId docId); + void update(DocumentUpdate update); + void put(Document doc, TestAndSetCondition condition); + void remove(DocumentId docId, TestAndSetCondition condition); + void update(DocumentUpdate update, TestAndSetCondition condition); + boolean isAborted(); + +}
\ No newline at end of file diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java new file mode 100755 index 00000000000..a9a08562c9d --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java @@ -0,0 +1,114 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.messagebus.Message; +import java.util.ArrayList; +import java.util.List; + +/** Simplifies sending messages belonging to a single result callback. */ +public class SingleSender implements SimpleFeedAccess { + + private final SharedSender.ResultCallback owner; + private final SharedSender sender; + private final List<MessageProcessor> messageProcessors = new ArrayList<>(); + private boolean blockingQueue; + + public SingleSender(SharedSender.ResultCallback owner, SharedSender sender, boolean blockingQueue) { + this.owner = owner; + this.sender = sender; + this.blockingQueue = blockingQueue; + } + + public SingleSender(SharedSender.ResultCallback owner, SharedSender sender) { + this(owner, sender, true); + } + + @Override + public void put(Document doc) { + send(new PutDocumentMessage(new DocumentPut(doc))); + } + + @Override + public void remove(DocumentId docId) { + send(new RemoveDocumentMessage(docId)); + } + + @Override + public void update(DocumentUpdate update) { + send(new UpdateDocumentMessage(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(doc)); + message.setCondition(condition); + send(message); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + RemoveDocumentMessage message = new RemoveDocumentMessage(docId); + message.setCondition(condition); + send(message); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + UpdateDocumentMessage message = new UpdateDocumentMessage(update); + message.setCondition(condition); + send(message); + } + + @Override + public boolean isAborted() { + return owner.isAborted(); + } + + public void addMessageProcessor(MessageProcessor processor) { + messageProcessors.add(processor); + } + + // Runs all message processors on the message and returns it. + private Message processMessage(Message m) { + for (MessageProcessor processor : messageProcessors) { + processor.process(m); + } + return m; + } + + public void send(Message m) { + send(m, -1); + } + + /** + * Sends the given message, allowing a maximum of maxPending messages to be + * sent for this sender. + * + * @param m The message to send + * @param maxPending The number of pending messages to block on for this sender. + */ + public void send(Message m, int maxPending) { + sender.send(processMessage(m), owner, maxPending, blockingQueue); + } + + public void done() { + // empty + } + + public void waitForPending() { + waitForPending(-1); + } + + public boolean waitForPending(long timeoutMs) { + return sender.waitForPending(owner, timeoutMs); + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java new file mode 100755 index 00000000000..7838f00642a --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * Wrapper class for SimpleFeedAccess to send various XML operations. + */ +public class VespaFeedSender { + + private final SimpleFeedAccess sender; + + public VespaFeedSender(SimpleFeedAccess sender) { + this.sender = sender; + } + + public boolean isAborted() { + return sender.isAborted(); + } + + public void sendOperation(VespaXMLFeedReader.Operation op) { + switch (op.getType()) { + case DOCUMENT: + sender.put(op.getDocument(), op.getCondition()); + break; + case REMOVE: + sender.remove(op.getRemove(), op.getCondition()); + break; + case UPDATE: + sender.update(op.getDocumentUpdate(), op.getCondition()); + break; + } + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java new file mode 100755 index 00000000000..3c989e6d42d --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedapi; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; + +import java.io.InputStream; + +/** + * Unpack a stream of document operations represented as XML and push to a feed + * access point. + * + * @author Thomas Gundersen + * @author steinar + */ +public class XMLFeeder extends Feeder { + public XMLFeeder(DocumentTypeManager docMan, SimpleFeedAccess sender, InputStream stream) { + super(docMan, new VespaFeedSender(sender), stream); + } + + @Override + protected FeedReader createReader() throws Exception { + return new VespaXMLFeedReader(stream, docMan); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java new file mode 100755 index 00000000000..24d30c4aa0a --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java @@ -0,0 +1,163 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler; + +import com.yahoo.clientmetrics.RouteMetricSet; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.container.jdisc.VespaHeaders; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.feedapi.SharedSender; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.text.Utf8String; +import com.yahoo.text.XMLWriter; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Stream; + +@SuppressWarnings("deprecation") +public final class FeedResponse extends HttpResponse implements SharedSender.ResultCallback { + + private final static Logger log = Logger.getLogger(FeedResponse.class.getName()); + private final List<ErrorMessage> errorMessages = new ArrayList<>(); + private final List<String> errors = new ArrayList<>(); + private final StringBuilder traces = new StringBuilder(); + private final RouteMetricSet metrics; + private boolean abortOnError = false; + private boolean isAborted = false; + + public FeedResponse(RouteMetricSet metrics) { + super(com.yahoo.jdisc.http.HttpResponse.Status.OK); + this.metrics = metrics; + } + + public boolean isAborted() { + return isAborted; + } + + public void setAbortOnFeedError(boolean abort) { + abortOnError = abort; + } + + @Override + public void render(OutputStream outputStream) throws IOException { + if ( ! errorMessages.isEmpty()) + setStatus(VespaHeaders.getStatus(false, errorMessages.get(0), errorMessages.iterator())); + + XMLWriter writer = new XMLWriter(new OutputStreamWriter(outputStream)); + writer.openTag("result"); + + if (metrics != null) { + metrics.printXml(writer, 0, 0); + } + if (traces.length() > 0) { + writer.openTag("trace"); + writer.append(traces); + writer.closeTag(); + } + if (!errors.isEmpty()) { + writer.openTag("errors"); + writer.attribute(new Utf8String("count"), errors.size()); + + for (int i = 0; i < errors.size() && i < 10; ++i) { + writer.openTag("error"); + writer.attribute(new Utf8String("message"), errors.get(i)); + writer.closeTag(); + } + writer.closeTag(); + } + + writer.closeTag(); + writer.flush(); + outputStream.close(); + } + + @Override + public java.lang.String getContentType() { + return "application/xml"; + } + + public String prettyPrint(Message m) { + if (m instanceof PutDocumentMessage) { + return "PUT[" + ((PutDocumentMessage)m).getDocumentPut().getDocument().getId() + "] "; + } + if (m instanceof RemoveDocumentMessage) { + return "REMOVE[" + ((RemoveDocumentMessage)m).getDocumentId() + "] "; + } + if (m instanceof UpdateDocumentMessage) { + return "UPDATE[" + ((UpdateDocumentMessage)m).getDocumentUpdate().getId() + "] "; + } + + return ""; + } + + public boolean handleReply(Reply reply, int numPending) { + metrics.addReply(reply); + if (reply.getTrace().getLevel() > 0) { + String str = reply.getTrace().toString(); + traces.append(str); + System.out.println(str); + } + + if (containsFatalErrors(reply.getErrors())) { + for (int i = 0; i < reply.getNumErrors(); ++i) { + Error err = reply.getError(i); + StringBuilder out = new StringBuilder(prettyPrint(reply.getMessage())); + out.append("[").append(DocumentProtocol.getErrorName(err.getCode())).append("] "); + if (err.getService() != null) { + out.append("(").append(err.getService()).append(") "); + } + out.append(err.getMessage()); + + String str = out.toString(); + log.finest(str); + addError(str); + } + isAborted = abortOnError; + return !abortOnError; + } + return numPending > 0; + } + + public void done() { + metrics.done(); + } + + public FeedResponse addXMLParseError(String error) { + errorMessages.add(ErrorMessage.createBadRequest(error)); + errors.add(error); + return this; + } + + public FeedResponse addError(String error) { + errorMessages.add(ErrorMessage.createBadRequest(error)); + errors.add(error); + return this; + } + + public List<String> getErrorList() { + return errors; + } + + public List<ErrorMessage> getErrorMessageList() { + return errorMessages; + } + + private static boolean containsFatalErrors(Stream<Error> errors) { + return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED); + } + + public boolean isSuccess() { + return errors.isEmpty(); + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java new file mode 100644 index 00000000000..4e0896b6da4 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler; + +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.metrics.MetricSet; +import com.yahoo.text.XMLWriter; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; + +/** + * Response that generates metric output like a status page. + */ +public final class MetricResponse extends HttpResponse { + + MetricSet set; + + MetricResponse(MetricSet set) { + super(com.yahoo.jdisc.http.HttpResponse.Status.OK); + this.set = set; + } + + @Override + public void render(OutputStream stream) throws IOException { + XMLWriter writer = new XMLWriter(new OutputStreamWriter(stream)); + writer.openTag("status"); + set.printXml(writer, 0, 2); + writer.closeTag(); + writer.flush(); + } + + @Override + public java.lang.String getContentType() { + return "application/xml"; + } + +}
\ No newline at end of file diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java new file mode 100644 index 00000000000..22f75705334 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler; + +import com.yahoo.jdisc.Metric; +import java.util.Map; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + * @since 5.1.20 + */ +public final class NullFeedMetric implements Metric { + @Override + public void set(String key, Number val, Context ctx) { + } + + @Override + public void add(String key, Number val, Context ctx) { + } + + @Override + public Context createContext(Map<String, ?> properties) { + return NullFeedContext.INSTANCE; + } + + private static class NullFeedContext implements Context { + private static final NullFeedContext INSTANCE = new NullFeedContext(); + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java new file mode 100755 index 00000000000..6e3facbdc98 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -0,0 +1,102 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler; + +import com.google.inject.Inject; +import com.yahoo.clientmetrics.RouteMetricSet; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.feedapi.DocprocMessageProcessor; +import com.yahoo.feedapi.FeedContext; +import com.yahoo.feedapi.Feeder; +import com.yahoo.feedapi.JsonFeeder; +import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SingleSender; +import com.yahoo.feedapi.XMLFeeder; +import com.yahoo.jdisc.Metric; +import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.vespaclient.config.FeederConfig; + +import java.util.List; +import java.util.concurrent.Executor; + +/** + * Feed documents from a com.yahoo.container.handler.Request. + * + * @author Thomas Gundersen + * @author steinar + */ +public final class VespaFeedHandler extends VespaFeedHandlerBase { + + public static final String JSON_INPUT = "jsonInput"; + + @Inject + public VespaFeedHandler(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, Executor executor, + Metric metric) throws Exception { + super(feederConfig, loadTypeConfig, executor, metric); + } + + VespaFeedHandler(FeedContext context, Executor executor) throws Exception { + super(context, executor); + } + + public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) throws Exception { + return new VespaFeedHandler(context, executor); + } + + @Override + public HttpResponse handle(HttpRequest request) { + return handle(request, (RouteMetricSet.ProgressCallback)null); + } + + public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) { + if (request.getProperty("status") != null) { + return new MetricResponse(context.getMetrics().getMetricSet()); + } + + boolean asynchronous = request.getBooleanProperty("asynchronous"); + + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); + + SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); + sender.addMessageProcessor(properties); + sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); + + Feeder feeder = createFeeder(sender, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); + + List<String> errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } + if (errors.size() > 0 && feeder instanceof XMLFeeder) { + response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); + } + + sender.done(); + + if (asynchronous) { + return response; + } + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if ( ! completed) + response.addError("Timed out after "+millis+" ms waiting for responses"); + response.done(); + return response; + } + + private Feeder createFeeder(SingleSender sender, HttpRequest request) { + String contentType = request.getHeader("Content-Type"); + if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { + return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); + } else { + return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); + } + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java new file mode 100755 index 00000000000..fa1e6854593 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java @@ -0,0 +1,93 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.feedhandler; + +import com.google.inject.Inject; +import com.yahoo.clientmetrics.ClientMetrics; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.docproc.DocprocService; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.feedapi.FeedContext; +import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SharedSender; +import com.yahoo.jdisc.Metric; +import com.yahoo.search.query.ParameterParser; +import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.vespaclient.config.FeederConfig; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Executor; +import java.util.zip.GZIPInputStream; + +public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { + + protected FeedContext context; + private final long defaultTimeoutMillis; + + @Inject + public VespaFeedHandlerBase(FeederConfig feederConfig, + LoadTypeConfig loadTypeConfig, + Executor executor, + Metric metric) throws Exception { + this(FeedContext.getInstance(feederConfig, loadTypeConfig, metric), executor, (long)feederConfig.timeout() * 1000); + } + + public VespaFeedHandlerBase(FeedContext context, Executor executor) throws Exception { + this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis()); + } + + public VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) throws Exception { + super(executor); + this.context = context; + this.defaultTimeoutMillis = defaultTimeoutMillis; + } + + public SharedSender getSharedSender(String route) { + return context.getSharedSender(route); + } + + public DocprocService getDocprocChain(HttpRequest request) { + return context.getPropertyProcessor().getDocprocChain(request); + } + + public ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) { + return context.getPropertyProcessor().getDocprocServiceRegistry(request); + } + + public MessagePropertyProcessor getPropertyProcessor() { + return context.getPropertyProcessor(); + } + + /** + * @param request Request object to get the POST data stream from + * @return An InputStream that either is a GZIP wrapper or simply the + * original data stream. + * @throws IllegalArgumentException if GZIP stream creation failed + */ + public InputStream getRequestInputStream(HttpRequest request) { + if ("gzip".equals(request.getHeader("Content-Encoding"))) { + try { + return new GZIPInputStream(request.getData()); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to create GZIP input stream from content", e); + } + } else { + return request.getData(); + } + } + + protected DocumentTypeManager getDocumentTypeManager() { + return context.getDocumentTypeManager(); + } + + public ClientMetrics getMetrics() { + return context.getMetrics(); + } + + protected long getTimeoutMillis(HttpRequest request) { + return ParameterParser.asMilliSeconds(request.getProperty("timeout"), defaultTimeoutMillis); + } + +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java new file mode 100644 index 00000000000..309adee673b --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Exported not as an API, but specificalle for SSBE. + */ +@ExportPackage +package com.yahoo.feedhandler; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java new file mode 100644 index 00000000000..ab404893c08 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespaclient; + +public class ClusterDef { + public ClusterDef(String name, String configId) { + this.name = name; + this.configId = configId; + } + + String name; + String configId; + + public String getName() { return name; } + public String getConfigId() { return configId; } +}
\ No newline at end of file diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java new file mode 100644 index 00000000000..3ea3bb5cb9d --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespaclient; + +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.config.subscription.ConfigGetter; + +import java.util.ArrayList; +import java.util.List; + +public class ClusterList { + List<ClusterDef> storageClusters = new ArrayList<ClusterDef>(); + + public ClusterList() { + this(null); + } + + public ClusterList(String configId) { + if (configId != null) { + configure(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId)); + } + } + + public List<ClusterDef> getStorageClusters() { + return storageClusters; + } + + public void configure(ClusterListConfig cfg) { + storageClusters.clear(); + for (int i = 0; i < cfg.storage().size(); i++) { + storageClusters.add(new ClusterDef(cfg.storage(i).name(), + cfg.storage(i).configid())); + } + } + + public static ClusterList createMockedList(List<ClusterDef> clusters) { + ClusterList list = new ClusterList(null); + list.storageClusters = clusters; + return list; + } +} |