From 0347203cbb768e2e11e4aa8c6b6b27e67624ac36 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 1 Mar 2023 14:21:24 +0100 Subject: Backport visit slicing to `vespa-visit` CLI tool Allows for efficient parallelization across multiple visitor instances, mirroring the existing support in Document V1. Also clean up some legacy option value parsing code. Note: changing the parsed type for `maxtotalhits` from `int` to `long` is intentional; the internal limit is already a `long` and a cluster may have a lot more than `INT32_MAX` documents. --- .../main/java/com/yahoo/vespavisit/VdsVisit.java | 81 +++++++++++++++++++--- 1 file changed, 71 insertions(+), 10 deletions(-) (limited to 'vespaclient-java/src/main/java/com/yahoo') diff --git a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java index 822c64ea5fa..f2ddd4ed8ea 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java @@ -364,6 +364,23 @@ public class VdsVisit { .hasArg(false) .build()); + options.addOption(Option.builder() + .longOpt("slices") + .desc("Split the document corpus into this number of independent slices. " + + "This lets multiple, concurrent series of visitors advance the same logical " + + "visit independently, by specifying a different --sliceid for each.") + .hasArg(true) + .type(Number.class) + .build()); + + options.addOption(Option.builder() + .longOpt("sliceid") + .desc("The slice number of the visit represented by this visitor. " + + "This number must be non-negative and less than the number of slices specified for the visit.") + .hasArg(true) + .type(Number.class) + .build()); + return options; } @@ -382,6 +399,8 @@ public class VdsVisit { private boolean jsonLinesOutput = false; private boolean tensorShortForm = false; // TODO Vespa 9: change default to true private boolean tensorDirectValues = false; // TODO Vespa 9: change default to true + private int slices = 1; + private int sliceId = 0; public VisitorParameters getVisitorParameters() { return visitorParameters; @@ -489,6 +508,30 @@ public class VdsVisit { this.tensorDirectValues = tensorDirectValues; } + public int slices() { + return slices; + } + + public void setSlices(int slices) { + this.slices = slices; + } + + public int sliceId() { + return sliceId; + } + + public void setSliceId(int sliceId) { + this.sliceId = sliceId; + } + + } + + private static int optionAsInt(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException { + return ((Number)cmdLine.getParsedOptionValue(optName)).intValue(); + } + + private static long optionAsLong(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException { + return ((Number)cmdLine.getParsedOptionValue(optName)).longValue(); } protected static class ArgumentParser { @@ -517,10 +560,10 @@ public class VdsVisit { params.setBucketSpace(line.getOptionValue("bucketspace")); } if (line.hasOption("f")) { - params.setFromTimestamp(((Number) line.getParsedOptionValue("f")).longValue()); + params.setFromTimestamp(optionAsLong(line, "f")); } if (line.hasOption("t")) { - params.setToTimestamp(((Number) line.getParsedOptionValue("t")).longValue()); + params.setToTimestamp(optionAsLong(line, "t")); } if (line.hasOption("e")) { throw new IllegalArgumentException("Headers only option has been removed."); @@ -534,10 +577,10 @@ public class VdsVisit { params.visitInconsistentBuckets(true); } if (line.hasOption("m")) { - params.setMaxPending(((Number) line.getParsedOptionValue("m")).intValue()); + params.setMaxPending(optionAsInt(line, "m")); } if (line.hasOption("b")) { - params.setMaxBucketsPerVisitor(((Number) line.getParsedOptionValue("b")).intValue()); + params.setMaxBucketsPerVisitor(optionAsInt(line, "b")); } if (line.hasOption("i")) { allParams.setPrintIdsOnly(true); @@ -547,11 +590,11 @@ public class VdsVisit { params.setResumeFileName(line.getOptionValue("p")); } if (line.hasOption("o")) { - allParams.setFullTimeout(((Number) line.getParsedOptionValue("o")).intValue()); + allParams.setFullTimeout(optionAsInt(line, "o")); params.setTimeoutMs(allParams.getFullTimeout()); } if (line.hasOption("u")) { - params.setTimeoutMs(((Number) line.getParsedOptionValue("u")).intValue()); + params.setTimeoutMs(optionAsInt(line, "u")); } if (line.hasOption("visitlibrary")) { params.setVisitorLibrary(line.getOptionValue("visitlibrary")); @@ -582,13 +625,13 @@ public class VdsVisit { allParams.setAbortOnClusterDown(true); } if (line.hasOption("processtime")) { - allParams.setProcessTime(((Number) line.getParsedOptionValue("processtime")).intValue()); + allParams.setProcessTime(optionAsInt(line, "processtime")); } if (line.hasOption("maxtotalhits")) { - params.setMaxTotalHits(((Number)line.getParsedOptionValue("maxtotalhits")).intValue()); + params.setMaxTotalHits(optionAsLong(line, "maxtotalhits")); } if (line.hasOption("tracelevel")) { - params.setTraceLevel(((Number)line.getParsedOptionValue("tracelevel")).intValue()); + params.setTraceLevel(optionAsInt(line, "tracelevel")); } if (line.hasOption("priority")) { try { @@ -608,7 +651,7 @@ public class VdsVisit { } if (line.hasOption("maxpendingsuperbuckets")) { StaticThrottlePolicy throttlePolicy = new StaticThrottlePolicy(); - throttlePolicy.setMaxPendingCount(((Number)line.getParsedOptionValue("maxpendingsuperbuckets")).intValue()); + throttlePolicy.setMaxPendingCount(optionAsInt(line, "maxpendingsuperbuckets")); params.setThrottlePolicy(throttlePolicy); } if (line.hasOption("shorttensors")) { @@ -617,6 +660,13 @@ public class VdsVisit { if (line.hasOption("tensorvalues")) { allParams.setTensorDirectValues(true); } + if (line.hasOption("slices") != line.hasOption("sliceid")) { + throw new IllegalArgumentException("Both --slices and --sliceid must be specified when visiting with slicing"); + } + if (line.hasOption("slices")) { + allParams.setSlices(optionAsInt(line, "slices")); + allParams.setSliceId(optionAsInt(line, "sliceid")); + } boolean jsonOutput = line.hasOption("jsonoutput"); boolean jsonl = line.hasOption("jsonl"); @@ -632,6 +682,14 @@ public class VdsVisit { allParams.setJsonOutput(!xmlOutput); } + if (allParams.slices() != 1 || allParams.sliceId() != 0) { + if ((allParams.slices() < 1) || (allParams.sliceId() < 0) || (allParams.sliceId() >= allParams.slices())) { + throw new IllegalArgumentException("--slices must be greater than 0 and --sliceid must be in the " + + "range [0, the value provided for --slices)"); + } + params.slice(allParams.slices(), allParams.sliceId()); + } + allParams.setVisitorParameters(params); return allParams; } @@ -730,6 +788,9 @@ public class VdsVisit { if (params.skipBucketsOnFatalErrors()) { out.println("Skip visiting super buckets with fatal errors."); } + if (params.getSlices() > 1) { + out.format("Visiting slice %d out of %s slices\n", params.getSliceId(), params.getSlices()); + } } private void onDocumentSelectionException(Exception e) { -- cgit v1.2.3