diff options
Diffstat (limited to 'slobrok')
122 files changed, 9415 insertions, 0 deletions
diff --git a/slobrok/.gitignore b/slobrok/.gitignore new file mode 100644 index 00000000000..a9b20e8992d --- /dev/null +++ b/slobrok/.gitignore @@ -0,0 +1,2 @@ +Makefile +Testing diff --git a/slobrok/CMakeLists.txt b/slobrok/CMakeLists.txt new file mode 100644 index 00000000000..1e074c1f814 --- /dev/null +++ b/slobrok/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_define_module( + DEPENDS + fastos + vespalib + fnet + configdefinitions + + LIBS + src/vespa/slobrok + src/vespa/slobrok/server + + APPS + src/apps/check_slobrok + src/apps/sbcmd + src/apps/slobrok + + TESTS + src/tests/backoff + src/tests/configure + src/tests/mirrorapi + src/tests/multi + src/tests/oldapi + src/tests/registerapi + src/tests/standalone + src/tests/startsome + src/tests/startup +) diff --git a/slobrok/OWNERS b/slobrok/OWNERS new file mode 100644 index 00000000000..67cd2820bb8 --- /dev/null +++ b/slobrok/OWNERS @@ -0,0 +1 @@ +arnej27959 diff --git a/slobrok/README b/slobrok/README new file mode 100644 index 00000000000..1a453ba93c2 --- /dev/null +++ b/slobrok/README @@ -0,0 +1 @@ +Service Location Broker (a simple naming service) diff --git a/slobrok/src/.gitignore b/slobrok/src/.gitignore new file mode 100644 index 00000000000..42d192f90a7 --- /dev/null +++ b/slobrok/src/.gitignore @@ -0,0 +1,4 @@ +Makefile.ini +config_command.sh +project.dsw +/slobrok.mak diff --git a/slobrok/src/Doxyfile b/slobrok/src/Doxyfile new file mode 100644 index 00000000000..2413c16fcb1 --- /dev/null +++ b/slobrok/src/Doxyfile @@ -0,0 +1,228 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +# Doxyfile 1.4.7 + +#--------------------------------------------------------------------------- +# Project related configuration options +#--------------------------------------------------------------------------- +PROJECT_NAME = slobrok +PROJECT_NUMBER = +OUTPUT_DIRECTORY = ../doc/doxygen +CREATE_SUBDIRS = NO +OUTPUT_LANGUAGE = English +USE_WINDOWS_ENCODING = NO +BRIEF_MEMBER_DESC = YES +REPEAT_BRIEF = YES +ABBREVIATE_BRIEF = +ALWAYS_DETAILED_SEC = NO +INLINE_INHERITED_MEMB = NO +FULL_PATH_NAMES = NO +STRIP_FROM_PATH = +STRIP_FROM_INC_PATH = +SHORT_NAMES = NO +JAVADOC_AUTOBRIEF = NO +MULTILINE_CPP_IS_BRIEF = NO +DETAILS_AT_TOP = NO +INHERIT_DOCS = YES +SEPARATE_MEMBER_PAGES = NO +TAB_SIZE = 8 +ALIASES = +OPTIMIZE_OUTPUT_FOR_C = NO +OPTIMIZE_OUTPUT_JAVA = NO +BUILTIN_STL_SUPPORT = NO +DISTRIBUTE_GROUP_DOC = NO +SUBGROUPING = YES +#--------------------------------------------------------------------------- +# Build related configuration options +#--------------------------------------------------------------------------- +EXTRACT_ALL = YES +EXTRACT_PRIVATE = NO +EXTRACT_STATIC = NO +EXTRACT_LOCAL_CLASSES = YES +EXTRACT_LOCAL_METHODS = NO +HIDE_UNDOC_MEMBERS = NO +HIDE_UNDOC_CLASSES = NO +HIDE_FRIEND_COMPOUNDS = NO +HIDE_IN_BODY_DOCS = NO +INTERNAL_DOCS = NO +CASE_SENSE_NAMES = YES +HIDE_SCOPE_NAMES = NO +SHOW_INCLUDE_FILES = YES +INLINE_INFO = YES +SORT_MEMBER_DOCS = YES +SORT_BRIEF_DOCS = NO +SORT_BY_SCOPE_NAME = YES +GENERATE_TODOLIST = YES +GENERATE_TESTLIST = YES +GENERATE_BUGLIST = YES +GENERATE_DEPRECATEDLIST= YES +ENABLED_SECTIONS = +MAX_INITIALIZER_LINES = 30 +SHOW_USED_FILES = YES +SHOW_DIRECTORIES = YES +FILE_VERSION_FILTER = +#--------------------------------------------------------------------------- +# configuration options related to warning and progress messages +#--------------------------------------------------------------------------- +QUIET = NO +WARNINGS = YES +WARN_IF_UNDOCUMENTED = YES +WARN_IF_DOC_ERROR = YES +WARN_NO_PARAMDOC = NO +WARN_FORMAT = "$file:$line: $text" +WARN_LOGFILE = +#--------------------------------------------------------------------------- +# configuration options related to the input files +#--------------------------------------------------------------------------- +INPUT = +FILE_PATTERNS = +RECURSIVE = YES +EXCLUDE = regress +EXCLUDE_SYMLINKS = NO +EXCLUDE_PATTERNS = tstdst* \ + sbcmd* \ + check_slobrok* +EXAMPLE_PATH = +EXAMPLE_PATTERNS = +EXAMPLE_RECURSIVE = NO +IMAGE_PATH = +INPUT_FILTER = +FILTER_PATTERNS = +FILTER_SOURCE_FILES = NO +#--------------------------------------------------------------------------- +# configuration options related to source browsing +#--------------------------------------------------------------------------- +SOURCE_BROWSER = NO +INLINE_SOURCES = NO +STRIP_CODE_COMMENTS = YES +REFERENCED_BY_RELATION = YES +REFERENCES_RELATION = YES +REFERENCES_LINK_SOURCE = YES +USE_HTAGS = NO +VERBATIM_HEADERS = YES +#--------------------------------------------------------------------------- +# configuration options related to the alphabetical class index +#--------------------------------------------------------------------------- +ALPHABETICAL_INDEX = NO +COLS_IN_ALPHA_INDEX = 5 +IGNORE_PREFIX = +#--------------------------------------------------------------------------- +# configuration options related to the HTML output +#--------------------------------------------------------------------------- +GENERATE_HTML = YES +HTML_OUTPUT = html +HTML_FILE_EXTENSION = .html +HTML_HEADER = +HTML_FOOTER = +HTML_STYLESHEET = +HTML_ALIGN_MEMBERS = YES +GENERATE_HTMLHELP = NO +CHM_FILE = +HHC_LOCATION = +GENERATE_CHI = NO +BINARY_TOC = NO +TOC_EXPAND = NO +DISABLE_INDEX = NO +ENUM_VALUES_PER_LINE = 4 +GENERATE_TREEVIEW = NO +TREEVIEW_WIDTH = 250 +#--------------------------------------------------------------------------- +# configuration options related to the LaTeX output +#--------------------------------------------------------------------------- +GENERATE_LATEX = YES +LATEX_OUTPUT = latex +LATEX_CMD_NAME = latex +MAKEINDEX_CMD_NAME = makeindex +COMPACT_LATEX = NO +PAPER_TYPE = a4wide +EXTRA_PACKAGES = +LATEX_HEADER = +PDF_HYPERLINKS = NO +USE_PDFLATEX = NO +LATEX_BATCHMODE = NO +LATEX_HIDE_INDICES = NO +#--------------------------------------------------------------------------- +# configuration options related to the RTF output +#--------------------------------------------------------------------------- +GENERATE_RTF = NO +RTF_OUTPUT = rtf +COMPACT_RTF = NO +RTF_HYPERLINKS = NO +RTF_STYLESHEET_FILE = +RTF_EXTENSIONS_FILE = +#--------------------------------------------------------------------------- +# configuration options related to the man page output +#--------------------------------------------------------------------------- +GENERATE_MAN = NO +MAN_OUTPUT = man +MAN_EXTENSION = .3 +MAN_LINKS = NO +#--------------------------------------------------------------------------- +# configuration options related to the XML output +#--------------------------------------------------------------------------- +GENERATE_XML = NO +XML_OUTPUT = xml +XML_SCHEMA = +XML_DTD = +XML_PROGRAMLISTING = YES +#--------------------------------------------------------------------------- +# configuration options for the AutoGen Definitions output +#--------------------------------------------------------------------------- +GENERATE_AUTOGEN_DEF = NO +#--------------------------------------------------------------------------- +# configuration options related to the Perl module output +#--------------------------------------------------------------------------- +GENERATE_PERLMOD = NO +PERLMOD_LATEX = NO +PERLMOD_PRETTY = YES +PERLMOD_MAKEVAR_PREFIX = +#--------------------------------------------------------------------------- +# Configuration options related to the preprocessor +#--------------------------------------------------------------------------- +ENABLE_PREPROCESSING = YES +MACRO_EXPANSION = NO +EXPAND_ONLY_PREDEF = NO +SEARCH_INCLUDES = YES +INCLUDE_PATH = +INCLUDE_FILE_PATTERNS = +PREDEFINED = +EXPAND_AS_DEFINED = +SKIP_FUNCTION_MACROS = YES +#--------------------------------------------------------------------------- +# Configuration::additions related to external references +#--------------------------------------------------------------------------- +TAGFILES = ../../vespalib/doc/doxygen/vespalib.tag=../vespalib +GENERATE_TAGFILE = ../doc/doxygen/slobrok.tag +ALLEXTERNALS = NO +EXTERNAL_GROUPS = YES +PERL_PATH = /usr/bin/perl +#--------------------------------------------------------------------------- +# Configuration options related to the dot tool +#--------------------------------------------------------------------------- +CLASS_DIAGRAMS = YES +HIDE_UNDOC_RELATIONS = YES +HAVE_DOT = YES +CLASS_GRAPH = YES +COLLABORATION_GRAPH = YES +GROUP_GRAPHS = YES +UML_LOOK = NO +TEMPLATE_RELATIONS = NO +INCLUDE_GRAPH = YES +INCLUDED_BY_GRAPH = YES +CALL_GRAPH = YES +CALLER_GRAPH = NO +GRAPHICAL_HIERARCHY = YES +DIRECTORY_GRAPH = YES +DOT_IMAGE_FORMAT = png +DOT_PATH = +DOTFILE_DIRS = +MAX_DOT_GRAPH_WIDTH = 1024 +MAX_DOT_GRAPH_HEIGHT = 1024 +MAX_DOT_GRAPH_DEPTH = 0 +DOT_TRANSPARENT = NO +DOT_MULTI_TARGETS = NO +GENERATE_LEGEND = YES +DOT_CLEANUP = YES +#--------------------------------------------------------------------------- +# Configuration::additions related to the search engine +#--------------------------------------------------------------------------- +SEARCHENGINE = NO diff --git a/slobrok/src/apps/check_slobrok/.gitignore b/slobrok/src/apps/check_slobrok/.gitignore new file mode 100644 index 00000000000..28d7034bd09 --- /dev/null +++ b/slobrok/src/apps/check_slobrok/.gitignore @@ -0,0 +1,4 @@ +/.depend +/Makefile +/check_slobrok +slobrok_check_slobrok_app diff --git a/slobrok/src/apps/check_slobrok/CMakeLists.txt b/slobrok/src/apps/check_slobrok/CMakeLists.txt new file mode 100644 index 00000000000..638f1c8c9d1 --- /dev/null +++ b/slobrok/src/apps/check_slobrok/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_check_slobrok_app + SOURCES + check_slobrok.cpp + INSTALL bin + DEPENDS +) diff --git a/slobrok/src/apps/check_slobrok/check_slobrok.cpp b/slobrok/src/apps/check_slobrok/check_slobrok.cpp new file mode 100644 index 00000000000..294cbe844c7 --- /dev/null +++ b/slobrok/src/apps/check_slobrok/check_slobrok.cpp @@ -0,0 +1,115 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP("check_slobrok"); + +#include <vespa/fnet/frt/frt.h> + +#include <string> +#include <sstream> + + +class Slobrok_Checker : public FastOS_Application +{ +private: + FRT_Supervisor *_supervisor; + FRT_Target *_target; + + Slobrok_Checker(const Slobrok_Checker &); + Slobrok_Checker &operator=(const Slobrok_Checker &); + +public: + Slobrok_Checker() : _supervisor(NULL), _target(NULL) {} + virtual ~Slobrok_Checker(); + int usage(); + void initRPC(const char *spec); + void finiRPC(); + virtual int Main(); +}; + + +Slobrok_Checker::~Slobrok_Checker() +{ + LOG_ASSERT(_supervisor == NULL); + LOG_ASSERT(_target == NULL); +} + + +int +Slobrok_Checker::usage() +{ + fprintf(stderr, "usage: %s <port>\n", _argv[0]); + return 1; +} + + +void +Slobrok_Checker::initRPC(const char *spec) +{ + _supervisor = new FRT_Supervisor(); + _target = _supervisor->GetTarget(spec); + _supervisor->Start(); +} + + +void +Slobrok_Checker::finiRPC() +{ + if (_target != NULL) { + _target->SubRef(); + _target = NULL; + } + if (_supervisor != NULL) { + _supervisor->ShutDown(true); + delete _supervisor; + _supervisor = NULL; + } +} + + +int +Slobrok_Checker::Main() +{ + if (_argc != 2) { + return usage(); + } + int port = atoi(_argv[1]); + if (port == 0) { + initRPC(_argv[1]); + } else { + std::ostringstream tmp; + tmp << "tcp/localhost:"; + tmp << port; + initRPC(tmp.str().c_str()); + } + + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); + + req->SetMethodName("slobrok.system.version"); + _target->InvokeSync(req, 5.0); + int failed = 0; + + if (req->IsError()) { + printf("vespa_slobrok %d: %s\n", + req->GetErrorCode(), req->GetErrorMessage()); + failed = 1; + } else { + FRT_Values &answer = *(req->GetReturn()); + const char *atypes = answer.GetTypeString(); + if (strcmp(atypes, "s") == 0) { + printf("vespa_slobrok-%s OK\n", answer[0]._string._str); + } else { + printf("vespa_slobrok bad rpc return type %s\n", atypes); + failed = 1; + } + } + finiRPC(); + return failed; +} + +int main(int argc, char **argv) +{ + Slobrok_Checker sb_checker; + return sb_checker.Entry(argc, argv); +} diff --git a/slobrok/src/apps/sbcmd/.gitignore b/slobrok/src/apps/sbcmd/.gitignore new file mode 100644 index 00000000000..c035466f4f9 --- /dev/null +++ b/slobrok/src/apps/sbcmd/.gitignore @@ -0,0 +1,3 @@ +/.depend +/Makefile +/sbcmd diff --git a/slobrok/src/apps/sbcmd/CMakeLists.txt b/slobrok/src/apps/sbcmd/CMakeLists.txt new file mode 100644 index 00000000000..3eea280ac55 --- /dev/null +++ b/slobrok/src/apps/sbcmd/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_sbcmd_app + SOURCES + sbcmd.cpp + OUTPUT_NAME sbcmd + INSTALL bin + DEPENDS +) diff --git a/slobrok/src/apps/sbcmd/sbcmd.cpp b/slobrok/src/apps/sbcmd/sbcmd.cpp new file mode 100644 index 00000000000..3ba4182de98 --- /dev/null +++ b/slobrok/src/apps/sbcmd/sbcmd.cpp @@ -0,0 +1,199 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP("sb-cmd"); + +#include <vespa/fnet/frt/frt.h> + +#include <string> +#include <sstream> + + +class Slobrok_CMD : public FastOS_Application +{ +private: + FRT_Supervisor *_supervisor; + FRT_Target *_target; + + Slobrok_CMD(const Slobrok_CMD &); + Slobrok_CMD &operator=(const Slobrok_CMD &); + +public: + Slobrok_CMD() : _supervisor(NULL), _target(NULL) {} + virtual ~Slobrok_CMD(); + int usage(); + void initRPC(const char *spec); + void finiRPC(); + virtual int Main(); +}; + + +Slobrok_CMD::~Slobrok_CMD() +{ + LOG_ASSERT(_supervisor == NULL); + LOG_ASSERT(_target == NULL); +} + + +int +Slobrok_CMD::usage() +{ + fprintf(stderr, "usage: %s <port|spec> <cmd> [args]\n", _argv[0]); + fprintf(stderr, "with cmd one of:\n"); + fprintf(stderr, " slobrok.callback.listNamesServed\n"); + fprintf(stderr, " slobrok.internal.listManagedRpcServers\n"); + fprintf(stderr, " slobrok.admin.listAllRpcServers\n"); + fprintf(stderr, " slobrok.lookupRpcServer {pattern}\n"); + fprintf(stderr, " slobrok.registerRpcServer name {spec}\n"); + fprintf(stderr, " slobrok.unregisterRpcServer {name} {spec}\n"); + fprintf(stderr, " slobrok.admin.addPeer {name} {spec}\n"); + fprintf(stderr, " slobrok.admin.removePeer {name} {spec}\n"); + fprintf(stderr, " slobrok.system.stop\n"); + fprintf(stderr, " slobrok.system.version\n"); + fprintf(stderr, " system.stop\n"); + return 1; +} + + +void +Slobrok_CMD::initRPC(const char *spec) +{ + _supervisor = new FRT_Supervisor(); + _target = _supervisor->GetTarget(spec); + _supervisor->Start(); +} + + +void +Slobrok_CMD::finiRPC() +{ + if (_target != NULL) { + _target->SubRef(); + _target = NULL; + } + if (_supervisor != NULL) { + _supervisor->ShutDown(true); + delete _supervisor; + _supervisor = NULL; + } +} + + +int +Slobrok_CMD::Main() +{ + if (_argc < 3) { + return usage(); + } + int port = atoi(_argv[1]); + if (port == 0) { + initRPC(_argv[1]); + } else { + std::ostringstream tmp; + tmp << "tcp/localhost:"; + tmp << port; + initRPC(tmp.str().c_str()); + } + + bool threeTables = false; + bool twoTables = false; + + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); + + req->SetMethodName(_argv[2]); + if (strcmp(_argv[2], "slobrok.admin.listAllRpcServers") == 0) { + threeTables = true; + // no params + } else if (strcmp(_argv[2], "slobrok.internal.listManagedRpcServers") == 0) { + twoTables = true; + // no params + } else if (strcmp(_argv[2], "slobrok.callback.listNamesServed") == 0 + || strcmp(_argv[2], "slobrok.internal.listManagedRpcServers") == 0 + || strcmp(_argv[2], "slobrok.admin.listAllRpcServers") == 0 + || strcmp(_argv[2], "slobrok.system.stop") == 0 + || strcmp(_argv[2], "slobrok.system.version") == 0 + || strcmp(_argv[2], "system.stop") == 0) + { + // no params + } else if (strcmp(_argv[2], "slobrok.lookupRpcServer") == 0 + && _argc == 4) + { + twoTables = true; + // one param + req->GetParams()->AddString(_argv[3]); + } else if ((strcmp(_argv[2], "slobrok.registerRpcServer") == 0 + || strcmp(_argv[2], "slobrok.unregisterRpcServer") == 0 + || strcmp(_argv[2], "slobrok.admin.addPeer") == 0 + || strcmp(_argv[2], "slobrok.admin.removePeer") == 0) + && _argc == 5) + { + // two params + req->GetParams()->AddString(_argv[3]); + req->GetParams()->AddString(_argv[4]); + } else { + finiRPC(); + return usage(); + } + _target->InvokeSync(req, 5.0); + + if (req->IsError()) { + fprintf(stderr, "sb-cmd error %d: %s\n", + req->GetErrorCode(), req->GetErrorMessage()); + } else { + FRT_Values &answer = *(req->GetReturn()); + const char *atypes = answer.GetTypeString(); + if (threeTables + && strcmp(atypes, "SSS") == 0 + && answer[0]._string_array._len > 0 + && answer[0]._string_array._len == answer[1]._string_array._len + && answer[0]._string_array._len == answer[2]._string_array._len) + { + for (uint32_t j = 0; j < answer[0]._string_array._len; j++) { + printf("%s\t%s\t%s\n", + answer[0]._string_array._pt[j]._str, + answer[1]._string_array._pt[j]._str, + answer[2]._string_array._pt[j]._str); + } + } else if (twoTables + && strcmp(atypes, "SS") == 0 + && answer[0]._string_array._len > 0 + && answer[0]._string_array._len == answer[1]._string_array._len) + { + for (uint32_t j = 0; j < answer[0]._string_array._len; j++) { + printf("%s\t%s\n", + answer[0]._string_array._pt[j]._str, + answer[1]._string_array._pt[j]._str); + } + } else { + fprintf(stderr, "sb-cmd OK, returntypes '%s'\n", atypes); + uint32_t idx = 0; + while (atypes != NULL && *atypes != '\0') { + switch (*atypes) { + case 's': + printf(" string = '%s'\n", answer[idx]._string._str); + break; + case 'S': + printf(" strings [%d]\n", answer[idx]._string_array._len); + for (uint32_t j = 0; j < answer[idx]._string_array._len; j++) { + printf("\t'%s'\n", answer[idx]._string_array._pt[j]._str); + } + break; + default: + printf(" unknown type %c\n", *atypes); + } + ++atypes; + ++idx; + } + } + } + req->SubRef(); + finiRPC(); + return 0; +} + +int main(int argc, char **argv) +{ + Slobrok_CMD sb_cmd; + return sb_cmd.Entry(argc, argv); +} diff --git a/slobrok/src/apps/slobrok/.gitignore b/slobrok/src/apps/slobrok/.gitignore new file mode 100644 index 00000000000..e0038c324fb --- /dev/null +++ b/slobrok/src/apps/slobrok/.gitignore @@ -0,0 +1,3 @@ +/.depend +/Makefile +/slobrok diff --git a/slobrok/src/apps/slobrok/CMakeLists.txt b/slobrok/src/apps/slobrok/CMakeLists.txt new file mode 100644 index 00000000000..1a735a366e4 --- /dev/null +++ b/slobrok/src/apps/slobrok/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_app + SOURCES + slobrok.cpp + OUTPUT_NAME slobrok + INSTALL bin + DEPENDS + slobrok_slobrokserver + slobrok +) diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp new file mode 100644 index 00000000000..53d2a61001c --- /dev/null +++ b/slobrok/src/apps/slobrok/slobrok.cpp @@ -0,0 +1,114 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("slobrok"); + +#include <vespa/fnet/fnet.h> + +#include <vespa/slobrok/server/sbenv.h> + +/** + * @brief namespace for the actual slobrok application. + **/ +namespace slobrok { + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +static std::unique_ptr<SBEnv> mainobj; + +extern "C" { +static void sigtermhandler(int signo); +}; + +static void +sigtermhandler(int signo) +{ + (void) signo; + if (mainobj) { + mainobj->shutdown(); + } +} + +static void +hook_sigterm(void) +{ + struct sigaction act; + act.sa_handler = sigtermhandler; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + sigaction(SIGTERM, &act, NULL); +} + + +int +App::Main() +{ + uint32_t portnum = 2773; + uint32_t statePort = 0; + vespalib::string cfgId; + + int argi = 1; + const char* optArg; + char c; + while ((c = GetOpt("c:s:p:", optArg, argi)) != -1) { + switch (c) { + case 'c': + cfgId = std::string(optArg); + break; + case 's': + statePort = atoi(optArg); + break; + case 'p': + portnum = atoi(optArg); + break; + default: + LOG(error, "unknown option letter '%c'", c); + return 1; + } + } + int res = 1; + try { + if (cfgId.empty()) { + LOG(debug, "no config id specified"); + ConfigShim shim(portnum); + mainobj.reset(new SBEnv(shim)); + } else { + ConfigShim shim(portnum, statePort, cfgId); + mainobj.reset(new SBEnv(shim)); + } + hook_sigterm(); + res = mainobj->MainLoop(); + } catch (const config::ConfigTimeoutException &e) { + LOG(error, "config timeout during construction : %s", e.what()); + EV_STOPPING("slobrok", "config timeout during construction"); + return 1; + } catch (const vespalib::PortListenException &e) { + LOG(error, "Failed listening to network port(%d) with protocol(%s): '%s'", + e.get_port(), e.get_protocol().c_str(), e.what()); + EV_STOPPING("slobrok", "could not listen to our network port"); + return 1; + } catch (const std::exception & e) { + LOG(error, "unknown exception during construction : %s", e.what()); + EV_STOPPING("slobrok", "unknown exception during construction"); + return 2; + } catch (...) { + LOG(error, "unknown exception during construction"); + EV_STOPPING("slobrok", "unknown exception during construction"); + return 3; + } + mainobj.reset(); + return res; +} + +} // namespace slobrok + +int +main(int argc, char **argv) +{ + slobrok::App slobrok; + return slobrok.Entry(argc, argv); +} diff --git a/slobrok/src/tests/backoff/.gitignore b/slobrok/src/tests/backoff/.gitignore new file mode 100644 index 00000000000..45529c14663 --- /dev/null +++ b/slobrok/src/tests/backoff/.gitignore @@ -0,0 +1,4 @@ +.depend +Makefile +*_test +slobrok_backoff_test_app diff --git a/slobrok/src/tests/backoff/CMakeLists.txt b/slobrok/src/tests/backoff/CMakeLists.txt new file mode 100644 index 00000000000..800f984d69d --- /dev/null +++ b/slobrok/src/tests/backoff/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_backoff_test_app + SOURCES + testbackoff.cpp + DEPENDS + slobrok + slobrok_slobrokserver +) +vespa_add_test(NAME slobrok_backoff_test_app COMMAND slobrok_backoff_test_app) diff --git a/slobrok/src/tests/backoff/DESC b/slobrok/src/tests/backoff/DESC new file mode 100644 index 00000000000..9955163994a --- /dev/null +++ b/slobrok/src/tests/backoff/DESC @@ -0,0 +1 @@ +BackOff test. Take a look at testbackoff.cpp for details. diff --git a/slobrok/src/tests/backoff/FILES b/slobrok/src/tests/backoff/FILES new file mode 100644 index 00000000000..39cce25d2e1 --- /dev/null +++ b/slobrok/src/tests/backoff/FILES @@ -0,0 +1 @@ +testbackoff.cpp diff --git a/slobrok/src/tests/backoff/testbackoff.cpp b/slobrok/src/tests/backoff/testbackoff.cpp new file mode 100644 index 00000000000..af0dbd5bc81 --- /dev/null +++ b/slobrok/src/tests/backoff/testbackoff.cpp @@ -0,0 +1,100 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("backoff_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/slobrok/backoff.h> +#include <algorithm> + +using slobrok::api::BackOff; + +TEST_SETUP(Test); + +//----------------------------------------------------------------------------- + +static double expectWait[21] = { + 0.5, 1.0, 1.5, 2.0, 2.5, + 3.0, 3.5, 4.0, 4.5, + 5.0, 6.0, 7.0, 8.0, 9.0, + 10, 15, 20, 25, 30, 30, 30 +}; + +int +Test::Main() +{ + TEST_INIT("backoff_test"); + + BackOff one; + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(0.500, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(1.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(1.500, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(2.000, one.get()); + EXPECT_TRUE(one.shouldWarn()); + + EXPECT_EQUAL(2.500, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(3.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(3.500, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(4.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(4.500, one.get()); + EXPECT_TRUE(one.shouldWarn()); + + EXPECT_EQUAL(5.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(6.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(7.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(8.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(9.000, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(10.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(15.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(20.00, one.get()); + EXPECT_TRUE(one.shouldWarn()); + + EXPECT_EQUAL(25.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(30.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(30.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(30.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(30.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + EXPECT_EQUAL(30.00, one.get()); + EXPECT_FALSE(one.shouldWarn()); + + TEST_FLUSH(); + + BackOff two; + for (int i = 0; i < 21; i++) { + EXPECT_EQUAL(expectWait[i], two.get()); + if (i == 3 || i == 8 || i == 16) { + EXPECT_TRUE(two.shouldWarn()); + } else { + EXPECT_FALSE(two.shouldWarn()); + } + } + two.reset(); + for (int i = 0; i < 21; i++) { + EXPECT_EQUAL(expectWait[i], two.get()); + if (i == 7 || i == 15) { + EXPECT_TRUE(two.shouldWarn()); + } else { + EXPECT_FALSE(two.shouldWarn()); + } + } + TEST_DONE(); +} diff --git a/slobrok/src/tests/configure/.gitignore b/slobrok/src/tests/configure/.gitignore new file mode 100644 index 00000000000..346a031bc0d --- /dev/null +++ b/slobrok/src/tests/configure/.gitignore @@ -0,0 +1,12 @@ +.depend +Makefile +configure_test +gencfg +slobrok1.cfg +slobrok1.cfg.new +slobrok1.out +slobrok2.cfg +slobrok2.cfg.new +slobrok2.out +slobrok_configure_test_app +slobrok_gencfg_app diff --git a/slobrok/src/tests/configure/CMakeLists.txt b/slobrok/src/tests/configure/CMakeLists.txt new file mode 100644 index 00000000000..a9988ead03f --- /dev/null +++ b/slobrok/src/tests/configure/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_gencfg_app + SOURCES + gencfg.cpp + DEPENDS +) +vespa_add_executable(slobrok_configure_test_app + SOURCES + configure.cpp + DEPENDS + slobrok + slobrok_slobrokserver +) +vespa_add_test( + NAME slobrok_configure_test_app + COMMAND slobrok_configure_test_app + ENVIRONMENT "VESPA_LOG_LEVEL=\"all -debug -spam\"" +) diff --git a/slobrok/src/tests/configure/DESC b/slobrok/src/tests/configure/DESC new file mode 100644 index 00000000000..893e0fb704c --- /dev/null +++ b/slobrok/src/tests/configure/DESC @@ -0,0 +1 @@ +configure test. Take a look at configure.cpp for details. diff --git a/slobrok/src/tests/configure/FILES b/slobrok/src/tests/configure/FILES new file mode 100644 index 00000000000..573a507f04b --- /dev/null +++ b/slobrok/src/tests/configure/FILES @@ -0,0 +1 @@ +configure.cpp diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp new file mode 100644 index 00000000000..e8977f678e4 --- /dev/null +++ b/slobrok/src/tests/configure/configure.cpp @@ -0,0 +1,223 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("configure_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/slobrok/sbregister.h> +#include <vespa/slobrok/server/slobrokserver.h> +#include <vespa/config/config.h> +#include <vespa/config-slobroks.h> +#include <sstream> +#include <algorithm> +#include <iostream> +#include <vespa/vespalib/util/host_name.h> + +using slobrok::api::MirrorAPI; +using slobrok::api::RegisterAPI; + +using slobrok::ConfigShim; +using slobrok::SlobrokServer; +using slobrok::ConfiguratorFactory; + +TEST_SETUP(Test); + + +std::string +createSpec(int port) +{ + if (port == 0) { + return std::string(); + } + std::ostringstream str; + str << "tcp/"; + str << vespalib::HostName::get(); + str << ":"; + str << port; + return str.str(); +} + + +struct SpecList +{ + MirrorAPI::SpecList _specList; + SpecList() : _specList() {} + SpecList(MirrorAPI::SpecList input) : _specList(input) {} + SpecList &add(const char *name, const char *spec) { + _specList.push_back(make_pair(std::string(name), + std::string(spec))); + return *this; + } + void sort() { + std::sort(_specList.begin(), _specList.end()); + } + bool operator==(SpecList &rhs) { // NB: MUTATE! + sort(); + rhs.sort(); + return _specList == rhs._specList; + } + std::string strVal() { + sort(); + std::string ret = "{"; + for (MirrorAPI::SpecList::iterator it = _specList.begin(); + it != _specList.end(); + ++it) + { + ret += "["; + ret += (*it).first; + ret += " -> "; + ret += (*it).second; + ret += "]"; + }; + ret += "}"; + return ret; + } +}; + + +bool +compare(MirrorAPI &api, const char *pattern, SpecList expect) +{ + for (int i = 0; i < 250; ++i) { + SpecList actual(api.lookup(pattern)); + if (actual == expect) { + return true; + } + FastOS_Thread::Sleep(100); + } + SpecList actual(api.lookup(pattern)); + std::cerr << "Actual: " << actual.strVal() << std::endl; + return false; +} + +int +Test::Main() +{ + TEST_INIT("configure_test"); + + FRT_Supervisor orb1; + FRT_Supervisor orb2; + + config::ConfigSet set; + cloud::config::SlobroksConfigBuilder srv1Builder; + srv1Builder.slobrok.resize(2); + srv1Builder.slobrok[0].connectionspec = createSpec(18524); + srv1Builder.slobrok[1].connectionspec = createSpec(18525); + + cloud::config::SlobroksConfigBuilder srv2Builder; + srv2Builder.slobrok.resize(2); + srv2Builder.slobrok[0].connectionspec = createSpec(18524); + srv2Builder.slobrok[1].connectionspec = createSpec(18525); + + set.addBuilder("server1", &srv1Builder); + set.addBuilder("server2", &srv2Builder); + + cloud::config::SlobroksConfigBuilder cli1Builder; + cli1Builder.slobrok.resize(1); + cli1Builder.slobrok[0].connectionspec = createSpec(18524); + + cloud::config::SlobroksConfigBuilder cli2Builder; + cli2Builder.slobrok.resize(1); + cli2Builder.slobrok[0].connectionspec = createSpec(18525); + + cloud::config::SlobroksConfigBuilder cli3Builder; + cli3Builder.slobrok.resize(1); + cli3Builder.slobrok[0].connectionspec = createSpec(18524); + + set.addBuilder("client1", &cli1Builder); + set.addBuilder("client2", &cli2Builder); + set.addBuilder("client3", &cli3Builder); + + config::IConfigContext::SP cfgCtx(new config::ConfigContext(set)); + ConfigShim srvConfig1(18524, "server1", cfgCtx); + ConfigShim srvConfig2(18525, "server2", cfgCtx); + + ConfiguratorFactory cliConfig1(config::ConfigUri("client1", cfgCtx)); + ConfiguratorFactory cliConfig2(config::ConfigUri("client2", cfgCtx)); + ConfiguratorFactory cliConfig3(config::ConfigUri("client3", cfgCtx)); + + SlobrokServer serverOne(srvConfig1); + SlobrokServer serverTwo(srvConfig2); + + MirrorAPI mirror1(orb1, cliConfig3); // NB this one will be changed + MirrorAPI mirror2(orb2, cliConfig2); + + RegisterAPI reg1(orb1, cliConfig1); + RegisterAPI reg2(orb2, cliConfig2); + + orb1.Listen(18526); + orb2.Listen(18527); + std::string myspec1 = createSpec(orb1.GetListenPort()); + std::string myspec2 = createSpec(orb2.GetListenPort()); + orb1.Start(); + orb2.Start(); + + reg1.registerName("A"); + reg2.registerName("B"); + + EXPECT_TRUE(compare(mirror1, "*", SpecList() + .add("A", myspec1.c_str()) + .add("B", myspec2.c_str()))); + EXPECT_TRUE(compare(mirror2, "*", SpecList() + .add("A", myspec1.c_str()) + .add("B", myspec2.c_str()))); + + TEST_FLUSH(); + + reg1.unregisterName("A"); + reg2.unregisterName("B"); + + EXPECT_TRUE(compare(mirror1, "*", SpecList())); + EXPECT_TRUE(compare(mirror2, "*", SpecList())); + + srv1Builder.slobrok.resize(1); + srv1Builder.slobrok[0].connectionspec = createSpec(18524); + srv2Builder.slobrok.resize(1); + srv2Builder.slobrok[0].connectionspec = createSpec(18525); + cfgCtx->reload(); + + FastOS_Thread::Sleep(6000); // reconfiguration time + + reg1.registerName("A"); + reg2.registerName("B"); + + FRT_Supervisor orb3; + FRT_Supervisor orb4; + RegisterAPI reg3(orb3, cliConfig1); + RegisterAPI reg4(orb4, cliConfig2); + orb3.Listen(18528); + orb4.Listen(18529); + std::string myspec3 = createSpec(orb3.GetListenPort()); + std::string myspec4 = createSpec(orb4.GetListenPort()); + orb3.Start(); + orb4.Start(); + reg3.registerName("B"); + reg4.registerName("A"); + + EXPECT_TRUE(compare(mirror1, "*", SpecList() + .add("A", myspec1.c_str()) + .add("B", myspec3.c_str()))); + EXPECT_TRUE(compare(mirror2, "*", SpecList() + .add("A", myspec4.c_str()) + .add("B", myspec2.c_str()))); + + TEST_FLUSH(); + + // test mirror API reconfiguration + cli3Builder.slobrok.resize(1); + cli3Builder.slobrok[0].connectionspec = createSpec(18525); + cfgCtx->reload(); + + EXPECT_TRUE(compare(mirror1, "*", SpecList() + .add("A", myspec4.c_str()) + .add("B", myspec2.c_str()))); + + serverOne.stop(); + serverTwo.stop(); + + orb1.ShutDown(true); + orb2.ShutDown(true); + orb3.ShutDown(true); + orb4.ShutDown(true); + TEST_DONE(); +} diff --git a/slobrok/src/tests/configure/gencfg.cpp b/slobrok/src/tests/configure/gencfg.cpp new file mode 100644 index 00000000000..85cc8629830 --- /dev/null +++ b/slobrok/src/tests/configure/gencfg.cpp @@ -0,0 +1,16 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <vespa/vespalib/util/host_name.h> + +int +main(int, char **) +{ + printf("slobrok[2]\n"); + printf("slobrok[0].connectionspec tcp/%s:18524\n", vespalib::HostName::get().c_str()); + printf("slobrok[1].connectionspec tcp/%s:18525\n", vespalib::HostName::get().c_str()); + + return 0; +} diff --git a/slobrok/src/tests/mirrorapi/.gitignore b/slobrok/src/tests/mirrorapi/.gitignore new file mode 100644 index 00000000000..a8e84620f18 --- /dev/null +++ b/slobrok/src/tests/mirrorapi/.gitignore @@ -0,0 +1,6 @@ +.depend +Makefile +*_test +slobrok.out +slobrok_mirror_match_test_app +slobrok_mirrorapi_test_app diff --git a/slobrok/src/tests/mirrorapi/CMakeLists.txt b/slobrok/src/tests/mirrorapi/CMakeLists.txt new file mode 100644 index 00000000000..8ae5107dbcb --- /dev/null +++ b/slobrok/src/tests/mirrorapi/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_mirrorapi_test_app + SOURCES + mirrorapi.cpp + DEPENDS + slobrok + slobrok_slobrokserver +) +vespa_add_test(NAME slobrok_mirrorapi_test_app COMMAND slobrok_mirrorapi_test_app) +vespa_add_executable(slobrok_mirror_match_test_app + SOURCES + match_test.cpp + DEPENDS + slobrok +) +vespa_add_test(NAME slobrok_mirror_match_test_app COMMAND slobrok_mirror_match_test_app) diff --git a/slobrok/src/tests/mirrorapi/DESC b/slobrok/src/tests/mirrorapi/DESC new file mode 100644 index 00000000000..5b7a0ac4677 --- /dev/null +++ b/slobrok/src/tests/mirrorapi/DESC @@ -0,0 +1 @@ +mirrorapi test. Take a look at mirrorapi.cpp for details. diff --git a/slobrok/src/tests/mirrorapi/FILES b/slobrok/src/tests/mirrorapi/FILES new file mode 100644 index 00000000000..a46f09cf2ac --- /dev/null +++ b/slobrok/src/tests/mirrorapi/FILES @@ -0,0 +1 @@ +mirrorapi.cpp diff --git a/slobrok/src/tests/mirrorapi/match_test.cpp b/slobrok/src/tests/mirrorapi/match_test.cpp new file mode 100644 index 00000000000..874c39a6d3f --- /dev/null +++ b/slobrok/src/tests/mirrorapi/match_test.cpp @@ -0,0 +1,80 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/slobrok/sbmirror.h> + +class MatchTester : public slobrok::api::IMirrorAPI +{ + virtual SpecList lookup(const std::string &) const override { + return SpecList(); + } + virtual uint32_t updates() const override { return 0; } + + const std::string name; + + void testMatch(const char *n, const char *p, bool expected) + { + TEST_STATE(n); + TEST_STATE(p); + EXPECT_EQUAL(expected, match(n, p)); + } + +public: + MatchTester(const std::string &n) : name(n) {} + + void mustMatch(const std::string &pattern) { + testMatch(name.c_str(), pattern.c_str(), true); + } + void mustNotMatch(const std::string &pattern) { + testMatch(name.c_str(), pattern.c_str(), false); + } +}; + + +TEST("require that pattern matches same string") { + std::string pattern = "foo/bar*zot/qux?foo**bar*/*nop*"; + MatchTester name(pattern); + name.mustMatch(pattern); +} + +TEST("require that star is prefix match") { + MatchTester name("foo/bar.foo/qux.bar/bar123/nop000"); + name.mustMatch("foo/bar.*/qux.*/bar*/nop*"); +} + +TEST("require that star matches empty string") { + MatchTester name("foo/bar./qux./bar/nop"); + name.mustMatch("foo/bar.*/qux.*/bar*/nop*"); +} + +TEST("require that extra char before slash does not match") { + MatchTester name("foo1/bar"); + name.mustNotMatch("foo/*"); +} + +TEST("require that star does not match multiple levels") { + MatchTester name1("foo/bar/qux"); + MatchTester name2("foo/bar/bar/qux"); + name1.mustMatch("foo/*/qux"); + name2.mustNotMatch("foo/*/qux"); +} + +TEST("require that double star matches multiple levels") { + MatchTester name("foo/bar.foo/qux.bar/bar123/nop000"); + name.mustMatch("**"); + name.mustMatch("f**"); + name.mustMatch("foo**"); + name.mustMatch("foo/**"); + name.mustMatch("foo*/**"); +} + +TEST("require that double star matches nothing") { + MatchTester name("A"); + name.mustMatch("A**"); +} + +TEST("require that double star eats rest of name") { + MatchTester name("foo/bar/baz/suffix"); + name.mustNotMatch("foo/**/suffix"); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp new file mode 100644 index 00000000000..4370931b785 --- /dev/null +++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp @@ -0,0 +1,226 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("mirrorapi_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/config-slobroks.h> +#include <algorithm> +#include <vespa/slobrok/server/slobrokserver.h> + +using slobrok::api::MirrorAPI; +using slobrok::SlobrokServer; + +TEST_SETUP(Test); + +//----------------------------------------------------------------------------- + +class Server : public FRT_Invokable +{ +private: + FRT_Supervisor _orb; + std::string _name; + std::string _slobrokSpec; + +public: + Server(std::string name, int port, std::string slobrokSpec); + ~Server(); + void rpc_listNamesServed(FRT_RPCRequest *req); + void reg(); +}; + + +Server::Server(std::string name, int port, std::string slobrokSpec) + : _orb(), + _name(name), + _slobrokSpec(slobrokSpec) +{ + { + FRT_ReflectionBuilder rb(&_orb); + //--------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(Server::rpc_listNamesServed), this); + rb.MethodDesc("Look up a rpcserver"); + rb.ReturnDesc("names", "The rpcserver names on this server"); + //--------------------------------------------------------------------- + } + _orb.Listen(port); + _orb.Start(); +} + + +void +Server::reg() +{ + char spec[64]; + sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort()); + + FRT_RPCRequest *req = _orb.AllocRPCRequest(); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString(_name.c_str()); + req->GetParams()->AddString(spec); + + FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str()); + sb->InvokeSync(req, 5.0); + sb->SubRef(); + req->SubRef(); +} + + +void +Server::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(1); + dst.SetString(&names[0], _name.c_str()); +} + + +Server::~Server() +{ + _orb.ShutDown(true); +} + +//----------------------------------------------------------------------------- + +struct SpecList +{ + MirrorAPI::SpecList _specList; + SpecList() : _specList() {} + SpecList(MirrorAPI::SpecList input) : _specList(input) {} + SpecList &add(const char *name, const char *spec) { + _specList.push_back(make_pair(std::string(name), + std::string(spec))); + return *this; + } + void sort() { + std::sort(_specList.begin(), _specList.end()); + } + bool operator==(SpecList &rhs) { // NB: MUTATE! + sort(); + rhs.sort(); + return _specList == rhs._specList; + } +}; + + +bool +compare(MirrorAPI &api, const char *pattern, SpecList expect) +{ + for (int i = 0; i < 250; ++i) { + SpecList actual(api.lookup(pattern)); + if (actual == expect) { + return true; + } + FastOS_Thread::Sleep(100); + } + return false; +} + + +int +Test::Main() +{ + TEST_INIT("mirrorapi_test"); + + SlobrokServer mock(18501); + FastOS_Thread::Sleep(300); + + Server a("A/x/w", 18502, "tcp/localhost:18501"); + Server b("B/x", 18503, "tcp/localhost:18501"); + Server c("C/x/z", 18504, "tcp/localhost:18501"); + Server d("D/y/z", 18505, "tcp/localhost:18501"); + Server e("E/y", 18506, "tcp/localhost:18501"); + Server f("F/y/w", 18507, "tcp/localhost:18501"); + + cloud::config::SlobroksConfigBuilder specBuilder; + cloud::config::SlobroksConfig::Slobrok slobrok; + slobrok.connectionspec = "tcp/localhost:18501"; + specBuilder.slobrok.push_back(slobrok); + FRT_Supervisor orb; + MirrorAPI mirror(orb, config::ConfigUri::createFromInstance(specBuilder)); + EXPECT_TRUE(!mirror.ready()); + orb.Start(); + FastOS_Thread::Sleep(1000); + + a.reg(); + EXPECT_TRUE(compare(mirror, "A/x/w", SpecList().add("A/x/w", "tcp/localhost:18502"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList())); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", "tcp/localhost:18502"))); + EXPECT_TRUE(compare(mirror, "*/*/w*", SpecList().add("A/x/w", "tcp/localhost:18502"))); + EXPECT_TRUE(compare(mirror, "A**", SpecList().add("A/x/w", "tcp/localhost:18502"))); + EXPECT_TRUE(compare(mirror, "**", SpecList().add("A/x/w", "tcp/localhost:18502"))); + EXPECT_TRUE(mirror.ready()); + + TEST_FLUSH(); + b.reg(); + EXPECT_TRUE(compare(mirror, "B/x", SpecList().add("B/x", "tcp/localhost:18503"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18503"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", "tcp/localhost:18502"))); + + TEST_FLUSH(); + c.reg(); + EXPECT_TRUE(compare(mirror, "C/x/z", SpecList().add("C/x/z", "tcp/localhost:18504"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18503"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18502") + .add("C/x/z", "tcp/localhost:18504"))); + + TEST_FLUSH(); + d.reg(); + EXPECT_TRUE(compare(mirror, "D/y/z", SpecList().add("D/y/z", "tcp/localhost:18505"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18503"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18502") + .add("C/x/z", "tcp/localhost:18504") + .add("D/y/z", "tcp/localhost:18505"))); + + TEST_FLUSH(); + e.reg(); + EXPECT_TRUE(compare(mirror, "E/y", SpecList().add("E/y", "tcp/localhost:18506"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", "tcp/localhost:18503") + .add("E/y", "tcp/localhost:18506"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18502") + .add("C/x/z", "tcp/localhost:18504") + .add("D/y/z", "tcp/localhost:18505"))); + + TEST_FLUSH(); + f.reg(); + EXPECT_TRUE(compare(mirror, "F/y/w", SpecList().add("F/y/w", "tcp/localhost:18507"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", "tcp/localhost:18503") + .add("E/y", "tcp/localhost:18506"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18502") + .add("C/x/z", "tcp/localhost:18504") + .add("D/y/z", "tcp/localhost:18505") + .add("F/y/w", "tcp/localhost:18507"))); + + + EXPECT_TRUE(compare(mirror, "*", SpecList())); + + EXPECT_TRUE(compare(mirror, "B/*", SpecList() + .add("B/x", "tcp/localhost:18503"))); + + EXPECT_TRUE(compare(mirror, "*/y", SpecList() + .add("E/y", "tcp/localhost:18506"))); + + EXPECT_TRUE(compare(mirror, "*/x/*", SpecList() + .add("A/x/w", "tcp/localhost:18502") + .add("C/x/z", "tcp/localhost:18504"))); + + EXPECT_TRUE(compare(mirror, "*/*/z", SpecList() + .add("C/x/z", "tcp/localhost:18504") + .add("D/y/z", "tcp/localhost:18505"))); + + EXPECT_TRUE(compare(mirror, "A/*/z", SpecList())); + + EXPECT_TRUE(compare(mirror, "A/*/w", SpecList() + .add("A/x/w", "tcp/localhost:18502"))); + + mock.stop(); + orb.ShutDown(true); + TEST_DONE(); +} diff --git a/slobrok/src/tests/multi/.gitignore b/slobrok/src/tests/multi/.gitignore new file mode 100644 index 00000000000..a8bc1f97275 --- /dev/null +++ b/slobrok/src/tests/multi/.gitignore @@ -0,0 +1,5 @@ +7.cfg +Makefile +multi_test +slobrok*.out +slobrok_multi_test_app diff --git a/slobrok/src/tests/multi/CMakeLists.txt b/slobrok/src/tests/multi/CMakeLists.txt new file mode 100644 index 00000000000..a133cd79a29 --- /dev/null +++ b/slobrok/src/tests/multi/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_multi_test_app + SOURCES + multi.cpp + DEPENDS +) +vespa_add_test(NAME slobrok_multi_test_app NO_VALGRIND COMMAND sh multi_test.sh) diff --git a/slobrok/src/tests/multi/DESC b/slobrok/src/tests/multi/DESC new file mode 100644 index 00000000000..d38625a647b --- /dev/null +++ b/slobrok/src/tests/multi/DESC @@ -0,0 +1 @@ +multi-slobrok test. Take a look at multi.cpp for details. diff --git a/slobrok/src/tests/multi/FILES b/slobrok/src/tests/multi/FILES new file mode 100644 index 00000000000..bbb1480c9ec --- /dev/null +++ b/slobrok/src/tests/multi/FILES @@ -0,0 +1 @@ +multi.cpp diff --git a/slobrok/src/tests/multi/multi.cpp b/slobrok/src/tests/multi/multi.cpp new file mode 100644 index 00000000000..9a8a7eb41e0 --- /dev/null +++ b/slobrok/src/tests/multi/multi.cpp @@ -0,0 +1,369 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/fnet/frt/frt.h> +#include <string> + +//----------------------------------------------------------------------------- + +class Server : public FRT_Invokable +{ +private: + FRT_Supervisor _orb; + std::string _name; + +public: + Server(std::string name, int port); + ~Server(); + void rpc_listNamesServed(FRT_RPCRequest *req); +}; + + +Server::Server(std::string name, int port) + : _orb(), + _name(name) +{ + { + FRT_ReflectionBuilder rb(&_orb); + //--------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(Server::rpc_listNamesServed), this); + rb.MethodDesc("Look up a rpcserver"); + rb.ReturnDesc("names", "The rpcserver names on this server"); + //--------------------------------------------------------------------- + } + _orb.Listen(port); + _orb.Start(); +} + + +void +Server::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(1); + dst.SetString(&names[0], _name.c_str()); +} + + +Server::~Server() +{ + _orb.ShutDown(true); +} + +//----------------------------------------------------------------------------- + +TEST("multi") { + FRT_Supervisor orb; + orb.Start(); + + FRT_Target *sb = orb.GetTarget(18511); + FRT_RPCRequest *req = NULL; + + // test ping against slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("frt.rpc.ping"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup '*' on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // check managed servers on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.internal.listManagedRpcServers"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + Server a("A", 18518); + + // register server A + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18518"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup '*' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18518") == 0); + + // lookup 'A' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18518") == 0); + + // lookup 'B' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("B"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // lookup '*/*' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*/*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + { + Server b("B", 18519); + + // register server B as 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("C"); + req->GetParams()->AddString("tcp/localhost:18519"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + + // register server B + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("B"); + req->GetParams()->AddString("tcp/localhost:18519"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + { + Server a2("A", 18520); + + // register server A(2) + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18520"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + } + + // lookup '*' should give 'AB | BA' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 2); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 2); + { + FRT_StringValue *name = req->GetReturn()->GetValue(0)._string_array._pt; + FRT_StringValue *spec = req->GetReturn()->GetValue(1)._string_array._pt; + if (strcmp(name[0]._str, "A") == 0) { + ASSERT_TRUE(strcmp(name[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(name[1]._str, "B") == 0); + ASSERT_TRUE(strcmp(spec[0]._str, "tcp/localhost:18518") == 0); + ASSERT_TRUE(strcmp(spec[1]._str, "tcp/localhost:18519") == 0); + } else { + ASSERT_TRUE(strcmp(name[1]._str, "A") == 0); + ASSERT_TRUE(strcmp(name[0]._str, "B") == 0); + ASSERT_TRUE(strcmp(spec[1]._str, "tcp/localhost:18518") == 0); + ASSERT_TRUE(strcmp(spec[0]._str, "tcp/localhost:18519") == 0); + } + } + } + + FastOS_Thread::Sleep(2000); + + // lookup 'B' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("B"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // unregister server A (wrong spec) + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18519"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + + // lookup 'A' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18518") == 0); + + // unregister server A + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18518"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup 'A' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // lookup '*' on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // unregister server A on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18518"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + + FRT_Target *sb1 = orb.GetTarget(18512); + FRT_Target *sb2 = orb.GetTarget(18513); + FRT_Target *sb3 = orb.GetTarget(18514); + FRT_Target *sb4 = orb.GetTarget(18515); + FRT_Target *sb5 = orb.GetTarget(18516); + FRT_Target *sb6 = orb.GetTarget(18517); + + // register server A + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18518"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + + Server c("C", 18521); + Server d("D", 18522); + + for (int i=0; i < 150; i++) { + // register server C + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("C"); + req->GetParams()->AddString("tcp/localhost:18521"); + sb1->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // register server D + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("D"); + req->GetParams()->AddString("tcp/localhost:18522"); + sb2->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup 'C' should give 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("C"); + sb3->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "C") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18521") == 0); + + // lookup 'C' should give 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("C"); + sb4->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "C") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18521") == 0); + + // lookup 'C' should give 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("C"); + sb5->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "C") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18521") == 0); + + // lookup 'C' should give 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("C"); + sb6->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "C") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18521") == 0); + + FastOS_Thread::Sleep(200); + + // lookup 'D' should give 'D' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("D"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "D") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18522") == 0); + } + + orb.ShutDown(true); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/slobrok/src/tests/multi/multi_test.sh b/slobrok/src/tests/multi/multi_test.sh new file mode 100755 index 00000000000..8f5805be592 --- /dev/null +++ b/slobrok/src/tests/multi/multi_test.sh @@ -0,0 +1,5 @@ +#/bin/bash +./start.sh +sleep 2 +./slobrok_multi_test_app || (./stop.sh; false) +./stop.sh diff --git a/slobrok/src/tests/multi/start.sh b/slobrok/src/tests/multi/start.sh new file mode 100755 index 00000000000..81e0bb93269 --- /dev/null +++ b/slobrok/src/tests/multi/start.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +export VESPA_LOG_LEVEL='all -spam' + +sed s=localhost=`hostname`= < template-7.cfg > 7.cfg + +../../apps/slobrok/slobrok -c file:7.cfg -p 18511 > slobrok0.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18512 > slobrok1.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18513 > slobrok2.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18514 > slobrok3.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18515 > slobrok4.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18516 > slobrok5.out 2>&1 & +echo $! >> pids.txt +../../apps/slobrok/slobrok -c file:7.cfg -p 18517 > slobrok6.out 2>&1 & +echo $! >> pids.txt diff --git a/slobrok/src/tests/multi/stop.sh b/slobrok/src/tests/multi/stop.sh new file mode 100755 index 00000000000..adf351b03e1 --- /dev/null +++ b/slobrok/src/tests/multi/stop.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +ok=true + +../../apps/sbcmd/sbcmd 18511 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18512 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18513 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18514 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18515 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18516 slobrok.system.stop || ok=false +../../apps/sbcmd/sbcmd 18517 slobrok.system.stop || ok=false + +sleep 2 + +for x in `cat pids.txt`; do + kill $x 2>/dev/null && ok=false +done +rm -f pids.txt + +$ok diff --git a/slobrok/src/tests/multi/template-7.cfg b/slobrok/src/tests/multi/template-7.cfg new file mode 100644 index 00000000000..6769494a51c --- /dev/null +++ b/slobrok/src/tests/multi/template-7.cfg @@ -0,0 +1,8 @@ +slobrok[7] +slobrok[0].connectionspec tcp/localhost:18511 +slobrok[1].connectionspec tcp/localhost:18512 +slobrok[2].connectionspec tcp/localhost:18513 +slobrok[3].connectionspec tcp/localhost:18514 +slobrok[4].connectionspec tcp/localhost:18515 +slobrok[5].connectionspec tcp/localhost:18516 +slobrok[6].connectionspec tcp/localhost:18517 diff --git a/slobrok/src/tests/oldapi/.gitignore b/slobrok/src/tests/oldapi/.gitignore new file mode 100644 index 00000000000..9eea175edb0 --- /dev/null +++ b/slobrok/src/tests/oldapi/.gitignore @@ -0,0 +1,4 @@ +/.depend +/Makefile +/oldapi_test +slobrok_oldapi_test_app diff --git a/slobrok/src/tests/oldapi/CMakeLists.txt b/slobrok/src/tests/oldapi/CMakeLists.txt new file mode 100644 index 00000000000..b4def919688 --- /dev/null +++ b/slobrok/src/tests/oldapi/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_oldapi_test_app + SOURCES + old.cpp + mirror.cpp + DEPENDS + slobrok + slobrok_slobrokserver +) +vespa_add_test(NAME slobrok_oldapi_test_app COMMAND slobrok_oldapi_test_app) diff --git a/slobrok/src/tests/oldapi/mirror.cpp b/slobrok/src/tests/oldapi/mirror.cpp new file mode 100644 index 00000000000..22304091395 --- /dev/null +++ b/slobrok/src/tests/oldapi/mirror.cpp @@ -0,0 +1,175 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.mirror"); +#include <vespa/fastos/fastos.h> +#include <vespa/fnet/frt/frt.h> +#include "mirror.h" +#include <memory> + +namespace slobrok { +namespace api { + + +MirrorOld::MirrorOld(FRT_Supervisor &orb, const std::vector<std::string> &slobroks) + : FNET_Task(orb.GetScheduler()), + _orb(orb), + _lock(), + _reqDone(false), + _specs(), + _specsGen(), + _updates(), + _slobrokspecs(), + _idx(0), + _backOff(), + _target(0), + _req(0) +{ + _slobrokspecs = slobroks; + for (uint32_t i = 0; i < slobroks.size(); ++i) { // randomize order + uint32_t x = random() % slobroks.size(); + if (x != i) { + std::swap(_slobrokspecs[i], _slobrokspecs[x]); + } + } + if (_slobrokspecs.size() <= 0) { + LOG(error, "no service location brokers!"); + } + ScheduleNow(); +} + + +MirrorOld::~MirrorOld() +{ + Kill(); + if (_req != 0) { + _req->Abort(); + _req->SubRef(); + } + if (_target != 0) { + _target->SubRef(); + } +} + + +MirrorOld::SpecList +MirrorOld::lookup(const std::string & pattern) const +{ + SpecList ret; + _lock.Lock(); + SpecList::const_iterator end = _specs.end(); + for (SpecList::const_iterator it = _specs.begin(); it != end; ++it) { + if (match(it->first.c_str(), pattern.c_str())) { + ret.push_back(*it); + } + } + _lock.Unlock(); + return ret; +} + + +bool +IMirrorOld::match(const char *name, const char *pattern) +{ + LOG_ASSERT(name != NULL); + LOG_ASSERT(pattern != NULL); + while (*pattern != '\0') { + if (*name == *pattern) { + ++name; + ++pattern; + } else if (*pattern == '*') { + ++pattern; + while (*name != '/' && *name != '\0') { + ++name; + } + } else { + return false; + } + } + return (*name == *pattern); +} + + +void +MirrorOld::PerformTask() +{ + if (_reqDone) { + _reqDone = false; + if (_req->IsError() + || strcmp(_req->GetReturnSpec(), "SSi") != 0 + || (_req->GetReturn()->GetValue(0)._string_array._len != + _req->GetReturn()->GetValue(1)._string_array._len)) + { + if (_target != 0) { + _target->SubRef(); + } + _target = 0; + ScheduleNow(); // try next slobrok + return; + } + + FRT_Values &answer = *(_req->GetReturn()); + + if (_specsGen != answer[2]._intval32) { + SpecList specs; + uint32_t numNames = answer[0]._string_array._len; + FRT_StringValue *n = answer[0]._string_array._pt; + FRT_StringValue *s = answer[1]._string_array._pt; + + for (uint32_t idx = 0; idx < numNames; idx++) { + specs.push_back(std::make_pair(std::string(n[idx]._str), + std::string(s[idx]._str))); + } + + _lock.Lock(); + std::swap(specs, _specs); + _updates.add(); + _lock.Unlock(); + _specsGen.setFromInt(answer[2]._intval32); + } + _backOff.reset(); + Schedule(0.1); // be nice + return; + } + if (_target == 0) { + if (_idx >= _slobrokspecs.size()) { + _idx = 0; + double delay = _backOff.get(); + Schedule(delay); + if (_slobrokspecs.size() < 1) { + // we already logged an error for this + return; + } + if (_backOff.shouldWarn()) { + std::string cps = _slobrokspecs[0]; + for (size_t ss = 1; ss < _slobrokspecs.size(); ++ss) { + cps += " or at "; + cps += _slobrokspecs[ss]; + } + LOG(warning, "cannot connect to location broker at %s " + "(retry in %f seconds)", cps.c_str(), delay); + } + return; + } + _target = _orb.GetTarget(_slobrokspecs[_idx++].c_str()); + LOG_ASSERT(_target != 0); // just in case (tm) + _specsGen.reset(); + } + _req = _orb.AllocRPCRequest(_req); + _req->SetMethodName("slobrok.mirror.fetch"); + _req->GetParams()->AddInt32(_specsGen.getAsInt()); // gencnt + _req->GetParams()->AddInt32(5000); // mstimeout + _target->InvokeAsync(_req, 40.0, this); +} + + +void +MirrorOld::RequestDone(FRT_RPCRequest *req) +{ + LOG_ASSERT(req == _req && !_reqDone); + (void) req; + _reqDone = true; + ScheduleNow(); +} + +} // namespace api +} // namespace slobrok diff --git a/slobrok/src/tests/oldapi/mirror.h b/slobrok/src/tests/oldapi/mirror.h new file mode 100644 index 00000000000..4bd100f5b5b --- /dev/null +++ b/slobrok/src/tests/oldapi/mirror.h @@ -0,0 +1,135 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#ifndef Old_MIRROR_H +#define Old_MIRROR_H + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/util/gencnt.h> +#include <vespa/slobrok/backoff.h> + +namespace slobrok { +namespace api { + +/** + * @brief Defines an interface for the name server lookup. + **/ +class IMirrorOld { +protected: + static bool match(const char *name, const char *pattern); + +public: + /** + * @brief Release any allocated resources. + **/ + virtual ~IMirrorOld() { } + + /** + * @brief vector of <name, connectionspec> pairs. + * + * The first element of each pair is a string containing the + * service name. The second is the connection spec, typically + * "tcp/foo.bar.com:42" + **/ + typedef std::vector< std::pair<std::string, std::string> > SpecList; + + /** + * Obtain all the services matching a given pattern. + * + * The pattern is matched against all service names in the local + * mirror repository. A service name may contain '/' as a + * separator token. A pattern may contain '*' to match anything up + * to the next '/' (or the end of the name). This means that the + * pattern 'foo/<!-- slash-star -->*<!-- star-slash -->/baz' would + * match the service names 'foo/bar/baz' and 'foo/xyz/baz'. The + * pattern 'foo/b*' would match 'foo/bar', but neither 'foo/xyz' + * nor 'foo/bar/baz'. The pattern 'a*b' will never match anything. + * + * @return a list of all matching services, with corresponding connect specs + * @param pattern The pattern used for matching + **/ + virtual SpecList lookup(const std::string & pattern) const = 0; + + /** + * Obtain the number of updates seen by this mirror. The value may + * wrap, but will never become 0 again. This can be used for name + * lookup optimization, because the results returned by lookup() + * will never change unless this number also changes. + * + * @return number of slobrok updates seen + **/ + virtual uint32_t updates() const = 0; +}; + +/** + * @brief A MirrorOld object is used to keep track of the services + * registered with a slobrok cluster. + * + * Updates to the service repository are fetched in the + * background. Lookups against this object is done using an internal + * mirror of the service repository. + **/ +class MirrorOld : public FNET_Task, + public FRT_IRequestWait, + public IMirrorOld +{ +public: + /** + * @brief Create a new MirrorOld using the given Supervisor and slobrok + * connect specs. + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + **/ + MirrorOld(FRT_Supervisor &orb, const std::vector<std::string> &slobroks); + + /** + * @brief Clean up. + **/ + ~MirrorOld(); + + // Inherit doc from IMirrorOld. + SpecList lookup(const std::string & pattern) const; + + // Inherit doc from IMirrorOld. + uint32_t updates() const { return _updates.getAsInt(); } + + /** + * @brief Ask if the MirrorOld has got any useful information from + * the Slobrok + * + * On application startup it is often useful to run the event loop + * for some time until this functions returns true (or if it never + * does, time out and tell the user there was no answer from any + * Service Location Broker). + * + * @return true if the MirrorOld object has + * asked for updates from a Slobrok and got any answer back + **/ + bool ready() const { return _updates.getAsInt() != 0; } + +private: + MirrorOld(const MirrorOld &); + MirrorOld &operator=(const MirrorOld &); + + /** from FNET_Task, polls slobrok **/ + void PerformTask(); + + /** from FRT_IRequestWait **/ + void RequestDone(FRT_RPCRequest *req); + + FRT_Supervisor &_orb; + mutable FastOS_Mutex _lock; + bool _reqDone; + SpecList _specs; + vespalib::GenCnt _specsGen; + vespalib::GenCnt _updates; + std::vector<std::string> _slobrokspecs; + uint32_t _idx; + BackOff _backOff; + FRT_Target *_target; + FRT_RPCRequest *_req; +}; + +} // namespace api +} // namespace slobrok + +#endif diff --git a/slobrok/src/tests/oldapi/old.cpp b/slobrok/src/tests/oldapi/old.cpp new file mode 100644 index 00000000000..8b620cda3b2 --- /dev/null +++ b/slobrok/src/tests/oldapi/old.cpp @@ -0,0 +1,221 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("mirrorapi_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include "mirror.h" +#include <vespa/config-slobroks.h> +#include <algorithm> +#include <vespa/slobrok/server/slobrokserver.h> + +using slobrok::api::MirrorOld; +using slobrok::SlobrokServer; + +TEST_SETUP(Test); + +//----------------------------------------------------------------------------- + +class Server : public FRT_Invokable +{ +private: + FRT_Supervisor _orb; + std::string _name; + std::string _slobrokSpec; + +public: + Server(std::string name, int port, std::string slobrokSpec); + ~Server(); + void rpc_listNamesServed(FRT_RPCRequest *req); + void reg(); +}; + + +Server::Server(std::string name, int port, std::string slobrokSpec) + : _orb(), + _name(name), + _slobrokSpec(slobrokSpec) +{ + { + FRT_ReflectionBuilder rb(&_orb); + //--------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(Server::rpc_listNamesServed), this); + rb.MethodDesc("Look up a rpcserver"); + rb.ReturnDesc("names", "The rpcserver names on this server"); + //--------------------------------------------------------------------- + } + _orb.Listen(port); + _orb.Start(); +} + + +void +Server::reg() +{ + char spec[64]; + sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort()); + + FRT_RPCRequest *req = _orb.AllocRPCRequest(); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString(_name.c_str()); + req->GetParams()->AddString(spec); + + FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str()); + sb->InvokeSync(req, 5.0); + sb->SubRef(); + req->SubRef(); +} + + +void +Server::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(1); + dst.SetString(&names[0], _name.c_str()); +} + + +Server::~Server() +{ + _orb.ShutDown(true); +} + +//----------------------------------------------------------------------------- + +struct SpecList +{ + MirrorOld::SpecList _specList; + SpecList() : _specList() {} + SpecList(MirrorOld::SpecList input) : _specList(input) {} + SpecList &add(const char *name, const char *spec) { + _specList.push_back(make_pair(std::string(name), + std::string(spec))); + return *this; + } + void sort() { + std::sort(_specList.begin(), _specList.end()); + } + bool operator==(SpecList &rhs) { // NB: MUTATE! + sort(); + rhs.sort(); + return _specList == rhs._specList; + } +}; + + +bool +compare(MirrorOld &api, const char *pattern, SpecList expect) +{ + for (int i = 0; i < 250; ++i) { + SpecList actual(api.lookup(pattern)); + if (actual == expect) { + return true; + } + FastOS_Thread::Sleep(100); + } + return false; +} + + +int +Test::Main() +{ + TEST_INIT("mirrorapi_test"); + + SlobrokServer mock(18531); + FastOS_Thread::Sleep(300); + + Server a("A/x/w", 18532, "tcp/localhost:18531"); + Server b("B/x", 18533, "tcp/localhost:18531"); + Server c("C/x/z", 18534, "tcp/localhost:18531"); + Server d("D/y/z", 18535, "tcp/localhost:18531"); + Server e("E/y", 18536, "tcp/localhost:18531"); + Server f("F/y/w", 18537, "tcp/localhost:18531"); + + std::vector<std::string> slobrokSpecs; + slobrokSpecs.push_back("tcp/localhost:18531"); + FRT_Supervisor orb; + MirrorOld mirror(orb, slobrokSpecs); + EXPECT_TRUE(!mirror.ready()); + orb.Start(); + FastOS_Thread::Sleep(1000); + + a.reg(); + EXPECT_TRUE(compare(mirror, "A/x/w", SpecList().add("A/x/w", "tcp/localhost:18532"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList())); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", "tcp/localhost:18532"))); + EXPECT_TRUE(mirror.ready()); + + TEST_FLUSH(); + b.reg(); + EXPECT_TRUE(compare(mirror, "B/x", SpecList().add("B/x", "tcp/localhost:18533"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18533"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", "tcp/localhost:18532"))); + + TEST_FLUSH(); + c.reg(); + EXPECT_TRUE(compare(mirror, "C/x/z", SpecList().add("C/x/z", "tcp/localhost:18534"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18533"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18532") + .add("C/x/z", "tcp/localhost:18534"))); + + TEST_FLUSH(); + d.reg(); + EXPECT_TRUE(compare(mirror, "D/y/z", SpecList().add("D/y/z", "tcp/localhost:18535"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", "tcp/localhost:18533"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18532") + .add("C/x/z", "tcp/localhost:18534") + .add("D/y/z", "tcp/localhost:18535"))); + + TEST_FLUSH(); + e.reg(); + EXPECT_TRUE(compare(mirror, "E/y", SpecList().add("E/y", "tcp/localhost:18536"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", "tcp/localhost:18533") + .add("E/y", "tcp/localhost:18536"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18532") + .add("C/x/z", "tcp/localhost:18534") + .add("D/y/z", "tcp/localhost:18535"))); + + TEST_FLUSH(); + f.reg(); + EXPECT_TRUE(compare(mirror, "F/y/w", SpecList().add("F/y/w", "tcp/localhost:18537"))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", "tcp/localhost:18533") + .add("E/y", "tcp/localhost:18536"))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", "tcp/localhost:18532") + .add("C/x/z", "tcp/localhost:18534") + .add("D/y/z", "tcp/localhost:18535") + .add("F/y/w", "tcp/localhost:18537"))); + + + EXPECT_TRUE(compare(mirror, "*", SpecList())); + + EXPECT_TRUE(compare(mirror, "B/*", SpecList() + .add("B/x", "tcp/localhost:18533"))); + + EXPECT_TRUE(compare(mirror, "*/y", SpecList() + .add("E/y", "tcp/localhost:18536"))); + + EXPECT_TRUE(compare(mirror, "*/x/*", SpecList() + .add("A/x/w", "tcp/localhost:18532") + .add("C/x/z", "tcp/localhost:18534"))); + + EXPECT_TRUE(compare(mirror, "*/*/z", SpecList() + .add("C/x/z", "tcp/localhost:18534") + .add("D/y/z", "tcp/localhost:18535"))); + + EXPECT_TRUE(compare(mirror, "A/*/z", SpecList())); + + EXPECT_TRUE(compare(mirror, "A/*/w", SpecList() + .add("A/x/w", "tcp/localhost:18532"))); + + mock.stop(); + orb.ShutDown(true); + TEST_DONE(); +} diff --git a/slobrok/src/tests/registerapi/.gitignore b/slobrok/src/tests/registerapi/.gitignore new file mode 100644 index 00000000000..869419c2766 --- /dev/null +++ b/slobrok/src/tests/registerapi/.gitignore @@ -0,0 +1,5 @@ +.depend +Makefile +registerapi_test +slobrok.out +slobrok_registerapi_test_app diff --git a/slobrok/src/tests/registerapi/CMakeLists.txt b/slobrok/src/tests/registerapi/CMakeLists.txt new file mode 100644 index 00000000000..de4f84cbf9f --- /dev/null +++ b/slobrok/src/tests/registerapi/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_registerapi_test_app + SOURCES + registerapi.cpp + DEPENDS + slobrok + slobrok_slobrokserver +) +vespa_add_test(NAME slobrok_registerapi_test_app COMMAND slobrok_registerapi_test_app) diff --git a/slobrok/src/tests/registerapi/DESC b/slobrok/src/tests/registerapi/DESC new file mode 100644 index 00000000000..a448754c118 --- /dev/null +++ b/slobrok/src/tests/registerapi/DESC @@ -0,0 +1 @@ +registerapi test. Take a look at registerapi.cpp for details. diff --git a/slobrok/src/tests/registerapi/FILES b/slobrok/src/tests/registerapi/FILES new file mode 100644 index 00000000000..d04c46a17ee --- /dev/null +++ b/slobrok/src/tests/registerapi/FILES @@ -0,0 +1 @@ +registerapi.cpp diff --git a/slobrok/src/tests/registerapi/registerapi.cpp b/slobrok/src/tests/registerapi/registerapi.cpp new file mode 100644 index 00000000000..8e4cb2404f9 --- /dev/null +++ b/slobrok/src/tests/registerapi/registerapi.cpp @@ -0,0 +1,219 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("registerapi_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/host_name.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/slobrok/sbregister.h> +#include <vespa/slobrok/server/slobrokserver.h> +#include <sstream> +#include <algorithm> + +using slobrok::api::MirrorAPI; +using slobrok::api::RegisterAPI; +using slobrok::SlobrokServer; + + +TEST_SETUP(Test); + + +std::string +createSpec(FRT_Supervisor &orb) +{ + if (orb.GetListenPort() == 0) { + return std::string(); + } + std::ostringstream str; + str << "tcp/"; + str << vespalib::HostName::get(); + str << ":"; + str << orb.GetListenPort(); + return str.str(); +} + + +struct SpecList +{ + MirrorAPI::SpecList _specList; + SpecList() : _specList() {} + SpecList(MirrorAPI::SpecList input) : _specList(input) {} + SpecList &add(const char *name, const char *spec) { + _specList.push_back(make_pair(std::string(name), + std::string(spec))); + return *this; + } + void sort() { + std::sort(_specList.begin(), _specList.end()); + } + bool operator==(SpecList &rhs) { // NB: MUTATE! + sort(); + rhs.sort(); + return _specList == rhs._specList; + } +}; + + +bool +compare(MirrorAPI &api, const char *pattern, SpecList expect) +{ + for (int i = 0; i < 250; ++i) { + SpecList actual(api.lookup(pattern)); + if (actual == expect) { + return true; + } + FastOS_Thread::Sleep(100); + } + return false; +} + +int +Test::Main() +{ + TEST_INIT("registerapi_test"); + + SlobrokServer mock(18548); + FastOS_Thread::Sleep(300); + + cloud::config::SlobroksConfigBuilder slobrokSpecs; + cloud::config::SlobroksConfig::Slobrok sb; + sb.connectionspec = "tcp/localhost:18548"; + slobrokSpecs.slobrok.push_back(sb); + slobrok::ConfiguratorFactory config(config::ConfigUri::createFromInstance(slobrokSpecs)); + + FRT_Supervisor orb; + RegisterAPI reg(orb, config); + MirrorAPI mirror(orb, config); + orb.Listen(18549); + std::string myspec = createSpec(orb); + orb.Start(); + + reg.registerName("A/x/w"); + EXPECT_TRUE(reg.busy()); + EXPECT_TRUE(compare(mirror, "A/x/w", SpecList().add("A/x/w", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList())); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", myspec.c_str()))); + + for (int i = 0; i < 30; i++) { + if (reg.busy()) FastOS_Thread::Sleep(100); + } + EXPECT_TRUE(!reg.busy()); + + TEST_FLUSH(); + reg.registerName("B/x"); + EXPECT_TRUE(compare(mirror, "B/x", SpecList().add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList().add("A/x/w", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("C/x/z"); + EXPECT_TRUE(compare(mirror, "C/x/z", SpecList().add("C/x/z", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("D/y/z"); + EXPECT_TRUE(compare(mirror, "D/y/z", SpecList().add("D/y/z", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList().add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("E/y"); + EXPECT_TRUE(compare(mirror, "E/y", SpecList().add("E/y", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()) + .add("E/y", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("F/y/w"); + EXPECT_TRUE(compare(mirror, "F/y/w", SpecList().add("F/y/w", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()) + .add("E/y", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()) + .add("F/y/w", myspec.c_str()))); + + + EXPECT_TRUE(compare(mirror, "*", SpecList())); + + EXPECT_TRUE(compare(mirror, "B/*", SpecList() + .add("B/x", myspec.c_str()))); + + EXPECT_TRUE(compare(mirror, "*/y", SpecList() + .add("E/y", myspec.c_str()))); + + EXPECT_TRUE(compare(mirror, "*/x/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()))); + + EXPECT_TRUE(compare(mirror, "*/*/z", SpecList() + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()))); + + EXPECT_TRUE(compare(mirror, "A/*/z", SpecList())); + + EXPECT_TRUE(compare(mirror, "A/*/w", SpecList() + .add("A/x/w", myspec.c_str()))); + + TEST_FLUSH(); + reg.unregisterName("E/y"); + reg.unregisterName("C/x/z"); + reg.unregisterName("F/y/w"); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("D/y/z", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("E/y"); + reg.registerName("C/x/z"); + reg.registerName("F/y/w"); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()) + .add("E/y", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()) + .add("F/y/w", myspec.c_str()))); + + TEST_FLUSH(); + reg.unregisterName("E/y"); + reg.unregisterName("C/x/z"); + reg.unregisterName("F/y/w"); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("D/y/z", myspec.c_str()))); + + TEST_FLUSH(); + reg.registerName("E/y"); + reg.registerName("C/x/z"); + reg.registerName("F/y/w"); + EXPECT_TRUE(compare(mirror, "*/*", SpecList() + .add("B/x", myspec.c_str()) + .add("E/y", myspec.c_str()))); + EXPECT_TRUE(compare(mirror, "*/*/*", SpecList() + .add("A/x/w", myspec.c_str()) + .add("C/x/z", myspec.c_str()) + .add("D/y/z", myspec.c_str()) + .add("F/y/w", myspec.c_str()))); + + mock.stop(); + orb.ShutDown(true); + TEST_DONE(); +} diff --git a/slobrok/src/tests/standalone/.gitignore b/slobrok/src/tests/standalone/.gitignore new file mode 100644 index 00000000000..233aab0e8e8 --- /dev/null +++ b/slobrok/src/tests/standalone/.gitignore @@ -0,0 +1,5 @@ +.depend +Makefile +slobrok.out +standalone_test +slobrok_standalone_test_app diff --git a/slobrok/src/tests/standalone/CMakeLists.txt b/slobrok/src/tests/standalone/CMakeLists.txt new file mode 100644 index 00000000000..60b10b9ddd0 --- /dev/null +++ b/slobrok/src/tests/standalone/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_standalone_test_app + SOURCES + standalone.cpp + DEPENDS + slobrok_slobrokserver + slobrok +) +vespa_add_test(NAME slobrok_standalone_test_app COMMAND slobrok_standalone_test_app) diff --git a/slobrok/src/tests/standalone/DESC b/slobrok/src/tests/standalone/DESC new file mode 100644 index 00000000000..7086f999d64 --- /dev/null +++ b/slobrok/src/tests/standalone/DESC @@ -0,0 +1 @@ +standalone test. Take a look at standalone.cpp for details. diff --git a/slobrok/src/tests/standalone/FILES b/slobrok/src/tests/standalone/FILES new file mode 100644 index 00000000000..e691561022f --- /dev/null +++ b/slobrok/src/tests/standalone/FILES @@ -0,0 +1 @@ +standalone.cpp diff --git a/slobrok/src/tests/standalone/standalone.cpp b/slobrok/src/tests/standalone/standalone.cpp new file mode 100644 index 00000000000..feeec4f508f --- /dev/null +++ b/slobrok/src/tests/standalone/standalone.cpp @@ -0,0 +1,278 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/fnet/frt/frt.h> +#include <vespa/slobrok/server/slobrokserver.h> +#include <string> + + +//----------------------------------------------------------------------------- + +class Server : public FRT_Invokable +{ +private: + FRT_Supervisor _orb; + std::string _name; + +public: + Server(std::string name, int port); + ~Server(); + void rpc_listNamesServed(FRT_RPCRequest *req); +}; + + +Server::Server(std::string name, int port) + : _orb(), + _name(name) +{ + { + FRT_ReflectionBuilder rb(&_orb); + //--------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(Server::rpc_listNamesServed), this); + rb.MethodDesc("Look up a rpcserver"); + rb.ReturnDesc("names", "The rpcserver names on this server"); + //--------------------------------------------------------------------- + } + _orb.Listen(port); + _orb.Start(); +} + + +void +Server::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(1); + dst.SetString(&names[0], _name.c_str()); +} + + +Server::~Server() +{ + _orb.ShutDown(true); +} + +//----------------------------------------------------------------------------- + +TEST("standalone") { + slobrok::SlobrokServer slobrokServer(18541); + FastOS_Thread::Sleep(300); + + FRT_Supervisor orb; + orb.Start(); + + FRT_Target *sb = orb.GetTarget(18541); + FRT_RPCRequest *req = NULL; + + // test ping against slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("frt.rpc.ping"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup '*' on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // check managed servers on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.internal.listManagedRpcServers"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + Server a("A", 18542); + + // register server A + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18542"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup '*' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18542") == 0); + + // lookup 'A' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18542") == 0); + + // lookup 'B' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("B"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // lookup '*/*' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*/*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + { + Server b("B", 18543); + + // register server B as 'C' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("C"); + req->GetParams()->AddString("tcp/localhost:18543"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + + // register server B + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("B"); + req->GetParams()->AddString("tcp/localhost:18543"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + { + Server a2("A", 18544); + + // register server A(2) + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18544"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + } + + // lookup '*' should give 'AB | BA' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 2); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 2); + { + FRT_StringValue *name = req->GetReturn()->GetValue(0)._string_array._pt; + FRT_StringValue *spec = req->GetReturn()->GetValue(1)._string_array._pt; + if (strcmp(name[0]._str, "A") == 0) { + ASSERT_TRUE(strcmp(name[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(name[1]._str, "B") == 0); + ASSERT_TRUE(strcmp(spec[0]._str, "tcp/localhost:18542") == 0); + ASSERT_TRUE(strcmp(spec[1]._str, "tcp/localhost:18543") == 0); + } else { + ASSERT_TRUE(strcmp(name[1]._str, "A") == 0); + ASSERT_TRUE(strcmp(name[0]._str, "B") == 0); + ASSERT_TRUE(strcmp(spec[1]._str, "tcp/localhost:18542") == 0); + ASSERT_TRUE(strcmp(spec[0]._str, "tcp/localhost:18543") == 0); + } + } + } + + FastOS_Thread::Sleep(2000); + + // lookup 'B' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("B"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // unregister server A (wrong spec) + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18543"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(req->IsError()); + + // lookup 'A' should give 'A' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 1); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 1); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(0)._string_array._pt[0]._str, "A") == 0); + ASSERT_TRUE(strcmp(req->GetReturn()->GetValue(1)._string_array._pt[0]._str, "tcp/localhost:18542") == 0); + + // unregister server A + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18542"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + // lookup 'A' should give '' + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("A"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // lookup '*' on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.lookupRpcServer"); + req->GetParams()->AddString("*"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + ASSERT_TRUE(strcmp(req->GetReturnSpec(), "SS") == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(0)._string_array._len == 0); + ASSERT_TRUE(req->GetReturn()->GetValue(1)._string_array._len == 0); + + // unregister server A on empty slobrok + req = orb.AllocRPCRequest(req); + req->SetMethodName("slobrok.unregisterRpcServer"); + req->GetParams()->AddString("A"); + req->GetParams()->AddString("tcp/localhost:18542"); + sb->InvokeSync(req, 5.0); + ASSERT_TRUE(!req->IsError()); + + sb->SubRef(); + req->SubRef(); + + slobrokServer.stop(); + orb.ShutDown(true); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/slobrok/src/tests/startsome/.gitignore b/slobrok/src/tests/startsome/.gitignore new file mode 100644 index 00000000000..590f1278584 --- /dev/null +++ b/slobrok/src/tests/startsome/.gitignore @@ -0,0 +1,7 @@ +/.depend +/Makefile +/rpc-method-list +/rpc_info +/tstdst +slobrok_rpc_info_app +slobrok_tstdst_app diff --git a/slobrok/src/tests/startsome/CMakeLists.txt b/slobrok/src/tests/startsome/CMakeLists.txt new file mode 100644 index 00000000000..d22dc1215ee --- /dev/null +++ b/slobrok/src/tests/startsome/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_tstdst_app + SOURCES + tstdst.cpp + DEPENDS +) +vespa_add_test(NAME slobrok_tstdst_app NO_VALGRIND COMMAND sh startsome.sh) +vespa_add_executable(slobrok_rpc_info_app + SOURCES + rpc_info.cpp + DEPENDS +) diff --git a/slobrok/src/tests/startsome/rpc_info.cpp b/slobrok/src/tests/startsome/rpc_info.cpp new file mode 100644 index 00000000000..09bc6727a2d --- /dev/null +++ b/slobrok/src/tests/startsome/rpc_info.cpp @@ -0,0 +1,137 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_info"); +#include <vespa/fnet/frt/frt.h> + +class RPCInfo : public FastOS_Application +{ +public: + + void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) + { + if ((*req) != NULL) + (*req)->SubRef(); + (*req) = supervisor->AllocRPCRequest(); + } + + void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) + { + if (r1 != NULL) + r1->SubRef(); + if (r2 != NULL) + r2->SubRef(); + } + + void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, + const char *name) + { + if (info->IsError()) { + printf("%sMETHOD %s\n", indent, name); + printf("%s [error(%d): %s]\n\n", indent, + info->GetErrorCode(), + info->GetErrorMessage()); + return; + } + + const char *desc = info->GetReturn()->GetValue(0)._string._str; + const char *arg = info->GetReturn()->GetValue(1)._string._str; + const char *ret = info->GetReturn()->GetValue(2)._string._str; + uint32_t argCnt = strlen(arg); + uint32_t retCnt = strlen(ret); + FRT_StringValue *argName = info->GetReturn()->GetValue(3)._string_array._pt; + FRT_StringValue *argDesc = info->GetReturn()->GetValue(4)._string_array._pt; + FRT_StringValue *retName = info->GetReturn()->GetValue(5)._string_array._pt; + FRT_StringValue *retDesc = info->GetReturn()->GetValue(6)._string_array._pt; + + printf("%sMETHOD %s\n", indent, name); + printf("%s DESCRIPTION:\n" + "%s %s\n", indent, indent, desc); + + if (argCnt > 0) { + printf("%s PARAMS:\n", indent); + for (uint32_t a = 0; a < argCnt; a++) + printf("%s [%c][%s] %s\n", indent, arg[a], argName[a]._str, + argDesc[a]._str); + } + + if (retCnt > 0) { + printf("%s RETURN:\n", indent); + for (uint32_t r = 0; r < retCnt; r++) + printf("%s [%c][%s] %s\n", indent, ret[r], retName[r]._str, + retDesc[r]._str); + } + printf("\n"); + } + + + int Main() + { + if (_argc < 2) { + printf("usage : rpc_info <connectspec> [verbose]\n"); + return 1; + } + + bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); + FRT_Supervisor supervisor; + FRT_Target *target = supervisor.GetTarget(_argv[1]); + FRT_RPCRequest *m_list = NULL; + FRT_RPCRequest *info = NULL; + supervisor.Start(); + + GetReq(&info, &supervisor); + info->SetMethodName("frt.rpc.ping"); + target->InvokeSync(info, 5.0); + if (info->IsError()) { + fprintf(stderr, "Error talking to %s\n", _argv[1]); + FreeReqs(m_list, info); + supervisor.ShutDown(true); + return 1; + } + + GetReq(&m_list, &supervisor); + m_list->SetMethodName("frt.rpc.getMethodList"); + target->InvokeSync(m_list, 5.0); + + if (!m_list->IsError()) { + + uint32_t numMethods = m_list->GetReturn()->GetValue(0)._string_array._len; + FRT_StringValue *methods = m_list->GetReturn()->GetValue(0)._string_array._pt; + FRT_StringValue *arglist = m_list->GetReturn()->GetValue(1)._string_array._pt; + FRT_StringValue *retlist = m_list->GetReturn()->GetValue(2)._string_array._pt; + + for (uint32_t m = 0; m < numMethods; m++) { + + if (verbose) { + + GetReq(&info, &supervisor); + info->SetMethodName("frt.rpc.getMethodInfo"); + info->GetParams()->AddString(methods[m]._str); + target->InvokeSync(info, 5.0); + DumpMethodInfo("", info, methods[m]._str); + + } else { + + printf("METHOD [%s] <- %s <- [%s]\n", + retlist[m]._str, methods[m]._str, arglist[m]._str); + } + } + } else { + fprintf(stderr, " [error(%d): %s]\n", + m_list->GetErrorCode(), + m_list->GetErrorMessage()); + } + FreeReqs(m_list, info); + target->SubRef(); + supervisor.ShutDown(true); + return 0; + } +}; + + +int +main(int argc, char **argv) +{ + RPCInfo myapp; + return myapp.Entry(argc, argv); +} diff --git a/slobrok/src/tests/startsome/startsome.sh b/slobrok/src/tests/startsome/startsome.sh new file mode 100755 index 00000000000..e3f9d9ea0dd --- /dev/null +++ b/slobrok/src/tests/startsome/startsome.sh @@ -0,0 +1,108 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +SBCMD=../../apps/sbcmd/sbcmd +SLOBROK=../../apps/slobrok/slobrok + + +listall () { + sleep 0.2 + echo port 18481: + ${SBCMD} 18481 slobrok.lookupRpcServer '*/*' + ${SBCMD} 18481 slobrok.admin.listAllRpcServers + + echo port 18482: + ${SBCMD} 18482 slobrok.lookupRpcServer '*/*' + + echo port 18483: + ${SBCMD} 18483 slobrok.lookupRpcServer '*/*' + + echo port 18484: + ${SBCMD} 18484 slobrok.lookupRpcServer '*/*' + sleep 0.2 +} + + +${SLOBROK} -p 18481 & +${SLOBROK} -p 18482 & +${SLOBROK} -p 18483 & +${SLOBROK} -p 18484 & + +sleep 1 + +./slobrok_rpc_info_app \ + tcp/`hostname`:18481 verbose > rpc-method-list + +echo port 18481: +${SBCMD} 18481 slobrok.callback.listNamesServed +echo port 18482: +${SBCMD} 18482 slobrok.callback.listNamesServed +echo port 18483: +${SBCMD} 18483 slobrok.callback.listNamesServed +echo port 18484: +${SBCMD} 18484 slobrok.callback.listNamesServed + +listall + +./slobrok_tstdst_app -s 18481 -p 18485 & +./slobrok_tstdst_app -s 18482 -p 18486 & +./slobrok_tstdst_app -s 18483 -p 18487 -n othertest/17 & +./slobrok_tstdst_app -s 18484 -p 18488 -n testrpcsrv/13 & + +listall + +add=tcp/`hostname`:18482 +${SBCMD} 18481 slobrok.admin.addPeer $add $add +add=tcp/`hostname`:18483 +${SBCMD} 18481 slobrok.admin.addPeer $add $add +add=tcp/`hostname`:18484 +${SBCMD} 18481 slobrok.admin.addPeer $add $add +add=tcp/`hostname`:18481 +${SBCMD} 18484 slobrok.admin.addPeer $add $add + +listall + +./slobrok_tstdst_app -s 18484 -p 18489 -n testrpcsrv/19 & + +listall + +rem=tcp/`hostname`:18482 +${SBCMD} 18481 slobrok.admin.removePeer $rem $rem + +./slobrok_tstdst_app -s 18482 -p 18490 -n testrpcsrv/19 & + +listall + +./slobrok_tstdst_app -s 18483 -p 18491 -n testrpcsrv/19 & + +listall + +./slobrok_tstdst_app -s 18481 -p 18492 -n testrpcsrv/19 & + +listall +${SBCMD} 18481 slobrok.admin.listAllRpcServers +sleep 3 + +${SBCMD} 18485 system.stop +${SBCMD} 18486 system.stop +${SBCMD} 18487 system.stop + +listall + +${SBCMD} 18488 system.stop + +listall + +${SBCMD} 18482 slobrok.system.stop +${SBCMD} 18483 slobrok.system.stop +${SBCMD} 18484 slobrok.system.stop + +listall + +${SBCMD} 18481 slobrok.system.stop +${SBCMD} 18489 system.stop +${SBCMD} 18491 system.stop +${SBCMD} 18492 system.stop +${SBCMD} 18490 system.stop + +wait diff --git a/slobrok/src/tests/startsome/tstdst.cpp b/slobrok/src/tests/startsome/tstdst.cpp new file mode 100644 index 00000000000..7e5b21dfc8f --- /dev/null +++ b/slobrok/src/tests/startsome/tstdst.cpp @@ -0,0 +1,224 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("testrpcserver"); + +#include <vespa/vespalib/util/host_name.h> +#include <vespa/fnet/fnet.h> +#include <vespa/fnet/frt/frt.h> + +#include <vector> +#include <string> +#include <sstream> + +class FNET_Transport; +class FNET_Scheduler; +class FRT_Supervisor; + +namespace testrpcserver { + +class RPCHooks; + +class TstEnv : public FRT_IRequestWait +{ +private: + FNET_Transport *_transport; + FRT_Supervisor *_supervisor; + + int _myport; + int _sbport; + RPCHooks *_rpcHooks; + + FNET_Transport *getTransport() { return _transport; } + FRT_Supervisor *getSupervisor() { return _supervisor; } + + TstEnv(const TstEnv &); // Not used + TstEnv &operator=(const TstEnv &); // Not used +public: + const char * const _id; + + explicit TstEnv(int sbp, int myp, const char *n); + virtual ~TstEnv(); + + int MainLoop(); + + void shutdown() { getTransport()->ShutDown(false); } + void RequestDone(FRT_RPCRequest* req) { + if (req->IsError()) { + LOG(error, "registration failed: %s", req->GetErrorMessage()); + } else { + LOG(debug, "registered"); + } + } +}; + + +class RPCHooks : public FRT_Invokable +{ +private: + TstEnv &_env; + + RPCHooks(const RPCHooks &); // Not used + RPCHooks &operator=(const RPCHooks &); // Not used +public: + explicit RPCHooks(TstEnv &env); + virtual ~RPCHooks(); + + void initRPC(FRT_Supervisor *supervisor); +private: + void rpc_listNamesServed(FRT_RPCRequest *req); + + void rpc_stop(FRT_RPCRequest *req); +}; + + +RPCHooks::RPCHooks(TstEnv &env) + : _env(env) +{ +} +RPCHooks::~RPCHooks() +{ +} + + +void +RPCHooks::initRPC(FRT_Supervisor *supervisor) +{ + + FRT_ReflectionBuilder rb(supervisor); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(RPCHooks::rpc_listNamesServed), this); + rb.MethodDesc("Look up a rpcserver"); + rb.ReturnDesc("names", "The rpcserver names on this server"); + //------------------------------------------------------------------------- + rb.DefineMethod("system.stop", "", "", true, + FRT_METHOD(RPCHooks::rpc_stop), this); + rb.MethodDesc("Shut down the application"); + //------------------------------------------------------------------------- +} + + +void +RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) +{ + std::vector<const char *> rpcsrvlist; + rpcsrvlist.push_back("testrpcsrv/17"); + rpcsrvlist.push_back("testrpcsrv/191"); + rpcsrvlist.push_back(_env._id); + + FRT_Values &dst = *req->GetReturn(); + + FRT_StringValue *names = dst.AddStringArray(rpcsrvlist.size()); + + for (uint32_t i = 0; i < rpcsrvlist.size(); ++i) { + dst.SetString(&names[i], rpcsrvlist[i]); + } + if (rpcsrvlist.size() < 1) { + req->SetError(FRTE_RPC_METHOD_FAILED, "no rpcserver names"); + } +} + + +// System API methods +void +RPCHooks::rpc_stop(FRT_RPCRequest *req) +{ + (void) req; + LOG(debug, "RPC: Shutdown"); + _env.shutdown(); +} + + +TstEnv::TstEnv(int sbp, int myp, const char *n) + : _transport(new FNET_Transport()), + _supervisor(new FRT_Supervisor(_transport, NULL)), + _myport(myp), + _sbport(sbp), + _rpcHooks(NULL), + _id(n) +{ + _rpcHooks = new RPCHooks(*this); + _rpcHooks->initRPC(getSupervisor()); +} + + +TstEnv::~TstEnv() +{ + delete _rpcHooks; +} + + +int +TstEnv::MainLoop() +{ + srandom(time(NULL)); + + if (! getSupervisor()->Listen(_myport)) { + LOG(error, "TestRpcServer: unable to listen to port %d", _myport); + return 1; + } + + std::string myrpcsrv = _id; + + std::ostringstream tmp; + tmp << "tcp/" << vespalib::HostName::get() << ":" << _myport; + std::string myspec = tmp.str(); + tmp.str(""); + tmp << "tcp/" << vespalib::HostName::get() << ":" << _sbport; + std::string sbspec = tmp.str(); + + FRT_RPCRequest *req = getSupervisor()->AllocRPCRequest(); + req->SetMethodName("slobrok.registerRpcServer"); + req->GetParams()->AddString(myrpcsrv.c_str()); + req->GetParams()->AddString(myspec.c_str()); + FRT_Target *slobrok = getSupervisor()->GetTarget(sbspec.c_str()); + slobrok->InvokeAsync(req, 5.0, this); + getTransport()->Main(); + return 0; +} + + +class App : public FastOS_Application +{ +public: + int Main() { + int sbport = 2773; + int myport = 2774; + const char *rpcsrvname = "testrpcsrv/17"; + + int argi = 1; + const char* optArg; + char c; + while ((c = GetOpt("n:p:s:", optArg, argi)) != -1) { + switch (c) { + case 'p': + myport = atoi(optArg); + break; + case 's': + sbport = atoi(optArg); + break; + case 'n': + rpcsrvname = optArg; + break; + default: + LOG(error, "unknown option letter '%c'", c); + return 1; + } + } + + TstEnv *mainobj = new TstEnv(sbport, myport, rpcsrvname); + int res = mainobj->MainLoop(); + delete mainobj; + return res; + } +}; + +} // namespace testrpcserver + +int +main(int argc, char **argv) +{ + testrpcserver::App tstdst; + return tstdst.Entry(argc, argv); +} diff --git a/slobrok/src/tests/startup/.gitignore b/slobrok/src/tests/startup/.gitignore new file mode 100644 index 00000000000..5dae353d999 --- /dev/null +++ b/slobrok/src/tests/startup/.gitignore @@ -0,0 +1,2 @@ +.depend +Makefile diff --git a/slobrok/src/tests/startup/CMakeLists.txt b/slobrok/src/tests/startup/CMakeLists.txt new file mode 100644 index 00000000000..5c90dd5bfcc --- /dev/null +++ b/slobrok/src/tests/startup/CMakeLists.txt @@ -0,0 +1 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. diff --git a/slobrok/src/tests/startup/run.sh b/slobrok/src/tests/startup/run.sh new file mode 100755 index 00000000000..d83dc886113 --- /dev/null +++ b/slobrok/src/tests/startup/run.sh @@ -0,0 +1,9 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +cmd=../../apps/slobrok/slobrok + +$cmd -c mumbojumbo +if [ $? -eq 1 ]; then + exit 0 +fi +exit 1 diff --git a/slobrok/src/vespa/slobrok/.gitignore b/slobrok/src/vespa/slobrok/.gitignore new file mode 100644 index 00000000000..2e5cbf39a54 --- /dev/null +++ b/slobrok/src/vespa/slobrok/.gitignore @@ -0,0 +1,6 @@ +.depend +Makefile +config-slobroks.cpp +config-slobroks.h +features.h +/libslobrok.so.5.1 diff --git a/slobrok/src/vespa/slobrok/CMakeLists.txt b/slobrok/src/vespa/slobrok/CMakeLists.txt new file mode 100644 index 00000000000..98caf3ce407 --- /dev/null +++ b/slobrok/src/vespa/slobrok/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(slobrok + SOURCES + sblist.cpp + cfg.cpp + sbmirror.cpp + sbregister.cpp + INSTALL lib64 + DEPENDS +) diff --git a/slobrok/src/vespa/slobrok/backoff.h b/slobrok/src/vespa/slobrok/backoff.h new file mode 100644 index 00000000000..c0cf4a4e061 --- /dev/null +++ b/slobrok/src/vespa/slobrok/backoff.h @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace slobrok { +namespace api { + +class BackOff +{ +private: + double _time; + double _warntime; + double _nextwarn; + +public: + BackOff() : _time(0.50), _warntime(0.0), _nextwarn(4.0) {} + void reset() { _time = 0.50; _warntime = 0.0; _nextwarn = 15.0; } + double get() { + double ret = _time; + _warntime += ret; + if (_time < 5.0) { + _time += 0.5; + } else if (_time < 10.0) { + _time += 1.0; + } else if (_time < 30.0) { + _time += 5; + } else { + // max retry time is 30 seconds + _time = 30.0; + } + return ret; + } + bool shouldWarn() { + if (_warntime > _nextwarn) { + _warntime = 0.0; + _nextwarn *= 4.0; + if (_nextwarn > 86400.0) { + _nextwarn = 86400.0; + } + return true; + } else { + return false; + } + } +}; + +} // namespace api +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/cfg.cpp b/slobrok/src/vespa/slobrok/cfg.cpp new file mode 100644 index 00000000000..50320f7dd23 --- /dev/null +++ b/slobrok/src/vespa/slobrok/cfg.cpp @@ -0,0 +1,70 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "cfg.h" +#include <vespa/log/log.h> +#include <vespa/vespalib/util/sync.h> + +LOG_SETUP(".slobrok.configurator"); + +namespace slobrok { + +namespace { + +std::vector<std::string> +extract(const cloud::config::SlobroksConfig &cfg) +{ + std::vector<std::string> r; + for (size_t i = 0; i < cfg.slobrok.size(); ++i) { + std::string spec(cfg.slobrok[i].connectionspec); + r.push_back(spec); + } + return r; +} + +} // namespace <unnamed> + +bool +Configurator::poll() +{ + bool retval = _subscriber.nextGeneration(0); + if (retval) { + std::unique_ptr<cloud::config::SlobroksConfig> cfg = _handle->getConfig(); + _target.setup(extract(*cfg)); + } + return retval; +} + + +Configurator::Configurator(Configurable& target, const config::ConfigUri & uri) + : _subscriber(uri.getContext()), + _handle(_subscriber.subscribe<cloud::config::SlobroksConfig>(uri.getConfigId())), + _target(target) + +{ +} + +ConfiguratorFactory::ConfiguratorFactory(const config::ConfigUri& uri) + : _uri(uri) +{ +} + +ConfiguratorFactory::ConfiguratorFactory(const std::vector<std::string> & spec) + : _uri(config::ConfigUri::createEmpty()) +{ + cloud::config::SlobroksConfigBuilder builder; + for (size_t i = 0; i < spec.size(); i++) { + cloud::config::SlobroksConfig::Slobrok sb; + sb.connectionspec = spec[i]; + builder.slobrok.push_back(sb); + } + _uri = config::ConfigUri::createFromInstance(builder); +} + +Configurator::UP +ConfiguratorFactory::create(Configurable& target) const +{ + Configurator::UP r(new Configurator(target, _uri)); + return r; +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/cfg.h b/slobrok/src/vespa/slobrok/cfg.h new file mode 100644 index 00000000000..cb660cd0222 --- /dev/null +++ b/slobrok/src/vespa/slobrok/cfg.h @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vector> +#include <memory> +#include <vespa/vespalib/util/ptrholder.h> +#include <vespa/config-slobroks.h> +#include <vespa/config/config.h> + +namespace slobrok { + +class Configurable { +public: + virtual void setup(const std::vector<std::string> &slobrokSpecs) = 0; + virtual ~Configurable() { } +}; + + +class Configurator { +private: + config::ConfigSubscriber _subscriber; + config::ConfigHandle<cloud::config::SlobroksConfig>::UP _handle; + Configurable &_target; +public: + Configurator(Configurable &target, const config::ConfigUri & uri); + bool poll(); + typedef std::unique_ptr<Configurator> UP; + + int64_t getGeneration() const { return _subscriber.getGeneration(); } +}; + +class ConfiguratorFactory { +private: + config::ConfigUri _uri; +public: + ConfiguratorFactory(const config::ConfigUri & uri); + // Convenience. Might belong somewhere else + ConfiguratorFactory(const std::vector<std::string> & spec); + + Configurator::UP create(Configurable &target) const; +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/sblist.cpp b/slobrok/src/vespa/slobrok/sblist.cpp new file mode 100644 index 00000000000..be702c6c5c2 --- /dev/null +++ b/slobrok/src/vespa/slobrok/sblist.cpp @@ -0,0 +1,101 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "sblist.h" +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.list"); +#include <vespa/vespalib/util/random.h> + + +using vespalib::LockGuard; + +namespace slobrok { +namespace api { + +SlobrokList::SlobrokList() + : _lock(), + _slobrokSpecs(), + _nextSpec(0), + _currSpec(1), + _retryCount(0) +{ +} + + +bool +SlobrokList::contains(const std::string &spec) +{ + LockGuard guard(_lock); + if (_currSpec < _slobrokSpecs.size()) { + if (spec == _slobrokSpecs[_currSpec]) return true; + } + for (size_t i = 0; i < _slobrokSpecs.size(); ++i) { + if (spec == _slobrokSpecs[i]) { + _currSpec = i; + return true; + } + } + return false; +} + + +std::string +SlobrokList::nextSlobrokSpec() +{ + LockGuard guard(_lock); + std::string v; + _currSpec = _nextSpec; + if (_nextSpec < _slobrokSpecs.size()) { + ++_nextSpec; + v = _slobrokSpecs[_currSpec]; + } else { + _nextSpec = 0; + ++_retryCount; + } + return v; +} + + +std::string +SlobrokList::logString() +{ + LockGuard guard(_lock); + if (_slobrokSpecs.size() == 0) { + return "[empty service location broker list]"; + } + std::string v; + v = _slobrokSpecs[0]; + for (size_t i = 1 ; i < _slobrokSpecs.size(); ++i) { + v += " or "; + v += _slobrokSpecs[i]; + } + return v; +} + + +void +SlobrokList::setup(const std::vector<std::string> &specList) +{ + if (specList.size() == 0) return; + size_t cfgSz = specList.size(); + LockGuard guard(_lock); + _slobrokSpecs.clear(); + _nextSpec = 0; + _currSpec = cfgSz; + for (size_t i = 0; i < cfgSz; ++i) { + _slobrokSpecs.push_back(specList[i]); + } + + vespalib::RandomGen randomizer(time(NULL)); + // randomize order + for (size_t i = 0; i + 1 < cfgSz; ++i) { + size_t lim = cfgSz - i; + size_t x = randomizer.nextUint32() % lim; + if (x != 0) { + std::swap(_slobrokSpecs[i], _slobrokSpecs[i+x]); + } + } +} + + +} // namespace api +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/sblist.h b/slobrok/src/vespa/slobrok/sblist.h new file mode 100644 index 00000000000..4265c2a1d57 --- /dev/null +++ b/slobrok/src/vespa/slobrok/sblist.h @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vector> +#include <vespa/vespalib/util/sync.h> +#include "cfg.h" + +namespace slobrok { +namespace api { + +/** + * @brief List of connection specs for service location brokers + **/ +class SlobrokList : public Configurable { +public: + /** + * @brief Create a new SlobrokList object, initially empty + **/ + SlobrokList(); + + /** + * setup with a list of connection specs; + * should be called at least once. + * @param specList should not be an empty list. + **/ + virtual void setup(const std::vector<std::string> &specList); + + /** + * retrieve the spec for next slobrok server to try. + * NOTE: when the list is exhausted the empty string will + * be returned once before looping and retrying. + **/ + std::string nextSlobrokSpec(); + + /** obtain how many times we have tried all possible servers */ + size_t retryCount() const { return _retryCount; } + + /** check if setup has been called successfully */ + bool ok() const { return _slobrokSpecs.size() > 0; } + + /** return a string (for logging) with all specs in the list */ + std::string logString(); + + /** check if the list contains a given spec */ + bool contains(const std::string &spec); +private: + vespalib::Lock _lock; + std::vector<std::string> _slobrokSpecs; + size_t _nextSpec; + size_t _currSpec; + size_t _retryCount; +}; + +} // namespace api +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/sbmirror.cpp b/slobrok/src/vespa/slobrok/sbmirror.cpp new file mode 100644 index 00000000000..7b0974d5b0d --- /dev/null +++ b/slobrok/src/vespa/slobrok/sbmirror.cpp @@ -0,0 +1,378 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.mirror"); +#include <vespa/fnet/frt/frt.h> +#include <vespa/config-slobroks.h> +#include "sbmirror.h" +#include <memory> + +using vespalib::LockGuard; + +namespace slobrok { +namespace api { + +MirrorAPI::MirrorAPI(FRT_Supervisor &orb, const ConfiguratorFactory & config) + : FNET_Task(orb.GetScheduler()), + _orb(orb), + _lock(), + _reqPending(false), + _scheduled(false), + _reqDone(false), + _useOldProto(false), + _specs(), + _specsGen(), + _updates(), + _slobrokSpecs(), + _configurator(config.create(_slobrokSpecs)), + _currSlobrok(""), + _rpc_ms(100), + _idx(0), + _backOff(), + _target(0), + _req(0) +{ + _configurator->poll(); + LOG_ASSERT(_slobrokSpecs.ok()); + ScheduleNow(); +} + + +MirrorAPI::~MirrorAPI() +{ + Kill(); + _configurator.reset(0); + if (_req != 0) { + _req->Abort(); + _req->SubRef(); + } + if (_target != 0) { + _target->SubRef(); + } +} + + +MirrorAPI::SpecList +MirrorAPI::lookup(const std::string & pattern) const +{ + SpecList ret; + LockGuard guard(_lock); + SpecList::const_iterator end = _specs.end(); + for (SpecList::const_iterator it = _specs.begin(); it != end; ++it) { + if (match(it->first.c_str(), pattern.c_str())) { + ret.push_back(*it); + } + } + return ret; +} + + +/* + * Match a single name against a pattern. + * A pattern can contain '*' to match until the next '/' separator, + * and end with '**' to match the rest of the name. + * Note that this isn't quite globbing, as there is no backtracking. + * Corresponds to code in: jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java + */ +bool +IMirrorAPI::match(const char *name, const char *pattern) +{ + LOG_ASSERT(name != NULL); + LOG_ASSERT(pattern != NULL); + while (*pattern != '\0') { + if (*name == *pattern) { + ++name; + ++pattern; + } else if (*pattern == '*') { + ++pattern; + while (*name != '/' && *name != '\0') { + ++name; + } + if (*pattern == '*') { + while (*name != '\0') { + ++name; + } + } + } else { + return false; + } + } + return (*name == *pattern); +} + + +void +MirrorAPI::updateTo(SpecList& newSpecs, uint32_t newGen) +{ + { + LockGuard guard(_lock); + std::swap(newSpecs, _specs); + _updates.add(); + } + _specsGen.setFromInt(newGen); + if (_rpc_ms < 15000) { + _rpc_ms = 15000; + } +} + +bool +MirrorAPI::ready() const +{ + LockGuard guard(_lock); + return _updates.getAsInt() != 0; +} + + +// returns true if reconnect is needed +bool +MirrorAPI::handleMirrorFetch() +{ + if (strcmp(_req->GetReturnSpec(), "SSi") != 0) { + LOG(warning, "unknown return types '%s' from RPC request", _req->GetReturnSpec()); + return true; + } + + FRT_Values &answer = *(_req->GetReturn()); + + uint32_t numNames = answer[0]._string_array._len; + FRT_StringValue *n = answer[0]._string_array._pt; + uint32_t numSpecs = answer[1]._string_array._len; + FRT_StringValue *s = answer[1]._string_array._pt; + uint32_t newGen = answer[2]._intval32; + + if (numNames != numSpecs) { + LOG(warning, "inconsistent array lengths from RPC mirror request"); + return true; + } + + if (_specsGen != newGen) { + SpecList specs; + + for (uint32_t idx = 0; idx < numNames; idx++) { + specs.push_back(std::make_pair(std::string(n[idx]._str), + std::string(s[idx]._str))); + } + updateTo(specs, newGen); + } + return false; +} + + +// returns true if reconnect is needed +bool +MirrorAPI::handleIncrementalFetch() +{ + if (strcmp(_req->GetReturnSpec(), "iSSSi") != 0) { + LOG(warning, "unknown return types '%s' from RPC request", _req->GetReturnSpec()); + return true; + } + FRT_Values &answer = *(_req->GetReturn()); + + uint32_t diff_from = answer[0]._intval32; + uint32_t numRemove = answer[1]._string_array._len; + FRT_StringValue *r = answer[1]._string_array._pt; + uint32_t numNames = answer[2]._string_array._len; + FRT_StringValue *n = answer[2]._string_array._pt; + uint32_t numSpecs = answer[3]._string_array._len; + FRT_StringValue *s = answer[3]._string_array._pt; + uint32_t diff_to = answer[4]._intval32; + + if (diff_from != 0 && diff_from != _specsGen.getAsInt()) { + LOG(warning, "bad old specs gen %u from RPC incremental request for [0/%u]", + diff_from, _specsGen.getAsInt()); + return true; + } + + if (numNames != numSpecs) { + LOG(warning, "inconsistent array lengths from RPC mirror request"); + return true; + } + + LOG(spam, "got incremental diff from %d to %d (had %d)", + diff_from, diff_to, _specsGen.getAsInt()); + + if (_specsGen == diff_from && _specsGen == diff_to) { + // nop + if (numRemove != 0 || numNames != 0) { + LOG(spam, "incremental diff [%u;%u] nop, but numRemove=%u, numNames=%u", + diff_from, diff_to, numRemove, numNames); + } + } else if (diff_from == 0) { + // full dump + if (numRemove != 0) { + LOG(spam, "incremental diff [%u;%u] full dump, but numRemove=%u, numNames=%u", + diff_from, diff_to, numRemove, numNames); + } + SpecList specs; + for (uint32_t idx = 0; idx < numNames; idx++) { + specs.push_back( + std::make_pair(std::string(n[idx]._str), + std::string(s[idx]._str))); + } + updateTo(specs, diff_to); + } else if (_specsGen == diff_from) { + // incremental update + SpecList specs; + SpecList::const_iterator end = _specs.end(); + for (SpecList::const_iterator it = _specs.begin(); + it != end; + ++it) + { + bool keep = true; + for (uint32_t idx = 0; idx < numRemove; idx++) { + if (it->first == r[idx]._str) keep = false; + } + for (uint32_t idx = 0; idx < numNames; idx++) { + if (it->first == n[idx]._str) keep = false; + } + if (keep) specs.push_back(*it); + } + for (uint32_t idx = 0; idx < numNames; idx++) { + specs.push_back( + std::make_pair(std::string(n[idx]._str), + std::string(s[idx]._str))); + } + updateTo(specs, diff_to); + } + return false; +} + + +void +MirrorAPI::handleReconfig() +{ + if (_configurator->poll() && _target != 0) { + if (! _slobrokSpecs.contains(_currSlobrok)) { + std::string cps = _slobrokSpecs.logString(); + LOG(warning, "current server %s not in list of location brokers: %s", + _currSlobrok.c_str(), cps.c_str()); + _target->SubRef(); + _target = 0; + } + } +} + +bool +MirrorAPI::handleReqDone() +{ + if (_reqDone) { + _reqDone = false; + _reqPending = false; + bool reconn = (_target == 0); + + if (_req->IsError()) { + if (_req->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD && !_useOldProto) { + _useOldProto = true; + return false; + } + reconn = true; + } else if (_useOldProto) { + reconn = handleMirrorFetch(); + } else { + reconn = handleIncrementalFetch(); + } + + if (reconn) { + if (_target != 0) { + _target->SubRef(); + } + _target = 0; + } else { + _backOff.reset(); + // req done OK + return true; + } + } + return false; +} + + +void +MirrorAPI::handleReconnect() +{ + if (_target == 0) { + _currSlobrok = _slobrokSpecs.nextSlobrokSpec(); + if (_currSlobrok.size() > 0) { + _target = _orb.GetTarget(_currSlobrok.c_str()); + } + _specsGen.reset(); + _useOldProto = false; + if (_target == 0) { + if (_rpc_ms < 50000) { + _rpc_ms += 100; + } + double delay = _backOff.get(); + reSched(delay); + if (_backOff.shouldWarn()) { + std::string cps = _slobrokSpecs.logString(); + LOG(warning, "cannot connect to location broker at %s (retry in %f seconds)", + cps.c_str(), delay); + } + } + } +} + + +void +MirrorAPI::makeRequest() +{ + if (_target == 0) return; + if (_reqPending) { + LOG(error, "cannot make new request, one is pending already"); + abort(); + } + if (_scheduled) { + LOG(error, "cannot make new request, re-schedule is pending"); + abort(); + } + + _req = _orb.AllocRPCRequest(_req); + if (_useOldProto) { + _req->SetMethodName("slobrok.mirror.fetch"); + } else { + _req->SetMethodName("slobrok.incremental.fetch"); + } + _req->GetParams()->AddInt32(_specsGen.getAsInt()); // gencnt + _req->GetParams()->AddInt32(5000); // mstimeout + _target->InvokeAsync(_req, 0.001 * _rpc_ms, this); + _reqPending = true; +} + +void +MirrorAPI::reSched(double seconds) +{ + if (_scheduled) { + LOG(error, "already scheduled when asked to re-schedule in %f seconds", seconds); + abort(); + } + Schedule(seconds); + _scheduled = true; +} + +void +MirrorAPI::PerformTask() +{ + _scheduled = false; + handleReconfig(); + if (handleReqDone()) { + reSched(0.1); // be nice, do not make request again immediately + return; + } + handleReconnect(); + if (! _scheduled) { + makeRequest(); + } +} + + +void +MirrorAPI::RequestDone(FRT_RPCRequest *req) +{ + LOG_ASSERT(req == _req && !_reqDone); + (void) req; + _reqDone = true; + ScheduleNow(); +} + +} // namespace api +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/sbmirror.h b/slobrok/src/vespa/slobrok/sbmirror.h new file mode 100644 index 00000000000..7336e511d92 --- /dev/null +++ b/slobrok/src/vespa/slobrok/sbmirror.h @@ -0,0 +1,167 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/util/gencnt.h> +#include <vespa/vespalib/util/sync.h> +#include "backoff.h" +#include "sblist.h" +#include "cfg.h" +#include <string> +#include <vector> + +namespace slobrok { +namespace api { + +/** + * @brief Defines an interface for the name server lookup. + **/ +class IMirrorAPI { +protected: + static bool match(const char *name, const char *pattern); + +public: + /** + * @brief Release any allocated resources. + **/ + virtual ~IMirrorAPI() { } + + /** + * @brief pair of <name, connectionspec>. + * + * The first element of pair is a string containing the service name. + * The second is the connection spec, typically "tcp/foo.bar.com:42" + **/ + typedef std::pair<std::string, std::string> Spec; + + /** + * @brief vector of <name, connectionspec> pairs. + * + * The first element of each pair is a string containing the service name. + * The second is the connection spec, typically "tcp/foo.bar.com:42" + **/ + typedef std::vector<Spec> SpecList; + + /** + * Obtain all the services matching a given pattern. + * + * The pattern is matched against all service names in the local mirror repository. A service name may contain '/' + * as a separator token. A pattern may contain '*' to match anything up to the next '/' (or the end of the + * name). This means that the pattern 'foo/<!-- slash-star -->*<!-- star-slash -->/baz' would match the service + * names 'foo/bar/baz' and 'foo/xyz/baz'. The pattern 'foo/b*' would match 'foo/bar', but neither 'foo/xyz' nor + * 'foo/bar/baz'. The pattern 'a*b' will never match anything. + * + * @return a list of all matching services, with corresponding connect specs + * @param pattern The pattern used for matching + **/ + virtual SpecList lookup(const std::string & pattern) const = 0; + + /** + * Obtain the number of updates seen by this mirror. The value may wrap, but will never become 0 again. This can be + * used for name lookup optimization, because the results returned by lookup() will never change unless this number + * also changes. + * + * @return number of slobrok updates seen + **/ + virtual uint32_t updates() const = 0; +}; + +/** + * @brief A MirrorAPI object is used to keep track of the services registered + * with a slobrok cluster. + * + * Updates to the service repository are + * fetched in the background. Lookups against this object is done + * using an internal mirror of the service repository. + **/ +class MirrorAPI : public FNET_Task, + public FRT_IRequestWait, + public IMirrorAPI +{ +public: + /** + * @brief vector of <string> pairs. + * + * Elements are connection specs, typically "tcp/foo.bar.com:42" + **/ + typedef std::vector<std::string> StringList; + + /** + * @brief Create a new MirrorAPI object using config + * + * uses the given Supervisor and config to create a MirrorAPI object. + * + * @param orb the Supervisor to use + * @param config how to get the connect spec list + **/ + MirrorAPI(FRT_Supervisor &orb, const ConfiguratorFactory & config); + + /** + * @brief Clean up. + **/ + ~MirrorAPI(); + + // Inherit doc from IMirrorAPI. + SpecList lookup(const std::string & pattern) const; + + // Inherit doc from IMirrorAPI. + uint32_t updates() const { return _updates.getAsInt(); } + + /** + * @brief Ask if the MirrorAPI has got any useful information from + * the Slobrok + * + * On application startup it is often useful to run the event loop + * for some time until this functions returns true (or if it never + * does, time out and tell the user there was no answer from any + * Service Location Broker). + * + * @return true if the MirrorAPI object has + * asked for updates from a Slobrok and got any answer back + **/ + bool ready() const; + +private: + MirrorAPI(const MirrorAPI &); + MirrorAPI &operator=(const MirrorAPI &); + + /** from FNET_Task, polls slobrok **/ + void PerformTask(); + + /** from FRT_IRequestWait **/ + void RequestDone(FRT_RPCRequest *req); + + void updateTo(SpecList& newSpecs, uint32_t newGen); + + bool handleIncrementalFetch(); + bool handleMirrorFetch(); + + void handleReconfig(); + bool handleReqDone(); + void handleReconnect(); + void makeRequest(); + + void reSched(double seconds); + + FRT_Supervisor &_orb; + mutable vespalib::Lock _lock; + bool _reqPending; + bool _scheduled; + bool _reqDone; + bool _useOldProto; + SpecList _specs; + vespalib::GenCnt _specsGen; + vespalib::GenCnt _updates; + SlobrokList _slobrokSpecs; + Configurator::UP _configurator; + std::string _currSlobrok; + int _rpc_ms; + uint32_t _idx; + BackOff _backOff; + FRT_Target *_target; + FRT_RPCRequest *_req; +}; + +} // namespace api +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/sbregister.cpp b/slobrok/src/vespa/slobrok/sbregister.cpp new file mode 100644 index 00000000000..0ca5d51aad2 --- /dev/null +++ b/slobrok/src/vespa/slobrok/sbregister.cpp @@ -0,0 +1,316 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.register"); +#include <vespa/fnet/frt/frt.h> +#include <vespa/config-slobroks.h> +#include "sbregister.h" +#include <memory> +#include <sstream> +#include <vespa/vespalib/util/host_name.h> + +namespace { + + +vespalib::string +createSpec(FRT_Supervisor &orb) +{ + vespalib::string spec; + if (orb.GetListenPort() != 0) { + vespalib::asciistream str; + str << "tcp/"; + str << vespalib::HostName::get(); + str << ":"; + str << orb.GetListenPort(); + spec = str.str(); + } + return spec; +} + + +void +discard(std::vector<vespalib::string> &vec, const vespalib::stringref & val) +{ + uint32_t i = 0; + uint32_t size = vec.size(); + while (i < size) { + if (vec[i] == val) { + std::swap(vec[i], vec[size - 1]); + vec.pop_back(); + --size; + } else { + ++i; + } + } + LOG_ASSERT(size == vec.size()); +} + +} // namespace <unnamed> + +namespace slobrok { +namespace api { + +RegisterAPI::RegisterAPI(FRT_Supervisor &orb, const ConfiguratorFactory & config) + : FNET_Task(orb.GetScheduler()), + _orb(orb), + _hooks(*this), + _lock(), + _reqDone(false), + _busy(false), + _slobrokSpecs(), + _configurator(config.create(_slobrokSpecs)), + _currSlobrok(""), + _idx(0), + _backOff(), + _names(), + _pending(), + _unreg(), + _target(0), + _req(0) +{ + _configurator->poll(); + LOG_ASSERT(_slobrokSpecs.ok()); + ScheduleNow(); +} + + +RegisterAPI::~RegisterAPI() +{ + Kill(); + _configurator.reset(0); + if (_req != 0) { + _req->Abort(); + _req->SubRef(); + } + if (_target != 0) { + _target->SubRef(); + } +} + + +void +RegisterAPI::registerName(const vespalib::stringref & name) +{ + _lock.Lock(); + for (uint32_t i = 0; i < _names.size(); ++i) { + if (_names[i] == name) { + _lock.Unlock(); + return; + } + } + _busy = true; + _names.push_back(name); + _pending.push_back(name); + discard(_unreg, name); + ScheduleNow(); + _lock.Unlock(); +} + + +void +RegisterAPI::unregisterName(const vespalib::stringref & name) +{ + _lock.Lock(); + _busy = true; + discard(_names, name); + discard(_pending, name); + _unreg.push_back(name); + ScheduleNow(); + _lock.Unlock(); +} + +// handle any request that completed +void +RegisterAPI::handleReqDone() +{ + if (_reqDone) { + _reqDone = false; + if (_req->IsError()) { + if (_req->GetErrorCode() != FRTE_RPC_METHOD_FAILED) { + LOG(debug, "register failed: %s (code %d)", + _req->GetErrorMessage(), _req->GetErrorCode()); + // unexpected error; close our connection to this + // slobrok server and try again with a fresh slate + if (_target != 0) { + _target->SubRef(); + } + _target = 0; + _busy = true; + } else { + LOG(warning, "%s(%s -> %s) failed: %s", + _req->GetMethodName(), + (*_req->GetParams())[0]._string._str, + (*_req->GetParams())[1]._string._str, + _req->GetErrorMessage()); + } + } else { + // reset backoff strategy on any successful request + _backOff.reset(); + } + _req->SubRef(); + _req = 0; + } +} + +// do we need to try to reconnect? +void +RegisterAPI::handleReconnect() +{ + if (_configurator->poll() && _target != 0) { + if (! _slobrokSpecs.contains(_currSlobrok)) { + vespalib::string cps = _slobrokSpecs.logString(); + LOG(warning, "current server %s not in list of location brokers: %s", + _currSlobrok.c_str(), cps.c_str()); + _target->SubRef(); + _target = 0; + } + } + if (_target == 0) { + _currSlobrok = _slobrokSpecs.nextSlobrokSpec(); + if (_currSlobrok.size() > 0) { + // try next possible server. + _target = _orb.GetTarget(_currSlobrok.c_str()); + } + _lock.Lock(); + // now that we have a new connection, we need to + // immediately re-register everything. + _pending = _names; + _lock.Unlock(); + if (_target == 0) { + // we have tried all possible servers. + // start from the top after a delay, + // possibly with a warning. + double delay = _backOff.get(); + Schedule(delay); + if (_backOff.shouldWarn()) { + vespalib::string cps = _slobrokSpecs.logString(); + LOG(warning, "cannot connect to location broker at %s " + "(retry in %f seconds)", cps.c_str(), delay); + } else { + LOG(debug, "slobrok retry in %f seconds", delay); + } + return; + } + } +} + +// perform any unregister or register that is pending +void +RegisterAPI::handlePending() +{ + bool unreg = false; + bool reg = false; + vespalib::string name; + _lock.Lock(); + // pop off the todo stack, unregister has priority + if (_unreg.size() > 0) { + name = _unreg.back(); + _unreg.pop_back(); + unreg = true; + } else if (_pending.size() > 0) { + name = _pending.back(); + _pending.pop_back(); + reg = true; + } + _lock.Unlock(); + + if (unreg) { + // start a new unregister request + LOG_ASSERT(!reg); + _req = _orb.AllocRPCRequest(); + _req->SetMethodName("slobrok.unregisterRpcServer"); + _req->GetParams()->AddString(name.c_str()); + LOG(debug, "unregister [%s]", name.c_str()); + _req->GetParams()->AddString(createSpec(_orb).c_str()); + _target->InvokeAsync(_req, 35.0, this); + } else if (reg) { + // start a new register request + _req = _orb.AllocRPCRequest(); + _req->SetMethodName("slobrok.registerRpcServer"); + _req->GetParams()->AddString(name.c_str()); + LOG(debug, "register [%s]", name.c_str()); + _req->GetParams()->AddString(createSpec(_orb).c_str()); + _target->InvokeAsync(_req, 35.0, this); + } else { + // nothing more to do right now; schedule to re-register all + // names after a long delay. + _lock.Lock(); + _pending = _names; + LOG(debug, "done, reschedule in 30s"); + _busy = false; + Schedule(30.0); + _lock.Unlock(); + } +} + +void +RegisterAPI::PerformTask() +{ + handleReqDone(); + if (_req != 0) { + LOG(debug, "req in progress"); + return; // current request still in progress, don't start anything new + } + handleReconnect(); + // still no connection? + if (_target == 0) return; + handlePending(); +} + + +void +RegisterAPI::RequestDone(FRT_RPCRequest *req) +{ + LOG_ASSERT(req == _req && !_reqDone); + (void) req; + _reqDone = true; + ScheduleNow(); +} + +//----------------------------------------------------------------------------- + +RegisterAPI::RPCHooks::RPCHooks(RegisterAPI &owner) + : _owner(owner) +{ + FRT_ReflectionBuilder rb(&_owner._orb); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(RPCHooks::rpc_listNamesServed), this); + rb.MethodDesc("List rpcserver names"); + rb.ReturnDesc("names", "The rpcserver names this server wants to serve"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.notifyUnregistered", "s", "", true, + FRT_METHOD(RPCHooks::rpc_notifyUnregistered), this); + rb.MethodDesc("Notify a server about removed registration"); + rb.ParamDesc("name", "RpcServer name"); + //------------------------------------------------------------------------- +} + + +RegisterAPI::RPCHooks::~RPCHooks() +{ +} + + +void +RegisterAPI::RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + _owner._lock.Lock(); + FRT_StringValue *names = dst.AddStringArray(_owner._names.size()); + for (uint32_t i = 0; i < _owner._names.size(); ++i) { + dst.SetString(&names[i], _owner._names[i].c_str()); + } + _owner._lock.Unlock(); +} + + +void +RegisterAPI::RPCHooks::rpc_notifyUnregistered(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + LOG(warning, "unregistered name %s", args[0]._string._str); +} + +} // namespace api +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/sbregister.h b/slobrok/src/vespa/slobrok/sbregister.h new file mode 100644 index 00000000000..6810a4d506b --- /dev/null +++ b/slobrok/src/vespa/slobrok/sbregister.h @@ -0,0 +1,105 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/util/gencnt.h> +#include "backoff.h" +#include "sblist.h" +#include "cfg.h" +#include <string> +#include <vector> + +namespace slobrok { +namespace api { + +/** + * @brief A RegisterAPI object is used to register and unregister + * services with a slobrok cluster. + * + * The register/unregister operations performed against this object + * are stored in a todo list that will be performed asynchronously + * against the slobrok cluster as soon as possible. + **/ +class RegisterAPI : public FNET_Task, + public FRT_IRequestWait +{ +public: + /** + * @brief Create a new RegisterAPI using the given Supervisor and config. + * + * @param orb the Supervisor to use + * @param config used to obtain the slobrok connect spec list + **/ + RegisterAPI(FRT_Supervisor &orb, const ConfiguratorFactory & config); + + /** + * @brief Clean up (deregisters all service names). + **/ + ~RegisterAPI(); + + /** + * @brief Register a service with the slobrok cluster. + * @param name service name to register + **/ + void registerName(const vespalib::stringref & name); + + /** + * @brief Unregister a service with the slobrok cluster + * @param name service name to unregister + **/ + void unregisterName(const vespalib::stringref & name); + + /** + * @brief Check progress + * + * @return true if there are outstanding registration requests + **/ + bool busy() const { return _busy; } + +private: + class RPCHooks: public FRT_Invokable + { + private: + RegisterAPI &_owner; + void rpc_listNamesServed(FRT_RPCRequest *req); + void rpc_notifyUnregistered(FRT_RPCRequest *req); + public: + RPCHooks(RegisterAPI &owner); + ~RPCHooks(); + }; + friend class RPCHooks; + + RegisterAPI(const RegisterAPI &); + RegisterAPI &operator=(const RegisterAPI &); + + bool match(const char *name, const char *pattern); + + /** from FNET_Task, poll slobrok **/ + void PerformTask(); + void handleReqDone(); // implementation detail of PerformTask + void handleReconnect(); // implementation detail of PerformTask + void handlePending(); // implementation detail of PerformTask + + /** from FRT_IRequestWait **/ + void RequestDone(FRT_RPCRequest *req); + + FRT_Supervisor &_orb; + RPCHooks _hooks; + FastOS_Mutex _lock; + bool _reqDone; + bool _busy; + SlobrokList _slobrokSpecs; + Configurator::UP _configurator; + vespalib::string _currSlobrok; + uint32_t _idx; + BackOff _backOff; + std::vector<vespalib::string> _names; // registered service names + std::vector<vespalib::string> _pending; // pending service name registrations + std::vector<vespalib::string> _unreg; // pending service name unregistrations + FRT_Target *_target; + FRT_RPCRequest *_req; +}; + +} // namespace api +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/.gitignore b/slobrok/src/vespa/slobrok/server/.gitignore new file mode 100644 index 00000000000..afc60a4ae1d --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/.gitignore @@ -0,0 +1,8 @@ +.depend +Makefile +check_slobrok +rpc-method-list +sbcmd +slobrok +tstdst +/libslobrokserver.so.5.1 diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt new file mode 100644 index 00000000000..d20f6850518 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(slobrok_slobrokserver + SOURCES + cmd.cpp + exchange_manager.cpp + history.cpp + i_rpc_server_manager.cpp + i_monitored_server.cpp + managed_rpc_server.cpp + monitor.cpp + named_service.cpp + remote_check.cpp + remote_slobrok.cpp + reserved_name.cpp + rpc_server_manager.cpp + rpc_server_map.cpp + rpchooks.cpp + sbenv.cpp + selfcheck.cpp + slobrokserver.cpp + visible_map.cpp + vtag.cpp + metrics_producer.cpp + INSTALL lib64 + DEPENDS +) diff --git a/slobrok/src/vespa/slobrok/server/cmd.cpp b/slobrok/src/vespa/slobrok/server/cmd.cpp new file mode 100644 index 00000000000..b83bcbe2fba --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/cmd.cpp @@ -0,0 +1,134 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".cmd"); + +#include <vespa/fnet/frt/frt.h> + +#include "cmd.h" +#include "ok_state.h" +#include "named_service.h" +#include "reserved_name.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "remote_slobrok.h" +#include "sbenv.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +struct RegRpcSrvData +{ +private: + RegRpcSrvData(const RegRpcSrvData &); + RegRpcSrvData &operator=(const RegRpcSrvData &); + +public: + enum { + RDC_INIT, XCH_WANTADD, CHK_RPCSRV, XCH_DOADD, XCH_IGNORE, RDC_INVAL + } _state; + + SBEnv &env; + const std::string name; + const std::string spec; + FRT_RPCRequest * const registerRequest; + + RegRpcSrvData(SBEnv &e, const char *n, const char *s, FRT_RPCRequest *r) + : _state(RDC_INIT), env(e), name(n), spec(s), registerRequest(r) + {} +}; + +RegRpcSrvCommand +RegRpcSrvCommand::makeRegRpcSrvCmd(SBEnv &env, + const char *name, const char *spec, + FRT_RPCRequest *req) +{ + RegRpcSrvData *data = new RegRpcSrvData(env, name, spec, req); + return RegRpcSrvCommand(data); +} + +RegRpcSrvCommand +RegRpcSrvCommand::makeRemRemCmd(SBEnv &env, + const char *name, const char *spec) +{ + RegRpcSrvData *data = new RegRpcSrvData(env, name, spec, NULL); + data->_state = RegRpcSrvData::XCH_IGNORE; + return RegRpcSrvCommand(data); +} + + +void +RegRpcSrvCommand::doRequest() +{ + LOG_ASSERT(_data->_state == RegRpcSrvData::RDC_INIT); + doneHandler(OkState()); +} + +void +RegRpcSrvCommand::cleanupReservation() +{ + RpcServerMap &map = _data->env._rpcsrvmap; + ReservedName *rsvp = map.getReservation(_data->name.c_str()); + if (rsvp != NULL && rsvp->isLocal) { + map.removeReservation(_data->name.c_str()); + } +} + +void +RegRpcSrvCommand::doneHandler(OkState result) +{ + LOG_ASSERT(_data != NULL); + + if (result.failed()) { + LOG(warning, "failed in state %d: %s", + _data->_state, result.errorMsg.c_str()); + cleanupReservation(); + // XXX should handle different state errors differently? + if (_data->registerRequest != NULL) { + _data->registerRequest->SetError(FRTE_RPC_METHOD_FAILED, + result.errorMsg.c_str()); + _data->registerRequest->Return(); + } else { + LOG(warning, "ignored: %s", result.errorMsg.c_str()); + } + goto alldone; + } + if (_data->_state == RegRpcSrvData::RDC_INIT) { + LOG(spam, "phase wantAdd(%s,%s)", + _data->name.c_str(), _data->spec.c_str()); + _data->_state = RegRpcSrvData::XCH_WANTADD; + _data->env._exchanger.wantAdd(_data->name.c_str(), _data->spec.c_str(), *this); + return; + } else if (_data->_state == RegRpcSrvData::XCH_WANTADD) { + LOG(spam, "phase addManaged(%s,%s)", + _data->name.c_str(), _data->spec.c_str()); + _data->_state = RegRpcSrvData::CHK_RPCSRV; + _data->env._rpcsrvmanager.addManaged(_data->name.c_str(), _data->spec.c_str(), *this); + return; + } else if (_data->_state == RegRpcSrvData::CHK_RPCSRV) { + LOG(spam, "phase doAdd(%s,%s)", _data->name.c_str(), _data->spec.c_str()); + _data->_state = RegRpcSrvData::XCH_DOADD; + _data->env._exchanger.doAdd(_data->name.c_str(), _data->spec.c_str(), *this); + return; + } else if (_data->_state == RegRpcSrvData::XCH_DOADD) { + LOG(debug, "done doAdd(%s,%s)", _data->name.c_str(), _data->spec.c_str()); + _data->_state = RegRpcSrvData::RDC_INVAL; + // all OK + _data->registerRequest->Return(); + goto alldone; + } else if (_data->_state == RegRpcSrvData::XCH_IGNORE) { + goto alldone; + } + // no other state should be possible + abort(); + alldone: + cleanupReservation(); + delete _data; + _data = NULL; +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/cmd.h b/slobrok/src/vespa/slobrok/server/cmd.h new file mode 100644 index 00000000000..f8147868e20 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/cmd.h @@ -0,0 +1,55 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "ok_state.h" + +class FRT_RPCRequest; + +namespace slobrok { + +class SBEnv; + +struct RegRpcSrvData; + +/** + * @class RegRpcSrvCommand + * @brief Small "script" to handle the various stages of a registration + * + * XXX should change name, also used for other tasks + **/ +class RegRpcSrvCommand +{ +private: + RegRpcSrvData *_data; + RegRpcSrvCommand(RegRpcSrvData *data) : _data(data) {} + void cleanupReservation(); +public: + virtual void doneHandler(OkState result); + void doRequest(); + + RegRpcSrvCommand(const RegRpcSrvCommand &rhs) + : _data(rhs._data) + { + } + + virtual ~RegRpcSrvCommand() {} + + RegRpcSrvCommand& operator=(const RegRpcSrvCommand &rhs) + { + _data = rhs._data; + return *this; + } + + static RegRpcSrvCommand makeRegRpcSrvCmd(SBEnv &env, + const char *name, + const char *spec, + FRT_RPCRequest *req); + static RegRpcSrvCommand makeRemRemCmd(SBEnv &env, + const char *name, + const char *spec); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/configshim.h b/slobrok/src/vespa/slobrok/server/configshim.h new file mode 100644 index 00000000000..e1c3cb38d80 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/configshim.h @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fastos/fastos.h> +#include <string> +#include <vespa/slobrok/cfg.h> + +namespace slobrok { + +class ConfigShim +{ +private: + uint32_t _port; + uint32_t _statePort; + std::string _configId; + ConfiguratorFactory _factory; + +public: + ConfigShim(uint32_t port) + : _port(port), _statePort(0), _configId(""), + _factory(config::ConfigUri::createEmpty()) {} + + ConfigShim(uint32_t port, uint32_t statePort_in, const std::string& cfgId) + : _port(port), _statePort(statePort_in), _configId(cfgId), + _factory(config::ConfigUri(_configId)) {} + + ConfigShim(uint32_t port, const std::string& cfgId, + config::IConfigContext::SP cfgCtx) + : _port(port), _statePort(0), _configId(cfgId), + _factory(config::ConfigUri(cfgId, cfgCtx)) {} + + uint32_t portNumber() const { return _port; } + + uint32_t statePort() const { return _statePort; } + + std::string configId() const { return _configId; } + + const char *id() const { return _configId.c_str(); } + + const ConfiguratorFactory & factory() const { return _factory; } +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp new file mode 100644 index 00000000000..197a1dd8a18 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -0,0 +1,286 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcserver"); + +#include <vector> +#include <deque> +#include <stdlib.h> + +#include "exchange_manager.h" + +#include "ok_state.h" +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "remote_slobrok.h" +#include "sbenv.h" +#include "cmd.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +ExchangeManager::ExchangeManager(SBEnv &env, RpcServerMap &rpcsrvmap) + : _partners(NULL), + _env(env), + _rpcsrvmanager(env._rpcsrvmanager), + _rpcsrvmap(rpcsrvmap) +{ +} + + +OkState +ExchangeManager::addPartner(const char *name, const char *spec) +{ + RemoteSlobrok *oldremote = _partners[name]; + if (oldremote != NULL) { + // already a partner, should be OK + if (strcmp(spec, oldremote->getSpec()) != 0) { + return OkState(FRTE_RPC_METHOD_FAILED, + "name already partner with different spec"); + } + // this is probably a good time to try connecting again + if (! oldremote->isConnected()) { + oldremote->tryConnect(); + } + return OkState(); + } + + RemoteSlobrok *partner = new RemoteSlobrok(name, spec, *this); + LOG_ASSERT(_partners.isSet(name) == false); + _partners.set(name, partner); + partner->tryConnect(); + return OkState(); +} + + +void +ExchangeManager::removePartner(const char *name) +{ + // assuming checks already done + RemoteSlobrok *oldremote = _partners.remove(name); + LOG_ASSERT(oldremote != NULL); + delete oldremote; +} + + +std::vector<std::string> +ExchangeManager::getPartnerList() +{ + std::vector<std::string> partnerList; + HashMap<RemoteSlobrok *>::Iterator itr = _partners.iterator(); + for (; itr.valid(); itr.next()) { + partnerList.push_back(std::string(itr.value()->getSpec())); + } + return partnerList; +} + + +void +ExchangeManager::forwardRemove(const char *name, const char *spec) +{ + RegRpcSrvCommand remremhandler + = RegRpcSrvCommand::makeRemRemCmd(_env, name, spec); + WorkPackage *package = new WorkPackage(WorkPackage::OP_REMOVE, + name, spec, *this, + remremhandler); + HashMap<RemoteSlobrok *>::Iterator it = _partners.iterator(); + while (it.valid()) { + RemoteSlobrok *partner = it.value(); + package->addItem(partner); + it.next(); + } + package->expedite(); +} + +void +ExchangeManager::doAdd(const char *name, const char *spec, + RegRpcSrvCommand rdc) +{ + HashMap<RemoteSlobrok *>::Iterator it = + _partners.iterator(); + + WorkPackage *package = + new WorkPackage(WorkPackage::OP_DOADD, name, spec, *this, rdc); + + while (it.valid()) { + RemoteSlobrok *partner = it.value(); + package->addItem(partner); + it.next(); + } + package->expedite(); +} + + +void +ExchangeManager::wantAdd(const char *name, const char *spec, + RegRpcSrvCommand rdc) +{ + WorkPackage *package = + new WorkPackage(WorkPackage::OP_WANTADD, name, spec, *this, rdc); + HashMap<RemoteSlobrok *>::Iterator it = _partners.iterator(); + while (it.valid()) { + RemoteSlobrok *partner = it.value(); + package->addItem(partner); + it.next(); + } + package->expedite(); +} + + +void +ExchangeManager::healthCheck() +{ + int i=0; + HashMap<RemoteSlobrok *>::Iterator it = _partners.iterator(); + while (it.valid()) { + RemoteSlobrok *partner = it.value(); + partner->healthCheck(); + it.next(); + i++; + } + LOG(debug, "ExchangeManager::healthCheck for %d partners", i); +} + +//----------------------------------------------------------------------------- + +ExchangeManager::WorkPackage::WorkItem::WorkItem(WorkPackage &pkg, + RemoteSlobrok *rem, + FRT_RPCRequest *req) + : _pkg(pkg), _pendingReq(req), _remslob(rem) +{ +} + +void +ExchangeManager::WorkPackage::WorkItem::RequestDone(FRT_RPCRequest *req) +{ + bool denied = false; + LOG_ASSERT(req == _pendingReq); + FRT_Values &answer = *(req->GetReturn()); + + if (!req->IsError() && strcmp(answer.GetTypeString(), "is") == 0) { + if (answer[0]._intval32 != 0) { + LOG(warning, "request denied: %s [%d]", + answer[1]._string._str, answer[0]._intval32); + denied = true; + } else { + LOG(spam, "request approved"); + } + } else { + LOG(warning, "error doing workitem: %s", req->GetErrorMessage()); + // XXX tell remslob? + } + req->SubRef(); + _pendingReq = NULL; + _pkg.doneItem(denied); +} + +void +ExchangeManager::WorkPackage::WorkItem::expedite() +{ + _remslob->invokeAsync(_pendingReq, 2.0, this); +} + +ExchangeManager::WorkPackage::WorkItem::~WorkItem() +{ + if (_pendingReq != NULL) { + _pendingReq->Abort(); + LOG_ASSERT(_pendingReq == NULL); + } +} + + +ExchangeManager::WorkPackage::WorkPackage(op_type op, + const char *name, const char *spec, + ExchangeManager &exchanger, + RegRpcSrvCommand donehandler) + : _work(), + _doneCnt(0), + _numDenied(0), + _donehandler(donehandler), + _exchanger(exchanger), + _optype(op), + _name(name), + _spec(spec) +{ +} + +ExchangeManager::WorkPackage::~WorkPackage() +{ + for (size_t i = 0; i < _work.size(); i++) { + delete _work[i]; + _work[i] = NULL; + } +} + +void +ExchangeManager::WorkPackage::doneItem(bool denied) +{ + ++_doneCnt; + if (denied) { + ++_numDenied; + } + LOG(spam, "package done %d/%d, %d denied", + (int)_doneCnt, (int)_work.size(), (int)_numDenied); + if (_doneCnt == _work.size()) { + if (_numDenied > 0) { + _donehandler.doneHandler(OkState(_numDenied, "denied by remote")); + } else { + _donehandler.doneHandler(OkState()); + } + delete this; + } +} + + +void +ExchangeManager::WorkPackage::addItem(RemoteSlobrok *partner) +{ + if (! partner->isConnected()) { + return; + } + FRT_RPCRequest *r = _exchanger._env.getSupervisor()->AllocRPCRequest(); + // XXX should recheck rpcsrvmap again + if (_optype == OP_REMOVE) { + r->SetMethodName("slobrok.internal.doRemove"); + } else if (_optype == OP_WANTADD) { + r->SetMethodName("slobrok.internal.wantAdd"); + } else if (_optype == OP_DOADD) { + r->SetMethodName("slobrok.internal.doAdd"); + } + r->GetParams()->AddString(_exchanger._env.mySpec()); + r->GetParams()->AddString(_name.c_str()); + r->GetParams()->AddString(_spec.c_str()); + + WorkItem *item = new WorkItem(*this, partner, r); + _work.push_back(item); + LOG(spam, "added %s(%s,%s,%s) for %s to workpackage", + r->GetMethodName(), _exchanger._env.mySpec(), + _name.c_str(), _spec.c_str(), partner->getName()); +} + + +void +ExchangeManager::WorkPackage::expedite() +{ + size_t sz = _work.size(); + if (sz == 0) { + // no remotes need doing. + _donehandler.doneHandler(OkState()); + delete this; + return; + } + for (size_t i = 0; i < sz; i++) { + _work[i]->expedite(); + // note that on the last iteration + // this object may be deleted if + // the RPC fails synchronously. + } +} + +//----------------------------------------------------------------------------- + + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.h b/slobrok/src/vespa/slobrok/server/exchange_manager.h new file mode 100644 index 00000000000..b13d64e64f0 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.h @@ -0,0 +1,123 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <deque> +#include <string> + +#include <vespa/fnet/frt/frt.h> + +#include <vespa/vespalib/util/hashmap.h> +#include "ok_state.h" +#include "cmd.h" +#include "remote_slobrok.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +class SBEnv; +class RpcServerMap; +class RpcServerManager; + +using vespalib::HashMap; + +//----------------------------------------------------------------------------- + +/** + * @class ExchangeManager + * @brief Keeps track of and talks to all remote location brokers + * + * Handles a collection of RemoteSlobrok objects; contains classes and + * methods for operating on all remote slobroks in parallel. + **/ +class ExchangeManager +{ +private: + ExchangeManager(const ExchangeManager &); // Not used + ExchangeManager &operator=(const ExchangeManager &); // Not used + + HashMap<RemoteSlobrok *> _partners; + + class WorkPackage; + + class IWorkPkgWait + { + public: + virtual void donePackage(WorkPackage *pkg, bool somedenied) = 0; + virtual ~IWorkPkgWait() {} + }; + + class WorkPackage + { + private: + WorkPackage(const WorkPackage&); // not used + WorkPackage& operator= (const WorkPackage&); // not used + + class WorkItem: public FRT_IRequestWait + { + private: + WorkPackage &_pkg; + FRT_RPCRequest *_pendingReq; + RemoteSlobrok *_remslob; + + WorkItem(const WorkItem&); // not used + WorkItem& operator= (const WorkItem&); // not used + public: + void expedite(); + virtual void RequestDone(FRT_RPCRequest *req); + WorkItem(WorkPackage &pkg, + RemoteSlobrok *rem, + FRT_RPCRequest *req); + ~WorkItem(); + }; + std::vector<WorkItem *> _work; + size_t _doneCnt; + size_t _numDenied; + RegRpcSrvCommand _donehandler; + public: + ExchangeManager &_exchanger; + enum op_type { OP_NOP, OP_WANTADD, OP_DOADD, OP_REMOVE }; + op_type _optype; + const std::string _name; + const std::string _spec; + void addItem(RemoteSlobrok *partner); + void doneItem(bool denied); + void expedite(); + WorkPackage(op_type op, + const char *name, const char *spec, + ExchangeManager &exchanger, + RegRpcSrvCommand donehandler); + ~WorkPackage(); + }; + +public: + ExchangeManager(SBEnv &env, RpcServerMap &rpcsrvmap); + ~ExchangeManager() {} + + SBEnv &_env; + RpcServerManager &_rpcsrvmanager; + RpcServerMap &_rpcsrvmap; + + OkState addPartner(const char *name, const char *spec); + void removePartner(const char *name); + std::vector<std::string> getPartnerList(); + + void registerFrom(RemoteSlobrok *partner); + void reregisterTo(RemoteSlobrok *partner); + + void forwardRemove(const char *name, const char *spec); + + void wantAdd(const char *name, const char *spec, RegRpcSrvCommand rdc); + void doAdd(const char *name, const char *spec, RegRpcSrvCommand rdc); + + RemoteSlobrok *lookupPartner(const char *name) const { + return _partners[name]; + } + + void healthCheck(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/history.cpp b/slobrok/src/vespa/slobrok/server/history.cpp new file mode 100644 index 00000000000..fa2bf9c9023 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/history.cpp @@ -0,0 +1,75 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".history"); + +#include "history.h" + +namespace slobrok { + +void +History::verify() const +{ + if (_entries.size() > 0) { + citer_t i = _entries.begin(); + vespalib::GenCnt gen = i->gen; + + while (++i != _entries.end()) { + gen.add(); + LOG_ASSERT(gen == i->gen); + } + } +} + +void +History::add(const char *name, vespalib::GenCnt gen) +{ + HistoryEntry h; + _entries.push_back(h); + _entries.back().name = name; + _entries.back().gen = gen; + + if (_entries.size() > 1500) { + _entries.erase(_entries.begin(), _entries.begin() + 500); + LOG(debug, "history size after trim: %lu", + (unsigned long)_entries.size()); + } + verify(); +} + + +bool +History::has(vespalib::GenCnt gen) const +{ + if (_entries.size() == 0) + return false; + + vespalib::GenCnt first = _entries.front().gen; + vespalib::GenCnt last = _entries.back().gen; + + return gen.inRangeInclusive(first, last); +} + + +std::set<std::string> +History::since(vespalib::GenCnt gen) const +{ + citer_t i = _entries.begin(); + citer_t end = _entries.end(); + while (i != end) { + if (i->gen == gen) break; + ++i; + } + std::set<std::string> ret; + while (i != end) { + ret.insert(i->name); + ++i; + } + LOG_ASSERT(ret.size() > 0); + return ret; +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/history.h b/slobrok/src/vespa/slobrok/server/history.h new file mode 100644 index 00000000000..9898d5b21cc --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/history.h @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/gencnt.h> +#include <vespa/vespalib/util/hashmap.h> +#include <vector> +#include <string> +#include <set> + +namespace slobrok { + +class History +{ +private: + struct HistoryEntry { + std::string name; + vespalib::GenCnt gen; + }; + + std::vector<HistoryEntry> _entries; + + typedef std::vector<HistoryEntry>::iterator iter_t; + typedef std::vector<HistoryEntry>::const_iterator citer_t; + + void verify() const; +public: + void add(const char *name, vespalib::GenCnt gen); + + bool has(vespalib::GenCnt gen) const; + + std::set<std::string> since(vespalib::GenCnt gen) const; + + History() : _entries() {} + ~History() {} +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/i_monitored_server.cpp b/slobrok/src/vespa/slobrok/server/i_monitored_server.cpp new file mode 100644 index 00000000000..b027d4d5859 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/i_monitored_server.cpp @@ -0,0 +1,2 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "i_monitored_server.h" diff --git a/slobrok/src/vespa/slobrok/server/i_monitored_server.h b/slobrok/src/vespa/slobrok/server/i_monitored_server.h new file mode 100644 index 00000000000..1bfe52387e0 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/i_monitored_server.h @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> + +namespace slobrok { + +//----------------------------------------------------------------------------- + +/** + * @class IMonitoredServer + * @brief A server that is monitored by a Monitor object + * + * interface that must be implemented by owners of Monitor objects. + **/ +class IMonitoredServer +{ +public: + virtual void notifyDisconnected() = 0; // lost connection to service + virtual ~IMonitoredServer() {} +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.cpp b/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.cpp new file mode 100644 index 00000000000..8476e5a25b1 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.cpp @@ -0,0 +1,2 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "i_rpc_server_manager.h" diff --git a/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.h b/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.h new file mode 100644 index 00000000000..b093acd7c2a --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/i_rpc_server_manager.h @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> + +#include "monitor.h" + +#include <string> + +namespace slobrok { + +class ManagedRpcServer; + +//----------------------------------------------------------------------------- + +/** + * @class IRpcServerManager + * @brief A manager for ManagedRpcServer objects. + * + * Interface class. + **/ + +class IRpcServerManager +{ +public: + virtual void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) = 0; + virtual void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) = 0; + virtual FRT_Supervisor *getSupervisor() = 0; + virtual ~IRpcServerManager() {} +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp new file mode 100644 index 00000000000..88414e7ab19 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp @@ -0,0 +1,131 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcserver"); + +#include "managed_rpc_server.h" +#include "i_rpc_server_manager.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +ManagedRpcServer::ManagedRpcServer(const char *name, + const char *spec, + IRpcServerManager &manager) + : NamedService(name, spec), + _mmanager(manager), + _monitor(*this, *manager.getSupervisor()), + _monitoredServer(NULL), + _checkServerReq(NULL) +{ +} + + +void +ManagedRpcServer::healthCheck() +{ + if (_monitoredServer == NULL) { + _monitoredServer = _mmanager.getSupervisor()->GetTarget(_spec.c_str()); + } + if (_checkServerReq == NULL) { + _checkServerReq = _mmanager.getSupervisor()->AllocRPCRequest(); + _checkServerReq->SetMethodName("slobrok.callback.listNamesServed"); + _monitoredServer->InvokeAsync(_checkServerReq, 25.0, this); + } +} + + +ManagedRpcServer::~ManagedRpcServer() +{ + LOG(debug, "(role[%s].~ManagedRpcServer)", _name.c_str()); + cleanupMonitor(); +} + + +void +ManagedRpcServer::cleanupMonitor() +{ + _monitor.disable(); + if (_monitoredServer != NULL) { + _monitoredServer->SubRef(); + _monitoredServer = NULL; + } + if (_checkServerReq != NULL) { + _checkServerReq->Abort(); + // _checkServerReq cleared by RequestDone Method + LOG_ASSERT(_checkServerReq == NULL); + } +} + +void +ManagedRpcServer::notifyDisconnected() +{ + cleanupMonitor(); + _mmanager.notifyFailedRpcSrv(this, "disconnected"); +} + + +bool +ManagedRpcServer::validateRpcServer(uint32_t numstrings, + FRT_StringValue *strings) +{ + for (uint32_t i = 0; i < numstrings; ++i) { + if (strcmp(strings[i]._str, _name.c_str()) == 0) { + return true; + } + } + LOG(info, "REMOVE: server at %s did not have %s in listNamesServed values", + _spec.c_str(), _name.c_str()); + return false; +} + + +void +ManagedRpcServer::RequestDone(FRT_RPCRequest *req) +{ + LOG_ASSERT(req == _checkServerReq); + FRT_Values &answer = *(req->GetReturn()); + + if (req->GetErrorCode() == FRTE_RPC_ABORT) { + LOG(debug, "rpcserver[%s].check aborted", getName()); + req->SubRef(); + _checkServerReq = NULL; + return; + } + + if (req->IsError() + || strcmp(answer.GetTypeString(), "S") != 0 + || ! validateRpcServer(answer[0]._string_array._len, + answer[0]._string_array._pt)) + { + std::string errmsg; + if (req->IsError()) { + errmsg = req->GetErrorMessage(); + } else if (strcmp(answer.GetTypeString(), "S") != 0) { + errmsg = "checkServer wrong return: "; + errmsg += answer.GetTypeString(); + } else { + errmsg = "checkServer failed validation"; + } + req->SubRef(); + _checkServerReq = NULL; + cleanupMonitor(); + _mmanager.notifyFailedRpcSrv(this, errmsg); + return; + } + + // start monitoring connection to server + LOG_ASSERT(_monitoredServer != NULL); + _monitor.enable(_monitoredServer); + + req->SubRef(); + _checkServerReq = NULL; + _mmanager.notifyOkRpcSrv(this); +} + + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/managed_rpc_server.h b/slobrok/src/vespa/slobrok/server/managed_rpc_server.h new file mode 100644 index 00000000000..07e0bdaf947 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/managed_rpc_server.h @@ -0,0 +1,60 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> + +#include "monitor.h" +#include "named_service.h" + +#include <string> + +namespace slobrok { + +//----------------------------------------------------------------------------- + +class IRpcServerManager; + +/** + * @class ManagedRpcServer + * @brief A NamedService that is managed by this location broker + * + * This class contains the logic to monitor the connection to a + * NamedService and also to do a healthCheck using the RPC method + * slobrok.checkRpcServer on the connection, notifying its + * manager using callbacks in the IRpcServerManager interface. + **/ + +class ManagedRpcServer: public NamedService, + public FRT_IRequestWait, + public IMonitoredServer +{ +private: + ManagedRpcServer(const ManagedRpcServer&); // Not used + ManagedRpcServer& operator=(const ManagedRpcServer&); // Not used + +public: + ManagedRpcServer(const char *name, + const char *spec, + IRpcServerManager &manager); + virtual ~ManagedRpcServer(); + + void healthCheck(); + +private: + IRpcServerManager &_mmanager; + Monitor _monitor; + FRT_Target *_monitoredServer; + FRT_RPCRequest *_checkServerReq; + + void cleanupMonitor(); + bool validateRpcServer(uint32_t numstrings, + FRT_StringValue *strings); +public: + virtual void RequestDone(FRT_RPCRequest *req); + virtual void notifyDisconnected(); // lost connection to service +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/metrics_producer.cpp b/slobrok/src/vespa/slobrok/server/metrics_producer.cpp new file mode 100644 index 00000000000..a30bdb8233f --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/metrics_producer.cpp @@ -0,0 +1,134 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "metrics_producer.h" + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/data/slime/slime.h> + +namespace slobrok { + +namespace { + +class MetricsSnapshotter : public FNET_Task +{ + MetricsProducer &_owner; + + void PerformTask() { + _owner.snapshot(); + Schedule(60.0); + } +public: + MetricsSnapshotter(FNET_Transport &transport, MetricsProducer &owner) + : FNET_Task(transport.GetScheduler()), + _owner(owner) + { + Schedule(60.0); + } + + virtual ~MetricsSnapshotter() { Kill(); } +}; + +class MetricSnapshot +{ +private: + vespalib::Slime _data; + vespalib::slime::Cursor& _metrics; + vespalib::slime::Cursor& _values; + double _snapLen; + +public: + MetricSnapshot(uint32_t prevTime, uint32_t currTime); + void addCount(const char *name, const char *desc, uint32_t count); + + vespalib::string asString() const { + return _data.toString(); + } +}; + +MetricSnapshot::MetricSnapshot(uint32_t prevTime, uint32_t currTime) + : _data(), + _metrics(_data.setObject()), + _values(_metrics.setArray("values")), + _snapLen(currTime - prevTime) +{ + vespalib::slime::Cursor& snapshot = _metrics.setObject("snapshot"); + snapshot.setLong("from", prevTime); + snapshot.setLong("to", currTime); + if (_snapLen < 1.0) { + _snapLen = 1.0; + } +} + + +void +MetricSnapshot::addCount(const char *name, const char *desc, uint32_t count) +{ + using namespace vespalib::slime::convenience; + Cursor& value = _values.addObject(); + value.setString("name", name); + value.setString("description", desc); + Cursor& inner = value.setObject("values"); + inner.setLong("count", count); + inner.setDouble("rate", count / _snapLen); +} + +vespalib::string +makeSnapshot(const RPCHooks::Metrics &prev, const RPCHooks::Metrics &curr, + uint32_t prevTime, uint32_t currTime) +{ + MetricSnapshot snapshot(prevTime, currTime); + snapshot.addCount("slobrok.requests.register", + "count of register requests received", + curr.registerReqs - prev.registerReqs); + snapshot.addCount("slobrok.requests.mirror", + "count of mirroring requests received", + curr.mirrorReqs - prev.mirrorReqs); + snapshot.addCount("slobrok.requests.admin", + "count of administrative requests received", + curr.adminReqs - prev.adminReqs); + return snapshot.asString(); +} + +} // namespace <unnamed> + + +MetricsProducer::MetricsProducer(const RPCHooks &hooks, + FNET_Transport &transport) + : _rpcHooks(hooks), + _lastMetrics{ 0, 0, 0, 0, 0, 0, 0}, + _producer(), + _startTime(time(NULL)), + _lastSnapshotStart(_startTime), + _snapshotter(new MetricsSnapshotter(transport, *this)) +{ +} + +vespalib::string +MetricsProducer::getMetrics(const vespalib::string &consumer) +{ + return _producer.getMetrics(consumer); +} + +vespalib::string +MetricsProducer::getTotalMetrics(const vespalib::string &) +{ + uint32_t now = time(NULL); + RPCHooks::Metrics current = _rpcHooks.getMetrics(); + RPCHooks::Metrics start{0, 0, 0, 0, 0, 0, 0}; + return makeSnapshot(start, current, _startTime, now); +} + + +void +MetricsProducer::snapshot() +{ + uint32_t now = time(NULL); + RPCHooks::Metrics current = _rpcHooks.getMetrics(); + _producer.setMetrics(makeSnapshot(_lastMetrics, current, _lastSnapshotStart, now)); + _lastMetrics = current; + _lastSnapshotStart = now; +} + + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/metrics_producer.h b/slobrok/src/vespa/slobrok/server/metrics_producer.h new file mode 100644 index 00000000000..ca4a222a3cb --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/metrics_producer.h @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "rpchooks.h" +#include <vespa/vespalib/net/metrics_producer.h> +#include <vespa/vespalib/net/simple_metrics_producer.h> +#include <vespa/fnet/frt/frt.h> + +namespace slobrok { + +class MetricsProducer : public vespalib::MetricsProducer +{ +private: + const RPCHooks &_rpcHooks; + RPCHooks::Metrics _lastMetrics; + vespalib::SimpleMetricsProducer _producer; + uint32_t _startTime; + uint32_t _lastSnapshotStart; + std::unique_ptr<FNET_Task> _snapshotter; + +public: + vespalib::string getMetrics(const vespalib::string &consumer) override; + vespalib::string getTotalMetrics(const vespalib::string &consumer) override; + + void snapshot(); + + MetricsProducer(const RPCHooks &hooks, FNET_Transport &transport); +}; + + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/monitor.cpp b/slobrok/src/vespa/slobrok/server/monitor.cpp new file mode 100644 index 00000000000..6ff7d7dd422 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/monitor.cpp @@ -0,0 +1,97 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".monitor"); + +#include "monitor.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +Monitor::Monitor(IMonitoredServer &server, FRT_Supervisor &supervisor) + : FNET_Task(supervisor.GetScheduler()), + _monitoredServer(server), + _channel(NULL), + _enabled(false) +{ +} + + +Monitor::~Monitor() +{ + Kill(); // will deadlock if called from task + disconnect(); +} + + +void +Monitor::enable(FRT_Target *monitorTarget) +{ + LOG_ASSERT(monitorTarget != NULL); + Unschedule(); + disconnect(); + _enabled = true; + FNET_Connection *conn = monitorTarget->GetConnection(); + if (conn != NULL) { + _channel = conn->OpenChannel(this, FNET_Context()); + } + if (_channel == NULL) { + ScheduleNow(); + } else { + _channel->SetContext(FNET_Context(_channel)); + } +} + + +void +Monitor::PerformTask() +{ + if (_enabled) { + _monitoredServer.notifyDisconnected(); + } +} + + +void +Monitor::disable() +{ + _enabled = false; + disconnect(); +} + + +void +Monitor::disconnect() +{ + if (_channel != NULL) { + _channel->SetContext(FNET_Context((FNET_Channel *)0)); + if (_channel->GetConnection()->GetState() <= FNET_Connection::FNET_CONNECTED) { + _channel->CloseAndFree(); + } + _channel = NULL; + } +} + + +FNET_IPacketHandler::HP_RetCode +Monitor::HandlePacket(FNET_Packet *packet, + FNET_Context context) +{ + if (context._value.CHANNEL == 0) { + packet->Free(); + return FNET_FREE_CHANNEL; + } + if (!packet->IsChannelLostCMD()) { + packet->Free(); + return FNET_KEEP_CHANNEL; + } + _channel = NULL; + ScheduleNow(); + return FNET_FREE_CHANNEL; +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/monitor.h b/slobrok/src/vespa/slobrok/server/monitor.h new file mode 100644 index 00000000000..990684df9fc --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/monitor.h @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> + +#include "i_monitored_server.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +/** + * @class Monitor + * @brief Utility for monitoring an FNET connection + * + * Connection failure is reported via notifyDisconnected() + * to the owner. + **/ +class Monitor : public FNET_IPacketHandler, + public FNET_Task +{ +private: + IMonitoredServer &_monitoredServer; + FNET_Channel *_channel; + bool _enabled; + Monitor(const Monitor&); + Monitor &operator=(const Monitor&); +public: + explicit Monitor(IMonitoredServer& owner, + FRT_Supervisor &supervisor); + virtual ~Monitor(); + void enable(FRT_Target *monitorTarget); + void disable(); +private: + void disconnect(); + virtual HP_RetCode HandlePacket(FNET_Packet *packet, + FNET_Context context); + virtual void PerformTask(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/named_service.cpp b/slobrok/src/vespa/slobrok/server/named_service.cpp new file mode 100644 index 00000000000..f46acd700d2 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/named_service.cpp @@ -0,0 +1,29 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcserver"); + +#include "named_service.h" +#include "i_rpc_server_manager.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +NamedService::NamedService(const char *name, + const char *spec) + : _name(name), + _spec(spec) +{ +} + + +NamedService::~NamedService() +{ + LOG(spam, "(role[%s].~NamedService)", _name.c_str()); +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/named_service.h b/slobrok/src/vespa/slobrok/server/named_service.h new file mode 100644 index 00000000000..f5221b75657 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/named_service.h @@ -0,0 +1,41 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <string> + +namespace slobrok { + +//----------------------------------------------------------------------------- + +/** + * @class NamedService + * @brief Represents a server with a name and a connection specification. + * + * a NamedService is always part of a collection implementing the + * IRpcSrvCollection interface. + **/ + +class NamedService +{ +private: + NamedService(const NamedService &); // Not used + NamedService &operator=(const NamedService &); // Not used + +protected: + std::string _name; + std::string _spec; + +public: + NamedService(const char *name, + const char *spec); + virtual ~NamedService(); + + const char *getName() const { return _name.c_str(); } + const char *getSpec() const { return _spec.c_str(); } +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/ok_state.h b/slobrok/src/vespa/slobrok/server/ok_state.h new file mode 100644 index 00000000000..fdfee95811e --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/ok_state.h @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> + +namespace slobrok { + +/** + * @struct OkState + * @brief object representing the result status of a request + * + * Contains an error code (0 means success) and an error message string. + **/ +struct OkState +{ + const uint32_t errorCode; + const std::string errorMsg; + + OkState(uint32_t code, std::string msg) : errorCode(code), errorMsg(msg) {} + OkState() : errorCode(0), errorMsg() {} + bool ok() const { return errorCode == 0; } + bool failed() const { return errorCode != 0; } + enum SpecialErrorCodes { + ALL_OK = 0, + FAILED = 1, + FAILED_BAD = 13 + }; +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/random.h b/slobrok/src/vespa/slobrok/server/random.h new file mode 100644 index 00000000000..0ab2597dd03 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/random.h @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <stdlib.h> + +namespace { + +const double randmax = RAND_MAX; + +// standard uniform distribution +double uniformRandom() { + return random() / randmax; +} + +double randomIn(double min, double max) { + return min + (uniformRandom() * (max - min)); +} + +} // namespace <unnamed> diff --git a/slobrok/src/vespa/slobrok/server/remote_check.cpp b/slobrok/src/vespa/slobrok/server/remote_check.cpp new file mode 100644 index 00000000000..96360daf873 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/remote_check.cpp @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".remcheck"); + +#include <vespa/fnet/frt/frt.h> + +#include "remote_check.h" +#include "ok_state.h" +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "remote_slobrok.h" +#include "sbenv.h" +#include "random.h" + +namespace slobrok { + + +RemoteCheck::RemoteCheck(FNET_Scheduler *sched, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman, + ExchangeManager& exch) + : FNET_Task(sched), + _rpcsrvmap(rpcsrvmap), _rpcsrvmanager(rpcsrvman), _exchanger(exch) +{ + double seconds = randomIn(15.3, 27.9); + Schedule(seconds); +} + + +RemoteCheck::~RemoteCheck() +{ + Kill(); +} + + +void +RemoteCheck::PerformTask() +{ + LOG(debug, "asking exchanger to health check"); + _exchanger.healthCheck(); + double seconds = randomIn(151.3, 179.7); + Schedule(seconds); +} + + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/remote_check.h b/slobrok/src/vespa/slobrok/server/remote_check.h new file mode 100644 index 00000000000..eaba4041883 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/remote_check.h @@ -0,0 +1,41 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/fnet.h> +#include <vespa/fnet/frt/frt.h> + +namespace slobrok { + +class SBEnv; +class RpcServerMap; +class RpcServerManager; +class ExchangeManager; + +/** + * @class RemoteCheck + * @brief Periodic healthcheck task for remote objects + * + * Checks the health of partner location brokers + * and their NamedService objects periodically. + **/ +class RemoteCheck : public FNET_Task +{ +private: + RpcServerMap &_rpcsrvmap; + RpcServerManager &_rpcsrvmanager; + ExchangeManager &_exchanger; + + RemoteCheck(const RemoteCheck &); // Not used + RemoteCheck &operator=(const RemoteCheck &); // Not used +public: + explicit RemoteCheck(FNET_Scheduler *sched, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman, + ExchangeManager& exchanger); + virtual ~RemoteCheck(); +private: + virtual void PerformTask(); +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp new file mode 100644 index 00000000000..42bdbd6373e --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp @@ -0,0 +1,371 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcserver"); + +#include <vector> +#include <deque> +#include <stdlib.h> + +#include "remote_slobrok.h" +#include "ok_state.h" +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "sbenv.h" +#include "cmd.h" + +namespace slobrok { +namespace { + +class IgnoreReqDone: public FRT_IRequestWait +{ + void RequestDone(FRT_RPCRequest *req) { + req->SubRef(); + } +}; + +} // namespace slobrok::<unnamed> + +//----------------------------------------------------------------------------- + +RemoteSlobrok::RemoteSlobrok(const char *name, const char *spec, + ExchangeManager &manager) + : _exchanger(manager), + _rpcsrvmanager(manager._rpcsrvmanager), + _remote(NULL), + _rpcserver(name, spec, *this), + _reconnecter(getSupervisor()->GetScheduler(), *this), + _failCnt(0), + _remAddPeerReq(NULL), + _remListReq(NULL), + _remAddReq(NULL), + _remRemReq(NULL), + _pending() +{ + _rpcserver.healthCheck(); +} + +static IgnoreReqDone ignorer; + +RemoteSlobrok::~RemoteSlobrok() +{ + _reconnecter.disable(); + + if (_remote != NULL) { + _remote->SubRef(); + _remote = NULL; + } + + if (_remAddPeerReq != NULL) { + _remAddPeerReq->Abort(); + } + if (_remListReq != NULL) { + _remListReq->Abort(); + } + if (_remAddReq != NULL) { + _remAddReq->Abort(); + } + if (_remRemReq != NULL) { + _remRemReq->Abort(); + } + // _rpcserver destructor called automatically +} + + +void +RemoteSlobrok::doPending() +{ + LOG_ASSERT(_remAddReq == NULL); + LOG_ASSERT(_remRemReq == NULL); + + if (_pending.size() > 0) { + NamedService *todo = _pending.front(); + _pending.pop_front(); + + NamedService *rpcsrv = _exchanger._rpcsrvmap.lookup(todo->getName()); + + if (rpcsrv == NULL) { + _remRemReq = getSupervisor()->AllocRPCRequest(); + _remRemReq->SetMethodName("slobrok.internal.doRemove"); + _remRemReq->GetParams()->AddString(_exchanger._env.mySpec()); + _remRemReq->GetParams()->AddString(todo->getName()); + _remRemReq->GetParams()->AddString(todo->getSpec()); + _remote->InvokeAsync(_remRemReq, 2.0, this); + } else { + _remAddReq = getSupervisor()->AllocRPCRequest(); + _remAddReq->SetMethodName("slobrok.internal.doAdd"); + _remAddReq->GetParams()->AddString(_exchanger._env.mySpec()); + _remAddReq->GetParams()->AddString(todo->getName()); + _remAddReq->GetParams()->AddString(rpcsrv->getSpec()); + _remote->InvokeAsync(_remAddReq, 2.0, this); + } + // XXX should save this and pick up on RequestDone() + delete todo; + } + +} + +void +RemoteSlobrok::pushMine() +{ + // all mine + std::vector<const NamedService *> mine = _exchanger._rpcsrvmap.allManaged(); + while (mine.size() > 0) { + const NamedService *now = mine.back(); + mine.pop_back(); + NamedService *copy = new NamedService(now->getName(), now->getSpec()); + _pending.push_back(copy); + } + doPending(); +} + +void +RemoteSlobrok::RequestDone(FRT_RPCRequest *req) +{ + FRT_Values &answer = *(req->GetReturn()); + if (req == _remAddPeerReq) { + // handle response after asking remote slobrok to add me as a peer: + if (req->IsError()) { + FRT_Values &args = *req->GetParams(); + const char *myname = args[0]._string._str; + const char *myspec = args[1]._string._str; + LOG(error, "addPeer(%s, %s) on remote slobrok %s at %s: %s", + myname, myspec, getName(), getSpec(), + req->GetErrorMessage()); + req->SubRef(); + _remAddPeerReq = NULL; + goto retrylater; + } + req->SubRef(); + _remAddPeerReq = NULL; + // next step is to ask the remote to send its list of managed names: + LOG_ASSERT(_remListReq == NULL); + _remListReq = getSupervisor()->AllocRPCRequest(); + _remListReq->SetMethodName("slobrok.internal.listManagedRpcServers"); + if (_remote != NULL) { + _remote->InvokeAsync(_remListReq, 3.0, this); + } + // when _remListReq is returned, our managed list is added + } else if (req == _remListReq) { + // handle the list sent from the remote: + if (req->IsError() + || strcmp(answer.GetTypeString(), "SS") != 0) + { + LOG(error, "error listing remote slobrok %s at %s: %s", + getName(), getSpec(), req->GetErrorMessage()); + req->SubRef(); + _remListReq = NULL; + goto retrylater; + } + uint32_t numNames = answer.GetValue(0)._string_array._len; + uint32_t numSpecs = answer.GetValue(1)._string_array._len; + + if (numNames != numSpecs) { + LOG(error, "inconsistent array lengths from %s at %s", + getName(), getSpec()); + req->SubRef(); + _remListReq = NULL; + goto retrylater; + } + FRT_StringValue *names = answer.GetValue(0)._string_array._pt; + FRT_StringValue *specs = answer.GetValue(1)._string_array._pt; + + for (uint32_t idx = 0; idx < numNames; idx++) { + _rpcsrvmanager.addRemote(names[idx]._str, specs[idx]._str); + } + req->SubRef(); + _remListReq = NULL; + + // next step is to push the ones I own: + pushMine(); + } else if (req == _remAddReq) { + // handle response after pushing some name that we managed: + if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION || + req->GetErrorCode() == FRTE_RPC_TIMEOUT)) + { + LOG(error, "connection error adding to remote slobrok: %s", + req->GetErrorMessage()); + req->SubRef(); + _remAddReq = NULL; + goto retrylater; + } + if (req->IsError()) { + FRT_Values &args = *req->GetParams(); + const char *rpcsrvname = args[1]._string._str; + const char *rpcsrvspec = args[2]._string._str; + LOG(warning, "error adding [%s -> %s] to remote slobrok: %s", + rpcsrvname, rpcsrvspec, req->GetErrorMessage()); + _rpcsrvmanager.removeLocal(rpcsrvname, rpcsrvspec); + } + req->SubRef(); + _remAddReq = NULL; + doPending(); + } else if (req == _remRemReq) { + // handle response after pushing some remove we had pending: + if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION || + req->GetErrorCode() == FRTE_RPC_TIMEOUT)) + { + LOG(error, "connection error adding to remote slobrok: %s", + req->GetErrorMessage()); + req->SubRef(); + _remRemReq = NULL; + goto retrylater; + } + if (req->IsError()) { + LOG(warning, "error removing on remote slobrok: %s", + req->GetErrorMessage()); + } + req->SubRef(); + _remRemReq = NULL; + doPending(); + } else { + LOG(error, "got unknown request back in RequestDone()"); + LOG_ASSERT(req == NULL); + } + + return; + retrylater: + fail(); + return; +} + + +void +RemoteSlobrok::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) +{ + if (++_failCnt > 10) { + LOG(warning, "remote location broker at %s failed: %s", + rpcsrv->getSpec(), errmsg.c_str()); + } else { + LOG(debug, "remote location broker at %s failed: %s", + rpcsrv->getSpec(), errmsg.c_str()); + } + LOG_ASSERT(rpcsrv == &_rpcserver); + fail(); +} + + +void +RemoteSlobrok::fail() +{ + // disconnect + if (_remote != NULL) { + _remote->SubRef(); + _remote = NULL; + } + // schedule reconnect attempt + _reconnecter.scheduleTryConnect(); +} + + +void +RemoteSlobrok::healthCheck() +{ + if (_remote != NULL && + _remAddPeerReq == NULL && + _remListReq == NULL && + _remAddReq == NULL && + _remRemReq == NULL) + { + LOG(debug, "spamming remote at %s with my names", getName()); + pushMine(); + } else { + LOG(debug, "not pushing mine, as we have: remote %p r.a.p.r=%p r.l.r=%p r.a.r=%p r.r.r=%p", + _remote, _remAddPeerReq, _remListReq, _remAddReq, _remRemReq); + } +} + + +void +RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) +{ + LOG_ASSERT(rpcsrv == &_rpcserver); + (void) rpcsrv; + + // connection was OK, so disable any pending reconnect + _reconnecter.disable(); + + if (_remote == NULL) { + _remote = getSupervisor()->GetTarget(getSpec()); + } + + // at this point, we will do (in sequence): + // ask peer to connect to us too; + // ask peer for its list of managed rpcservers, adding to our database + // add our managed rpcserver on peer + // any failure will cause disconnect and retry. + + _remAddPeerReq = getSupervisor()->AllocRPCRequest(); + _remAddPeerReq->SetMethodName("slobrok.admin.addPeer"); + _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec()); + _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec()); + _remote->InvokeAsync(_remAddPeerReq, 3.0, this); + // when _remAddPeerReq is returned, our managed list is added via doAdd() +} + +void +RemoteSlobrok::tryConnect() +{ + _rpcserver.healthCheck(); +} + +FRT_Supervisor * +RemoteSlobrok::getSupervisor() +{ + return _exchanger._env.getSupervisor(); +} + +//----------------------------------------------------------------------------- + +RemoteSlobrok::Reconnecter::Reconnecter(FNET_Scheduler *sched, + RemoteSlobrok &owner) + : FNET_Task(sched), + _waittime(13), + _owner(owner) +{ +} + +RemoteSlobrok::Reconnecter::~Reconnecter() +{ + Kill(); +} + +void +RemoteSlobrok::Reconnecter::scheduleTryConnect() +{ + if (_waittime < 60) + ++_waittime; + Schedule(_waittime + (random() & 255)/100.0); + +} + +void +RemoteSlobrok::Reconnecter::disable() +{ + // called when connection OK + Unschedule(); + _waittime = 13; +} + +void +RemoteSlobrok::Reconnecter::PerformTask() +{ + _owner.tryConnect(); +} + +void +RemoteSlobrok::invokeAsync(FRT_RPCRequest *req, + double timeout, + FRT_IRequestWait *rwaiter) +{ + LOG_ASSERT(isConnected()); + _remote->InvokeAsync(req, timeout, rwaiter); +} + + +//----------------------------------------------------------------------------- + + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h new file mode 100644 index 00000000000..a8bd5ca908c --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <deque> +#include <string> + +#include <vespa/fnet/frt/frt.h> + +#include <vespa/vespalib/util/hashmap.h> +#include "ok_state.h" +#include "cmd.h" +#include "i_rpc_server_manager.h" +#include "rpc_server_manager.h" +#include "managed_rpc_server.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +class SBEnv; +class RpcServerMap; +class RpcServerManager; +class ExchangeManager; + +using vespalib::HashMap; + +//----------------------------------------------------------------------------- + +/** + * @class RemoteSlobrok + * @brief Keeps track of and talks to a remote location broker + * + * Handles one single partner slobrok + **/ +class RemoteSlobrok: public IRpcServerManager, + public FRT_IRequestWait +{ +private: + RemoteSlobrok(const RemoteSlobrok&); // not used + RemoteSlobrok& operator= (const RemoteSlobrok&); // not used + + class Reconnecter : public FNET_Task + { + private: + int _waittime; + RemoteSlobrok &_owner; + Reconnecter(const Reconnecter &); // not used + Reconnecter &operator=(const Reconnecter &); // not used + public: + explicit Reconnecter(FNET_Scheduler *sched, RemoteSlobrok &owner); + virtual ~Reconnecter(); + void scheduleTryConnect(); + void disable(); + virtual void PerformTask(); + }; + +private: + ExchangeManager &_exchanger; + RpcServerManager &_rpcsrvmanager; + FRT_Target *_remote; + ManagedRpcServer _rpcserver; + Reconnecter _reconnecter; + int _failCnt; + + FRT_RPCRequest *_remAddPeerReq; + FRT_RPCRequest *_remListReq; + FRT_RPCRequest *_remAddReq; + FRT_RPCRequest *_remRemReq; + + std::deque<NamedService *> _pending; + void pushMine(); + void doPending(); + +public: + RemoteSlobrok(const char *name, const char *spec, + ExchangeManager &manager); + virtual ~RemoteSlobrok(); + + void fail(); + bool isConnected() const { return (_remote != NULL); } + void tryConnect(); + void healthCheck(); + void invokeAsync(FRT_RPCRequest *req, + double timeout, + FRT_IRequestWait *rwaiter); + const char *getName() const { return _rpcserver.getName(); } + const char *getSpec() const { return _rpcserver.getSpec(); } + + // interfaces implemented: + virtual void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg); + virtual void notifyOkRpcSrv(ManagedRpcServer *rpcsrv); + virtual void RequestDone(FRT_RPCRequest *req); + virtual FRT_Supervisor *getSupervisor(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/reserved_name.cpp b/slobrok/src/vespa/slobrok/server/reserved_name.cpp new file mode 100644 index 00000000000..6b6895315b5 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/reserved_name.cpp @@ -0,0 +1,2 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "reserved_name.h" diff --git a/slobrok/src/vespa/slobrok/server/reserved_name.h b/slobrok/src/vespa/slobrok/server/reserved_name.h new file mode 100644 index 00000000000..b36e8e8ac50 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/reserved_name.h @@ -0,0 +1,41 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <string> + +#include "named_service.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +/** + * @class ReservedName + * @brief A reservation for a name + * + * a reservation expires 15 seconds after it is created. + **/ + +class ReservedName: public NamedService +{ +private: + FastOS_Time _reservedTime; +public: + const bool isLocal; + + ReservedName(const char *name, const char *spec, bool local) + : NamedService(name, spec), _reservedTime(), isLocal(local) + { + _reservedTime.SetNow(); + } + bool stillReserved() { + return (_reservedTime.MilliSecsToNow() < 15000); + } + int seconds() { return _reservedTime.MilliSecsToNow() / 1000; } +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_manager.cpp b/slobrok/src/vespa/slobrok/server/rpc_server_manager.cpp new file mode 100644 index 00000000000..f8d0a4a3897 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpc_server_manager.cpp @@ -0,0 +1,361 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcserver"); + +#include <string> +#include <sstream> +#include <vespa/vespalib/util/stringfmt.h> + +#include "rpc_server_manager.h" +#include "ok_state.h" +#include "named_service.h" +#include "reserved_name.h" +#include "rpc_server_map.h" +#include "remote_slobrok.h" +#include "sbenv.h" + +namespace slobrok { + +RpcServerManager::RpcServerManager(SBEnv &sbenv) + : FNET_Task(sbenv.getScheduler()), + _rpcsrvmap(sbenv._rpcsrvmap), + _exchanger(sbenv._exchanger), + _env(sbenv), + _addManageds(), + _deleteList() +{ +} + +static OkState +validateName(const char *rpcsrvname) +{ + const char *p = rpcsrvname; + while (*p != '\0') { + // important: disallow '*' + if (strchr("+,-./:=@[]_{}~<>" + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz", *p) == NULL) + { + std::ostringstream tmp; + tmp << "Illegal character '" << *p << "' ("; + tmp << (int)(*p)<< ") in rpcserver name"; + return OkState(13, tmp.str().c_str()); + } + ++p; + } + if (p == rpcsrvname) { + return OkState(13, "empty rpcserver name"); + } + return OkState(); +} + + +OkState +RpcServerManager::checkPartner(const char *remslobrok) +{ + if (strcmp(remslobrok, _env.mySpec()) == 0) { + return OkState(13, "remote slobrok using my rpcserver name"); + } + RemoteSlobrok *partner = _exchanger.lookupPartner(remslobrok); + if (partner == NULL) { + return OkState(13, "remote slobrok not a partner"); + } + return OkState(); +} + +OkState +RpcServerManager::addRemReservation(const char *remslobrok, + const char *name, + const char *spec) +{ + OkState state = checkPartner(remslobrok); + if (state.failed()) return state; + + OkState valid = validateName(name); + if (valid.failed()) return valid; + + NamedService *old = _rpcsrvmap.lookupManaged(name); + if (old != NULL) { + if (strcmp(old->getSpec(), spec) == 0) { + // was alright already + return OkState(0, "already registered"); + } + LOG(warning, "remote %s tried to register [%s -> %s] but we already have [%s -> %s] registered!", + remslobrok, name, spec, old->getName(), old->getSpec()); + return OkState(FRTE_RPC_METHOD_FAILED, "already managed by me"); + } + if (_rpcsrvmap.conflictingReservation(name, spec)) { + return OkState(FRTE_RPC_METHOD_FAILED, + "registration for name already in progress"); + } + ReservedName *rpcsrv = new ReservedName(name, spec, false); + _rpcsrvmap.addReservation(rpcsrv); + return OkState(0, "done"); +} + + +OkState +RpcServerManager::addPeer(const char *remsbname, + const char *remsbspec) +{ + if (strcmp(remsbname, _env.mySpec()) == 0) { + return OkState(13, "cannot add remote slobrok with my rpcserver name"); + } + return _exchanger.addPartner(remsbname, remsbspec); +} + + +OkState +RpcServerManager::removePeer(const char *remsbname, + const char *remsbspec) +{ + if (strcmp(remsbname, _env.mySpec()) == 0) { + return OkState(13, "cannot remove my own rpcserver name"); + } + RemoteSlobrok *partner = _exchanger.lookupPartner(remsbname); + if (partner == NULL) { + return OkState(0, "remote slobrok not a partner"); + } + if (strcmp(partner->getSpec(), remsbspec) != 0) { + return OkState(13, "peer registered with different spec"); + } + _exchanger.removePartner(remsbname); + return OkState(0, "done"); +} + + +OkState +RpcServerManager::addMyReservation(const char *name, const char *spec) +{ + OkState valid = validateName(name); + if (valid.failed()) return valid; + + NamedService *old = _rpcsrvmap.lookupManaged(name); + if (old != NULL) { + if (strcmp(old->getSpec(), spec) == 0) { + // was alright already + return OkState(0, "already registered"); + } else { + return OkState(FRTE_RPC_METHOD_FAILED, vespalib::make_string( + "name %s registered (to %s), cannot register %s", + name, old->getSpec(), spec)); + } + } + + // check if we already are in the progress of adding this + if (_rpcsrvmap.conflictingReservation(name, spec)) { + ReservedName * rsv = _rpcsrvmap.getReservation(name); + LOG(warning, "conflicting registrations: wanted [%s -> %s] but [%s -> %s] already reserved", + name, spec, rsv->getName(), rsv->getSpec()); + return OkState(FRTE_RPC_METHOD_FAILED, + "registration for name already in progress with a different spec"); + } + _rpcsrvmap.removeReservation(name); + ReservedName *rpcsrv = new ReservedName(name, spec, true); + _rpcsrvmap.addReservation(rpcsrv); + return OkState(0, "done"); +} + + +OkState +RpcServerManager::addRemote(const char *name, + const char *spec) +{ + OkState valid = validateName(name); + if (valid.failed()) return valid; + + if (alreadyManaged(name, spec)) { + return OkState(0, "already correct"); + } + NamedService *old = _rpcsrvmap.lookup(name); + if (old != NULL) { + if (strcmp(old->getSpec(), spec) != 0) { + LOG(warning, "collision on remote add: " + "name %s registered to %s locally, " + "but another location broker wants it registered to %s", + name, old->getSpec(), spec); + removeRemote(name, old->getSpec()); + return OkState(13, "registered, with different spec"); + } + // was alright already, remove reservation + _rpcsrvmap.removeReservation(name); + return OkState(0, "already correct"); + } + _rpcsrvmap.removeReservation(name); + ManagedRpcServer *rpcsrv = new ManagedRpcServer(name, spec, *this); + _rpcsrvmap.addNew(rpcsrv); + rpcsrv->healthCheck(); + return OkState(0, "done"); +} + +OkState +RpcServerManager::remove(ManagedRpcServer *rpcsrv) +{ + NamedService *td = _rpcsrvmap.lookup(rpcsrv->getName()); + if (td == rpcsrv) { + return removeLocal(rpcsrv->getName(), rpcsrv->getSpec()); + } else { + return OkState(1, "not currently registered"); + } +} + + +OkState +RpcServerManager::removeRemote(const char *name, + const char *spec) +{ + NamedService *old = _rpcsrvmap.lookup(name); + if (old == NULL) { + // was alright already, remove any reservation too + _rpcsrvmap.removeReservation(name); + return OkState(0, "already done"); + } + if (strcmp(old->getSpec(), spec) != 0) { + return OkState(1, "name registered, but with different spec"); + } + NamedService *td = _rpcsrvmap.remove(name); + LOG_ASSERT(td == old); + delete td; + return OkState(0, "done"); +} + +OkState +RpcServerManager::removeLocal(const char *name, const char *spec) +{ + NamedService *td = _rpcsrvmap.lookup(name); + if (td == NULL) { + // already removed, nop + return OkState(); + } + + RemoteSlobrok *partner = _exchanger.lookupPartner(name); + if (partner != NULL) { + return OkState(13, "cannot unregister partner slobrok"); + } + + ManagedRpcServer *rpcsrv = _rpcsrvmap.lookupManaged(name); + if (rpcsrv == NULL) { + return OkState(13, "not a local rpcserver"); + } + + if (strcmp(rpcsrv->getSpec(), spec) != 0) { + // the client can probably ignore this "error" + // or log it on level INFO? + return OkState(1, "name registered, but with different spec"); + } + td = _rpcsrvmap.remove(name); + LOG_ASSERT(td == rpcsrv); + delete rpcsrv; + _exchanger.forwardRemove(name, spec); + return OkState(); +} + + +void +RpcServerManager::addManaged(const char *name, const char *spec, + RegRpcSrvCommand rdc) +{ + ManagedRpcServer *rpcsrv = new ManagedRpcServer(name, spec, *this); + _rpcsrvmap.addNew(rpcsrv); + for (size_t i = 0; i < _addManageds.size(); i++) { + if (_addManageds[i].rpcsrv == NULL) { + _addManageds[i].rpcsrv = rpcsrv; + _addManageds[i].handler = rdc; + rpcsrv->healthCheck(); + return; + } + } + MRSandRRSC pair(rpcsrv, rdc); + _addManageds.push_back(pair); + rpcsrv->healthCheck(); + return; +} + + + +bool +RpcServerManager::alreadyManaged(const char *name, const char *spec) +{ + ManagedRpcServer *rpcsrv = _rpcsrvmap.lookupManaged(name); + if (rpcsrv != NULL) { + if (strcmp(rpcsrv->getSpec(), spec) == 0) { + return true; + } + } + return false; +} + + +RpcServerManager::~RpcServerManager() +{ + Kill(); + PerformTask(); +} + + +void +RpcServerManager::PerformTask() +{ + std::vector<ManagedRpcServer *> dl; + std::swap(dl, _deleteList); + for (uint32_t i = 0; i < dl.size(); ++i) { + delete dl[i]; + } +} + + +void +RpcServerManager::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) +{ + bool logged = false; + NamedService *old = _rpcsrvmap.lookup(rpcsrv->getName()); + if (old == rpcsrv) { + old = _rpcsrvmap.remove(rpcsrv->getName()); + LOG_ASSERT(old == rpcsrv); + LOG(info, "managed server %s at %s failed: %s", + rpcsrv->getName(), rpcsrv->getSpec(), errmsg.c_str()); + logged = true; + } + _exchanger.forwardRemove(rpcsrv->getName(), rpcsrv->getSpec()); + for (size_t i = 0; i < _addManageds.size(); ++i) { + if (_addManageds[i].rpcsrv == rpcsrv) { + _addManageds[i].handler + .doneHandler(OkState(13, "failed check using listNames callback")); + _addManageds[i].rpcsrv = 0; + LOG(warning, "rpcserver %s at %s failed while trying to register", + rpcsrv->getName(), rpcsrv->getSpec()); + logged = true; + } + } + if (!logged) { + LOG(warning, "unmanaged server %s at %s failed: %s", + rpcsrv->getName(), rpcsrv->getSpec(), errmsg.c_str()); + } + _deleteList.push_back(rpcsrv); + ScheduleNow(); +} + +void +RpcServerManager::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) +{ + for (size_t i = 0; i < _addManageds.size(); ++i) { + if (_addManageds[i].rpcsrv == rpcsrv) { + _addManageds[i].handler.doneHandler(OkState()); + _addManageds[i].rpcsrv = 0; + } + } + // XXX check if pending wantAdd / doAdd / registerRpcServer +} + +FRT_Supervisor * +RpcServerManager::getSupervisor() +{ + return _env.getSupervisor(); +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_manager.h b/slobrok/src/vespa/slobrok/server/rpc_server_manager.h new file mode 100644 index 00000000000..e9157cba269 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpc_server_manager.h @@ -0,0 +1,113 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vector> + +#include <vespa/vespalib/util/hashmap.h> + +#include <vespa/fnet/frt/frt.h> + +#include "ok_state.h" +#include "cmd.h" +#include "i_rpc_server_manager.h" +#include "named_service.h" + +namespace slobrok { + +class NamedService; +class ManagedRpcServer; +class RemoteSlobrok; +class ReservedName; +class RpcServerMap; +class ExchangeManager; +class SBEnv; + +/** + * @class RpcServerManager + * @brief Main "business logic" for the service location broker. + * + * Used by all external and some internal operations. + * This class actually implements operations, + * checking for validity, manipulating internal datastructures, + * and initiating synchronization operations to peer slobroks. + **/ +class RpcServerManager : public FNET_Task, + public IRpcServerManager +{ +private: + RpcServerMap &_rpcsrvmap; + ExchangeManager &_exchanger; + SBEnv &_env; + + struct MRSandRRSC { + ManagedRpcServer *rpcsrv; + RegRpcSrvCommand handler; + MRSandRRSC(ManagedRpcServer *d, RegRpcSrvCommand h) + : rpcsrv(d), handler(h) {} + + MRSandRRSC(const MRSandRRSC &rhs) + : rpcsrv(rhs.rpcsrv), + handler(rhs.handler) + { + } + + MRSandRRSC& operator=(const MRSandRRSC &rhs) + { + rpcsrv = rhs.rpcsrv; + handler = rhs.handler; + return *this; + } + }; + std::vector<MRSandRRSC> _addManageds; + std::vector<ManagedRpcServer *> _deleteList; + + RpcServerManager(const RpcServerManager &); // Not used + RpcServerManager &operator=(const RpcServerManager &); // Not used + +public: + OkState checkPartner(const char *remslobrok); + + OkState addPeer(const char *remsbname, + const char *remsbspec); + OkState removePeer(const char *remsbname, + const char *remsbspec); + + OkState addLocal(const char *name, + const char *spec, + FRT_RPCRequest *req); + OkState addRemote(const char *name, + const char *spec); + + OkState addRemReservation(const char *remslobrok, + const char *name, + const char *spec); + OkState addMyReservation(const char *name, + const char *spec); + + bool alreadyManaged(const char *name, + const char *spec); + void addManaged(const char *name, + const char *spec, + RegRpcSrvCommand rdc); + + OkState remove(ManagedRpcServer *rpcsrv); + + OkState removeLocal(const char *name, const char *spec); + + OkState removeRemote(const char *name, + const char *spec); + + RpcServerManager(SBEnv &sbenv); + virtual ~RpcServerManager(); + + virtual void PerformTask(); + virtual void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg); + virtual void notifyOkRpcSrv(ManagedRpcServer *rpcsrv); + virtual FRT_Supervisor *getSupervisor(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp new file mode 100644 index 00000000000..138c9720149 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp @@ -0,0 +1,192 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".rpcsrvmap"); + +#include <vector> + +#include "rpc_server_map.h" +#include "named_service.h" +#include "reserved_name.h" +#include "rpc_server_manager.h" +#include "sbenv.h" + +namespace slobrok { + +//----------------------------------------------------------------------------- + +NamedService * +RpcServerMap::lookup(const char *name) const +{ + NamedService *d = _myrpcsrv_map[name]; + return d; +} + + +NamedService * +RpcServerMap::remove(const char *name) +{ + _visible_map.remove(name); + NamedService *d = _myrpcsrv_map.remove(name); + return d; +} + + + +std::vector<const NamedService *> +RpcServerMap::lookupPattern(const char *pattern) const +{ + std::vector<const NamedService *> retval; + for (HashMap<ManagedRpcServer *>::Iterator it = _myrpcsrv_map.iterator(); + it.valid(); it.next()) + { + if (match(it.key(), pattern)) { + retval.push_back(it.value()); + } + } + return retval; +} + + +std::vector<const NamedService *> +RpcServerMap::allManaged() const +{ + std::vector<const NamedService *> retval; + // get list of all names in myrpcsrv_map + for (HashMap<ManagedRpcServer *>::Iterator it = _myrpcsrv_map.iterator(); + it.valid(); it.next()) + { + retval.push_back(it.value()); + } + return retval; +} + + +void +RpcServerMap::add(NamedService *rpcsrv) +{ + const char *name = rpcsrv->getName(); + + LOG_ASSERT(rpcsrv != NULL); + LOG_ASSERT(_myrpcsrv_map.isSet(name) == false); + + removeReservation(name); + + LOG_ASSERT(_visible_map.lookup(name) == NULL); + _visible_map.addNew(rpcsrv); +} + +void +RpcServerMap::addNew(ManagedRpcServer *rpcsrv) +{ + const char *name = rpcsrv->getName(); + + ManagedRpcServer *oldman = _myrpcsrv_map.remove(name); + + if (oldman != NULL) { + ReservedName *oldres = _reservations[name]; + NamedService *oldvis = _visible_map.remove(name); + + const char *spec = rpcsrv->getSpec(); + LOG(warning, "internal state problem: adding [%s at %s] but already had" + " oldvis %p oldman %p oldres %p", + name, spec, oldvis, oldman, oldres); + if (oldvis != NULL) { + const char *n = oldvis->getName(); + const char *s = oldvis->getSpec(); + LOG(warning, "oldvis: [%s at %s]", n, s); + } + if (oldres != NULL) { + const char *n = oldres->getName(); + const char *s = oldres->getSpec(); + LOG(warning, "oldres: [%s at %s]", n, s); + } + + const char *n = oldman->getName(); + const char *s = oldman->getSpec(); + LOG(warning, "oldman: [%s at %s]", n, s); + delete oldman; + LOG_ASSERT(oldman == oldvis); + } + add(rpcsrv); + _myrpcsrv_map.set(name, rpcsrv); +} + + +void +RpcServerMap::addReservation(ReservedName *rpcsrv) +{ + LOG_ASSERT(rpcsrv != NULL); + LOG_ASSERT(_myrpcsrv_map.isSet(rpcsrv->getName()) == false); + + // must not be reserved for something else already + // this should have been checked already, so assert + LOG_ASSERT(! conflictingReservation(rpcsrv->getName(), rpcsrv->getSpec())); + ReservedName *old = _reservations.set(rpcsrv->getName(), rpcsrv); + LOG_ASSERT(old == NULL + || strcmp(old->getSpec(), rpcsrv->getSpec()) == 0 + || ! old->stillReserved()); + delete old; +} + + +/** check if there is a (different) registration for this name in progress */ +bool +RpcServerMap::conflictingReservation(const char *name, const char *spec) +{ + ReservedName *resv = _reservations[name]; + return (resv != NULL && + resv->stillReserved() && + strcmp(resv->getSpec(), spec) != 0); +} + + +RpcServerMap::~RpcServerMap() +{ + // get list of names in rpcsrv_map + std::vector<const char *> names; + for (HashMap<ManagedRpcServer *>::Iterator it = _myrpcsrv_map.iterator(); + it.valid(); it.next()) + { + names.push_back(it.key()); + } + + for (uint32_t i = 0; i < names.size(); i++) { + NamedService *rpcsrv = _myrpcsrv_map.remove(names[i]); + LOG_ASSERT(rpcsrv != NULL); + delete rpcsrv; + } + LOG_ASSERT(_myrpcsrv_map.size() == 0); +} + + +bool +RpcServerMap::match(const char *name, const char *pattern) +{ + LOG_ASSERT(name != NULL); + LOG_ASSERT(pattern != NULL); + while (*pattern != '\0') { + if (*name == *pattern) { + ++name; + ++pattern; + } else if (*pattern == '*') { + ++pattern; + while (*name != '/' && *name != '\0') { + ++name; + } + } else { + return false; + } + } + return (*name == *pattern); +} + +void +RpcServerMap::removeReservation(const char *name) +{ + delete _reservations.remove(name); +} + + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.h b/slobrok/src/vespa/slobrok/server/rpc_server_map.h new file mode 100644 index 00000000000..5f08039057a --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.h @@ -0,0 +1,80 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/util/hashmap.h> +#include <vector> + +#include "visible_map.h" + +namespace slobrok { + +class NamedService; +class ManagedRpcServer; +class RemoteRpcServer; +class ReservedName; + +/** + * @class RpcServerMap + * @brief Contains the actual collections of NamedService (and subclasses) + * objects known by this location broker. + * + * Works as a collection of NamedService objects, but actually contains + * three seperate hashmaps. + **/ + +using vespalib::HashMap; + +class RpcServerMap +{ +private: + VisibleMap _visible_map; + + HashMap<ManagedRpcServer *> _myrpcsrv_map; + + HashMap<ReservedName *> _reservations; + + static bool match(const char *name, const char *pattern); + + RpcServerMap(const RpcServerMap &); // Not used + RpcServerMap &operator=(const RpcServerMap &); // Not use + + void add(NamedService *rpcsrv); + +public: + typedef std::vector<const NamedService *> RpcSrvlist; + + VisibleMap& visibleMap() { return _visible_map; } + + ManagedRpcServer *lookupManaged(const char *name) const { + return _myrpcsrv_map[name]; + } + + NamedService * lookup(const char *name) const; + RpcSrvlist lookupPattern(const char *pattern) const; + RpcSrvlist allVisible() const; + RpcSrvlist allManaged() const; + + void addNew(ManagedRpcServer *rpcsrv); + NamedService * remove(const char *name); + + void addReservation(ReservedName *rpcsrv); + bool conflictingReservation(const char *name, const char *spec); + + ReservedName *getReservation(const char *name) const { + return _reservations[name]; + } + void removeReservation(const char *name); + + RpcServerMap() + : _myrpcsrv_map(NULL), + _reservations(NULL) + { + } + ~RpcServerMap(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp new file mode 100644 index 00000000000..53d8eb2423a --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp @@ -0,0 +1,632 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "vtag.h" + +#include <vespa/log/log.h> +LOG_SETUP(".rpchooks"); + +#include <vespa/fnet/frt/frt.h> + +#include "rpchooks.h" +#include "ok_state.h" +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "remote_slobrok.h" +#include "sbenv.h" +#include "visible_map.h" +#include "rpcmirror.h" + +namespace slobrok { + +namespace { + + + +class MetricsReport : public FNET_Task +{ + RPCHooks &_owner; + + void PerformTask() { + _owner.reportMetrics(); + Schedule(300.0); + } +public: + MetricsReport(FRT_Supervisor *orb, + RPCHooks &owner) + : FNET_Task(orb->GetScheduler()), + _owner(owner) + { + Schedule(0.0); + } + + virtual ~MetricsReport() { Kill(); } +}; + +} // namespace <unnamed> + +//----------------------------------------------------------------------------- + +RPCHooks::RPCHooks(SBEnv &env, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman) + : _env(env), _rpcsrvmap(rpcsrvmap), _rpcsrvmanager(rpcsrvman), + _cnts{0, 0, 0, 0, 0, 0, 0}, + _m_reporter(NULL) +{ +} + + +RPCHooks::~RPCHooks() +{ + delete _m_reporter; +} + +void +RPCHooks::reportMetrics() +{ + EV_COUNT("register_reqs", _cnts.registerReqs); + EV_COUNT("mirror_reqs", _cnts.mirrorReqs); + EV_COUNT("wantadd_reqs", _cnts.wantAddReqs); + EV_COUNT("doadd_reqs", _cnts.doAddReqs); + EV_COUNT("doremove_reqs", _cnts.doRemoveReqs); + EV_COUNT("admin_reqs", _cnts.adminReqs); + EV_COUNT("other_reqs", _cnts.otherReqs); +} + + +void +RPCHooks::initRPC(FRT_Supervisor *supervisor) +{ + _m_reporter = new MetricsReport(supervisor, *this); + + FRT_ReflectionBuilder rb(supervisor); + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.system.resume", "", "", true, + FRT_METHOD(RPCHooks::rpc_resume), this); + rb.MethodDesc("Enable something - currently NOP"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.system.suspend", "", "", true, + FRT_METHOD(RPCHooks::rpc_suspend), this); + rb.MethodDesc("Disable something - currently NOP"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.system.version", "", "s", true, + FRT_METHOD(RPCHooks::rpc_version), this); + rb.MethodDesc("Get location broker version"); + rb.ReturnDesc("version", "version string"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.system.stop", "", "", true, + FRT_METHOD(RPCHooks::rpc_stop), this); + rb.MethodDesc("Shut down the location broker application"); + //------------------------------------------------------------------------- + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.internal.listManagedRpcServers", "", "SS", true, + FRT_METHOD(RPCHooks::rpc_listManagedRpcServers), this); + rb.MethodDesc("List all rpcservers managed by this location broker"); + rb.ReturnDesc("names", "Managed rpcserver names"); + rb.ReturnDesc("specs", "The connection specifications (in same order)"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.internal.lookupManaged", "s", "ss", true, + FRT_METHOD(RPCHooks::rpc_lookupManaged), this); + rb.MethodDesc("Lookup a specific rpcserver managed by this location broker"); + rb.ParamDesc("name", "Name of rpc server"); + rb.ReturnDesc("name", "Name of rpc server"); + rb.ReturnDesc("spec", "The connection specification"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.internal.wantAdd", "sss", "is", true, + FRT_METHOD(RPCHooks::rpc_wantAdd), this); + rb.MethodDesc("remote location broker wants to add a rpcserver"); + rb.ParamDesc("slobrok", "Name of remote location broker"); + rb.ParamDesc("name", "NamedService name to reserve"); + rb.ParamDesc("spec", "The connection specification"); + rb.ReturnDesc("denied", "non-zero if request was denied"); + rb.ReturnDesc("reason", "reason for denial"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.internal.doAdd", "sss", "is", true, + FRT_METHOD(RPCHooks::rpc_doAdd), this); + rb.MethodDesc("add rpcserver managed by remote location broker"); + rb.ParamDesc("slobrok", "Name of remote location broker"); + rb.ParamDesc("name", "NamedService name to add"); + rb.ParamDesc("spec", "The connection specification"); + rb.ReturnDesc("denied", "non-zero if request was denied"); + rb.ReturnDesc("reason", "reason for denial"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.internal.doRemove", "sss", "is", true, + FRT_METHOD(RPCHooks::rpc_doRemove), this); + rb.MethodDesc("remove rpcserver managed by remote location broker"); + rb.ParamDesc("slobrok", "Name of remote location broker"); + rb.ParamDesc("name", "NamedService name to remove"); + rb.ParamDesc("spec", "The connection specification"); + rb.ReturnDesc("denied", "non-zero if request was denied"); + rb.ReturnDesc("reason", "reason for denial"); + //------------------------------------------------------------------------- + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", true, + FRT_METHOD(RPCHooks::rpc_listNamesServed), this); + rb.MethodDesc("List rpcservers served"); + rb.ReturnDesc("names", "The rpcserver names this server wants to serve"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.callback.notifyUnregistered", "s", "", true, + FRT_METHOD(RPCHooks::rpc_notifyUnregistered), this); + rb.MethodDesc("Notify a server about removed registration"); + rb.ParamDesc("name", "NamedService name"); + //------------------------------------------------------------------------- + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.admin.removePeer", "ss", "", true, + FRT_METHOD(RPCHooks::rpc_removePeer), this); + rb.MethodDesc("stop syncing with other location broker"); + rb.ParamDesc("slobrok", "NamedService name of remote location broker"); + rb.ParamDesc("spec", "Connection specification of remote location broker"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.admin.addPeer", "ss", "", true, + FRT_METHOD(RPCHooks::rpc_addPeer), this); + rb.MethodDesc("sync our information with other location broker"); + rb.ParamDesc("slobrok", "NamedService name of remote location broker"); + rb.ParamDesc("spec", "Connection specification of remote location broker"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.admin.listAllRpcServers", "", "SSS", true, + FRT_METHOD(RPCHooks::rpc_listAllRpcServers), this); + rb.MethodDesc("List all known rpcservers"); + rb.ReturnDesc("names", "NamedService names"); + rb.ReturnDesc("specs", "The connection specifications (in same order)"); + rb.ReturnDesc("owners", "Corresponding names of managing location broker"); + //------------------------------------------------------------------------- + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.unregisterRpcServer", "ss", "", true, + FRT_METHOD(RPCHooks::rpc_unregisterRpcServer), this); + rb.MethodDesc("Unregister a rpcserver"); + rb.ParamDesc("name", "NamedService name"); + rb.ParamDesc("spec", "The connection specification"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.registerRpcServer", "ss", "", true, + FRT_METHOD(RPCHooks::rpc_registerRpcServer), this); + rb.MethodDesc("Register a rpcserver"); + rb.ParamDesc("name", "NamedService name"); + rb.ParamDesc("spec", "The connection specification"); + //------------------------------------------------------------------------- + + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.mirror.fetch", "ii", "SSi", true, + FRT_METHOD(RPCHooks::rpc_mirrorFetch), this); + rb.MethodDesc("Fetch or update mirror of name to spec map"); + rb.ParamDesc("gencnt", "generation already known by client"); + rb.ParamDesc("timeout", "How many milliseconds to wait for changes" + "before returning if nothing has changed (max=10000)"); + rb.ReturnDesc("names", "Array of NamedService names"); + rb.ReturnDesc("specs", "Array of connection specifications (same order)"); + rb.ReturnDesc("newgen", "Generation count for new version of the map"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.incremental.fetch", "ii", "iSSSi", true, + FRT_METHOD(RPCHooks::rpc_incrementalFetch), this); + rb.MethodDesc("Fetch or update mirror of name to spec map"); + rb.ParamDesc("gencnt", "generation already known by client"); + rb.ParamDesc("timeout", "How many milliseconds to wait for changes" + "before returning if nothing has changed (max=10000)"); + + rb.ReturnDesc("oldgen", "diff from generation already known by client"); + rb.ReturnDesc("removed", "Array of NamedService names to remove"); + rb.ReturnDesc("names", "Array of NamedService names with new values"); + rb.ReturnDesc("specs", "Array of connection specifications (same order)"); + rb.ReturnDesc("newgen", "Generation count for new version of the map"); + //------------------------------------------------------------------------- + rb.DefineMethod("slobrok.lookupRpcServer", "s", "SS", true, + FRT_METHOD(RPCHooks::rpc_lookupRpcServer), this); + rb.MethodDesc("Look up rpcservers"); + rb.ParamDesc("pattern", "The pattern of the rpcservers to lookup.\n" + " " + "The pattern may contain * characters to match a component.\n" + " " + "Components are delimited by / characters.\n" + " " + "There is no way to match an arbitrary number of components\n" + " " + "or to match just a part of a component." + ); + rb.ReturnDesc("names", "The rpcserver names matching pattern"); + rb.ReturnDesc("specs", "The connection specifications (in same order)"); + //------------------------------------------------------------------------- +} + + +void +RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) +{ + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(1); + dst.SetString(names, _env.mySpec()); + _cnts.otherReqs++; + return; +} + +void +RPCHooks::rpc_notifyUnregistered(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + LOG(warning, "got notifyUnregistered '%s'", dName); + _cnts.otherReqs++; + return; +} + + +void +RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + + LOG(debug, "RPC: invoked registerRpcServer(%s,%s)", dName, dSpec); + _cnts.registerReqs++; + + // is this already OK? + if (_rpcsrvmanager.alreadyManaged(dName, dSpec)) { + LOG(debug, "registerRpcServer(%s,%s) OK, already managed", + dName, dSpec); + return; + } + // can we say now, that this will fail? + OkState state = _rpcsrvmanager.addMyReservation(dName, dSpec); + if (state.failed()) { + req->SetError(FRTE_RPC_METHOD_FAILED, state.errorMsg.c_str()); + LOG(info, "cannot register %s at %s: %s", dName, dSpec, state.errorMsg.c_str()); + return; + } + // need to actually setup management and decide result later + req->Detach(); + RegRpcSrvCommand completer + = RegRpcSrvCommand::makeRegRpcSrvCmd(_env, dName, dSpec, req); + completer.doRequest(); +} + +void +RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + OkState state = _rpcsrvmanager.removeLocal(dName, dSpec); + if (state.failed()) { + req->SetError(FRTE_RPC_METHOD_FAILED, state.errorMsg.c_str()); + } + LOG(debug, "unregisterRpcServer(%s,%s) %s: %s", + dName, dSpec, + state.ok() ? "OK" : "failed", + state.errorMsg.c_str()); + _cnts.otherReqs++; + return; +} + + +void +RPCHooks::rpc_addPeer(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *remslobrok = args[0]._string._str; + const char *remsbspec = args[1]._string._str; + + OkState ok = _env.addPeer(remslobrok, remsbspec); + if (ok.failed()) { + req->SetError(FRTE_RPC_METHOD_FAILED, ok.errorMsg.c_str()); + } + LOG(debug, "addPeer(%s,%s) %s: %s", + remslobrok, remsbspec, + ok.ok() ? "OK" : "failed", + ok.errorMsg.c_str()); + _cnts.adminReqs++; + return; +} + + +void +RPCHooks::rpc_removePeer(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *remslobrok = args[0]._string._str; + const char *remsbspec = args[1]._string._str; + + OkState ok = _env.removePeer(remslobrok, remsbspec); + if (ok.failed()) { + req->SetError(FRTE_RPC_METHOD_FAILED, ok.errorMsg.c_str()); + } + LOG(debug, "removePeer(%s,%s) %s: %s", + remslobrok, remsbspec, + ok.ok() ? "OK" : "failed", + ok.errorMsg.c_str()); + _cnts.adminReqs++; + return; +} + + +void +RPCHooks::rpc_wantAdd(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *remsb = args[0]._string._str; + const char *dName = args[1]._string._str; + const char *dSpec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + OkState state = _rpcsrvmanager.addRemReservation(remsb, dName, dSpec); + if (state.failed() > 1) { + req->SetError(FRTE_RPC_METHOD_FAILED, state.errorMsg.c_str()); + } + retval.AddInt32(state.errorCode); + retval.AddString(state.errorMsg.c_str()); + LOG(debug, "%s->wantAdd(%s,%s) %s: %s", + remsb, dName, dSpec, + state.ok() ? "OK" : "failed", + state.errorMsg.c_str()); + _cnts.wantAddReqs++; + return; +} + + +void +RPCHooks::rpc_doRemove(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *rname = args[0]._string._str; + const char *dname = args[1]._string._str; + const char *dspec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + OkState state = _rpcsrvmanager.removeRemote(dname, dspec); + retval.AddInt32(state.errorCode); + retval.AddString(state.errorMsg.c_str()); + if (state.errorCode > 1) { + req->SetError(FRTE_RPC_METHOD_FAILED, state.errorMsg.c_str()); + } + LOG(debug, "%s->doRemove(%s,%s) %s: %s", + rname, dname, dspec, + state.ok() ? "OK" : "failed", + state.errorMsg.c_str()); + _cnts.doRemoveReqs++; + return; +} + +void +RPCHooks::rpc_doAdd(FRT_RPCRequest *req) +{ + FRT_Values &args = *req->GetParams(); + const char *rname = args[0]._string._str; + const char *dname = args[1]._string._str; + const char *dspec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + OkState state = _rpcsrvmanager.addRemote(dname, dspec); + retval.AddInt32(state.errorCode); + retval.AddString(state.errorMsg.c_str()); + if (state.errorCode > 1) { + req->SetError(FRTE_RPC_METHOD_FAILED, state.errorMsg.c_str()); + } + LOG(debug, "%s->doAdd(%s,%s) %s: %s", + rname, dname, dspec, + state.ok() ? "OK" : "failed", + state.errorMsg.c_str()); + _cnts.doAddReqs++; + return; +} + + +void +RPCHooks::rpc_lookupRpcServer(FRT_RPCRequest *req) +{ + _cnts.otherReqs++; + FRT_Values &args = *req->GetParams(); + const char *rpcserverPattern = args[0]._string._str; + LOG(debug, "RPC: lookupRpcServers(%s)", rpcserverPattern); + std::vector<const NamedService *> rpcsrvlist = + _rpcsrvmap.lookupPattern(rpcserverPattern); + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(rpcsrvlist.size()); + FRT_StringValue *specs = dst.AddStringArray(rpcsrvlist.size()); + for (uint32_t i = 0; i < rpcsrvlist.size(); ++i) { + dst.SetString(&names[i], rpcsrvlist[i]->getName()); + dst.SetString(&specs[i], rpcsrvlist[i]->getSpec()); + } + if (rpcsrvlist.size() < 1) { + LOG(debug, "RPC: lookupRpcServers(%s) -> no match", + rpcserverPattern); + } else { + LOG(debug, "RPC: lookupRpcServers(%s) -> %u matches, first [%s,%s]", + rpcserverPattern, (unsigned int)rpcsrvlist.size(), + rpcsrvlist[0]->getName(), rpcsrvlist[0]->getSpec()); + } + return; +} + + +void +RPCHooks::rpc_listManagedRpcServers(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + std::vector<const NamedService *> rpcsrvlist = _rpcsrvmap.allManaged(); + FRT_Values &dst = *req->GetReturn(); + FRT_StringValue *names = dst.AddStringArray(rpcsrvlist.size()); + FRT_StringValue *specs = dst.AddStringArray(rpcsrvlist.size()); + for (uint32_t i = 0; i < rpcsrvlist.size(); ++i) { + dst.SetString(&names[i], rpcsrvlist[i]->getName()); + dst.SetString(&specs[i], rpcsrvlist[i]->getSpec()); + } + if (rpcsrvlist.size() < 1) { + LOG(debug, "RPC: listManagedRpcServers() -> 0 managed"); + } else { + LOG(debug, "RPC: listManagedRpcServers() -> %u managed, first [%s,%s]", + (unsigned int)rpcsrvlist.size(), + rpcsrvlist[0]->getName(), rpcsrvlist[0]->getSpec()); + } + return; +} + +void +RPCHooks::rpc_lookupManaged(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + FRT_Values &args = *req->GetParams(); + const char *name = args[0]._string._str; + LOG(debug, "RPC: lookupManaged(%s)", name); + ManagedRpcServer *found = _rpcsrvmap.lookupManaged(name); + + if (found == NULL) { + req->SetError(FRTE_RPC_METHOD_FAILED, "Not found"); + } else { + FRT_Values &dst = *req->GetReturn(); + dst.AddString(found->getName()); + dst.AddString(found->getSpec()); + } + return; +} + +void +RPCHooks::rpc_listAllRpcServers(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + + std::vector<const NamedService *> mrpcsrvlist = _rpcsrvmap.allManaged(); + + FRT_Values &dst = *req->GetReturn(); + size_t sz = mrpcsrvlist.size(); + FRT_StringValue *names = dst.AddStringArray(sz); + FRT_StringValue *specs = dst.AddStringArray(sz); + FRT_StringValue *owner = dst.AddStringArray(sz); + + int j = 0; + for (uint32_t i = 0; i < mrpcsrvlist.size(); ++i, ++j) { + dst.SetString(&names[j], mrpcsrvlist[i]->getName()); + dst.SetString(&specs[j], mrpcsrvlist[i]->getSpec()); + dst.SetString(&owner[j], _env.mySpec()); + } + + if (sz > 0) { + LOG(debug, "listManagedRpcServers -> %u, last [%s,%s,%s]", + (unsigned int)mrpcsrvlist.size(), + dst[0]._string_array._pt[sz-1]._str, + dst[1]._string_array._pt[sz-1]._str, + dst[2]._string_array._pt[sz-1]._str); + } else { + LOG(debug, "listManagedRpcServers -> %u", (unsigned int) mrpcsrvlist.size()); + } + return; + +} + + +void +RPCHooks::rpc_mirrorFetch(FRT_RPCRequest *req) +{ + _cnts.mirrorReqs++; + FRT_Values &args = *req->GetParams(); + + vespalib::GenCnt gencnt(args[0]._intval32); + uint32_t msTimeout = args[1]._intval32; + + (new (req->GetMemoryTub()) + MirrorFetch(_env.getSupervisor(), req, _rpcsrvmap.visibleMap(), gencnt))->invoke(msTimeout); +} + +void +RPCHooks::rpc_incrementalFetch(FRT_RPCRequest *req) +{ + _cnts.mirrorReqs++; + FRT_Values &args = *req->GetParams(); + + vespalib::GenCnt gencnt(args[0]._intval32); + uint32_t msTimeout = args[1]._intval32; + + (new (req->GetMemoryTub()) + IncrementalFetch(_env.getSupervisor(), req, _rpcsrvmap.visibleMap(), gencnt))->invoke(msTimeout); +} + + +// System API methods +void +RPCHooks::rpc_stop(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + (void) req; + LOG(debug, "RPC stop command received, initiating shutdown"); + _env.shutdown(); +} + + +void +RPCHooks::rpc_version(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + std::string ver; + + char *s = VersionTag; + bool needdate = true; + if (strncmp(VersionTag, "V_", 2) == 0) { + s += 2; + do { + while (strchr("0123456789", *s) != NULL) { + ver.append(s++, 1); + } + if (strncmp(s, "_RELEASE", 8) == 0) { + needdate = false; + break; + } + if (strncmp(s, "_RC", 3) == 0) { + char *e = strchr(s, '-'); + if (e == NULL) { + ver.append(s); + } else { + ver.append(s, e - s); + } + needdate = false; + break; + } + if (*s == '_' && strchr("0123456789", *++s)) { + ver.append("."); + } else { + break; + } + } while (*s && *s != '-'); + } else { + char *e = strchr(s, '-'); + if (e == NULL) { + ver.append(s); + } else { + ver.append(s, e - s); + } + } + if (needdate) { + ver.append("-"); + s = VersionTagDate; + char *e = strchr(s, '-'); + if (e == NULL) { + ver.append(s); + } else { + ver.append(s, e - s); + } + } + LOG(debug, "RPC version: %s", ver.c_str()); + + req->GetReturn()->AddString(ver.c_str()); + return; +} + + +void +RPCHooks::rpc_suspend(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + req->SetError(FRTE_RPC_METHOD_FAILED, "not implemented"); + LOG(debug, "RPC suspend command received (ignored)"); +} + + +void +RPCHooks::rpc_resume(FRT_RPCRequest *req) +{ + _cnts.adminReqs++; + // XXX always ignored + (void) req; + LOG(debug, "RPC resume command received (ignored)"); +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.h b/slobrok/src/vespa/slobrok/server/rpchooks.h new file mode 100644 index 00000000000..0f560d62975 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpchooks.h @@ -0,0 +1,84 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> + +namespace slobrok { + +class SBEnv; +class RpcServerMap; +class RpcServerManager; + +/** + * @class RPCHooks + * @brief The FNET-RPC interface to a location broker + * + * Contains methods for receiveing and unpacking requests, + * invoking the right internal method, and (in most cases) + * packaging and returning the result of the request. + **/ +class RPCHooks : public FRT_Invokable +{ +public: + + struct Metrics { + unsigned long registerReqs; + unsigned long mirrorReqs; + unsigned long wantAddReqs; + unsigned long doAddReqs; + unsigned long doRemoveReqs; + unsigned long adminReqs; + unsigned long otherReqs; + }; + +private: + SBEnv &_env; + RpcServerMap &_rpcsrvmap; + RpcServerManager &_rpcsrvmanager; + + RPCHooks(const RPCHooks &); // Not used + RPCHooks &operator=(const RPCHooks &); // Not used + + Metrics _cnts; + FNET_Task *_m_reporter; + +public: + explicit RPCHooks(SBEnv &env, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman); + virtual ~RPCHooks(); + + void initRPC(FRT_Supervisor *supervisor); + void reportMetrics(); + const Metrics& getMetrics() const { return _cnts; } + +private: + void rpc_lookupRpcServer(FRT_RPCRequest *req); + + void rpc_registerRpcServer(FRT_RPCRequest *req); + void rpc_unregisterRpcServer(FRT_RPCRequest *req); + + void rpc_addPeer(FRT_RPCRequest *req); + void rpc_removePeer(FRT_RPCRequest *req); + void rpc_listManagedRpcServers(FRT_RPCRequest *req); + void rpc_lookupManaged(FRT_RPCRequest *req); + void rpc_listAllRpcServers(FRT_RPCRequest *req); + void rpc_mirrorFetch(FRT_RPCRequest *req); + void rpc_incrementalFetch(FRT_RPCRequest *req); + void rpc_wantAdd(FRT_RPCRequest *req); + void rpc_doAdd(FRT_RPCRequest *req); + void rpc_doRemove(FRT_RPCRequest *req); + + void rpc_forceUnregisterRpcServer(FRT_RPCRequest *req); + void rpc_listNamesServed(FRT_RPCRequest *req); + void rpc_notifyUnregistered(FRT_RPCRequest *req); + void rpc_getRpcServerHistory(FRT_RPCRequest *req); + + void rpc_stop(FRT_RPCRequest *req); + void rpc_suspend(FRT_RPCRequest *req); + void rpc_version(FRT_RPCRequest *req); + void rpc_resume(FRT_RPCRequest *req); +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/rpcmirror.h b/slobrok/src/vespa/slobrok/server/rpcmirror.h new file mode 100644 index 00000000000..889b00c6ba1 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/rpcmirror.h @@ -0,0 +1,273 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace slobrok { + +namespace { + +class MirrorFetch : public FNET_Task, + public VisibleMap::IUpdateListener +{ +private: + MirrorFetch(const MirrorFetch &); + MirrorFetch& operator=(const MirrorFetch &); + + FRT_RPCRequest *_req; + VisibleMap &_map; + vespalib::GenCnt _gen; + +public: + MirrorFetch(FRT_Supervisor *orb, + FRT_RPCRequest *req, + VisibleMap &map, + vespalib::GenCnt gen); + virtual ~MirrorFetch(); + + void completeReq(); + virtual void PerformTask(); + virtual void updated(VisibleMap &map); + virtual void aborted(VisibleMap &map); + void invoke(uint32_t msTimeout); +}; + + +MirrorFetch::MirrorFetch(FRT_Supervisor *orb, + FRT_RPCRequest *req, + VisibleMap &map, + vespalib::GenCnt gen) + : FNET_Task(orb->GetScheduler()), + _req(req), + _map(map), + _gen(gen) +{ +} + + +MirrorFetch::~MirrorFetch() +{ + LOG_ABORT("Should never be called!"); +} + + +void +MirrorFetch::completeReq() +{ + vespalib::GenCnt newgen = _map.genCnt(); + if (newgen == _gen) { // no change + _req->GetReturn()->AddStringArray(0); + _req->GetReturn()->AddStringArray(0); + } else { + std::vector<const NamedService *> rpcsrvlist = _map.allVisible(); + + FRT_Values &dst = *_req->GetReturn(); + size_t sz = rpcsrvlist.size(); + FRT_StringValue *names = dst.AddStringArray(sz); + FRT_StringValue *specs = dst.AddStringArray(sz); + for (uint32_t i = 0; i < rpcsrvlist.size(); ++i) { + dst.SetString(&names[i], rpcsrvlist[i]->getName()); + dst.SetString(&specs[i], rpcsrvlist[i]->getSpec()); + } + if (sz > 0) { + LOG(debug, "mirrorFetch %p -> %u, last [%s,%s]", + this, + (unsigned int)sz, + dst[0]._string_array._pt[sz-1]._str, + dst[1]._string_array._pt[sz-1]._str); + } else { + LOG(debug, "mirrorFetch %p -> 0 size", this); + } + } + _req->GetReturn()->AddInt32(newgen.getAsInt()); + LOG(debug, "mirrorFetch %p done (gen %d -> gen %d)", + this, _gen.getAsInt(), newgen.getAsInt()); + _req->Return(); +} + + +void +MirrorFetch::PerformTask() +{ + // cancel update notification + _map.removeUpdateListener(this); + completeReq(); +} + + +void +MirrorFetch::updated(VisibleMap &map) +{ + LOG_ASSERT(&map == &_map); + (void) ↦ + // unschedule timeout task + Unschedule(); + completeReq(); +} + + +void +MirrorFetch::aborted(VisibleMap &map) +{ + LOG_ASSERT(&map == &_map); + (void) ↦ + // unschedule timeout task + Unschedule(); + _req->SetError(FRTE_RPC_METHOD_FAILED, "slobrok shutting down"); + _req->Return(); +} + + +void +MirrorFetch::invoke(uint32_t msTimeout) +{ + _req->Detach(); + LOG(debug, "MirrorFetch %p invoked from %s (gen %d, timeout %d ms)", + this, _req->GetConnection()->GetSpec(), _gen.getAsInt(), msTimeout); + if (_map.genCnt() != _gen || msTimeout == 0) { + completeReq(); + } else { + _map.addUpdateListener(this); // register as update listener + if (msTimeout > 10000) + msTimeout = 10000; + Schedule((double) msTimeout / 1000.0); + } +} + + + +class IncrementalFetch : public FNET_Task, + public VisibleMap::IUpdateListener +{ +private: + IncrementalFetch(const IncrementalFetch &); + IncrementalFetch& operator=(const IncrementalFetch &); + + FRT_RPCRequest *_req; + VisibleMap &_map; + vespalib::GenCnt _gen; + +public: + IncrementalFetch(FRT_Supervisor *orb, + FRT_RPCRequest *req, + VisibleMap &map, + vespalib::GenCnt gen); + virtual ~IncrementalFetch(); + + void completeReq(); + virtual void PerformTask(); + virtual void updated(VisibleMap &map); + virtual void aborted(VisibleMap &map); + void invoke(uint32_t msTimeout); +}; + + +IncrementalFetch::IncrementalFetch(FRT_Supervisor *orb, + FRT_RPCRequest *req, + VisibleMap &map, + vespalib::GenCnt gen) + : FNET_Task(orb->GetScheduler()), + _req(req), + _map(map), + _gen(gen) +{ +} + + +IncrementalFetch::~IncrementalFetch() +{ + LOG_ABORT("Should never be called!"); +} + + +void +IncrementalFetch::completeReq() +{ + vespalib::GenCnt newgen = _map.genCnt(); + VisibleMap::MapDiff diff; + FRT_Values &dst = *_req->GetReturn(); + + if (newgen == _gen) { // no change + dst.AddInt32(_gen.getAsInt()); + } else if (_map.hasHistory(_gen)) { + diff = _map.history(_gen); + dst.AddInt32(_gen.getAsInt()); + } else { + dst.AddInt32(0); + diff.updated = _map.allVisible(); + } + + size_t sz = diff.removed.size(); + FRT_StringValue *rem = dst.AddStringArray(sz); + for (uint32_t i = 0; i < sz; ++i) { + dst.SetString(&rem[i], diff.removed[i].c_str()); + } + + sz = diff.updated.size(); + FRT_StringValue *names = dst.AddStringArray(sz); + FRT_StringValue *specs = dst.AddStringArray(sz); + for (uint32_t i = 0; i < sz; ++i) { + dst.SetString(&names[i], diff.updated[i]->getName()); + dst.SetString(&specs[i], diff.updated[i]->getSpec()); + } + + dst.AddInt32(newgen.getAsInt()); + LOG(debug, "mirrorFetch %p done (gen %d -> gen %d)", + this, _gen.getAsInt(), newgen.getAsInt()); + _req->Return(); +} + + +void +IncrementalFetch::PerformTask() +{ + // cancel update notification + _map.removeUpdateListener(this); + completeReq(); +} + + +void +IncrementalFetch::updated(VisibleMap &map) +{ + LOG_ASSERT(&map == &_map); + (void) ↦ + // unschedule timeout task + Unschedule(); + completeReq(); +} + + +void +IncrementalFetch::aborted(VisibleMap &map) +{ + LOG_ASSERT(&map == &_map); + (void) ↦ + // unschedule timeout task + Unschedule(); + _req->SetError(FRTE_RPC_METHOD_FAILED, "slobrok shutting down"); + _req->Return(); +} + + +void +IncrementalFetch::invoke(uint32_t msTimeout) +{ + _req->Detach(); + LOG(debug, "IncrementalFetch %p invoked from %s (gen %d, timeout %d ms)", + this, _req->GetConnection()->GetSpec(), _gen.getAsInt(), msTimeout); + if (_map.genCnt() != _gen || msTimeout == 0) { + completeReq(); + } else { + _map.addUpdateListener(this); // register as update listener + if (msTimeout > 10000) + msTimeout = 10000; + Schedule((double) msTimeout / 1000.0); + } +} + + +} // namespace <unnamed> + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp new file mode 100644 index 00000000000..858c6203f33 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -0,0 +1,268 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/fnet/fnet.h> +#include <vespa/fnet/frt/frt.h> + +#include "sbenv.h" +#include <memory> +#include <string> +#include <sstream> +#include <vespa/vespalib/util/exception.h> +#include <vespa/vespalib/net/state_server.h> +#include <vespa/vespalib/util/host_name.h> + +#include "rpchooks.h" +#include "selfcheck.h" +#include "remote_check.h" + +#include <vespa/log/log.h> +LOG_SETUP(".sbenv"); + + +namespace slobrok { +namespace { + +void +discard(std::vector<std::string> &vec, const std::string & val) +{ + uint32_t i = 0; + uint32_t size = vec.size(); + while (i < size) { + if (vec[i] == val) { + std::swap(vec[i], vec[size - 1]); + vec.pop_back(); + --size; + } else { + ++i; + } + } + LOG_ASSERT(size == vec.size()); +} + + +class ConfigTask : public FNET_Task +{ +private: + Configurator& _configurator; + + ConfigTask(const ConfigTask &); + ConfigTask &operator=(const ConfigTask &); +public: + ConfigTask(FNET_Scheduler *sched, Configurator& configurator); + + virtual ~ConfigTask(); + virtual void PerformTask(); +}; + + +ConfigTask::ConfigTask(FNET_Scheduler *sched, Configurator& configurator) + : FNET_Task(sched), + _configurator(configurator) +{ + Schedule(1.0); +} + + +ConfigTask::~ConfigTask() +{ + Kill(); +} + + +void +ConfigTask::PerformTask() +{ + Schedule(1.0); + LOG(spam, "checking for new config"); + try { + _configurator.poll(); + } catch (std::exception &e) { + LOG(warning, "ConfigTask: poll failed: %s", e.what()); + Schedule(10.0); + } +} + +} // namespace slobrok::<unnamed> + +SBEnv::SBEnv(const ConfigShim &shim) + : _transport(new FNET_Transport()), + _supervisor(new FRT_Supervisor(_transport.get(), NULL)), + _sbPort(shim.portNumber()), + _statePort(shim.statePort()), + _configurator(shim.factory().create(*this)), + _shuttingDown(false), + _partnerList(), + _me(), + _rpcHooks(*this, _rpcsrvmap, _rpcsrvmanager), + _selfchecktask(new SelfCheck(getSupervisor()->GetScheduler(), _rpcsrvmap, _rpcsrvmanager)), + _remotechecktask(new RemoteCheck(getSupervisor()->GetScheduler(), _rpcsrvmap, _rpcsrvmanager, _exchanger)), + _health(), + _metrics(_rpcHooks, *_transport), + _components(), + _rpcsrvmanager(*this), + _exchanger(*this, _rpcsrvmap), + _rpcsrvmap() +{ + srandom(time(NULL) ^ getpid()); + _rpcHooks.initRPC(getSupervisor()); +} + + +SBEnv::~SBEnv() +{ + getTransport()->WaitFinished(); +} + + +void +SBEnv::shutdown() +{ + _shuttingDown = true; + getTransport()->ShutDown(false); +} + +void +SBEnv::resume() +{ + // nop +} + +namespace { + +std::string +createSpec(int port) +{ + if (port == 0) { + return std::string(); + } + std::ostringstream str; + str << "tcp/"; + str << vespalib::HostName::get(); + str << ":"; + str << port; + return str.str(); +} + +vespalib::string +toString(const std::vector<std::string> & v) { + vespalib::asciistream os; + os << "[" << '\n'; + for (const std::string & partner : v) { + os << " " << partner << '\n'; + } + os << ']'; + return os.str(); +} + +} // namespace <unnamed> + +int +SBEnv::MainLoop() +{ + vespalib::StateServer stateServer(_statePort, _health, _metrics, _components); + + if (! getSupervisor()->Listen(_sbPort)) { + LOG(error, "unable to listen to port %d", _sbPort); + EV_STOPPING("slobrok", "could not listen"); + return 1; + } else { + LOG(config, "listening on port %d", _sbPort); + } + + std::string myspec = createSpec(_sbPort); + + _me.reset(new ManagedRpcServer(myspec.c_str(), myspec.c_str(), + _rpcsrvmanager)); + + try { + _configurator->poll(); + ConfigTask configTask(getScheduler(), *_configurator); + LOG(debug, "slobrok: starting main event loop"); + EV_STARTED("slobrok"); + getTransport()->Main(); + LOG(debug, "slobrok: main event loop done"); + } catch (vespalib::Exception &e) { + LOG(error, "invalid config: %s", e.what()); + EV_STOPPING("slobrok", "invalid config"); + return 1; + } catch (...) { + LOG(error, "unknown exception while configuring"); + EV_STOPPING("slobrok", "unknown config exception"); + return 1; + } + EV_STOPPING("slobrok", "clean shutdown"); + return 0; +} + + +void +SBEnv::setup(const std::vector<std::string> &cfg) +{ + _partnerList = cfg; + std::vector<std::string> oldList = _exchanger.getPartnerList(); + LOG(debug, "(re-)configuring. oldlist size %d, configuration list size %d", + (int)oldList.size(), + (int)cfg.size()); + for (uint32_t i = 0; i < cfg.size(); ++i) { + std::string slobrok = cfg[i]; + discard(oldList, slobrok); + if (slobrok != mySpec()) { + OkState res = _rpcsrvmanager.addPeer(slobrok.c_str(), slobrok.c_str()); + if (!res.ok()) { + LOG(warning, "could not add peer %s: %s", slobrok.c_str(), + res.errorMsg.c_str()); + } else { + LOG(config, "added peer %s", slobrok.c_str()); + } + } + } + for (uint32_t i = 0; i < oldList.size(); ++i) { + OkState res = _rpcsrvmanager.removePeer(oldList[i].c_str(), oldList[i].c_str()); + if (!res.ok()) { + LOG(warning, "could not remove peer %s: %s", oldList[i].c_str(), + res.errorMsg.c_str()); + } else { + LOG(config, "removed peer %s", oldList[i].c_str()); + } + } + int64_t curGen = _configurator->getGeneration(); + vespalib::ComponentConfigProducer::Config current("slobroks", curGen, "ok"); + _components.addConfig(current); +} + +OkState +SBEnv::addPeer(const std::string &name, const std::string &spec) +{ + if (spec == mySpec()) { + return OkState(FRTE_RPC_METHOD_FAILED, "cannot add my own spec as peer"); + } + if (_partnerList.size() != 0) { + for (const std::string & partner : _partnerList) { + if (partner == spec) { + return OkState(0, "already configured with peer"); + } + } + vespalib::string peers = toString(_partnerList); + LOG(warning, "got addPeer with non-configured peer %s, check config consistency. configured peers = %s", + spec.c_str(), peers.c_str()); + return OkState(FRTE_RPC_METHOD_FAILED, "configured partner list does not contain peer. configured peers = " + peers); + } + return _rpcsrvmanager.addPeer(name.c_str(), spec.c_str()); +} + +OkState +SBEnv::removePeer(const std::string &name, const std::string &spec) +{ + if (spec == mySpec()) { + return OkState(FRTE_RPC_METHOD_FAILED, "cannot remove my own spec as peer"); + } + for (const std::string & partner : _partnerList) { + if (partner == spec) { + return OkState(FRTE_RPC_METHOD_FAILED, "configured partner list contains peer, cannot remove"); + } + } + return _rpcsrvmanager.removePeer(name.c_str(), spec.c_str()); +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h new file mode 100644 index 00000000000..1988e8821a6 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -0,0 +1,92 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fastos/fastos.h> +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "remote_slobrok.h" +#include "exchange_manager.h" +#include "configshim.h" +#include "ok_state.h" +#include <vespa/config-slobroks.h> +#include <vespa/slobrok/cfg.h> +#include <vespa/vespalib/net/simple_health_producer.h> +#include "metrics_producer.h" +#include <vespa/vespalib/net/simple_component_config_producer.h> + +class FastOS_ThreadPool; +class FNET_Transport; +class FNET_Scheduler; +class FRT_Supervisor; + +namespace slobrok { + +class NamedService; +class ManagedRpcServer; +class RemoteRpcServer; +class RPCHooks; +class SelfCheck; +class RemoteCheck; + +/** + * @class SBEnv + * @brief Environmental class containing an entire server location broker + * + * XXX more description needed + **/ +class SBEnv : public Configurable +{ +private: + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _supervisor; + + uint32_t _sbPort; + uint32_t _statePort; + Configurator::UP _configurator; + bool _shuttingDown; + + SBEnv(const SBEnv &); // Not used + SBEnv &operator=(const SBEnv &); // Not used + + void setup(const std::vector<std::string> &cfg); + + std::vector<std::string> _partnerList; + std::unique_ptr<ManagedRpcServer> _me; + RPCHooks _rpcHooks; + std::unique_ptr<SelfCheck> _selfchecktask; + std::unique_ptr<RemoteCheck> _remotechecktask; + vespalib::SimpleHealthProducer _health; + MetricsProducer _metrics; + vespalib::SimpleComponentConfigProducer _components; + +public: + explicit SBEnv(const ConfigShim &shim); + virtual ~SBEnv(); + + FNET_Transport *getTransport() { return _transport.get(); } + FNET_Scheduler *getScheduler() { return _transport->GetScheduler(); } + FRT_Supervisor *getSupervisor() { return _supervisor.get(); } + + void shutdown(); + void suspend(); + void resume(); + + RpcServerManager _rpcsrvmanager; + ExchangeManager _exchanger; + RpcServerMap _rpcsrvmap; + + const char *mySpec() const { return _me->getSpec(); } + + bool isSuspended() const { return false; } + bool isShuttingDown() const { return _shuttingDown; } + + int MainLoop(); + + OkState addPeer(const std::string& name, const std::string &spec); + OkState removePeer(const std::string& name, const std::string &spec); + +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/selfcheck.cpp b/slobrok/src/vespa/slobrok/server/selfcheck.cpp new file mode 100644 index 00000000000..bb8cb930bae --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/selfcheck.cpp @@ -0,0 +1,58 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".selfcheck"); + +#include <vespa/fnet/frt/frt.h> + +#include "selfcheck.h" +#include "ok_state.h" +#include "named_service.h" +#include "rpc_server_map.h" +#include "rpc_server_manager.h" +#include "managed_rpc_server.h" +#include "random.h" + +namespace slobrok { + + +SelfCheck::SelfCheck(FNET_Scheduler *sched, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman) + : FNET_Task(sched), + _rpcsrvmap(rpcsrvmap), _rpcsrvmanager(rpcsrvman) +{ + // start within 1 second + double seconds = randomIn(0.123, 1.000); + LOG(debug, "selfcheck in %g seconds", seconds); + Schedule(seconds); +} + + +SelfCheck::~SelfCheck() +{ + Kill(); +} + + +void +SelfCheck::PerformTask() +{ + std::vector<const NamedService *> mrpcsrvlist = _rpcsrvmap.allManaged(); + + for (size_t i = 0; i < mrpcsrvlist.size(); ++i) { + const NamedService *r = mrpcsrvlist[i]; + ManagedRpcServer *m = _rpcsrvmap.lookupManaged(r->getName()); + LOG_ASSERT(r == m); + LOG(debug, "managed: %s -> %s", m->getName(), m->getSpec()); + m->healthCheck(); + } + // reschedule in 1-2 seconds: + double seconds = randomIn(0.987, 2.000); + LOG(debug, "selfcheck AGAIN in %g seconds", seconds); + Schedule(seconds); +} + + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/selfcheck.h b/slobrok/src/vespa/slobrok/server/selfcheck.h new file mode 100644 index 00000000000..f8274b44767 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/selfcheck.h @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/fnet.h> +#include <vespa/fnet/frt/frt.h> + +namespace slobrok { + +class SBEnv; +class RpcServerMap; +class RpcServerManager; +class ExchangeManager; + +/** + * @class SelfCheck + * @brief Periodic healthcheck task + * + * Checks the health of this location broker + * and its ManagedRpcServer objects periodically. + **/ +class SelfCheck : public FNET_Task +{ +private: + RpcServerMap &_rpcsrvmap; + RpcServerManager &_rpcsrvmanager; + + SelfCheck(const SelfCheck &); // Not used + SelfCheck &operator=(const SelfCheck &); // Not used +public: + explicit SelfCheck(FNET_Scheduler *sched, + RpcServerMap& rpcsrvmap, + RpcServerManager& rpcsrvman); + virtual ~SelfCheck(); +private: + virtual void PerformTask(); +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp new file mode 100644 index 00000000000..731a2a1da60 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include "slobrokserver.h" + +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.server"); + +namespace slobrok { + +SlobrokServer::SlobrokServer(ConfigShim &shim) + : _env(shim), + _thread(*this) +{ + _thread.start(); +} + +SlobrokServer::SlobrokServer(uint32_t port) + : _env(ConfigShim(port)), + _thread(*this) +{ + _thread.start(); +} + + +SlobrokServer::~SlobrokServer() +{ + _env.shutdown(); + _thread.join(); +} + +void +SlobrokServer::run() +{ + _env.MainLoop(); +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.h b/slobrok/src/vespa/slobrok/server/slobrokserver.h new file mode 100644 index 00000000000..c6c41a45b12 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.h @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fastos/fastos.h> +#include "sbenv.h" +#include "configshim.h" +#include <vespa/vespalib/util/thread.h> +#include <vespa/vespalib/util/runnable.h> +#include <vespa/config/config.h> +#include <vespa/config-slobroks.h> + +namespace slobrok { + + +class SlobrokServer : public vespalib::Runnable +{ +private: + SBEnv _env; + vespalib::Thread _thread; + +public: + SlobrokServer(ConfigShim &shim); + SlobrokServer(uint32_t port); + ~SlobrokServer(); + + virtual void run(); + + void stop() { _env.shutdown(); } +}; + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/visible_map.cpp b/slobrok/src/vespa/slobrok/server/visible_map.cpp new file mode 100644 index 00000000000..6d9ce1b4c19 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/visible_map.cpp @@ -0,0 +1,177 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include <vespa/log/log.h> +LOG_SETUP(".vismap"); + +#include "visible_map.h" +#include "named_service.h" + +namespace slobrok { + +void +VisibleMap::updated() +{ + _genCnt.add(); + WaitList waitList; + std::swap(waitList, _waitList); + for (uint32_t i = 0; i < waitList.size(); ++i) { + waitList[i]->updated(*this); + } +} + + +void +VisibleMap::aborted() +{ + WaitList waitList; + std::swap(waitList, _waitList); + for (uint32_t i = 0; i < waitList.size(); ++i) { + waitList[i]->aborted(*this); + } +} + + +void +VisibleMap::addUpdateListener(IUpdateListener *l) +{ + _waitList.push_back(l); +} + + +void +VisibleMap::removeUpdateListener(IUpdateListener *l) +{ + uint32_t i = 0; + uint32_t size = _waitList.size(); + while (i < size) { + if (_waitList[i] == l) { + std::swap(_waitList[i], _waitList[size - 1]); + _waitList.pop_back(); + --size; + } else { + ++i; + } + } + LOG_ASSERT(size == _waitList.size()); +} + +//----------------------------------------------------------------------------- + +std::vector<const NamedService *> +VisibleMap::lookupPattern(const char *pattern) const +{ + std::vector<const NamedService *> retval; + for (iter_t it = _map.iterator(); it.valid(); it.next()) + { + if (match(it.key(), pattern)) { + retval.push_back(it.value()); + } + } + return retval; +} + + +std::vector<const NamedService *> +VisibleMap::allVisible() const +{ + std::vector<const NamedService *> retval; + // get list of all names in myrpcsrvmap + for (iter_t it = _map.iterator(); it.valid(); it.next()) + { + retval.push_back(it.value()); + } + return retval; +} + + + +void +VisibleMap::addNew(NamedService *rpcsrv) +{ + LOG_ASSERT(rpcsrv != NULL); + LOG_ASSERT(_map.isSet(rpcsrv->getName()) == false); + _map.set(rpcsrv->getName(), rpcsrv); + + _history.add(rpcsrv->getName(), _genCnt); + updated(); +} + + +NamedService * +VisibleMap::remove(const char *name) { + + NamedService *d = _map.remove(name); + if (d != NULL) { + _history.add(name, _genCnt); + updated(); + } + return d; +} + + +NamedService * +VisibleMap::update(NamedService *rpcsrv) { + LOG_ASSERT(rpcsrv != NULL); + + NamedService *d = _map.remove(rpcsrv->getName()); + LOG_ASSERT(d != NULL); + + _map.set(rpcsrv->getName(), rpcsrv); + + _history.add(rpcsrv->getName(), _genCnt); + updated(); + + return d; +} + +VisibleMap::MapDiff +VisibleMap::history(const vespalib::GenCnt& gen) const +{ + MapDiff retval; + std::set<std::string> names = _history.since(gen); + for (std::set<std::string>::iterator it = names.begin(); + it != names.end(); + ++it) + { + const NamedService *val = lookup(it->c_str()); + if (val == NULL) { + retval.removed.push_back(*it); + } else { + retval.updated.push_back(val); + } + } + return retval; +} + + +VisibleMap::~VisibleMap() +{ + aborted(); +} + + +bool +VisibleMap::match(const char *name, const char *pattern) +{ + LOG_ASSERT(name != NULL); + LOG_ASSERT(pattern != NULL); + while (*pattern != '\0') { + if (*name == *pattern) { + ++name; + ++pattern; + } else if (*pattern == '*') { + ++pattern; + while (*name != '/' && *name != '\0') { + ++name; + } + } else { + return false; + } + } + return (*name == *pattern); +} + +//----------------------------------------------------------------------------- + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/visible_map.h b/slobrok/src/vespa/slobrok/server/visible_map.h new file mode 100644 index 00000000000..04a61ea1b4d --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/visible_map.h @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/frt.h> +#include <vespa/vespalib/util/hashmap.h> +#include <vespa/vespalib/util/gencnt.h> +#include <vector> +#include <string> + +#include "history.h" + +namespace slobrok { + +class NamedService; + +/** + * @class VisibleMap + * @brief API to the collection of NamedService + * name->spec mappings visible to the world + **/ + +using vespalib::HashMap; + + +class VisibleMap +{ +public: + class IUpdateListener + { + public: + /** + * Signals that the given RPC server map has been updated. The + * notification will be one-shot; to get further update + * notifications you will need to re-register the listener. + * + * @param map the map that became updated + **/ + virtual void updated(VisibleMap &map) = 0; + virtual void aborted(VisibleMap &map) = 0; + protected: + virtual ~IUpdateListener() {} + }; + + typedef std::vector<const NamedService *> RpcSrvlist; + + struct MapDiff + { + std::vector<std::string> removed; + RpcSrvlist updated; + }; + +private: + HashMap<NamedService *> _map; + typedef HashMap<NamedService *>::Iterator iter_t; + + typedef std::vector<IUpdateListener *> WaitList; + WaitList _waitList; + vespalib::GenCnt _genCnt; + History _history; + + static bool match(const char *name, const char *pattern); + + VisibleMap(const VisibleMap &); // Not used + VisibleMap &operator=(const VisibleMap &); // Not use + + void updated(); + void aborted(); + +public: + void addUpdateListener(IUpdateListener *l); + void removeUpdateListener(IUpdateListener *l); + + void addNew(NamedService *rpcsrv); + NamedService *remove(const char *name); + NamedService *update(NamedService *rpcsrv); + + NamedService *lookup(const char *name) const { return _map[name]; } + RpcSrvlist lookupPattern(const char *pattern) const; + RpcSrvlist allVisible() const; + + const vespalib::GenCnt& genCnt() { return _genCnt; } + + bool hasHistory(vespalib::GenCnt gen) const { return _history.has(gen); } + + MapDiff history(const vespalib::GenCnt& gen) const; + + VisibleMap() + : _map(NULL), + _waitList(), + _genCnt(1) + { + } + ~VisibleMap(); +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/vtag.cpp b/slobrok/src/vespa/slobrok/server/vtag.cpp new file mode 100644 index 00000000000..380d653bd28 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/vtag.cpp @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "vtag.h" + +#ifndef V_TAG +#define V_TAG "NOTAG" +#define V_TAG_DATE "NOTAG" +#define V_TAG_SYSTEM "NOTAG" +#define V_TAG_SYSTEM_REV "NOTAG" +#define V_TAG_BUILDER "NOTAG" +#endif + +namespace slobrok { + +char VersionTag[] = V_TAG; +char VersionTagDate[] = V_TAG_DATE; +char VersionTagSystem[] = V_TAG_SYSTEM; +char VersionTagSystemRev[] = V_TAG_SYSTEM_REV; +char VersionTagBuilder[] = V_TAG_BUILDER; + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/vtag.h b/slobrok/src/vespa/slobrok/server/vtag.h new file mode 100644 index 00000000000..6e9227a5951 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/vtag.h @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace slobrok { + +extern char VersionTag[]; +extern char VersionTagType[]; +extern char VersionTagValue[]; +extern char VersionTagDate[]; +extern char VersionTagSystem[]; +extern char VersionTagSystemRev[]; +extern char VersionTagBuilder[]; + +} // namespace slobrok + diff --git a/slobrok/tmp/.gitignore b/slobrok/tmp/.gitignore new file mode 100644 index 00000000000..6ee112c16d8 --- /dev/null +++ b/slobrok/tmp/.gitignore @@ -0,0 +1,7 @@ +.depend +Makefile +config-slobroks.cpp +config-slobroks.h +sbcmd +slobrok +tstdst |