summaryrefslogtreecommitdiffstats
path: root/vespaclient-java/src/main/java/com/yahoo/vespafeeder
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@yahoo-inc.com>2017-06-06 14:13:55 +0200
committerBjørn Christian Seime <bjorncs@yahoo-inc.com>2017-06-06 14:13:55 +0200
commit8f2d9c36cccb34a1594d5f1cf82ac852ad5e712d (patch)
tree15addc0903bb361f6df8c5833877844d068687ab /vespaclient-java/src/main/java/com/yahoo/vespafeeder
parentef89ead652b55d7742767aea7e9c3d9243f19336 (diff)
Move vespaclient-java to Vespa open-source
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo/vespafeeder')
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java191
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java76
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java14
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java38
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java149
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java171
6 files changed, 639 insertions, 0 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java
new file mode 100644
index 00000000000..249ed1fd70c
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java
@@ -0,0 +1,191 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.feedapi.DummySessionFactory;
+import com.yahoo.feedapi.MessageBusSessionFactory;
+import com.yahoo.feedapi.MessagePropertyProcessor;
+import com.yahoo.feedapi.SessionFactory;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.lang.System.out;
+
+/**
+ * Argument parsing class for the vespa feeder.
+ */
+public class Arguments {
+ public FeederConfig getFeederConfig() {
+ return new FeederConfig(feederConfigBuilder);
+ }
+
+ public List<String> getFiles() {
+ return files;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ private FeederConfig.Builder feederConfigBuilder = new FeederConfig.Builder();
+ private List<String> files = new ArrayList<String>();
+ private String dumpDocumentsFile = null;
+ private String mode = "standard";
+ private boolean validateOnly = false;
+ private boolean verbose = false;
+ SessionFactory sessionFactory = null;
+ MessagePropertyProcessor propertyProcessor = null;
+ private String priority = null;
+
+ public MessagePropertyProcessor getPropertyProcessor() {
+ return propertyProcessor;
+ }
+
+ public void help() {
+ out.println("This is a tool for feeding xml (deprecated) or json data to a Vespa application.\n" +
+ "\n" +
+ "The options are:\n" +
+ " --abortondataerror arg (true) Whether or not to abort if the xml input has \n" +
+ " errors (true|false).\n" +
+ " --abortonsenderror arg (true) Whether or not to abort if an error occured while\n" +
+ " sending operations to Vespa (true|false).\n" +
+ " --file arg The name of the input files to read. These can \n" +
+ " also be passed as arguments without the option \n" +
+ " prefix. If none is given, this tool parses \n" +
+ " identifiers from standard in.\n" +
+ " -h [ --help ] Shows this help page.\n" +
+ " --maxpending arg The maximum number of operations that are allowed\n" +
+ " to be pending at any given time. NOTE: This disables dynamic throttling. Use with care.\n" +
+ " --maxpendingsize arg The maximum size (in bytes) of operations that \n" +
+ " are allowed to be pending at any given time. \n" +
+ " --maxfeedrate arg Limits the feed rate to the given number (operations/second). You may still want to increase\n" +
+ " the max pending size if your feed rate doesn't reach the desired number.\n" +
+ " --mode arg (=standard) The mode to run vespafeeder in (standard | benchmark).\n" +
+ " --noretry Turns off retries of recoverable failures.\n" +
+ " --retrydelay arg (=1) The time (in seconds) to wait between retries of \n" +
+ " a failed operation.\n" +
+ " --route arg (=default) The route to send the data to.\n" +
+ " --timeout arg (=180) The time (in seconds) allowed for sending \n" +
+ " operations.\n" +
+ " --trace arg (=0) The trace level of network traffic.\n" +
+ " --validate Run validation tool on input files instead of \n" +
+ " feeding them.\n" +
+ " --dumpDocuments <filename> Specify a file where documents in the put are serialized.\n" +
+ " --priority arg Specify priority of sent messages (see documentation for priority values)\n" +
+ " --create-if-non-existent Enable setting of create-if-non-existent to true on all document updates in the given xml feed.\n" +
+ " -v [ --verbose ] Enable verbose output of progress.\n");
+ }
+
+ public class HelpShownException extends Exception {
+
+ }
+
+ public Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException {
+ parse(argList);
+
+ if (factory != null) {
+ sessionFactory = factory;
+ } else if (validateOnly) {
+ if (dumpDocumentsFile != null) {
+ BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(dumpDocumentsFile));
+ sessionFactory = new DummySessionFactory(null, out);
+ } else {
+ sessionFactory = new DummySessionFactory(null, null);
+ }
+ } else {
+ sessionFactory = new MessageBusSessionFactory(propertyProcessor);
+ }
+ }
+
+ void parse(String[] argList) throws HelpShownException {
+ List<String> args = new LinkedList<String>();
+ args.addAll(Arrays.asList(argList));
+
+ while (!args.isEmpty()) {
+ String arg = args.remove(0);
+
+ if (arg.equals("-h") || arg.equals("--help")) {
+ help();
+ throw new HelpShownException();
+ } else if ("--abortondataerror".equals(arg)) {
+ feederConfigBuilder.abortondocumenterror(getBoolean(getParam(args, arg)));
+ } else if ("--abortonsenderror".equals(arg)) {
+ feederConfigBuilder.abortonsenderror(getBoolean(getParam(args, arg)));
+ } else if ("--file".equals(arg)) {
+ files.add(getParam(args, arg));
+ } else if ("--maxpending".equals(arg)) {
+ feederConfigBuilder.maxpendingdocs(Integer.parseInt(getParam(args, arg)));
+ } else if ("--maxpendingsize".equals(arg)) {
+ feederConfigBuilder.maxpendingbytes(Integer.parseInt(getParam(args, arg)));
+ } else if ("--mode".equals(arg)) {
+ mode = getParam(args, arg);
+ } else if ("--noretry".equals(arg)) {
+ feederConfigBuilder.retryenabled(false);
+ } else if ("--retrydelay".equals(arg)) {
+ feederConfigBuilder.retrydelay(Integer.parseInt(getParam(args, arg)));
+ } else if ("--route".equals(arg)) {
+ feederConfigBuilder.route(getParam(args, arg));
+ } else if ("--timeout".equals(arg)) {
+ feederConfigBuilder.timeout(Double.parseDouble(getParam(args, arg)));
+ } else if ("--trace".equals(arg)) {
+ feederConfigBuilder.tracelevel(Integer.parseInt(getParam(args, arg)));
+ } else if ("--validate".equals(arg)) {
+ validateOnly = true;
+ } else if ("--dumpDocuments".equals(arg)) {
+ dumpDocumentsFile = getParam(args, arg);
+ } else if ("--maxfeedrate".equals(arg)) {
+ feederConfigBuilder.maxfeedrate(Double.parseDouble(getParam(args, arg)));
+ } else if ("--create-if-non-existent".equals(arg)) {
+ feederConfigBuilder.createifnonexistent(true);
+ } else if ("-v".equals(arg) || "--verbose".equals(arg)) {
+ verbose = true;
+ } else if ("--priority".equals(arg)) {
+ priority = getParam(args, arg);
+ } else {
+ files.add(arg);
+ }
+ }
+
+ propertyProcessor = new MessagePropertyProcessor(getFeederConfig(), new LoadTypeConfig(new LoadTypeConfig.Builder()));
+ }
+
+ private String getParam(List<String> args, String arg) throws IllegalArgumentException {
+ try {
+ return args.remove(0);
+ } catch (Exception e) {
+ System.err.println("--" + arg + " requires an argument");
+ throw new IllegalArgumentException(arg);
+ }
+ }
+
+ private Boolean getBoolean(String arg) {
+ if (arg.equalsIgnoreCase("yes")) {
+ return true;
+ } else if (arg.equalsIgnoreCase("no")) {
+ return false;
+ } else {
+ return Boolean.parseBoolean(arg);
+ }
+ }
+
+ public String getPriority() {
+ return priority;
+ }
+
+ public SessionFactory getSessionFactory() {
+ return sessionFactory;
+ }
+
+
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java
new file mode 100644
index 00000000000..cc0d8f8c780
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/BenchmarkProgressPrinter.java
@@ -0,0 +1,76 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import com.yahoo.clientmetrics.MessageTypeMetricSet;
+import com.yahoo.clientmetrics.RouteMetricSet;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.metrics.Metric;
+import com.yahoo.metrics.MetricSet;
+import com.yahoo.metrics.MetricVisitor;
+
+import java.io.PrintStream;
+
+/**
+ * Class that takes progress from the feed and prints to a stream.
+ */
+public class BenchmarkProgressPrinter implements RouteMetricSet.ProgressCallback {
+ private final long startTime;
+ private final Timer timer;
+ private final PrintStream output;
+
+ public BenchmarkProgressPrinter(Timer timer, PrintStream output) {
+ this.timer = timer;
+ this.output = output;
+ this.startTime = timer.milliTime();
+ }
+
+ class PrintVisitor extends MetricVisitor {
+ private final PrintStream out;
+
+ PrintVisitor(PrintStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public boolean visitMetricSet(MetricSet set, boolean autoGenerated) {
+ if (set instanceof MessageTypeMetricSet && set.getName().equals("total")) {
+ Metric m = set.getMetric("latency");
+ Metric count = set.getMetric("count");
+ Metric err = set.getMetric("errors.total");
+
+ long okCount = 0, errCount = 0, minLatency = 0, maxLatency = 0, avgLatency = 0;
+
+ if (m != null) {
+ minLatency = m.getLongValue("min");
+ maxLatency = m.getLongValue("max");
+ avgLatency = m.getLongValue("average");
+ }
+ if (count != null) {
+ okCount = count.getLongValue("count");
+ }
+
+ if (err != null) {
+ errCount = err.getLongValue("count");
+ }
+ long timeUsed = timer.milliTime() - startTime;
+ out.println(timeUsed + ", " + okCount + ", " + errCount + ", " + minLatency + ", " + maxLatency + ", " + avgLatency);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public void onProgress(RouteMetricSet metrics) {
+ //metrics.visit(new PrintVisitor(output), false);
+ }
+
+ @Override
+ public void done(RouteMetricSet metrics) {
+ try {
+ output.println("# Time used, num ok, num error, min latency, max latency, average latency");
+ metrics.visit(new PrintVisitor(output), false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java
new file mode 100755
index 00000000000..3479221257d
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/FileRequest.java
@@ -0,0 +1,14 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+
+public class FileRequest extends InputStreamRequest {
+
+ FileRequest(File f) throws FileNotFoundException {
+ super(new FileInputStream(f));
+ }
+
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java
new file mode 100644
index 00000000000..e69eb6727b0
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/InputStreamRequest.java
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import com.yahoo.container.jdisc.HttpRequest;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This is needed because whoever wrote this library moronically decided to pass in-process communication through
+ * the HTTP layer. As the feeded is being phased out in favor of the standalone HTTP client we don't bother to clean
+ * it up properly.
+ *
+ * @author bratseth
+ */
+public class InputStreamRequest {
+
+ private InputStream input;
+ private Map<String, String> properties = new HashMap<>();
+
+ protected InputStreamRequest(InputStream input) {
+ this.input = input;
+ }
+
+ public void setProperty(String key, String value) {
+ properties.put(key, value);
+ }
+
+ public String getProperty(String key) {
+ return properties.get(key);
+ }
+
+ public HttpRequest toRequest() {
+ return HttpRequest.createTestRequest("", com.yahoo.jdisc.http.HttpRequest.Method.POST, input, properties);
+ }
+
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java
new file mode 100644
index 00000000000..52087d33a47
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/ProgressPrinter.java
@@ -0,0 +1,149 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import com.yahoo.clientmetrics.MessageTypeMetricSet;
+import com.yahoo.clientmetrics.RouteMetricSet;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.metrics.Metric;
+import com.yahoo.metrics.MetricSet;
+import com.yahoo.metrics.MetricVisitor;
+import com.yahoo.metrics.SumMetric;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.math.RoundingMode;
+import java.text.NumberFormat;
+import java.util.Locale;
+
+/**
+ * Class that takes progress from the feed and prints to a stream.
+ */
+public class ProgressPrinter implements RouteMetricSet.ProgressCallback {
+ private long startTime = 0;
+ private long lastProgressTime = 0;
+ private long lastVerboseProgress = 0;
+ final Timer timer;
+ final PrintStream output;
+
+ public ProgressPrinter(Timer timer, PrintStream output) {
+ this.timer = timer;
+ this.output = output;
+
+ startTime = timer.milliTime();
+ lastProgressTime = startTime;
+ lastVerboseProgress = startTime;
+ }
+
+ class PrintVisitor extends MetricVisitor {
+ final PrintStream out;
+ final NumberFormat format;
+
+ PrintVisitor(PrintStream out) {
+ this.out = out;
+ format = NumberFormat.getNumberInstance(Locale.US);
+ format.setMaximumFractionDigits(2);
+ format.setMinimumFractionDigits(2);
+ format.setMinimumIntegerDigits(1);
+ format.setParseIntegerOnly(false);
+ format.setRoundingMode(RoundingMode.HALF_UP);
+ format.setGroupingUsed(false);
+ }
+
+ @Override
+ public boolean visitMetricSet(MetricSet set, boolean autoGenerated) {
+ if (set instanceof MessageTypeMetricSet && !set.getName().equals("total")) {
+ Metric m = set.getMetric("latency");
+ Metric count = set.getMetric("count");
+ Metric err = set.getMetric("errors.total");
+
+ long okCount = 0, errCount = 0, ignored = 0;
+ long minLatency = 0, maxLatency = 0, avgLatency = 0;
+
+ if (m != null) {
+ minLatency = m.getLongValue("min");
+ maxLatency = m.getLongValue("max");
+ avgLatency = m.getLongValue("average");
+ }
+ if (count != null) {
+ okCount = count.getLongValue("count");
+ }
+ Metric ignoredMetric = set.getMetric("ignored");
+ if (ignoredMetric != null) {
+ ignored = ignoredMetric.getLongValue("count");
+ }
+
+ if (err != null) {
+ errCount = err.getLongValue("count");
+ }
+
+ long timeSinceStart = timer.milliTime() - startTime;
+
+ out.println(((MessageTypeMetricSet)set).getMessageName() + ":\t" +
+ "ok: " + okCount +
+ " msgs/sec: " + format.format((double)okCount * 1000 / timeSinceStart) +
+ " failed: " + errCount +
+ " ignored: " + ignored +
+ " latency(min, max, avg): " + minLatency + ", " + maxLatency + ", " + avgLatency);
+ }
+ return true;
+ }
+ }
+
+ public static String getDashes(int count) {
+ String dashes = "";
+ for (int i = 0; i < count; i++) {
+ dashes += "-";
+ }
+
+ return dashes;
+ }
+
+ public synchronized void renderStatusText(RouteMetricSet metrics, PrintStream stream) throws IOException {
+ String headline = "Messages sent to vespa (route " + metrics.getName() + ") :";
+ stream.println(headline);
+ stream.println(getDashes(headline.length()));
+ metrics.visit(new PrintVisitor(stream), false);
+ }
+
+ public long getOkMessageCount(RouteMetricSet metrics) {
+ SumMetric sum = (SumMetric)metrics.getMetric("total");
+
+ MetricSet ms = (MetricSet)sum.generateSum();
+ if (ms != null) {
+ Metric latency = ms.getMetric("latency");
+ if (latency != null) {
+ return latency.getLongValue("count");
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void onProgress(RouteMetricSet metrics) {
+ try {
+ long timeNow = timer.milliTime();
+
+ if (timeNow - lastVerboseProgress > 30000) {
+ output.println("\n");
+ renderStatusText(metrics, output);
+ lastVerboseProgress = timeNow;
+ } else if (timeNow - lastProgressTime > 1000) {
+ output.print("\rSuccessfully sent " + getOkMessageCount(metrics) + " messages so far");
+ lastProgressTime = timeNow;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void done(RouteMetricSet metrics) {
+ try {
+ output.println("\n");
+ renderStatusText(metrics, output);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java
new file mode 100755
index 00000000000..a6ede66c43d
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java
@@ -0,0 +1,171 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespafeeder;
+
+import com.yahoo.clientmetrics.RouteMetricSet;
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.feedapi.FeedContext;
+import com.yahoo.feedhandler.FeedResponse;
+import com.yahoo.feedhandler.NullFeedMetric;
+import com.yahoo.feedhandler.VespaFeedHandler;
+import com.yahoo.log.LogSetup;
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.vespaclient.ClusterList;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class VespaFeeder {
+
+ Arguments args;
+ DocumentTypeManager manager;
+ Executor threadPool = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("vespafeeder"));
+
+ public VespaFeeder(Arguments args, DocumentTypeManager manager) {
+ this.args = args;
+ this.manager = manager;
+ }
+
+ public static class FeedErrorException extends Exception {
+ String message;
+
+ public FeedErrorException(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String getMessage() {
+ return message;
+ }
+
+ }
+
+ static FeedErrorException renderErrors(List<String> errors) {
+ StringBuilder buffer = new StringBuilder();
+
+ if (!errors.isEmpty()) {
+ String headline = (errors.size() > 10) ? "First 10 errors (of " + errors.size() + "):" : "Errors:";
+ buffer.append(headline).append("\n");
+ for (int i = 0; i < headline.length(); ++i) {
+ buffer.append("-");
+ }
+ buffer.append("\n");
+ for (int i = 0; i < errors.size() && i < 10; ++i) {
+ buffer.append(" ").append(errors.get(i)).append("\n");
+ }
+ }
+
+ return new FeedErrorException(buffer.toString());
+ }
+
+ public RouteMetricSet.ProgressCallback createProgressCallback(PrintStream output) {
+ if ("benchmark".equals(args.getMode())) {
+ return new BenchmarkProgressPrinter(SystemTimer.INSTANCE, output);
+ } else {
+ return new ProgressPrinter(SystemTimer.INSTANCE, output);
+ }
+ }
+
+ void parseFiles(InputStream stdin, PrintStream output) throws Exception {
+ FeedContext context = new FeedContext(
+ args.getPropertyProcessor(),
+ args.getSessionFactory(),
+ manager,
+ new ClusterList(), new NullFeedMetric());
+
+ final BufferedInputStream input = new BufferedInputStream(stdin);
+ VespaFeedHandler handler = VespaFeedHandler.createFromContext(context, threadPool);
+
+ if (args.getFiles().isEmpty()) {
+ InputStreamRequest req = new InputStreamRequest(input);
+ setProperties(req, input);
+ FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output));
+ if ( ! response.isSuccess()) {
+ throw renderErrors(response.getErrorList());
+ }
+ } else {
+ if (args.isVerbose()) {
+ for (String fileName : args.getFiles()) {
+ long thisSize = new File(fileName).length();
+ output.println("Size of file '" + fileName + "' is " + thisSize + " B.");
+ }
+ }
+
+ for (String fileName : args.getFiles()) {
+ File f = new File(fileName);
+ FileRequest req = new FileRequest(f);
+ final BufferedInputStream inputSnooper = new BufferedInputStream(new FileInputStream(fileName));
+ setProperties(req, inputSnooper);
+ inputSnooper.close();
+ FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output));
+ if (!response.isSuccess()) {
+ throw renderErrors(response.getErrorList());
+ }
+ }
+ }
+ }
+
+ // use BufferedInputStream to enforce the input.markSupported() == true
+ private void setProperties(InputStreamRequest req, BufferedInputStream input) throws IOException {
+ setPriority(req);
+ setCreateIfNonExistent(req);
+ setJsonInput(req, input);
+ }
+
+ private void setPriority(InputStreamRequest req) {
+ if (args.getPriority() != null) {
+ req.setProperty("priority", args.getPriority());
+ }
+ }
+
+ private void setCreateIfNonExistent(InputStreamRequest req) {
+ if (args.getFeederConfig().createifnonexistent()) {
+ req.setProperty("createifnonexistent", "true");
+ }
+ }
+
+ // package access for easy testing
+ static void setJsonInput(InputStreamRequest req, BufferedInputStream input) throws IOException {
+ input.mark(4);
+ int b = input.read();
+ input.reset();
+ // A valid JSON feed will always start with '['
+ if (b == '[') {
+ req.setProperty(VespaFeedHandler.JSON_INPUT, Boolean.TRUE.toString());
+ } else {
+ req.setProperty(VespaFeedHandler.JSON_INPUT, Boolean.FALSE.toString());
+ }
+ }
+
+ public static void main(String[] args) {
+ LogSetup.initVespaLogging("vespafeeder");
+
+ try {
+ Arguments arguments = new Arguments(args, null);
+
+ DocumentTypeManager manager = new DocumentTypeManager();
+ DocumentTypeManagerConfigurer.configure(manager, "client").close();
+
+ VespaFeeder feeder = new VespaFeeder(arguments, manager);
+ feeder.parseFiles(System.in, System.out);
+ System.exit(0);
+ } catch (Arguments.HelpShownException e) {
+ System.exit(0);
+ } catch (IllegalArgumentException e) {
+ System.exit(1);
+ } catch (FileNotFoundException e) {
+ System.err.println("Could not open file " + e.getMessage());
+ System.exit(1);
+ } catch (FeedErrorException e) {
+ System.err.println("\n" + e.getMessage());
+ System.exit(1);
+ } catch (Exception e) {
+ System.err.println("Got exception " + e.getMessage() + ", aborting feed.");
+ System.exit(1);
+ }
+ }
+
+}