diff options
Diffstat (limited to 'searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp')
-rw-r--r-- | searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp | 634 |
1 files changed, 634 insertions, 0 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp new file mode 100644 index 00000000000..eb6b77cd1b3 --- /dev/null +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -0,0 +1,634 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("vespa-transactionlog-inspect"); + +#include <vespa/config/helper/configgetter.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/searchcore/proton/server/replaypacketdispatcher.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/transactionlog/translogserver.h> +#include <vespa/vespalib/util/programoptions.h> +#include <iostream> + +using namespace proton; +using namespace search; +using namespace search::common; +using namespace search::transactionlog; + +using document::DocumenttypesConfig; +using document::DocumentTypeRepo; + +typedef std::shared_ptr<DocumenttypesConfig> DocumenttypesConfigSP; +typedef std::unique_ptr<IReplayPacketHandler> IReplayPacketHandlerUP; + +struct DummyFileHeaderContext : public FileHeaderContext +{ + typedef std::unique_ptr<DummyFileHeaderContext> UP; + virtual void addTags(vespalib::GenericHeader &, const vespalib::string &) const {} +}; + + +namespace +{ + +class ConfigFile +{ + typedef std::shared_ptr<ConfigFile> SP; + + vespalib::string _name; + time_t _modTime; + std::vector<char> _content; + +public: + ConfigFile(void); + + const vespalib::string & + getName(void) const + { + return _name; + } + + vespalib::nbostream & + deserialize(vespalib::nbostream &stream); + + void + print(void) const; +}; + + +ConfigFile::ConfigFile(void) + : _name(), + _modTime(0), + _content() +{ +} + + +vespalib::nbostream & +ConfigFile::deserialize(vespalib::nbostream &stream) +{ + stream >> _name; + assert(strchr(_name.c_str(), '/') == NULL); + stream >> _modTime; + uint32_t sz; + stream >> sz; + _content.resize(sz); + assert(stream.size() >= sz); + memcpy(&_content[0], stream.peek(), sz); + stream.adjustReadPos(sz); + return stream; +} + +void +ConfigFile::print(void) const +{ + std::cout << "Name: " << _name << "\n" << + "ModTime: " << _modTime << "\n" << + "Content-Length: " << _content.size() << "\n\n"; + std::cout.write(&_content[0], _content.size()); + std::cout << "\n-----------------------------" << std::endl; +} + +vespalib::nbostream & +operator>>(vespalib::nbostream &stream, ConfigFile &configFile) +{ + return configFile.deserialize(stream); +} + + +} + +struct DummyStreamHandler : public NewConfigOperation::IStreamHandler { + std::map<std::string, ConfigFile> _cfs; + + DummyStreamHandler(void) + : NewConfigOperation::IStreamHandler(), + _cfs() + { + } + + virtual void + serializeConfig(SerialNum, vespalib::nbostream &) + { + } + + virtual void + deserializeConfig(SerialNum, vespalib::nbostream &is) + { + _cfs.clear(); + uint32_t numConfigs; + is >> numConfigs; + for (uint32_t i = 0; i < numConfigs; ++i) { + ConfigFile cf; + is >> cf; + _cfs[cf.getName()] = cf; + } + assert(is.size() == 0); + } +}; + +struct DocTypeRepo { + DocumenttypesConfigSP docTypeCfg; + DocumentTypeRepo docTypeRepo; + DocTypeRepo(const std::string &configDir) + : docTypeCfg(config::ConfigGetter<DocumenttypesConfig>:: + getConfig("", config::DirSpec(configDir)).release()), + docTypeRepo(*docTypeCfg) + { + } +}; + + +/** + * Class the receives all concrete operations as part of a domain visit + * and prints the content of them to standard out. + */ +class OperationPrinter : public IReplayPacketHandler +{ +private: + DocumentTypeRepo &_repo; + DummyStreamHandler _streamHandler; + size_t _counter; + +protected: + void print(const FeedOperation &op) { + std::cout << "OP[" << (_counter++) << "]: " << op.toString() << std::endl; + } + +public: + OperationPrinter(DocumentTypeRepo &repo) + : _repo(repo), + _streamHandler(), + _counter(0) + { + } + virtual void replay(const PutOperation &op) { print(op); } + virtual void replay(const RemoveOperation &op) { print(op); } + virtual void replay(const UpdateOperation &op) { print(op); } + virtual void replay(const NoopOperation &op) { print(op); } + virtual void replay(const NewConfigOperation &op) + { + print(op); + typedef std::map<std::string, ConfigFile>::const_iterator I; + for (I i(_streamHandler._cfs.begin()), ie(_streamHandler._cfs.end()); + i != ie; ++i) { + i->second.print(); + } + } + + virtual void replay(const WipeHistoryOperation &op) { print(op); } + virtual void replay(const DeleteBucketOperation &op) { print(op); } + virtual void replay(const SplitBucketOperation &op) { print(op); } + virtual void replay(const JoinBucketsOperation &op) { print(op); } + virtual void replay(const PruneRemovedDocumentsOperation &op) { print(op); } + virtual void replay(const SpoolerReplayStartOperation &op) { print(op); } + virtual void replay(const SpoolerReplayCompleteOperation &op) { print(op); } + virtual void replay(const MoveOperation &op) { print(op); } + virtual void replay(const CreateBucketOperation &op) { print(op); } + virtual void replay(const CompactLidSpaceOperation &op) { print(op); } + virtual NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() { + return _streamHandler; + } + virtual document::DocumentTypeRepo &getDeserializeRepo() { + return _repo; + } +}; + + +/** + * Class the receives all concrete operations as part of a domain visit + * and prints all document operations to standard out. + */ +class DocumentPrinter : public OperationPrinter +{ +private: + bool _printXml; + bool _verbose; + + void printXml(const vespalib::xml::XmlSerializable &toPrint) { + vespalib::xml::XmlOutputStream out(std::cout); + toPrint.printXml(out); + std::cout << std::endl; + } + + void printXml(const document::FieldValue &toPrint) { + vespalib::xml::XmlOutputStream out(std::cout); + toPrint.printXml(out); + std::cout << std::endl; + } + + void printText(const document::Printable &toPrint) { + toPrint.print(std::cout, _verbose); + std::cout << std::endl; + } + + void printText(const document::FieldValue &toPrint) { + toPrint.print(std::cout, _verbose); + std::cout << std::endl; + } + +public: + DocumentPrinter(DocumentTypeRepo &repo, bool printXml_, bool verbose) + : OperationPrinter(repo), + _printXml(printXml_), + _verbose(verbose) + { + } + virtual void replay(const PutOperation &op) { + print(op); + if (op.getDocument().get() != NULL) { + if (_printXml) { + printXml(*op.getDocument()); + } else { + printText(*op.getDocument()); + } + } + } + virtual void replay(const RemoveOperation &op) { + print(op); + } + virtual void replay(const UpdateOperation &op) { + print(op); + if (op.getUpdate().get() != NULL) { + if (_printXml) { + printXml(*op.getUpdate()); + } else { + printText(*op.getUpdate()); + } + } + } + virtual void replay(const NoopOperation &) { } + virtual void replay(const NewConfigOperation &) { } + virtual void replay(const WipeHistoryOperation &) { } + virtual void replay(const DeleteBucketOperation &) { } + virtual void replay(const SplitBucketOperation &) { } + virtual void replay(const JoinBucketsOperation &) { } + virtual void replay(const PruneRemovedDocumentsOperation &) { } + virtual void replay(const SpoolerReplayStartOperation &) { } + virtual void replay(const SpoolerReplayCompleteOperation &) { } + virtual void replay(const MoveOperation &) { } + virtual void replay(const CreateBucketOperation &) { } +}; + + +/** + * Class that receives packets from the tls as part of a domain visit + * and dispatches each packet entry to the ReplayPacketDispatcher that + * transforms them into concrete operations. + */ +class VisitorCallback : public TransLogClient::Session::Callback +{ +private: + ReplayPacketDispatcher _dispatcher; + bool _eof; + +public: + VisitorCallback(IReplayPacketHandler &handler) + : _dispatcher(handler), + _eof(false) + { + } + virtual RPC::Result receive(const Packet &packet) { + vespalib::nbostream handle(packet.getHandle().c_str(), + packet.getHandle().size(), + true); + try { + while (handle.size() > 0) { + Packet::Entry entry; + entry.deserialize(handle); + _dispatcher.replayEntry(entry); + } + } catch (const std::exception &e) { + std::cerr << "Error while handling transaction log packet: '" + << std::string(e.what()) << "'" << std::endl; + return RPC::ERROR; + } + return RPC::OK; + } + virtual void inSync() { } + virtual void eof() { _eof = true; } + bool isEof() const { return _eof; } +}; + + +/** + * Interface for a utility. + */ +struct Utility +{ + virtual ~Utility() {} + typedef std::unique_ptr<Utility> UP; + virtual int run() = 0; +}; + + +/** + * Base options used by a utility class. + */ +class BaseOptions +{ +protected: + vespalib::ProgramOptions _opts; + +public: + std::string tlsDir; + std::string tlsName; + int listenPort; + typedef std::unique_ptr<BaseOptions> UP; + BaseOptions(int argc, const char* const* argv) + : _opts(argc, argv) + { + _opts.addOption("tlsdir", tlsDir, "Tls directory"); + _opts.addOption("tlsname", tlsName, std::string("tls"), "Name of the tls"); + _opts.addOption("listenport", listenPort, 13701, "Tcp listen port"); + } + virtual ~BaseOptions() {} + void usage() { _opts.writeSyntaxPage(std::cout); } + virtual void parse() { _opts.parse(); } + virtual std::string toString() const { + return vespalib::make_string("tlsdir=%s, tlsname=%s, listenport=%d", + tlsDir.c_str(), tlsName.c_str(), listenPort); + } + virtual Utility::UP createUtility() const = 0; +}; + +/** + * Base class for a utility with tls server and tls client. + */ +class BaseUtility : public Utility +{ +protected: + const BaseOptions &_bopts; + DummyFileHeaderContext _fileHeader; + TransLogServer _server; + TransLogClient _client; + +public: + BaseUtility(const BaseOptions &bopts) + : _bopts(bopts), + _fileHeader(), + _server(_bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), + _client(vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) + { + } + virtual int run() = 0; +}; + + +/** + * Program options used by ListDomainsUtility. + */ +struct ListDomainsOptions : public BaseOptions +{ + ListDomainsOptions(int argc, const char* const* argv) + : BaseOptions(argc, argv) + { + _opts.setSyntaxMessage("Utility to list all domains in a tls"); + } + static std::string command() { return "listdomains"; } + virtual Utility::UP createUtility() const; +}; + +/** + * Utility to list all domains in a tls. + */ +class ListDomainsUtility : public BaseUtility +{ +public: + ListDomainsUtility(const ListDomainsOptions &opts) + : BaseUtility(opts) + { + } + virtual int run() { + std::cout << ListDomainsOptions::command() << ": " << _bopts.toString() << std::endl; + + std::vector<vespalib::string> domains; + _client.listDomains(domains); + std::cout << "Listing status for " << domains.size() << " domain(s):" << std::endl; + for (size_t i = 0; i < domains.size(); ++i) { + TransLogClient::Session::UP session = _client.open(domains[i]); + SerialNum first; + SerialNum last; + size_t count; + session->status(first, last, count); + std::cout << "Domain '" << domains[i] << "': first=" << first << ", last=" << last; + std::cout << ", count=" << count << std::endl; + } + return 0; + } +}; + +Utility::UP +ListDomainsOptions::createUtility() const +{ + return Utility::UP(new ListDomainsUtility(*this)); +} + + +/** + * Program options used by DumpOperationsUtility. + */ +struct DumpOperationsOptions : public BaseOptions +{ + std::string domainName; + SerialNum firstSerialNum; + SerialNum lastSerialNum; + std::string configDir; + DumpOperationsOptions(int argc, const char* const* argv) + : BaseOptions(argc, argv) + { + _opts.addOption("domain", domainName, "Name of the domain"); + _opts.addOption("first", firstSerialNum, "Serial number of first operation"); + _opts.addOption("last", lastSerialNum, "Serial number of last operation"); + _opts.addOption("configdir", configDir, "Config directory (with documenttypes.cfg)"); + _opts.setSyntaxMessage("Utility to dump a range of operations ([first,last]) in a tls domain"); + } + static std::string command() { return "dumpoperations"; } + virtual std::string toString() const { + return vespalib::make_string("%s, domain=%s, first=%" PRIu64 ", last=%" PRIu64 ", configdir=%s", + BaseOptions::toString().c_str(), domainName.c_str(), + firstSerialNum, lastSerialNum, + configDir.c_str()); + } + virtual Utility::UP createUtility() const; +}; + +/** + * Utility to dump a range of operations in a tls domain. + */ +class DumpOperationsUtility : public BaseUtility +{ +protected: + const DumpOperationsOptions &_oopts; + + virtual IReplayPacketHandlerUP createHandler(DocumentTypeRepo &repo) { + return IReplayPacketHandlerUP(new OperationPrinter(repo)); + } + + int doRun() { + DocTypeRepo repo(_oopts.configDir); + IReplayPacketHandlerUP handler = createHandler(repo.docTypeRepo); + VisitorCallback callback(*handler); + TransLogClient::Visitor::UP visitor = _client.createVisitor(_oopts.domainName, callback); + bool visitOk = visitor->visit(_oopts.firstSerialNum-1, _oopts.lastSerialNum); + if (!visitOk) { + std::cerr << "Visiting domain '" << _oopts.domainName << "' [" << _oopts.firstSerialNum << "," + << _oopts.lastSerialNum << "] failed" << std::endl; + return 1; + } + for (size_t i = 0; !callback.isEof() && (i < 60 * 60); i++ ) { + FastOS_Thread::Sleep(1000); + } + return 0; + } + +public: + DumpOperationsUtility(const DumpOperationsOptions &oopts) + : BaseUtility(oopts), + _oopts(oopts) + { + } + virtual int run() { + std::cout << DumpOperationsOptions::command() << ": " << _oopts.toString() << std::endl; + return doRun(); + } +}; + +Utility::UP +DumpOperationsOptions::createUtility() const +{ + return Utility::UP(new DumpOperationsUtility(*this)); +} + + +/** + * Program options used by DumpDocumentsUtility. + */ +struct DumpDocumentsOptions : public DumpOperationsOptions +{ + std::string format; + bool verbose; + DumpDocumentsOptions(int argc, const char* const* argv) + : DumpOperationsOptions(argc, argv) + { + _opts.addOption("format", format, std::string("xml"), "Format in which the document operations should be dumped ('xml' or 'text')"); + _opts.addOption("verbose", verbose, false, "Whether the document operations should be dumped verbosely"); + _opts.setSyntaxMessage("Utility to dump a range of document operations ([first,last]) in a tls domain"); + } + static std::string command() { return "dumpdocuments"; } + virtual void parse() { + DumpOperationsOptions::parse(); + if (format != "xml" && format != "text") { + throw vespalib::InvalidCommandLineArgumentsException("Expected 'format' to be 'xml' or 'text'"); + } + } + virtual std::string toString() const { + return vespalib::make_string("%s, format=%s, verbose=%s", + DumpOperationsOptions::toString().c_str(), + format.c_str(), (verbose ? "true" : "false")); + } + virtual Utility::UP createUtility() const; +}; + +/** + * Utility to dump a range of document operations in a tls domain. + */ +class DumpDocumentsUtility : public DumpOperationsUtility +{ +protected: + const DumpDocumentsOptions &_dopts; + virtual IReplayPacketHandlerUP createHandler(DocumentTypeRepo &repo) { + return IReplayPacketHandlerUP(new DocumentPrinter(repo, _dopts.format == "xml", _dopts.verbose)); + } + +public: + DumpDocumentsUtility(const DumpDocumentsOptions &dopts) + : DumpOperationsUtility(dopts), + _dopts(dopts) + { + } + virtual int run() { + std::cout << DumpDocumentsOptions::command() << ": " << _oopts.toString() << std::endl; + return doRun(); + } +}; + +Utility::UP +DumpDocumentsOptions::createUtility() const +{ + return Utility::UP(new DumpDocumentsUtility(*this)); +} + + +/** + * Main application. + */ +class App : public FastOS_Application +{ +private: + std::string _programName; + std::string _tmpArg; + + void combineFirstArgs() { + _tmpArg = vespalib::make_string("%s %s", _argv[0], _argv[1]).c_str(); + _argv[1] = &_tmpArg[0]; + } + void replaceFirstArg(const std::string &replace) { + _tmpArg = vespalib::make_string("%s %s", _programName.c_str(), replace.c_str()).c_str(); + _argv[0] = &_tmpArg[0]; + } + void usageHeader() { + std::cout << _programName << " version 0.0\n"; + } + void usage() { + usageHeader(); + replaceFirstArg(ListDomainsOptions::command()); + ListDomainsOptions(_argc, _argv).usage(); + replaceFirstArg(DumpOperationsOptions::command()); + DumpOperationsOptions(_argc, _argv).usage(); + replaceFirstArg(DumpDocumentsOptions::command()); + DumpDocumentsOptions(_argc, _argv).usage(); + } + +public: + int Main() { + _programName = _argv[0]; + if (_argc < 2) { + usage(); + return 1; + } + BaseOptions::UP opts; + if (strcmp(_argv[1], ListDomainsOptions::command().c_str()) == 0) { + combineFirstArgs(); + opts.reset(new ListDomainsOptions(_argc-1, _argv+1)); + } else if (strcmp(_argv[1], DumpOperationsOptions::command().c_str()) == 0) { + combineFirstArgs(); + opts.reset(new DumpOperationsOptions(_argc-1, _argv+1)); + } else if (strcmp(_argv[1], DumpDocumentsOptions::command().c_str()) == 0) { + combineFirstArgs(); + opts.reset(new DumpDocumentsOptions(_argc-1, _argv+1)); + } + if (opts.get() != NULL) { + try { + opts->parse(); + } catch (const vespalib::InvalidCommandLineArgumentsException &e) { + std::cerr << "Error parsing program options: " << e.getMessage() << std::endl; + usageHeader(); + opts->usage(); + return 1; + } + return opts->createUtility()->run(); + } + usage(); + return 1; + } +}; + +int +main(int argc, char **argv) +{ + App app; + return app.Entry(argc, argv); +} |