// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespavisit;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.log.LogSetup;
import com.yahoo.document.select.OrderingSpecification;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.vespaclient.ClusterDef;
import com.yahoo.vespaclient.ClusterList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import java.io.*;
import java.nio.charset.Charset;
import java.util.Map;
/**
* Example client using visiting
*
* @author Einar M R Rosenvinge, based on work by Håkon Humberset
*/
public class VdsVisit {
private VdsVisitParameters params;
private MessageBusParams mbparams = new MessageBusParams(new LoadTypeSet());
private VisitorSession session;
private final VisitorSessionAccessorFactory sessionAccessorFactory;
private VisitorSessionAccessor sessionAccessor;
private ShutdownHookRegistrar shutdownHookRegistrar;
public interface ShutdownHookRegistrar {
public void registerShutdownHook(Thread thread);
}
public interface VisitorSessionAccessor {
public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException;
public void shutdown();
}
public interface VisitorSessionAccessorFactory {
public VisitorSessionAccessor createVisitorSessionAccessor();
}
private static class MessageBusVisitorSessionAccessor implements VisitorSessionAccessor {
private MessageBusDocumentAccess access;
private MessageBusVisitorSessionAccessor(MessageBusParams mbparams) {
access = new MessageBusDocumentAccess(mbparams);
}
@Override
public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException {
return access.createVisitorSession(params);
}
@Override
public void shutdown() {
access.shutdown();
}
}
private static class MessageBusVisitorSessionAccessorFactory implements VisitorSessionAccessorFactory {
MessageBusParams mbparams;
private MessageBusVisitorSessionAccessorFactory(MessageBusParams mbparams) {
this.mbparams = mbparams;
}
@Override
public VisitorSessionAccessor createVisitorSessionAccessor() {
return new MessageBusVisitorSessionAccessor(mbparams);
}
}
private static class JvmRuntimeShutdownHookRegistrar implements ShutdownHookRegistrar {
@Override
public void registerShutdownHook(Thread thread) {
Runtime.getRuntime().addShutdownHook(thread);
}
}
public VdsVisit() {
this.sessionAccessorFactory = new MessageBusVisitorSessionAccessorFactory(mbparams);
this.shutdownHookRegistrar = new JvmRuntimeShutdownHookRegistrar();
}
public VdsVisit(VisitorSessionAccessorFactory sessionAccessorFactory,
ShutdownHookRegistrar shutdownHookRegistrar)
{
this.sessionAccessorFactory = sessionAccessorFactory;
this.shutdownHookRegistrar = shutdownHookRegistrar;
}
public static void main(String args[]) {
LogSetup.initVespaLogging("vespa-visit");
VdsVisit vdsVisit = new VdsVisit();
Options options = createOptions();
try {
ArgumentParser parser = new ArgumentParser(options);
vdsVisit.params = parser.parse(args);
if (vdsVisit.params == null) {
vdsVisit.printSyntax(options);
System.exit(0);
}
ClusterList clusterList = new ClusterList("client");
vdsVisit.params.getVisitorParameters().setRoute(
resolveClusterRoute(clusterList, vdsVisit.params.getCluster()));
} catch (org.apache.commons.cli.ParseException e) {
System.err.println("Failed to parse arguments. Try --help for syntax. " + e.getMessage());
System.exit(1);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(1);
}
if (vdsVisit.params.isVerbose()) {
verbosePrintParameters(vdsVisit.params, System.err);
}
try {
vdsVisit.run();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
private void printSyntax(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("vespa-visit ", "Visit documents from VDS", options , "");
}
@SuppressWarnings("AccessStaticViaInstance")
protected static Options createOptions() {
Options options = new Options();
options.addOption("h", "help", false, "Show this syntax page.");
options.addOption(Option.builder("d")
.longOpt("datahandler")
.hasArg(true)
.argName("target")
.desc("Send results to the given target.")
.build());
options.addOption(Option.builder("s")
.longOpt("selection")
.hasArg(true)
.argName("selection")
.desc("What documents to visit.")
.build());
options.addOption(Option.builder("f")
.longOpt("from")
.hasArg(true)
.argName("timestamp")
.desc("Only visit from the given timestamp (microseconds).")
.type(Number.class)
.build());
options.addOption(Option.builder("t")
.longOpt("to")
.hasArg(true)
.argName("timestamp")
.desc("Only visit up to the given timestamp (microseconds).")
.type(Number.class).build());
options.addOption("e", "headersonly", false, "Only visit headers of documents.[Deprecated]");
options.addOption(Option.builder("l")
.longOpt("fieldset")
.hasArg(true)
.argName("fieldset")
.desc("Retrieve the specified fields only (see http://docs.vespa.ai/documentation/reference/fieldsets.html). Default is [all].")
.build());
options.addOption(Option.builder()
.longOpt("visitinconsistentbuckets")
.hasArg(false)
.desc("Don't wait for inconsistent buckets to become consistent.")
.build());
options.addOption(Option.builder("m")
.longOpt("maxpending")
.hasArg(true)
.argName("num")
.desc("Maximum pending messages to data handlers per storage visitor.")
.type(Number.class)
.build());
options.addOption(Option.builder()
.longOpt("maxpendingsuperbuckets")
.hasArg(true)
.argName("num")
.desc("Maximum pending visitor messages from the vespa-visit client. If set, dynamic throttling of visitors will be disabled!")
.type(Number.class)
.build());
options.addOption(Option.builder("b")
.longOpt("maxbuckets")
.hasArg(true)
.argName("num")
.desc("Maximum buckets per visitor.")
.type(Number.class)
.build());
options.addOption("i", "printids", false, "Display only document identifiers.");
options.addOption(Option.builder("p")
.longOpt("progress")
.hasArg(true)
.argName("file")
.desc("Use given file to track progress.")
.build());
options.addOption(Option.builder("o")
.longOpt("timeout")
.hasArg(true)
.argName("milliseconds")
.desc("Time out visitor after given time.")
.type(Number.class)
.build());
options.addOption(Option.builder("u")
.longOpt("buckettimeout")
.hasArg(true)
.argName("milliseconds")
.desc("Fail visitor if visiting a single bucket takes longer than this (default same as timeout)")
.type(Number.class)
.build());
options.addOption(Option.builder()
.longOpt("visitlibrary")
.hasArg(true)
.argName("string")
.desc("Use the given visitor library.")
.build());
options.addOption(Option.builder()
.longOpt("libraryparam")
.numberOfArgs(2)
.argName("key> 0) {
out.println("Adding the following library specific parameters:");
for (Map.Entry entry : params.getLibraryParameters().entrySet()) {
out.println(" " + entry.getKey() + " = " +
new String(entry.getValue(), Charset.forName("utf-8")));
}
}
if (params.getPriority() != DocumentProtocol.Priority.NORMAL_3) {
out.println("Visitor priority " + params.getPriority().name());
}
if (params.skipBucketsOnFatalErrors()) {
out.println("Skip visiting super buckets with fatal errors.");
}
}
private void onDocumentSelectionException(Exception e) {
System.err.println("Illegal document selection string '" +
params.getVisitorParameters().getDocumentSelection() + "'.\n");
System.exit(1);
}
private void onIllegalArgumentException(Exception e) {
System.err.println("Illegal arguments : \n");
System.err.println(e.getMessage());
System.exit(1);
}
public void run() {
System.exit(doRun());
}
protected int doRun() {
VisitorParameters visitorParameters = params.getVisitorParameters();
// If progress file already exists, create resume token from it
if (visitorParameters.getResumeFileName() != null &&
!"".equals(visitorParameters.getResumeFileName()))
{
try {
File file = new File(visitorParameters.getResumeFileName());
FileInputStream fos = new FileInputStream(file);
StringBuilder builder = new StringBuilder();
byte[] b = new byte[100000];
int length;
while ((length = fos.read(b)) > 0) {
builder.append(new String(b, 0, length));
}
fos.close();
visitorParameters.setResumeToken(new ProgressToken(builder.toString()));
if (params.isVerbose()) {
System.err.format("Resuming visitor already %.1f %% finished.\n",
visitorParameters.getResumeToken().percentFinished());
}
} catch (FileNotFoundException e) {
// Ignore; file has not been created yet but will be shortly.
} catch (IOException e) {
System.err.println("Could not open progress file: " + visitorParameters.getResumeFileName());
e.printStackTrace(System.err);
return 1;
}
}
initShutdownHook();
sessionAccessor = sessionAccessorFactory.createVisitorSessionAccessor();
VdsVisitHandler handler;
handler = new StdOutVisitorHandler(
params.isPrintIdsOnly(),
params.isVerbose(),
params.isVerbose(),
params.isVerbose(),
params.getStatisticsParts() != null,
params.getAbortOnClusterDown(),
params.getProcessTime(),
params.jsonOutput);
if (visitorParameters.getResumeFileName() != null) {
handler.setProgressFileName(visitorParameters.getResumeFileName());
}
visitorParameters.setControlHandler(handler.getControlHandler());
if (visitorParameters.getRemoteDataHandler() == null) {
visitorParameters.setLocalDataHandler(handler.getDataHandler());
}
if (params.getStatisticsParts() != null) {
String[] parts = params.getStatisticsParts().split(",");
for (String s : parts) {
visitorParameters.setLibraryParameter(s, "true");
}
}
try {
session = sessionAccessor.createVisitorSession(visitorParameters);
while (true) {
try {
if (session.waitUntilDone(params.getFullTimeout())) break;
} catch (InterruptedException e) {}
}
if (visitorParameters.getTraceLevel() > 0) {
System.out.println(session.getTrace().toString());
}
} catch (ParseException e) {
onDocumentSelectionException(e);
} catch (IllegalArgumentException e) {
onIllegalArgumentException(e);
} catch (Exception e) {
System.err.println("Document selection string was: " + visitorParameters.getDocumentSelection());
System.err.println("Caught unexpected exception: ");
e.printStackTrace(System.err);
return 1;
}
if (visitorParameters.getControlHandler().getResult().code
== VisitorControlHandler.CompletionCode.SUCCESS)
{
return 0;
} else {
return 1;
}
}
private void initShutdownHook() {
shutdownHookRegistrar.registerShutdownHook(new CleanUpThread());
}
class CleanUpThread extends Thread {
public void run() {
try {
if (session != null) {
session.destroy();
}
} catch (IllegalStateException ise) {
//ignore this
}
try {
if (sessionAccessor != null) {
sessionAccessor.shutdown();
}
} catch (IllegalStateException ise) {
//ignore this too
}
}
}
}