// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespavisit; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.log.LogSetup; import com.yahoo.document.select.OrderingSpecification; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.vespaclient.ClusterDef; import com.yahoo.vespaclient.ClusterList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import java.io.*; import java.nio.charset.Charset; import java.util.Map; /** * Client using visiting, used by the vespa-visit command line tool. * * @author Einar M R Rosenvinge */ public class VdsVisit { private VdsVisitParameters params; private MessageBusParams mbparams = new MessageBusParams(new LoadTypeSet()); private VisitorSession session; private final VisitorSessionAccessorFactory sessionAccessorFactory; private VisitorSessionAccessor sessionAccessor; private ShutdownHookRegistrar shutdownHookRegistrar; public interface ShutdownHookRegistrar { void registerShutdownHook(Thread thread); } public interface VisitorSessionAccessor { VisitorSession createVisitorSession(VisitorParameters params) throws ParseException; void shutdown(); } public interface VisitorSessionAccessorFactory { VisitorSessionAccessor createVisitorSessionAccessor(); } private static class MessageBusVisitorSessionAccessor implements VisitorSessionAccessor { private MessageBusDocumentAccess access; private MessageBusVisitorSessionAccessor(MessageBusParams mbparams) { access = new MessageBusDocumentAccess(mbparams); } @Override public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException { return access.createVisitorSession(params); } @Override public void shutdown() { access.shutdown(); } } private static class MessageBusVisitorSessionAccessorFactory implements VisitorSessionAccessorFactory { MessageBusParams mbparams; private MessageBusVisitorSessionAccessorFactory(MessageBusParams mbparams) { this.mbparams = mbparams; } @Override public VisitorSessionAccessor createVisitorSessionAccessor() { return new MessageBusVisitorSessionAccessor(mbparams); } } private static class JvmRuntimeShutdownHookRegistrar implements ShutdownHookRegistrar { @Override public void registerShutdownHook(Thread thread) { Runtime.getRuntime().addShutdownHook(thread); } } public VdsVisit() { this.sessionAccessorFactory = new MessageBusVisitorSessionAccessorFactory(mbparams); this.shutdownHookRegistrar = new JvmRuntimeShutdownHookRegistrar(); } public VdsVisit(VisitorSessionAccessorFactory sessionAccessorFactory, ShutdownHookRegistrar shutdownHookRegistrar) { this.sessionAccessorFactory = sessionAccessorFactory; this.shutdownHookRegistrar = shutdownHookRegistrar; } public static void main(String args[]) { LogSetup.initVespaLogging("vespa-visit"); VdsVisit vdsVisit = new VdsVisit(); Options options = createOptions(); try { ArgumentParser parser = new ArgumentParser(options); vdsVisit.params = parser.parse(args); if (vdsVisit.params == null) { vdsVisit.printSyntax(options); System.exit(0); } ClusterList clusterList = new ClusterList("client"); vdsVisit.params.getVisitorParameters().setRoute( resolveClusterRoute(clusterList, vdsVisit.params.getCluster())); } catch (org.apache.commons.cli.ParseException e) { System.err.println("Failed to parse arguments. Try --help for syntax. " + e.getMessage()); System.exit(1); } catch (IllegalArgumentException e) { System.err.println(e.getMessage()); System.exit(1); } if (vdsVisit.params.isVerbose()) { verbosePrintParameters(vdsVisit.params, System.err); } try { vdsVisit.run(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } private void printSyntax(Options options) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("vespa-visit ", "Visit documents from VDS", options , ""); } @SuppressWarnings("AccessStaticViaInstance") protected static Options createOptions() { Options options = new Options(); options.addOption("h", "help", false, "Show this syntax page."); options.addOption(Option.builder("d") .longOpt("datahandler") .hasArg(true) .argName("target") .desc("Send results to the given target.") .build()); options.addOption(Option.builder("s") .longOpt("selection") .hasArg(true) .argName("selection") .desc("What documents to visit.") .build()); options.addOption(Option.builder("f") .longOpt("from") .hasArg(true) .argName("timestamp") .desc("Only visit from the given timestamp (microseconds).") .type(Number.class) .build()); options.addOption(Option.builder("t") .longOpt("to") .hasArg(true) .argName("timestamp") .desc("Only visit up to the given timestamp (microseconds).") .type(Number.class).build()); options.addOption("e", "headersonly", false, "Only visit headers of documents.[Deprecated]"); options.addOption(Option.builder("l") .longOpt("fieldset") .hasArg(true) .argName("fieldset") .desc("Retrieve the specified fields only (see http://docs.vespa.ai/documentation/reference/fieldsets.html). Default is [all].") .build()); options.addOption(Option.builder() .longOpt("visitinconsistentbuckets") .hasArg(false) .desc("Don't wait for inconsistent buckets to become consistent.") .build()); options.addOption(Option.builder("m") .longOpt("maxpending") .hasArg(true) .argName("num") .desc("Maximum pending messages to data handlers per storage visitor.") .type(Number.class) .build()); options.addOption(Option.builder() .longOpt("maxpendingsuperbuckets") .hasArg(true) .argName("num") .desc("Maximum pending visitor messages from the vespa-visit client. If set, dynamic throttling of visitors will be disabled!") .type(Number.class) .build()); options.addOption(Option.builder("b") .longOpt("maxbuckets") .hasArg(true) .argName("num") .desc("Maximum buckets per visitor.") .type(Number.class) .build()); options.addOption("i", "printids", false, "Display only document identifiers."); options.addOption(Option.builder("p") .longOpt("progress") .hasArg(true) .argName("file") .desc("Use given file to track progress.") .build()); options.addOption(Option.builder("o") .longOpt("timeout") .hasArg(true) .argName("milliseconds") .desc("Time out visitor after given time.") .type(Number.class) .build()); options.addOption(Option.builder("u") .longOpt("buckettimeout") .hasArg(true) .argName("milliseconds") .desc("Fail visitor if visiting a single bucket takes longer than this (default same as timeout)") .type(Number.class) .build()); options.addOption(Option.builder() .longOpt("visitlibrary") .hasArg(true) .argName("string") .desc("Use the given visitor library.") .build()); options.addOption(Option.builder() .longOpt("libraryparam") .numberOfArgs(2) .argName("key> 0) { out.println("Adding the following library specific parameters:"); for (Map.Entry entry : params.getLibraryParameters().entrySet()) { out.println(" " + entry.getKey() + " = " + new String(entry.getValue(), Charset.forName("utf-8"))); } } if (params.getPriority() != DocumentProtocol.Priority.NORMAL_3) { out.println("Visitor priority " + params.getPriority().name()); } if (params.skipBucketsOnFatalErrors()) { out.println("Skip visiting super buckets with fatal errors."); } } private void onDocumentSelectionException(Exception e) { System.err.println("Illegal document selection string '" + params.getVisitorParameters().getDocumentSelection() + "'.\n"); System.exit(1); } private void onIllegalArgumentException(Exception e) { System.err.println("Illegal arguments : \n"); System.err.println(e.getMessage()); System.exit(1); } public void run() { System.exit(doRun()); } protected int doRun() { VisitorParameters visitorParameters = params.getVisitorParameters(); // If progress file already exists, create resume token from it if (visitorParameters.getResumeFileName() != null && !"".equals(visitorParameters.getResumeFileName())) { try { File file = new File(visitorParameters.getResumeFileName()); FileInputStream fos = new FileInputStream(file); StringBuilder builder = new StringBuilder(); byte[] b = new byte[100000]; int length; while ((length = fos.read(b)) > 0) { builder.append(new String(b, 0, length)); } fos.close(); visitorParameters.setResumeToken(new ProgressToken(builder.toString())); if (params.isVerbose()) { System.err.format("Resuming visitor already %.1f %% finished.\n", visitorParameters.getResumeToken().percentFinished()); } } catch (FileNotFoundException e) { // Ignore; file has not been created yet but will be shortly. } catch (IOException e) { System.err.println("Could not open progress file: " + visitorParameters.getResumeFileName()); e.printStackTrace(System.err); return 1; } } initShutdownHook(); sessionAccessor = sessionAccessorFactory.createVisitorSessionAccessor(); VdsVisitHandler handler; handler = new StdOutVisitorHandler( params.isPrintIdsOnly(), params.isVerbose(), params.isVerbose(), params.isVerbose(), params.getStatisticsParts() != null, params.getAbortOnClusterDown(), params.getProcessTime(), params.jsonOutput); if (visitorParameters.getResumeFileName() != null) { handler.setProgressFileName(visitorParameters.getResumeFileName()); } visitorParameters.setControlHandler(handler.getControlHandler()); if (visitorParameters.getRemoteDataHandler() == null) { visitorParameters.setLocalDataHandler(handler.getDataHandler()); } if (params.getStatisticsParts() != null) { String[] parts = params.getStatisticsParts().split(","); for (String s : parts) { visitorParameters.setLibraryParameter(s, "true"); } } try { session = sessionAccessor.createVisitorSession(visitorParameters); while (true) { try { if (session.waitUntilDone(params.getFullTimeout())) break; } catch (InterruptedException e) {} } if (visitorParameters.getTraceLevel() > 0) { System.out.println(session.getTrace().toString()); } } catch (ParseException e) { onDocumentSelectionException(e); } catch (IllegalArgumentException e) { onIllegalArgumentException(e); } catch (Exception e) { System.err.println("Document selection string was: " + visitorParameters.getDocumentSelection()); System.err.println("Caught unexpected exception: "); e.printStackTrace(System.err); return 1; } if (visitorParameters.getControlHandler().getResult().code == VisitorControlHandler.CompletionCode.SUCCESS) { return 0; } else { return 1; } } private void initShutdownHook() { shutdownHookRegistrar.registerShutdownHook(new CleanUpThread()); } class CleanUpThread extends Thread { public void run() { try { if (session != null) { session.destroy(); } } catch (IllegalStateException ise) { //ignore this } try { if (sessionAccessor != null) { sessionAccessor.shutdown(); } } catch (IllegalStateException ise) { //ignore this too } } } }