summaryrefslogtreecommitdiffstats
path: root/vespaclient-java
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-03-01 14:21:24 +0100
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-03-01 16:11:30 +0100
commit0347203cbb768e2e11e4aa8c6b6b27e67624ac36 (patch)
tree95f9121e6a6755eda7ec403b176e9065919ac10d /vespaclient-java
parentcdc86f437afbb34cabc9f05db951bef6ad206121 (diff)
Backport visit slicing to `vespa-visit` CLI tool
Allows for efficient parallelization across multiple visitor instances, mirroring the existing support in Document V1. Also clean up some legacy option value parsing code. Note: changing the parsed type for `maxtotalhits` from `int` to `long` is intentional; the internal limit is already a `long` and a cluster may have a lot more than `INT32_MAX` documents.
Diffstat (limited to 'vespaclient-java')
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java81
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java60
2 files changed, 111 insertions, 30 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java
index 822c64ea5fa..f2ddd4ed8ea 100644
--- a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java
+++ b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java
@@ -364,6 +364,23 @@ public class VdsVisit {
.hasArg(false)
.build());
+ options.addOption(Option.builder()
+ .longOpt("slices")
+ .desc("Split the document corpus into this number of independent slices. " +
+ "This lets multiple, concurrent series of visitors advance the same logical " +
+ "visit independently, by specifying a different --sliceid for each.")
+ .hasArg(true)
+ .type(Number.class)
+ .build());
+
+ options.addOption(Option.builder()
+ .longOpt("sliceid")
+ .desc("The slice number of the visit represented by this visitor. " +
+ "This number must be non-negative and less than the number of slices specified for the visit.")
+ .hasArg(true)
+ .type(Number.class)
+ .build());
+
return options;
}
@@ -382,6 +399,8 @@ public class VdsVisit {
private boolean jsonLinesOutput = false;
private boolean tensorShortForm = false; // TODO Vespa 9: change default to true
private boolean tensorDirectValues = false; // TODO Vespa 9: change default to true
+ private int slices = 1;
+ private int sliceId = 0;
public VisitorParameters getVisitorParameters() {
return visitorParameters;
@@ -489,6 +508,30 @@ public class VdsVisit {
this.tensorDirectValues = tensorDirectValues;
}
+ public int slices() {
+ return slices;
+ }
+
+ public void setSlices(int slices) {
+ this.slices = slices;
+ }
+
+ public int sliceId() {
+ return sliceId;
+ }
+
+ public void setSliceId(int sliceId) {
+ this.sliceId = sliceId;
+ }
+
+ }
+
+ private static int optionAsInt(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException {
+ return ((Number)cmdLine.getParsedOptionValue(optName)).intValue();
+ }
+
+ private static long optionAsLong(CommandLine cmdLine, String optName) throws org.apache.commons.cli.ParseException {
+ return ((Number)cmdLine.getParsedOptionValue(optName)).longValue();
}
protected static class ArgumentParser {
@@ -517,10 +560,10 @@ public class VdsVisit {
params.setBucketSpace(line.getOptionValue("bucketspace"));
}
if (line.hasOption("f")) {
- params.setFromTimestamp(((Number) line.getParsedOptionValue("f")).longValue());
+ params.setFromTimestamp(optionAsLong(line, "f"));
}
if (line.hasOption("t")) {
- params.setToTimestamp(((Number) line.getParsedOptionValue("t")).longValue());
+ params.setToTimestamp(optionAsLong(line, "t"));
}
if (line.hasOption("e")) {
throw new IllegalArgumentException("Headers only option has been removed.");
@@ -534,10 +577,10 @@ public class VdsVisit {
params.visitInconsistentBuckets(true);
}
if (line.hasOption("m")) {
- params.setMaxPending(((Number) line.getParsedOptionValue("m")).intValue());
+ params.setMaxPending(optionAsInt(line, "m"));
}
if (line.hasOption("b")) {
- params.setMaxBucketsPerVisitor(((Number) line.getParsedOptionValue("b")).intValue());
+ params.setMaxBucketsPerVisitor(optionAsInt(line, "b"));
}
if (line.hasOption("i")) {
allParams.setPrintIdsOnly(true);
@@ -547,11 +590,11 @@ public class VdsVisit {
params.setResumeFileName(line.getOptionValue("p"));
}
if (line.hasOption("o")) {
- allParams.setFullTimeout(((Number) line.getParsedOptionValue("o")).intValue());
+ allParams.setFullTimeout(optionAsInt(line, "o"));
params.setTimeoutMs(allParams.getFullTimeout());
}
if (line.hasOption("u")) {
- params.setTimeoutMs(((Number) line.getParsedOptionValue("u")).intValue());
+ params.setTimeoutMs(optionAsInt(line, "u"));
}
if (line.hasOption("visitlibrary")) {
params.setVisitorLibrary(line.getOptionValue("visitlibrary"));
@@ -582,13 +625,13 @@ public class VdsVisit {
allParams.setAbortOnClusterDown(true);
}
if (line.hasOption("processtime")) {
- allParams.setProcessTime(((Number) line.getParsedOptionValue("processtime")).intValue());
+ allParams.setProcessTime(optionAsInt(line, "processtime"));
}
if (line.hasOption("maxtotalhits")) {
- params.setMaxTotalHits(((Number)line.getParsedOptionValue("maxtotalhits")).intValue());
+ params.setMaxTotalHits(optionAsLong(line, "maxtotalhits"));
}
if (line.hasOption("tracelevel")) {
- params.setTraceLevel(((Number)line.getParsedOptionValue("tracelevel")).intValue());
+ params.setTraceLevel(optionAsInt(line, "tracelevel"));
}
if (line.hasOption("priority")) {
try {
@@ -608,7 +651,7 @@ public class VdsVisit {
}
if (line.hasOption("maxpendingsuperbuckets")) {
StaticThrottlePolicy throttlePolicy = new StaticThrottlePolicy();
- throttlePolicy.setMaxPendingCount(((Number)line.getParsedOptionValue("maxpendingsuperbuckets")).intValue());
+ throttlePolicy.setMaxPendingCount(optionAsInt(line, "maxpendingsuperbuckets"));
params.setThrottlePolicy(throttlePolicy);
}
if (line.hasOption("shorttensors")) {
@@ -617,6 +660,13 @@ public class VdsVisit {
if (line.hasOption("tensorvalues")) {
allParams.setTensorDirectValues(true);
}
+ if (line.hasOption("slices") != line.hasOption("sliceid")) {
+ throw new IllegalArgumentException("Both --slices and --sliceid must be specified when visiting with slicing");
+ }
+ if (line.hasOption("slices")) {
+ allParams.setSlices(optionAsInt(line, "slices"));
+ allParams.setSliceId(optionAsInt(line, "sliceid"));
+ }
boolean jsonOutput = line.hasOption("jsonoutput");
boolean jsonl = line.hasOption("jsonl");
@@ -632,6 +682,14 @@ public class VdsVisit {
allParams.setJsonOutput(!xmlOutput);
}
+ if (allParams.slices() != 1 || allParams.sliceId() != 0) {
+ if ((allParams.slices() < 1) || (allParams.sliceId() < 0) || (allParams.sliceId() >= allParams.slices())) {
+ throw new IllegalArgumentException("--slices must be greater than 0 and --sliceid must be in the " +
+ "range [0, the value provided for --slices)");
+ }
+ params.slice(allParams.slices(), allParams.sliceId());
+ }
+
allParams.setVisitorParameters(params);
return allParams;
}
@@ -730,6 +788,9 @@ public class VdsVisit {
if (params.skipBucketsOnFatalErrors()) {
out.println("Skip visiting super buckets with fatal errors.");
}
+ if (params.getSlices() > 1) {
+ out.format("Visiting slice %d out of %s slices\n", params.getSliceId(), params.getSlices());
+ }
}
private void onDocumentSelectionException(Exception e) {
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java
index 434d91b3ea3..a0db3f973a4 100644
--- a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java
+++ b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java
@@ -19,8 +19,11 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,7 +80,6 @@ public class VdsVisitTestCase {
/**
* Test the parameters that could not be used in conjunction with
* those in the first parameter test.
- * @throws Exception
*/
@Test
void testCommandLineShortOptions2() throws Exception {
@@ -113,6 +115,11 @@ public class VdsVisitTestCase {
assertTrue(allParams.isPrintIdsOnly());
}
+ private static String joinLines(String... lines) {
+ String nl = System.getProperty("line.separator"); // in case of \r\n instead of just \n...
+ return String.join(nl, lines) + nl;
+ }
+
@Test
void testCommandLineLongOptions() throws Exception {
// short options testing (for options that do not collide with each other)
@@ -140,7 +147,9 @@ public class VdsVisitTestCase {
"--abortonclusterdown",
"--visitremoves",
"--bucketspace", "outerspace",
- "--shorttensors"
+ "--shorttensors",
+ "--slices", "16",
+ "--sliceid", "5"
};
VdsVisit.ArgumentParser parser = createMockArgumentParser();
VdsVisit.VdsVisitParameters allParams = parser.parse(args);
@@ -177,33 +186,44 @@ public class VdsVisitTestCase {
assertTrue(allParams.getAbortOnClusterDown());
assertTrue(params.visitRemoves());
+ assertEquals(16, params.getSlices());
+ assertEquals(5, params.getSliceId());
+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(outputStream);
VdsVisit.verbosePrintParameters(allParams, printStream);
printStream.flush();
String nl = System.getProperty("line.separator"); // the joys of running tests on windows
- assertEquals(
- "Time out visitor after 123456789 ms." + nl +
- "Visiting documents matching: 'id.user=1234'" + nl +
- "Visiting bucket space: outerspace" + nl +
- "Visiting in the inclusive timestamp range 5678 - 9012." + nl +
- "Visiting field set foodoc.bar,foodoc.baz." + nl +
- "Visiting inconsistent buckets." + nl +
- "Including remove entries." + nl +
- "Tracking progress in file: foo-progress.txt" + nl +
- "Let visitor have maximum 6000 replies pending on data handlers per storage node visitor." + nl +
- "Visit maximum 5 buckets per visitor." + nl +
- "Sending data to data handler at: foo.remote" + nl +
- "Using visitor library 'fnord'." + nl +
- "Adding the following library specific parameters:" + nl +
- " asdf = rargh" + nl +
- "Visitor priority NORMAL_1" + nl +
- "Skip visiting super buckets with fatal errors." + nl,
- outputStream.toString("utf-8"));
+ assertEquals(joinLines(
+ "Time out visitor after 123456789 ms.",
+ "Visiting documents matching: 'id.user=1234'",
+ "Visiting bucket space: outerspace",
+ "Visiting in the inclusive timestamp range 5678 - 9012.",
+ "Visiting field set foodoc.bar,foodoc.baz.",
+ "Visiting inconsistent buckets.",
+ "Including remove entries.",
+ "Tracking progress in file: foo-progress.txt",
+ "Let visitor have maximum 6000 replies pending on data handlers per storage node visitor.",
+ "Visit maximum 5 buckets per visitor.",
+ "Sending data to data handler at: foo.remote",
+ "Using visitor library 'fnord'.",
+ "Adding the following library specific parameters:",
+ " asdf = rargh",
+ "Visitor priority NORMAL_1",
+ "Skip visiting super buckets with fatal errors.",
+ "Visiting slice 5 out of 16 slices"),
+ outputStream.toString(StandardCharsets.UTF_8));
}
private static String[] emptyArgList() { return new String[]{}; }
+ @Test
+ void slicing_is_disabled_by_default() throws Exception {
+ var allParams = createMockArgumentParser().parse(emptyArgList());
+ assertEquals(1, allParams.slices()); // 1 slice; the entire cluster
+ assertEquals(0, allParams.sliceId());
+ }
+
// TODO Vespa 9: change default from long to short
@Test
void tensor_output_format_is_long_by_default() throws Exception {