diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-03-01 14:21:24 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-03-01 16:11:30 +0100 |
commit | 0347203cbb768e2e11e4aa8c6b6b27e67624ac36 (patch) | |
tree | 95f9121e6a6755eda7ec403b176e9065919ac10d /vespaclient-java | |
parent | cdc86f437afbb34cabc9f05db951bef6ad206121 (diff) |
Backport visit slicing to `vespa-visit` CLI tool
Allows for efficient parallelization across multiple visitor
instances, mirroring the existing support in Document V1.
Also clean up some legacy option value parsing code. Note:
changing the parsed type for `maxtotalhits` from `int` to `long`
is intentional; the internal limit is already a `long` and a
cluster may have a lot more than `INT32_MAX` documents.
Diffstat (limited to 'vespaclient-java')
-rw-r--r-- | vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java | 81 | ||||
-rw-r--r-- | vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java | 60 |
2 files changed, 111 insertions, 30 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java index 822c64ea5fa..f2ddd4ed8ea 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java @@ -364,6 +364,23 @@ public class VdsVisit { .hasArg(false) .build()); + options.addOption(Option.builder() + .longOpt("slices") + .desc("Split the document corpus into this number of independent slices. " + + "This lets multiple, concurrent series of visitors advance the same logical " + + "visit independently, by specifying a different --sliceid for each.") + .hasArg(true) + .type(Number.class) + .build()); + + options.addOption(Option.builder() + .longOpt("sliceid") + .desc("The slice number of the visit represented by this visitor. " + + "This number must be non-negative and less than the number of slices specified for the visit.") + .hasArg(true) + .type(Number.class) + .build()); + return options; } @@ -382,6 +399,8 @@ public class VdsVisit { private boolean jsonLinesOutput = false; private boolean tensorShortForm = false; // TODO Vespa 9: change default to true private boolean tensorDirectValues = false; // TODO Vespa 9: change default to true + private int slices = 1; + private int sliceId = 0; public VisitorParameters getVisitorParameters() { return visitorParameters; @@ -489,6 +508,30 @@ public class VdsVisit { this.tensorDirectValues = tensorDirectValues; } + public int slices() { + return slices; + } + + public void setSlices(int slices) { + this.slices = slices; + } + + public int sliceId() { + return sliceId; + } + + public void setSliceId(int sliceId) { + this.sliceId = sliceId; + } + + } + + private static int optionAsInt(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException { + return ((Number)cmdLine.getParsedOptionValue(optName)).intValue(); + } + + private static long optionAsLong(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException { + return ((Number)cmdLine.getParsedOptionValue(optName)).longValue(); } protected static class ArgumentParser { @@ -517,10 +560,10 @@ public class VdsVisit { params.setBucketSpace(line.getOptionValue("bucketspace")); } if (line.hasOption("f")) { - params.setFromTimestamp(((Number) line.getParsedOptionValue("f")).longValue()); + params.setFromTimestamp(optionAsLong(line, "f")); } if (line.hasOption("t")) { - params.setToTimestamp(((Number) line.getParsedOptionValue("t")).longValue()); + params.setToTimestamp(optionAsLong(line, "t")); } if (line.hasOption("e")) { throw new IllegalArgumentException("Headers only option has been removed."); @@ -534,10 +577,10 @@ public class VdsVisit { params.visitInconsistentBuckets(true); } if (line.hasOption("m")) { - params.setMaxPending(((Number) line.getParsedOptionValue("m")).intValue()); + params.setMaxPending(optionAsInt(line, "m")); } if (line.hasOption("b")) { - params.setMaxBucketsPerVisitor(((Number) line.getParsedOptionValue("b")).intValue()); + params.setMaxBucketsPerVisitor(optionAsInt(line, "b")); } if (line.hasOption("i")) { allParams.setPrintIdsOnly(true); @@ -547,11 +590,11 @@ public class VdsVisit { params.setResumeFileName(line.getOptionValue("p")); } if (line.hasOption("o")) { - allParams.setFullTimeout(((Number) line.getParsedOptionValue("o")).intValue()); + allParams.setFullTimeout(optionAsInt(line, "o")); params.setTimeoutMs(allParams.getFullTimeout()); } if (line.hasOption("u")) { - params.setTimeoutMs(((Number) line.getParsedOptionValue("u")).intValue()); + params.setTimeoutMs(optionAsInt(line, "u")); } if (line.hasOption("visitlibrary")) { params.setVisitorLibrary(line.getOptionValue("visitlibrary")); @@ -582,13 +625,13 @@ public class VdsVisit { allParams.setAbortOnClusterDown(true); } if (line.hasOption("processtime")) { - allParams.setProcessTime(((Number) line.getParsedOptionValue("processtime")).intValue()); + allParams.setProcessTime(optionAsInt(line, "processtime")); } if (line.hasOption("maxtotalhits")) { - params.setMaxTotalHits(((Number)line.getParsedOptionValue("maxtotalhits")).intValue()); + params.setMaxTotalHits(optionAsLong(line, "maxtotalhits")); } if (line.hasOption("tracelevel")) { - params.setTraceLevel(((Number)line.getParsedOptionValue("tracelevel")).intValue()); + params.setTraceLevel(optionAsInt(line, "tracelevel")); } if (line.hasOption("priority")) { try { @@ -608,7 +651,7 @@ public class VdsVisit { } if (line.hasOption("maxpendingsuperbuckets")) { StaticThrottlePolicy throttlePolicy = new StaticThrottlePolicy(); - throttlePolicy.setMaxPendingCount(((Number)line.getParsedOptionValue("maxpendingsuperbuckets")).intValue()); + throttlePolicy.setMaxPendingCount(optionAsInt(line, "maxpendingsuperbuckets")); params.setThrottlePolicy(throttlePolicy); } if (line.hasOption("shorttensors")) { @@ -617,6 +660,13 @@ public class VdsVisit { if (line.hasOption("tensorvalues")) { allParams.setTensorDirectValues(true); } + if (line.hasOption("slices") != line.hasOption("sliceid")) { + throw new IllegalArgumentException("Both --slices and --sliceid must be specified when visiting with slicing"); + } + if (line.hasOption("slices")) { + allParams.setSlices(optionAsInt(line, "slices")); + allParams.setSliceId(optionAsInt(line, "sliceid")); + } boolean jsonOutput = line.hasOption("jsonoutput"); boolean jsonl = line.hasOption("jsonl"); @@ -632,6 +682,14 @@ public class VdsVisit { allParams.setJsonOutput(!xmlOutput); } + if (allParams.slices() != 1 || allParams.sliceId() != 0) { + if ((allParams.slices() < 1) || (allParams.sliceId() < 0) || (allParams.sliceId() >= allParams.slices())) { + throw new IllegalArgumentException("--slices must be greater than 0 and --sliceid must be in the " + + "range [0, the value provided for --slices)"); + } + params.slice(allParams.slices(), allParams.sliceId()); + } + allParams.setVisitorParameters(params); return allParams; } @@ -730,6 +788,9 @@ public class VdsVisit { if (params.skipBucketsOnFatalErrors()) { out.println("Skip visiting super buckets with fatal errors."); } + if (params.getSlices() > 1) { + out.format("Visiting slice %d out of %s slices\n", params.getSliceId(), params.getSlices()); + } } private void onDocumentSelectionException(Exception e) { diff --git a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java index 434d91b3ea3..a0db3f973a4 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java @@ -19,8 +19,11 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -77,7 +80,6 @@ public class VdsVisitTestCase { /** * Test the parameters that could not be used in conjunction with * those in the first parameter test. - * @throws Exception */ @Test void testCommandLineShortOptions2() throws Exception { @@ -113,6 +115,11 @@ public class VdsVisitTestCase { assertTrue(allParams.isPrintIdsOnly()); } + private static String joinLines(String... lines) { + String nl = System.getProperty("line.separator"); // in case of \r\n instead of just \n... + return String.join(nl, lines) + nl; + } + @Test void testCommandLineLongOptions() throws Exception { // short options testing (for options that do not collide with each other) @@ -140,7 +147,9 @@ public class VdsVisitTestCase { "--abortonclusterdown", "--visitremoves", "--bucketspace", "outerspace", - "--shorttensors" + "--shorttensors", + "--slices", "16", + "--sliceid", "5" }; VdsVisit.ArgumentParser parser = createMockArgumentParser(); VdsVisit.VdsVisitParameters allParams = parser.parse(args); @@ -177,33 +186,44 @@ public class VdsVisitTestCase { assertTrue(allParams.getAbortOnClusterDown()); assertTrue(params.visitRemoves()); + assertEquals(16, params.getSlices()); + assertEquals(5, params.getSliceId()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); PrintStream printStream = new PrintStream(outputStream); VdsVisit.verbosePrintParameters(allParams, printStream); printStream.flush(); String nl = System.getProperty("line.separator"); // the joys of running tests on windows - assertEquals( - "Time out visitor after 123456789 ms." + nl + - "Visiting documents matching: 'id.user=1234'" + nl + - "Visiting bucket space: outerspace" + nl + - "Visiting in the inclusive timestamp range 5678 - 9012." + nl + - "Visiting field set foodoc.bar,foodoc.baz." + nl + - "Visiting inconsistent buckets." + nl + - "Including remove entries." + nl + - "Tracking progress in file: foo-progress.txt" + nl + - "Let visitor have maximum 6000 replies pending on data handlers per storage node visitor." + nl + - "Visit maximum 5 buckets per visitor." + nl + - "Sending data to data handler at: foo.remote" + nl + - "Using visitor library 'fnord'." + nl + - "Adding the following library specific parameters:" + nl + - " asdf = rargh" + nl + - "Visitor priority NORMAL_1" + nl + - "Skip visiting super buckets with fatal errors." + nl, - outputStream.toString("utf-8")); + assertEquals(joinLines( + "Time out visitor after 123456789 ms.", + "Visiting documents matching: 'id.user=1234'", + "Visiting bucket space: outerspace", + "Visiting in the inclusive timestamp range 5678 - 9012.", + "Visiting field set foodoc.bar,foodoc.baz.", + "Visiting inconsistent buckets.", + "Including remove entries.", + "Tracking progress in file: foo-progress.txt", + "Let visitor have maximum 6000 replies pending on data handlers per storage node visitor.", + "Visit maximum 5 buckets per visitor.", + "Sending data to data handler at: foo.remote", + "Using visitor library 'fnord'.", + "Adding the following library specific parameters:", + " asdf = rargh", + "Visitor priority NORMAL_1", + "Skip visiting super buckets with fatal errors.", + "Visiting slice 5 out of 16 slices"), + outputStream.toString(StandardCharsets.UTF_8)); } private static String[] emptyArgList() { return new String[]{}; } + @Test + void slicing_is_disabled_by_default() throws Exception { + var allParams = createMockArgumentParser().parse(emptyArgList()); + assertEquals(1, allParams.slices()); // 1 slice; the entire cluster + assertEquals(0, allParams.sliceId()); + } + // TODO Vespa 9: change default from long to short @Test void tensor_output_format_is_long_by_default() throws Exception { |