diff options
author | Bjørn Christian Seime <bjorncs@yahoo-inc.com> | 2017-06-06 14:13:55 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@yahoo-inc.com> | 2017-06-06 14:13:55 +0200 |
commit | 8f2d9c36cccb34a1594d5f1cf82ac852ad5e712d (patch) | |
tree | 15addc0903bb361f6df8c5833877844d068687ab /vespaclient-java/src/main/java/com/yahoo/vespafeeder | |
parent | ef89ead652b55d7742767aea7e9c3d9243f19336 (diff) |
Move vespaclient-java to Vespa open-source
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo/vespafeeder')
6 files changed, 639 insertions, 0 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java new file mode 100644 index 00000000000..249ed1fd70c --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java @@ -0,0 +1,191 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.feedapi.DummySessionFactory; +import com.yahoo.feedapi.MessageBusSessionFactory; +import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SessionFactory; +import com.yahoo.vespaclient.config.FeederConfig; + +import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static java.lang.System.out; + +/** + * Argument parsing class for the vespa feeder. + */ +public class Arguments { + public FeederConfig getFeederConfig() { + return new FeederConfig(feederConfigBuilder); + } + + public List<String> getFiles() { + return files; + } + + public String getMode() { + return mode; + } + + public boolean isVerbose() { + return verbose; + } + + private FeederConfig.Builder feederConfigBuilder = new FeederConfig.Builder(); + private List<String> files = new ArrayList<String>(); + private String dumpDocumentsFile = null; + private String mode = "standard"; + private boolean validateOnly = false; + private boolean verbose = false; + SessionFactory sessionFactory = null; + MessagePropertyProcessor propertyProcessor = null; + private String priority = null; + + public MessagePropertyProcessor getPropertyProcessor() { + return propertyProcessor; + } + + public void help() { + out.println("This is a tool for feeding xml (deprecated) or json data to a Vespa application.\n" + + "\n" + + "The options are:\n" + + " --abortondataerror arg (true) Whether or not to abort if the xml input has \n" + + " errors (true|false).\n" + + " --abortonsenderror arg (true) Whether or not to abort if an error occured while\n" + + " sending operations to Vespa (true|false).\n" + + " --file arg The name of the input files to read. These can \n" + + " also be passed as arguments without the option \n" + + " prefix. If none is given, this tool parses \n" + + " identifiers from standard in.\n" + + " -h [ --help ] Shows this help page.\n" + + " --maxpending arg The maximum number of operations that are allowed\n" + + " to be pending at any given time. NOTE: This disables dynamic throttling. Use with care.\n" + + " --maxpendingsize arg The maximum size (in bytes) of operations that \n" + + " are allowed to be pending at any given time. \n" + + " --maxfeedrate arg Limits the feed rate to the given number (operations/second). You may still want to increase\n" + + " the max pending size if your feed rate doesn't reach the desired number.\n" + + " --mode arg (=standard) The mode to run vespafeeder in (standard | benchmark).\n" + + " --noretry Turns off retries of recoverable failures.\n" + + " --retrydelay arg (=1) The time (in seconds) to wait between retries of \n" + + " a failed operation.\n" + + " --route arg (=default) The route to send the data to.\n" + + " --timeout arg (=180) The time (in seconds) allowed for sending \n" + + " operations.\n" + + " --trace arg (=0) The trace level of network traffic.\n" + + " --validate Run validation tool on input files instead of \n" + + " feeding them.\n" + + " --dumpDocuments <filename> Specify a file where documents in the put are serialized.\n" + + " --priority arg Specify priority of sent messages (see documentation for priority values)\n" + + " --create-if-non-existent Enable setting of create-if-non-existent to true on all document updates in the given xml feed.\n" + + " -v [ --verbose ] Enable verbose output of progress.\n"); + } + + public class HelpShownException extends Exception { + + } + + public Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException { + parse(argList); + + if (factory != null) { + sessionFactory = factory; + } else if (validateOnly) { + if (dumpDocumentsFile != null) { + BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(dumpDocumentsFile)); + sessionFactory = new DummySessionFactory(null, out); + } else { + sessionFactory = new DummySessionFactory(null, null); + } + } else { + sessionFactory = new MessageBusSessionFactory(propertyProcessor); + } + } + + void parse(String[] argList) throws HelpShownException { + List<String> args = new LinkedList<String>(); + args.addAll(Arrays.asList(argList)); + + while (!args.isEmpty()) { + String arg = args.remove(0); + + if (arg.equals("-h") || arg.equals("--help")) { + help(); + throw new HelpShownException(); + } else if ("--abortondataerror".equals(arg)) { + feederConfigBuilder.abortondocumenterror(getBoolean(getParam(args, arg))); + } else if ("--abortonsenderror".equals(arg)) { + feederConfigBuilder.abortonsenderror(getBoolean(getParam(args, arg))); + } else if ("--file".equals(arg)) { + files.add(getParam(args, arg)); + } else if ("--maxpending".equals(arg)) { + feederConfigBuilder.maxpendingdocs(Integer.parseInt(getParam(args, arg))); + } else if ("--maxpendingsize".equals(arg)) { + feederConfigBuilder.maxpendingbytes(Integer.parseInt(getParam(args, arg))); + } else if ("--mode".equals(arg)) { + mode = getParam(args, arg); + } else if ("--noretry".equals(arg)) { + feederConfigBuilder.retryenabled(false); + } else if ("--retrydelay".equals(arg)) { + feederConfigBuilder.retrydelay(Integer.parseInt(getParam(args, arg))); + } else if ("--route".equals(arg)) { + feederConfigBuilder.route(getParam(args, arg)); + } else if ("--timeout".equals(arg)) { + feederConfigBuilder.timeout(Double.parseDouble(getParam(args, arg))); + } else if ("--trace".equals(arg)) { + feederConfigBuilder.tracelevel(Integer.parseInt(getParam(args, arg))); + } else if ("--validate".equals(arg)) { + validateOnly = true; + } else if ("--dumpDocuments".equals(arg)) { + dumpDocumentsFile = getParam(args, arg); + } else if ("--maxfeedrate".equals(arg)) { + feederConfigBuilder.maxfeedrate(Double.parseDouble(getParam(args, arg))); + } else if ("--create-if-non-existent".equals(arg)) { + feederConfigBuilder.createifnonexistent(true); + } else if ("-v".equals(arg) || "--verbose".equals(arg)) { + verbose = true; + } else if ("--priority".equals(arg)) { + priority = getParam(args, arg); + } else { + files.add(arg); + } + } + + propertyProcessor = new MessagePropertyProcessor(getFeederConfig(), new LoadTypeConfig(new LoadTypeConfig.Builder())); + } + + private String getParam(List<String> args, String arg) throws IllegalArgumentException { + try { + return args.remove(0); + } catch (Exception e) { + System.err.println("--" + arg + " requires an argument"); + throw new IllegalArgumentException(arg); + } + } + + private Boolean getBoolean(String arg) { + if (arg.equalsIgnoreCase("yes")) { + return true; + } else if (arg.equalsIgnoreCase("no")) { + return false; + } else { + return Boolean.parseBoolean(arg); + } + } + + public String getPriority() { + return priority; + } + + public SessionFactory getSessionFactory() { + return sessionFactory; + } + + +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java new file mode 100644 index 00000000000..cc0d8f8c780 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java @@ -0,0 +1,76 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import com.yahoo.clientmetrics.MessageTypeMetricSet; +import com.yahoo.clientmetrics.RouteMetricSet; +import com.yahoo.concurrent.Timer; +import com.yahoo.metrics.Metric; +import com.yahoo.metrics.MetricSet; +import com.yahoo.metrics.MetricVisitor; + +import java.io.PrintStream; + +/** + * Class that takes progress from the feed and prints to a stream. + */ +public class BenchmarkProgressPrinter implements RouteMetricSet.ProgressCallback { + private final long startTime; + private final Timer timer; + private final PrintStream output; + + public BenchmarkProgressPrinter(Timer timer, PrintStream output) { + this.timer = timer; + this.output = output; + this.startTime = timer.milliTime(); + } + + class PrintVisitor extends MetricVisitor { + private final PrintStream out; + + PrintVisitor(PrintStream out) { + this.out = out; + } + + @Override + public boolean visitMetricSet(MetricSet set, boolean autoGenerated) { + if (set instanceof MessageTypeMetricSet && set.getName().equals("total")) { + Metric m = set.getMetric("latency"); + Metric count = set.getMetric("count"); + Metric err = set.getMetric("errors.total"); + + long okCount = 0, errCount = 0, minLatency = 0, maxLatency = 0, avgLatency = 0; + + if (m != null) { + minLatency = m.getLongValue("min"); + maxLatency = m.getLongValue("max"); + avgLatency = m.getLongValue("average"); + } + if (count != null) { + okCount = count.getLongValue("count"); + } + + if (err != null) { + errCount = err.getLongValue("count"); + } + long timeUsed = timer.milliTime() - startTime; + out.println(timeUsed + ", " + okCount + ", " + errCount + ", " + minLatency + ", " + maxLatency + ", " + avgLatency); + } + return true; + } + } + + @Override + public void onProgress(RouteMetricSet metrics) { + //metrics.visit(new PrintVisitor(output), false); + } + + @Override + public void done(RouteMetricSet metrics) { + try { + output.println("# Time used, num ok, num error, min latency, max latency, average latency"); + metrics.visit(new PrintVisitor(output), false); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java new file mode 100755 index 00000000000..3479221257d --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java @@ -0,0 +1,14 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; + +public class FileRequest extends InputStreamRequest { + + FileRequest(File f) throws FileNotFoundException { + super(new FileInputStream(f)); + } + +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java new file mode 100644 index 00000000000..e69eb6727b0 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import com.yahoo.container.jdisc.HttpRequest; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +/** + * This is needed because whoever wrote this library moronically decided to pass in-process communication through + * the HTTP layer. As the feeded is being phased out in favor of the standalone HTTP client we don't bother to clean + * it up properly. + * + * @author bratseth + */ +public class InputStreamRequest { + + private InputStream input; + private Map<String, String> properties = new HashMap<>(); + + protected InputStreamRequest(InputStream input) { + this.input = input; + } + + public void setProperty(String key, String value) { + properties.put(key, value); + } + + public String getProperty(String key) { + return properties.get(key); + } + + public HttpRequest toRequest() { + return HttpRequest.createTestRequest("", com.yahoo.jdisc.http.HttpRequest.Method.POST, input, properties); + } + +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java new file mode 100644 index 00000000000..52087d33a47 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java @@ -0,0 +1,149 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import com.yahoo.clientmetrics.MessageTypeMetricSet; +import com.yahoo.clientmetrics.RouteMetricSet; +import com.yahoo.concurrent.Timer; +import com.yahoo.metrics.Metric; +import com.yahoo.metrics.MetricSet; +import com.yahoo.metrics.MetricVisitor; +import com.yahoo.metrics.SumMetric; + +import java.io.IOException; +import java.io.PrintStream; +import java.math.RoundingMode; +import java.text.NumberFormat; +import java.util.Locale; + +/** + * Class that takes progress from the feed and prints to a stream. + */ +public class ProgressPrinter implements RouteMetricSet.ProgressCallback { + private long startTime = 0; + private long lastProgressTime = 0; + private long lastVerboseProgress = 0; + final Timer timer; + final PrintStream output; + + public ProgressPrinter(Timer timer, PrintStream output) { + this.timer = timer; + this.output = output; + + startTime = timer.milliTime(); + lastProgressTime = startTime; + lastVerboseProgress = startTime; + } + + class PrintVisitor extends MetricVisitor { + final PrintStream out; + final NumberFormat format; + + PrintVisitor(PrintStream out) { + this.out = out; + format = NumberFormat.getNumberInstance(Locale.US); + format.setMaximumFractionDigits(2); + format.setMinimumFractionDigits(2); + format.setMinimumIntegerDigits(1); + format.setParseIntegerOnly(false); + format.setRoundingMode(RoundingMode.HALF_UP); + format.setGroupingUsed(false); + } + + @Override + public boolean visitMetricSet(MetricSet set, boolean autoGenerated) { + if (set instanceof MessageTypeMetricSet && !set.getName().equals("total")) { + Metric m = set.getMetric("latency"); + Metric count = set.getMetric("count"); + Metric err = set.getMetric("errors.total"); + + long okCount = 0, errCount = 0, ignored = 0; + long minLatency = 0, maxLatency = 0, avgLatency = 0; + + if (m != null) { + minLatency = m.getLongValue("min"); + maxLatency = m.getLongValue("max"); + avgLatency = m.getLongValue("average"); + } + if (count != null) { + okCount = count.getLongValue("count"); + } + Metric ignoredMetric = set.getMetric("ignored"); + if (ignoredMetric != null) { + ignored = ignoredMetric.getLongValue("count"); + } + + if (err != null) { + errCount = err.getLongValue("count"); + } + + long timeSinceStart = timer.milliTime() - startTime; + + out.println(((MessageTypeMetricSet)set).getMessageName() + ":\t" + + "ok: " + okCount + + " msgs/sec: " + format.format((double)okCount * 1000 / timeSinceStart) + + " failed: " + errCount + + " ignored: " + ignored + + " latency(min, max, avg): " + minLatency + ", " + maxLatency + ", " + avgLatency); + } + return true; + } + } + + public static String getDashes(int count) { + String dashes = ""; + for (int i = 0; i < count; i++) { + dashes += "-"; + } + + return dashes; + } + + public synchronized void renderStatusText(RouteMetricSet metrics, PrintStream stream) throws IOException { + String headline = "Messages sent to vespa (route " + metrics.getName() + ") :"; + stream.println(headline); + stream.println(getDashes(headline.length())); + metrics.visit(new PrintVisitor(stream), false); + } + + public long getOkMessageCount(RouteMetricSet metrics) { + SumMetric sum = (SumMetric)metrics.getMetric("total"); + + MetricSet ms = (MetricSet)sum.generateSum(); + if (ms != null) { + Metric latency = ms.getMetric("latency"); + if (latency != null) { + return latency.getLongValue("count"); + } + } + + return 0; + } + + @Override + public void onProgress(RouteMetricSet metrics) { + try { + long timeNow = timer.milliTime(); + + if (timeNow - lastVerboseProgress > 30000) { + output.println("\n"); + renderStatusText(metrics, output); + lastVerboseProgress = timeNow; + } else if (timeNow - lastProgressTime > 1000) { + output.print("\rSuccessfully sent " + getOkMessageCount(metrics) + " messages so far"); + lastProgressTime = timeNow; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void done(RouteMetricSet metrics) { + try { + output.println("\n"); + renderStatusText(metrics, output); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java new file mode 100755 index 00000000000..a6ede66c43d --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java @@ -0,0 +1,171 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespafeeder; + +import com.yahoo.clientmetrics.RouteMetricSet; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.feedapi.FeedContext; +import com.yahoo.feedhandler.FeedResponse; +import com.yahoo.feedhandler.NullFeedMetric; +import com.yahoo.feedhandler.VespaFeedHandler; +import com.yahoo.log.LogSetup; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.vespaclient.ClusterList; + +import java.io.*; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class VespaFeeder { + + Arguments args; + DocumentTypeManager manager; + Executor threadPool = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("vespafeeder")); + + public VespaFeeder(Arguments args, DocumentTypeManager manager) { + this.args = args; + this.manager = manager; + } + + public static class FeedErrorException extends Exception { + String message; + + public FeedErrorException(String message) { + this.message = message; + } + + @Override + public String getMessage() { + return message; + } + + } + + static FeedErrorException renderErrors(List<String> errors) { + StringBuilder buffer = new StringBuilder(); + + if (!errors.isEmpty()) { + String headline = (errors.size() > 10) ? "First 10 errors (of " + errors.size() + "):" : "Errors:"; + buffer.append(headline).append("\n"); + for (int i = 0; i < headline.length(); ++i) { + buffer.append("-"); + } + buffer.append("\n"); + for (int i = 0; i < errors.size() && i < 10; ++i) { + buffer.append(" ").append(errors.get(i)).append("\n"); + } + } + + return new FeedErrorException(buffer.toString()); + } + + public RouteMetricSet.ProgressCallback createProgressCallback(PrintStream output) { + if ("benchmark".equals(args.getMode())) { + return new BenchmarkProgressPrinter(SystemTimer.INSTANCE, output); + } else { + return new ProgressPrinter(SystemTimer.INSTANCE, output); + } + } + + void parseFiles(InputStream stdin, PrintStream output) throws Exception { + FeedContext context = new FeedContext( + args.getPropertyProcessor(), + args.getSessionFactory(), + manager, + new ClusterList(), new NullFeedMetric()); + + final BufferedInputStream input = new BufferedInputStream(stdin); + VespaFeedHandler handler = VespaFeedHandler.createFromContext(context, threadPool); + + if (args.getFiles().isEmpty()) { + InputStreamRequest req = new InputStreamRequest(input); + setProperties(req, input); + FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output)); + if ( ! response.isSuccess()) { + throw renderErrors(response.getErrorList()); + } + } else { + if (args.isVerbose()) { + for (String fileName : args.getFiles()) { + long thisSize = new File(fileName).length(); + output.println("Size of file '" + fileName + "' is " + thisSize + " B."); + } + } + + for (String fileName : args.getFiles()) { + File f = new File(fileName); + FileRequest req = new FileRequest(f); + final BufferedInputStream inputSnooper = new BufferedInputStream(new FileInputStream(fileName)); + setProperties(req, inputSnooper); + inputSnooper.close(); + FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output)); + if (!response.isSuccess()) { + throw renderErrors(response.getErrorList()); + } + } + } + } + + // use BufferedInputStream to enforce the input.markSupported() == true + private void setProperties(InputStreamRequest req, BufferedInputStream input) throws IOException { + setPriority(req); + setCreateIfNonExistent(req); + setJsonInput(req, input); + } + + private void setPriority(InputStreamRequest req) { + if (args.getPriority() != null) { + req.setProperty("priority", args.getPriority()); + } + } + + private void setCreateIfNonExistent(InputStreamRequest req) { + if (args.getFeederConfig().createifnonexistent()) { + req.setProperty("createifnonexistent", "true"); + } + } + + // package access for easy testing + static void setJsonInput(InputStreamRequest req, BufferedInputStream input) throws IOException { + input.mark(4); + int b = input.read(); + input.reset(); + // A valid JSON feed will always start with '[' + if (b == '[') { + req.setProperty(VespaFeedHandler.JSON_INPUT, Boolean.TRUE.toString()); + } else { + req.setProperty(VespaFeedHandler.JSON_INPUT, Boolean.FALSE.toString()); + } + } + + public static void main(String[] args) { + LogSetup.initVespaLogging("vespafeeder"); + + try { + Arguments arguments = new Arguments(args, null); + + DocumentTypeManager manager = new DocumentTypeManager(); + DocumentTypeManagerConfigurer.configure(manager, "client").close(); + + VespaFeeder feeder = new VespaFeeder(arguments, manager); + feeder.parseFiles(System.in, System.out); + System.exit(0); + } catch (Arguments.HelpShownException e) { + System.exit(0); + } catch (IllegalArgumentException e) { + System.exit(1); + } catch (FileNotFoundException e) { + System.err.println("Could not open file " + e.getMessage()); + System.exit(1); + } catch (FeedErrorException e) { + System.err.println("\n" + e.getMessage()); + System.exit(1); + } catch (Exception e) { + System.err.println("Got exception " + e.getMessage() + ", aborting feed."); + System.exit(1); + } + } + +} |