// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespavisit; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.log.LogSetup; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.vespaclient.ClusterDef; import com.yahoo.vespaclient.ClusterList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import java.io.IOException; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Map; import java.util.stream.Collectors; /** * Client using visiting, used by the vespa-visit command line tool. * * @author Einar M R Rosenvinge */ public class VdsVisit { private VdsVisitParameters params; private MessageBusParams mbparams = new MessageBusParams(); private VisitorSession session; private final VisitorSessionAccessorFactory sessionAccessorFactory; private VisitorSessionAccessor sessionAccessor; private ShutdownHookRegistrar shutdownHookRegistrar; public interface ShutdownHookRegistrar { void registerShutdownHook(Thread thread); } public interface VisitorSessionAccessor { VisitorSession createVisitorSession(VisitorParameters params) throws ParseException; void shutdown(); } public interface VisitorSessionAccessorFactory { VisitorSessionAccessor createVisitorSessionAccessor(); } private static class MessageBusVisitorSessionAccessor implements VisitorSessionAccessor { private MessageBusDocumentAccess access; private MessageBusVisitorSessionAccessor(MessageBusParams mbparams) { access = new MessageBusDocumentAccess(mbparams); } @Override public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException { return access.createVisitorSession(params); } @Override public void shutdown() { access.shutdown(); } } private static class MessageBusVisitorSessionAccessorFactory implements VisitorSessionAccessorFactory { MessageBusParams mbparams; private MessageBusVisitorSessionAccessorFactory(MessageBusParams mbparams) { this.mbparams = mbparams; } @Override public VisitorSessionAccessor createVisitorSessionAccessor() { return new MessageBusVisitorSessionAccessor(mbparams); } } private static class JvmRuntimeShutdownHookRegistrar implements ShutdownHookRegistrar { @Override public void registerShutdownHook(Thread thread) { Runtime.getRuntime().addShutdownHook(thread); } } public VdsVisit() { this.sessionAccessorFactory = new MessageBusVisitorSessionAccessorFactory(mbparams); this.shutdownHookRegistrar = new JvmRuntimeShutdownHookRegistrar(); } public VdsVisit(VisitorSessionAccessorFactory sessionAccessorFactory, ShutdownHookRegistrar shutdownHookRegistrar) { this.sessionAccessorFactory = sessionAccessorFactory; this.shutdownHookRegistrar = shutdownHookRegistrar; } public static void main(String args[]) { LogSetup.initVespaLogging("vespa-visit"); VdsVisit vdsVisit = new VdsVisit(); Options options = createOptions(); try { ArgumentParser parser = new ArgumentParser(options); vdsVisit.params = parser.parse(args); if (vdsVisit.params == null) { vdsVisit.printSyntax(options); System.exit(0); } ClusterList clusterList = new ClusterList("client"); vdsVisit.params.getVisitorParameters().setRoute( resolveClusterRoute(clusterList, vdsVisit.params.getCluster())); } catch (org.apache.commons.cli.ParseException e) { System.err.println("Failed to parse arguments. Try --help for syntax. " + e.getMessage()); System.exit(1); } catch (IllegalArgumentException e) { System.err.println(e.getMessage()); System.exit(1); } if (vdsVisit.params.isVerbose()) { verbosePrintParameters(vdsVisit.params, System.err); } try { vdsVisit.run(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } private void printSyntax(Options options) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("vespa-visit ", "Visit documents from Vespa", options , ""); } @SuppressWarnings("AccessStaticViaInstance") protected static Options createOptions() { Options options = new Options(); options.addOption("h", "help", false, "Show this syntax page."); options.addOption(Option.builder("d") .longOpt("datahandler") .hasArg(true) .argName("target") .desc("Send results to the given target.") .build()); options.addOption(Option.builder("s") .longOpt("selection") .hasArg(true) .argName("selection") .desc("What documents to visit.") .build()); options.addOption(Option.builder("f") .longOpt("from") .hasArg(true) .argName("timestamp") .desc("Only visit from the given timestamp (microseconds).") .type(Number.class) .build()); options.addOption(Option.builder("t") .longOpt("to") .hasArg(true) .argName("timestamp") .desc("Only visit up to the given timestamp (microseconds).") .type(Number.class).build()); options.addOption(Option.builder("l") .longOpt("fieldset") .hasArg(true) .argName("fieldset") .desc("Retrieve the specified fields only (see https://docs.vespa.ai/en/documents.html#fieldsets). Default is [document].") .build()); options.addOption(Option.builder() .longOpt("visitinconsistentbuckets") .hasArg(false) .desc("Don't wait for inconsistent buckets to become consistent.") .build()); options.addOption(Option.builder("m") .longOpt("maxpending") .hasArg(true) .argName("num") .desc("Maximum pending messages to data handlers per storage visitor.") .type(Number.class) .build()); options.addOption(Option.builder() .longOpt("maxpendingsuperbuckets") .hasArg(true) .argName("num") .desc("Maximum pending visitor messages from the vespa-visit client. If set, dynamic throttling of visitors will be disabled!") .type(Number.class) .build()); options.addOption(Option.builder("b") .longOpt("maxbuckets") .hasArg(true) .argName("num") .desc("Maximum buckets per visitor.") .type(Number.class) .build()); options.addOption("i", "printids", false, "Display only document identifiers."); options.addOption(Option.builder("p") .longOpt("progress") .hasArg(true) .argName("file") .desc("Use given file to track progress.") .build()); options.addOption(Option.builder("o") .longOpt("timeout") .hasArg(true) .argName("milliseconds") .desc("Time out visitor after given time.") .type(Number.class) .build()); options.addOption(Option.builder("u") .longOpt("buckettimeout") .hasArg(true) .argName("milliseconds") .desc("Fail visitor if visiting a single bucket takes longer than this (default same as timeout)") .type(Number.class) .build()); options.addOption(Option.builder() .longOpt("visitlibrary") .hasArg(true) .argName("string") .desc("Use the given visitor library.") .build()); options.addOption(Option.builder() .longOpt("libraryparam") .numberOfArgs(2) .argName("key> = allParams.slices())) { throw new IllegalArgumentException("--slices must be greater than 0 and --sliceid must be in the " + "range [0, the value provided for --slices)"); } params.slice(allParams.slices(), allParams.sliceId()); } allParams.setVisitorParameters(params); return allParams; } } // For unit testing only protected void setVdsVisitParameters(VdsVisitParameters vdsVisitParameters) { this.params = vdsVisitParameters; } protected static String resolveClusterRoute(ClusterList clusters, String wantedCluster) { if (clusters.getStorageClusters().size() == 0) { throw new IllegalArgumentException("Your Vespa cluster does not have any content clusters " + "declared. Visiting feature is not available."); } ClusterDef found = null; String names = clusters.getStorageClusters() .stream().map(c -> "'" + c.getName() + "'") .collect(Collectors.joining(", ")); if (wantedCluster != null) { for (ClusterDef c : clusters.getStorageClusters()) { if (c.getName().equals(wantedCluster)) { found = c; } } if (found == null) { throw new IllegalArgumentException("Your vespa cluster contains the content clusters " + names + ", not '" + wantedCluster + "'. Please select a valid vespa cluster."); } } else if (clusters.getStorageClusters().size() == 1) { found = clusters.getStorageClusters().get(0); } else { throw new IllegalArgumentException("Your vespa cluster contains the content clusters " + names + ". Please use the -c option to select one of them as a target for visiting."); } return found.getRoute(); } protected static void verbosePrintParameters(VdsVisitParameters vdsParams, PrintStream out) { VisitorParameters params = vdsParams.getVisitorParameters(); if (params.getTimeoutMs() != -1) { out.println("Time out visitor after " + params.getTimeoutMs() + " ms."); } if (params.getDocumentSelection() == null || params.getDocumentSelection().equals("")) { out.println("Visiting all documents"); } else { out.println("Visiting documents matching: " + params.getDocumentSelection()); } out.println(String.format("Visiting bucket space: %s", params.getBucketSpace())); if (params.getFromTimestamp() != 0 && params.getToTimestamp() != 0) { out.println("Visiting in the inclusive timestamp range " + params.getFromTimestamp() + " - " + params.getToTimestamp() + "."); } else if (params.getFromTimestamp() != 0) { out.println("Visiting from and including timestamp " + params.getFromTimestamp() + "."); } else if (params.getToTimestamp() != 0) { out.println("Visiting to and including timestamp " + params.getToTimestamp() + "."); } out.println("Visiting field set " + params.fieldSet() + "."); if (params.visitInconsistentBuckets()) { out.println("Visiting inconsistent buckets."); } if (params.visitRemoves()) { out.println("Including remove entries."); } if (params.getResumeFileName() != null && !"".equals(params.getResumeFileName())) { out.println("Tracking progress in file: " + params.getResumeFileName()); } if (vdsParams.isPrintIdsOnly()) { out.println("Only showing document identifiers."); } out.println("Let visitor have maximum " + params.getMaxPending() + " replies pending on data handlers per storage node visitor."); out.println("Visit maximum " + params.getMaxBucketsPerVisitor() + " buckets per visitor."); if (params.getRemoteDataHandler() != null) { out.println("Sending data to data handler at: " + params.getRemoteDataHandler()); } if (params.getRoute() != null) { out.println("Visiting cluster '" + params.getRoute() + "'."); } if (params.getVisitorLibrary() != null) { out.println("Using visitor library '" + params.getVisitorLibrary() + "'."); } if (params.getLibraryParameters().size() > 0) { out.println("Adding the following library specific parameters:"); for (Map.Entry entry : params.getLibraryParameters().entrySet()) { out.println(" " + entry.getKey() + " = " + new String(entry.getValue(), StandardCharsets.UTF_8)); } } if (params.getPriority() != DocumentProtocol.Priority.NORMAL_3) { out.println("Visitor priority " + params.getPriority().name()); } if (params.skipBucketsOnFatalErrors()) { out.println("Skip visiting super buckets with fatal errors."); } if (params.getSlices() > 1) { out.format("Visiting slice %d out of %s slices\n", params.getSliceId(), params.getSlices()); } } private void onDocumentSelectionException(Exception e) { System.err.println("Illegal document selection string '" + params.getVisitorParameters().getDocumentSelection() + "'.\n"); System.exit(1); } private void onIllegalArgumentException(Exception e) { System.err.println("Illegal arguments : \n"); System.err.println(e.getMessage()); System.exit(1); } public void run() { System.exit(doRun()); } protected int doRun() { VisitorParameters visitorParameters = params.getVisitorParameters(); // If progress file already exists, create resume token from it if (visitorParameters.getResumeFileName() != null && !"".equals(visitorParameters.getResumeFileName())) { try { var progressFileContents = Files.readString(Path.of(visitorParameters.getResumeFileName())); visitorParameters.setResumeToken(new ProgressToken(progressFileContents)); if (params.isVerbose()) { System.err.format("Resuming visitor already %.1f %% finished.\n", visitorParameters.getResumeToken().percentFinished()); } } catch (NoSuchFileException e) { // Ignore; file has not been created yet but will be shortly. } catch (IOException e) { System.err.println("Could not open progress file: " + visitorParameters.getResumeFileName()); e.printStackTrace(System.err); return 1; } } initShutdownHook(); sessionAccessor = sessionAccessorFactory.createVisitorSessionAccessor(); VdsVisitHandler handler; var handlerParams = new StdOutVisitorHandler.Params(); handlerParams.printIds = params.isPrintIdsOnly(); handlerParams.indentXml = params.isVerbose(); handlerParams.showProgress = params.isVerbose(); handlerParams.showStatistics = params.isVerbose(); handlerParams.doStatistics = params.getStatisticsParts() != null; handlerParams.abortOnClusterDown = params.getAbortOnClusterDown(); handlerParams.processTimeMilliSecs = params.getProcessTime(); handlerParams.outputFormat = params.stdOutHandlerOutputFormat(); handlerParams.tensorShortForm = params.tensorShortForm(); handlerParams.tensorDirectValues = params.tensorDirectValues(); handlerParams.nullRender = params.nullRender(); handler = new StdOutVisitorHandler(handlerParams); if (visitorParameters.getResumeFileName() != null) { handler.setProgressFileName(visitorParameters.getResumeFileName()); } visitorParameters.setControlHandler(handler.getControlHandler()); if (visitorParameters.getRemoteDataHandler() == null) { visitorParameters.setLocalDataHandler(handler.getDataHandler()); } if (params.getStatisticsParts() != null) { String[] parts = params.getStatisticsParts().split(","); for (String s : parts) { visitorParameters.setLibraryParameter(s, "true"); } } try { session = sessionAccessor.createVisitorSession(visitorParameters); while (true) { try { if (session.waitUntilDone(params.getFullTimeout())) break; } catch (InterruptedException e) {} } if (visitorParameters.getTraceLevel() > 0) { System.out.println(session.getTrace().toString()); } } catch (ParseException e) { onDocumentSelectionException(e); } catch (IllegalArgumentException e) { onIllegalArgumentException(e); } catch (Exception e) { System.err.println("Document selection string was: " + visitorParameters.getDocumentSelection()); System.err.println("Caught unexpected exception: "); e.printStackTrace(System.err); return 1; } if (visitorParameters.getControlHandler().getResult().code == VisitorControlHandler.CompletionCode.SUCCESS) { return 0; } else { return 1; } } private void initShutdownHook() { shutdownHookRegistrar.registerShutdownHook(new CleanUpThread()); } class CleanUpThread extends Thread { public void run() { try { if (session != null) { session.destroy(); } } catch (IllegalStateException ise) { //ignore this } try { if (sessionAccessor != null) { sessionAccessor.shutdown(); } } catch (IllegalStateException ise) { //ignore this too } } } }