diff options
97 files changed, 546 insertions, 721 deletions
diff --git a/config/src/tests/configfetcher/configfetcher.cpp b/config/src/tests/configfetcher/configfetcher.cpp index 02d90352f69..eab2e8a8c84 100644 --- a/config/src/tests/configfetcher/configfetcher.cpp +++ b/config/src/tests/configfetcher/configfetcher.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/config/helper/configfetcher.h> +#include <vespa/vespalib/util/exception.h> #include <fstream> #include "config-my.h" #include <atomic> @@ -151,7 +152,7 @@ TEST_F("verify that config generation can be obtained from config fetcher", Conf if (cb._configured) { break; } - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(std::chrono::milliseconds(10));; } EXPECT_EQUAL(2, fetcher.getGeneration()); EXPECT_EQUAL("bar", cb._config.get()->myField); diff --git a/config/src/tests/configparser/configparser.cpp b/config/src/tests/configparser/configparser.cpp index 40c0a5b99bd..f9e36a11def 100644 --- a/config/src/tests/configparser/configparser.cpp +++ b/config/src/tests/configparser/configparser.cpp @@ -2,6 +2,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/config/config.h> #include <vespa/config/common/configparser.h> +#include <vespa/config/common/exceptions.h> #include "config-foo.h" #include <fstream> #include <vespa/vespalib/stllike/asciistream.h> diff --git a/config/src/tests/configretriever/configretriever.cpp b/config/src/tests/configretriever/configretriever.cpp index 8273eee2c73..8e2d38e3c4d 100644 --- a/config/src/tests/configretriever/configretriever.cpp +++ b/config/src/tests/configretriever/configretriever.cpp @@ -9,6 +9,7 @@ #include <vespa/config/retriever/simpleconfigurer.h> #include <vespa/config/common/configholder.h> #include <vespa/config/subscription/configsubscription.h> +#include <vespa/config/common/exceptions.h> #include "config-bootstrap.h" #include "config-foo.h" #include "config-bar.h" diff --git a/config/src/tests/file_subscription/file_subscription.cpp b/config/src/tests/file_subscription/file_subscription.cpp index 622610ac1b5..25d27bcf905 100644 --- a/config/src/tests/file_subscription/file_subscription.cpp +++ b/config/src/tests/file_subscription/file_subscription.cpp @@ -3,6 +3,7 @@ #include <vespa/config/config.h> #include <vespa/config/common/configholder.h> #include <vespa/config/file/filesource.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/sync.h> #include <fstream> #include <config-my.h> diff --git a/config/src/tests/functiontest/functiontest.cpp b/config/src/tests/functiontest/functiontest.cpp index 28b688c189e..34429ab4016 100644 --- a/config/src/tests/functiontest/functiontest.cpp +++ b/config/src/tests/functiontest/functiontest.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/config/config.h> +#include <vespa/config/common/exceptions.h> #include "config-function-test.h" #include <fstream> diff --git a/config/src/tests/subscriber/subscriber.cpp b/config/src/tests/subscriber/subscriber.cpp index 3bae5ed85b1..ce9a8fcbb6a 100644 --- a/config/src/tests/subscriber/subscriber.cpp +++ b/config/src/tests/subscriber/subscriber.cpp @@ -4,6 +4,7 @@ #include <vespa/config/common/misc.h> #include <vespa/config/common/configholder.h> #include <vespa/config/subscription/configsubscription.h> +#include <vespa/config/common/exceptions.h> #include <fstream> #include "config-foo.h" #include "config-bar.h" @@ -281,9 +282,9 @@ TEST_MT_FFF("requireThatConfigIsReturnedWhenUpdatedDuringNextConfig", 2, MyManag verifyConfig("foo2", f3.h1->getConfig()); verifyConfig("bar", f3.h2->getConfig()); } else { - FastOS_Thread::Sleep(300); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); f1.updateValue(0, createFooValue("foo2"), 2); - FastOS_Thread::Sleep(300); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); f1.updateGeneration(1, 2); } } @@ -331,7 +332,7 @@ TEST_MT_FFF("requireThatNextConfigIsInterruptedOnClose", 2, MyManager, APIFixtur ASSERT_TRUE(timer.MilliSecsToNow() >= 500.0); ASSERT_TRUE(timer.MilliSecsToNow() < 60000.0); } else { - FastOS_Thread::Sleep(1000); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); f3.s.close(); } } @@ -514,7 +515,7 @@ TEST_MT_FF("requireThatConfigSubscriberWaitsUntilNextConfigSucceeds", 2, MyManag verifyConfig("foo2", h1->getConfig()); // First update is skipped } else { TEST_BARRIER(); - FastOS_Thread::Sleep(1000); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); f1.updateValue(0, createFooValue("foo2"), 3); } } diff --git a/config/src/vespa/config/common/configparser.cpp b/config/src/vespa/config/common/configparser.cpp index e0a0b0138b9..c490133172f 100644 --- a/config/src/vespa/config/common/configparser.cpp +++ b/config/src/vespa/config/common/configparser.cpp @@ -1,11 +1,17 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configparser.h" +#include "exceptions.h" #include "misc.h" #include <vespa/vespalib/stllike/asciistream.h> namespace config { +void ConfigParser::throwNoDefaultValue(const vespalib::stringref & key) { + throw InvalidConfigException("Config parameter " + key + " has no " + "default value and is not specified in config", VESPA_STRLOC); +} + vespalib::string ConfigParser::deQuote(const vespalib::stringref & source) { diff --git a/config/src/vespa/config/common/configparser.h b/config/src/vespa/config/common/configparser.h index 613dfc33d94..cde036281a5 100644 --- a/config/src/vespa/config/common/configparser.h +++ b/config/src/vespa/config/common/configparser.h @@ -1,7 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/stringfmt.h> #include <map> #include <set> @@ -25,6 +24,7 @@ private: static std::map<vespalib::string, vsvector> splitMap( const vsvector & config); static vespalib::string deQuote(const vespalib::stringref & source); + static void throwNoDefaultValue(const vespalib::stringref & key); template<typename T> static T convert(const vsvector &); @@ -81,8 +81,7 @@ ConfigParser::parseInternal(const vespalib::stringref & key, const V & config) V lines = getLinesForKey(key, config); if (lines.size() == 0) { - throw InvalidConfigException("Config parameter " + key + " has no " - "default value and is not specified in config", VESPA_STRLOC); + throwNoDefaultValue(key); } return convert<T>(lines); } diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp index 0a4b95e9153..1f1bfe69bb8 100644 --- a/config/src/vespa/config/helper/configfetcher.cpp +++ b/config/src/vespa/config/helper/configfetcher.cpp @@ -1,7 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configfetcher.h" - +#include <vespa/vespalib/util/thread.h> #include <vespa/log/log.h> LOG_SETUP(".config.helper.configfetcher"); @@ -9,7 +9,7 @@ namespace config { ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) : _poller(context), - _thread(_poller), + _thread(std::make_unique<vespalib::Thread>(_poller)), _closed(false), _started(false) { @@ -17,20 +17,25 @@ ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) ConfigFetcher::ConfigFetcher(const SourceSpec & spec) : _poller(IConfigContext::SP(new ConfigContext(spec))), - _thread(_poller), + _thread(std::make_unique<vespalib::Thread>(_poller)), _closed(false), _started(false) { } void +ConfigFetcher::subscribeGenerationChanges(IGenerationCallback * callback) { + _poller.subscribeGenerationChanges(callback); +} + +void ConfigFetcher::start() { if (!_closed) { LOG(debug, "Polling for config"); _poller.poll(); LOG(debug, "Starting fetcher thread..."); - _thread.start(); + _thread->start(); _started = true; LOG(debug, "Fetcher thread started"); } @@ -47,7 +52,7 @@ ConfigFetcher::close() if (!_closed) { _poller.close(); if (_started) - _thread.join(); + _thread->join(); } } diff --git a/config/src/vespa/config/helper/configfetcher.h b/config/src/vespa/config/helper/configfetcher.h index ed04dc62f50..872937d8635 100644 --- a/config/src/vespa/config/helper/configfetcher.h +++ b/config/src/vespa/config/helper/configfetcher.h @@ -2,11 +2,11 @@ #pragma once #include "configpoller.h" -#include <vespa/config/config.h> #include <vespa/config/common/timingvalues.h> -#include <vespa/vespalib/util/thread.h> #include <atomic> +namespace vespalib { class Thread; } + namespace config { /** @@ -22,16 +22,14 @@ public: template <typename ConfigType> void subscribe(const std::string & configId, IFetcherCallback<ConfigType> * callback, uint64_t subscribeTimeout = DEFAULT_SUBSCRIBE_TIMEOUT); - void subscribeGenerationChanges(IGenerationCallback * callback) { - _poller.subscribeGenerationChanges(callback); - } + void subscribeGenerationChanges(IGenerationCallback * callback); void start(); void close(); int64_t getGeneration() const { return _poller.getGeneration(); } private: ConfigPoller _poller; - vespalib::Thread _thread; + std::unique_ptr<vespalib::Thread> _thread; std::atomic<bool> _closed; std::atomic<bool> _started; }; diff --git a/config/src/vespa/config/print/fileconfigsnapshotreader.cpp b/config/src/vespa/config/print/fileconfigsnapshotreader.cpp index 0cc7fe9fe38..951619aee6f 100644 --- a/config/src/vespa/config/print/fileconfigsnapshotreader.cpp +++ b/config/src/vespa/config/print/fileconfigsnapshotreader.cpp @@ -1,10 +1,10 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <fstream> -#include <sstream> #include "fileconfigsnapshotreader.h" #include "jsonconfigformatter.h" -#include <iostream> +#include <vespa/config/common/exceptions.h> +#include <fstream> +#include <sstream> namespace config { diff --git a/config/src/vespa/config/print/fileconfigsnapshotwriter.cpp b/config/src/vespa/config/print/fileconfigsnapshotwriter.cpp index de5c3af065d..5717e5e9781 100644 --- a/config/src/vespa/config/print/fileconfigsnapshotwriter.cpp +++ b/config/src/vespa/config/print/fileconfigsnapshotwriter.cpp @@ -1,8 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <fstream> #include "fileconfigsnapshotwriter.h" #include "jsonconfigformatter.h" +#include <vespa/config/common/exceptions.h> +#include <fstream> namespace config { diff --git a/config/src/vespa/config/retriever/configretriever.cpp b/config/src/vespa/config/retriever/configretriever.cpp index 240ba3bac00..9d49ad3324e 100644 --- a/config/src/vespa/config/retriever/configretriever.cpp +++ b/config/src/vespa/config/retriever/configretriever.cpp @@ -1,6 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configretriever.h" +#include <vespa/config/common/exceptions.h> + namespace config { diff --git a/config/src/vespa/config/retriever/configsnapshot.cpp b/config/src/vespa/config/retriever/configsnapshot.cpp index b04f44bf600..717c10d30e0 100644 --- a/config/src/vespa/config/retriever/configsnapshot.cpp +++ b/config/src/vespa/config/retriever/configsnapshot.cpp @@ -1,10 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configsnapshot.h" -#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/config/subscription/configsubscription.h> +#include <vespa/config/print/configdatabuffer.h> +#include <vespa/config/common/exceptions.h> #include <vespa/config/common/misc.h> #include <vespa/vespalib/data/slime/slime.h> -#include <vespa/vespalib/data/memory.h> +#include <vespa/vespalib/stllike/asciistream.h> using vespalib::Slime; using vespalib::slime::Cursor; @@ -16,23 +18,19 @@ namespace config { const int64_t ConfigSnapshot::SNAPSHOT_FORMAT_VERSION = 1; ConfigSnapshot::ConfigSnapshot() - : _valueMap(), - _generation(0) -{} + : _valueMap(), + _generation(0) {} -ConfigSnapshot::~ConfigSnapshot() -{ +ConfigSnapshot::~ConfigSnapshot() { } -ConfigSnapshot::ConfigSnapshot(const ConfigSnapshot & rhs) : - _valueMap(rhs._valueMap), - _generation(rhs._generation) -{ +ConfigSnapshot::ConfigSnapshot(const ConfigSnapshot &rhs) : + _valueMap(rhs._valueMap), + _generation(rhs._generation) { } ConfigSnapshot & -ConfigSnapshot::operator = (const ConfigSnapshot & rhs) -{ +ConfigSnapshot::operator=(const ConfigSnapshot &rhs) { if (&rhs != this) { ConfigSnapshot tmp(rhs); tmp.swap(*this); @@ -41,25 +39,31 @@ ConfigSnapshot::operator = (const ConfigSnapshot & rhs) } void -ConfigSnapshot::swap(ConfigSnapshot & rhs) -{ +ConfigSnapshot::swap(ConfigSnapshot &rhs) { _valueMap.swap(rhs._valueMap); std::swap(_generation, rhs._generation); } -ConfigSnapshot::ConfigSnapshot(const SubscriptionList & subscriptionList, int64_t generation) - : _valueMap(), - _generation(generation) -{ +ConfigSnapshot::ConfigSnapshot(const SubscriptionList &subscriptionList, int64_t generation) + : _valueMap(), + _generation(generation) { for (SubscriptionList::const_iterator it(subscriptionList.begin()), mt(subscriptionList.end()); it != mt; it++) { _valueMap[(*it)->getKey()] = Value((*it)->getLastGenerationChanged(), (*it)->getConfig()); } } -ConfigSnapshot::ConfigSnapshot(const ValueMap & valueMap, int64_t generation) - : _valueMap(valueMap), - _generation(generation) -{ +ConfigSnapshot::ConfigSnapshot(const ValueMap &valueMap, int64_t generation) + : _valueMap(valueMap), + _generation(generation) { +} + +ConfigSnapshot::ValueMap::const_iterator +ConfigSnapshot::find(const ConfigKey &key) const { + ValueMap::const_iterator it(_valueMap.find(key)); + if (it == _valueMap.end()) { + throw IllegalConfigKeyException("Unable to find config for key " + key.toString()); + } + return it; } ConfigSnapshot diff --git a/config/src/vespa/config/retriever/configsnapshot.h b/config/src/vespa/config/retriever/configsnapshot.h index 4cb4eeb3e23..24a16876720 100644 --- a/config/src/vespa/config/retriever/configsnapshot.h +++ b/config/src/vespa/config/retriever/configsnapshot.h @@ -1,14 +1,16 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/config/subscription/configsubscription.h> -#include <vespa/config/print/configdatabuffer.h> +#include "configkeyset.h" +#include <vespa/config/common/configvalue.h> #include <vespa/vespalib/stllike/string.h> #include <map> -#include "configkeyset.h" namespace config { +class ConfigSubscription; +class ConfigDataBuffer; + /** * A ConfigSnapshot contains a map of config keys to config instances. You may * request an instance of a config by calling the getConfig method. @@ -16,7 +18,7 @@ namespace config { class ConfigSnapshot { public: - typedef std::vector<ConfigSubscription::SP> SubscriptionList; + typedef std::vector<std::shared_ptr<ConfigSubscription>> SubscriptionList; /** * Construct an empty config snapshot. @@ -112,6 +114,8 @@ private: void deserializeV2(vespalib::slime::Inspector & root); Value deserializeValueV2(vespalib::slime::Inspector & inspector) const; + ValueMap::const_iterator find(const ConfigKey & key) const; + ValueMap _valueMap; int64_t _generation; }; @@ -119,4 +123,3 @@ private: } // namespace config #include "configsnapshot.hpp" - diff --git a/config/src/vespa/config/retriever/configsnapshot.hpp b/config/src/vespa/config/retriever/configsnapshot.hpp index 395d36bb4f6..bb07431cdef 100644 --- a/config/src/vespa/config/retriever/configsnapshot.hpp +++ b/config/src/vespa/config/retriever/configsnapshot.hpp @@ -1,7 +1,5 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/config/common/exceptions.h> - namespace config { template <typename ConfigType> @@ -9,11 +7,7 @@ std::unique_ptr<ConfigType> ConfigSnapshot::getConfig(const vespalib::string & configId) const { ConfigKey key(ConfigKey::create<ConfigType>(configId)); - ValueMap::const_iterator it(_valueMap.find(key)); - if (it == _valueMap.end()) { - throw IllegalConfigKeyException("Unable to find config for key " + key.toString()); - } - return it->second.second.newInstance<ConfigType>(); + return find(key)->second.second.newInstance<ConfigType>(); } template <typename ConfigType> @@ -21,11 +15,7 @@ bool ConfigSnapshot::isChanged(const vespalib::string & configId, int64_t currentGeneration) const { ConfigKey key(ConfigKey::create<ConfigType>(configId)); - ValueMap::const_iterator it(_valueMap.find(key)); - if (it == _valueMap.end()) { - throw IllegalConfigKeyException("Unable to find config for key " + key.toString()); - } - return currentGeneration < it->second.first; + return currentGeneration < find(key)->second.first; } template <typename ConfigType> diff --git a/configd/src/apps/sentinel/sentinel.cpp b/configd/src/apps/sentinel/sentinel.cpp index fed65c7fbab..d557cc04993 100644 --- a/configd/src/apps/sentinel/sentinel.cpp +++ b/configd/src/apps/sentinel/sentinel.cpp @@ -1,19 +1,15 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <sys/types.h> +#include <vespa/config/common/exceptions.h> #include <signal.h> -#include <cstring> #include <unistd.h> #include <sys/time.h> - #include <vespa/defaults.h> +#include "config-handler.h" + #include <vespa/log/log.h> LOG_SETUP("config-sentinel"); -#include <vespa/config-sentinel.h> - -#include "config-handler.h" - using namespace config; constexpr uint64_t CONFIG_TIMEOUT_MS = 3 * 60 * 1000; diff --git a/configgen/src/main/java/com/yahoo/config/codegen/CppClassBuilder.java b/configgen/src/main/java/com/yahoo/config/codegen/CppClassBuilder.java index 49f5275c7fe..fa16765d3da 100644 --- a/configgen/src/main/java/com/yahoo/config/codegen/CppClassBuilder.java +++ b/configgen/src/main/java/com/yahoo/config/codegen/CppClassBuilder.java @@ -562,6 +562,7 @@ public class CppClassBuilder implements ClassBuilder { } w.write("\n"); w.write("#include <vespa/config/common/configvalue.h>\n"); + w.write("#include <vespa/config/common/exceptions.h>\n"); w.write("#include <vespa/config/configgen/configpayload.h>\n"); w.write("#include <vespa/config/print/configdatabuffer.h>\n"); w.write("#include <vespa/config/common/configparser.h>\n"); diff --git a/configutil/src/lib/configstatus.cpp b/configutil/src/lib/configstatus.cpp index 8b721543c54..3cafccdf62e 100644 --- a/configutil/src/lib/configstatus.cpp +++ b/configutil/src/lib/configstatus.cpp @@ -7,6 +7,7 @@ #include <vbench/http/http_result_handler.h> #include <vbench/http/server_spec.h> #include <vbench/http/http_client.h> +#include <vespa/config/common/exceptions.h> #include <iostream> using configdefinitions::tagsContain; diff --git a/configutil/src/lib/modelinspect.cpp b/configutil/src/lib/modelinspect.cpp index 12a262bc6aa..e9aa744c478 100644 --- a/configutil/src/lib/modelinspect.cpp +++ b/configutil/src/lib/modelinspect.cpp @@ -3,6 +3,7 @@ #include "modelinspect.h" #include <lib/tags.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/config/common/exceptions.h> #include <iostream> using configdefinitions::tagsContain; @@ -26,7 +27,7 @@ ModelInspect::ModelInspect(Flags flags, const config::ConfigUri uri, std::ostrea try { _cfg = config::ConfigGetter<cloud::config::ModelConfig>::getConfig(uri.getConfigId(), uri.getContext()); - } catch(config::ConfigRuntimeException &e) { + } catch (config::ConfigRuntimeException &e) { std::cerr << e.getMessage() << "\n"; } if (_cfg.get() != NULL) { diff --git a/documentapi/src/tests/loadtypes/loadtypetest.cpp b/documentapi/src/tests/loadtypes/loadtypetest.cpp index 03c1da178b0..9afd5783546 100644 --- a/documentapi/src/tests/loadtypes/loadtypetest.cpp +++ b/documentapi/src/tests/loadtypes/loadtypetest.cpp @@ -3,6 +3,7 @@ #include <vespa/documentapi/loadtypes/loadtypeset.h> #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/config/config.h> +#include <vespa/config/common/exceptions.h> namespace documentapi { diff --git a/documentapi/src/vespa/documentapi/loadtypes/loadtypeset.cpp b/documentapi/src/vespa/documentapi/loadtypes/loadtypeset.cpp index 062ed58ca87..34d843b2a01 100644 --- a/documentapi/src/vespa/documentapi/loadtypes/loadtypeset.cpp +++ b/documentapi/src/vespa/documentapi/loadtypes/loadtypeset.cpp @@ -3,6 +3,7 @@ #include "loadtypeset.h" #include <vespa/config-load-type.h> #include <vespa/config/config.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/config/helper/configgetter.hpp> diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp index ba4d0cff079..b494ad5673c 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp @@ -2,8 +2,7 @@ #include "messagetypepolicy.h" #include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/messagebus/routing/route.h> -#include <vespa/messagebus/routing/routingcontext.h> +#include <vespa/messagebus/message.h> #include <vespa/vespalib/stllike/hash_map.hpp> using vespa::config::content::MessagetyperouteselectorpolicyConfig; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp index f19e0c4c85f..780851ab597 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp @@ -9,10 +9,9 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/config-stor-distribution.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> -#include <vespa/log/log.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/log/log.h> LOG_SETUP(".storagepolicy"); using vespalib::make_string; @@ -120,8 +119,7 @@ StoragePolicy::doSelect(mbus::RoutingContext &context) document::BucketId id; switch(msg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId( - static_cast<const PutDocumentMessage&>(msg).getDocument()->getId()); + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument()->getId()); break; case DocumentProtocol::MESSAGE_GETDOCUMENT: diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h index c73c4c8560d..3b45e9c6bf7 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h @@ -2,10 +2,10 @@ #pragma once #include "externslobrokpolicy.h" -#include <vespa/document/bucket/bucketidfactory.h> #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/messagebus/reply.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> #include <vespa/config/helper/ifetchercallback.h> #include <vespa/config/helper/configfetcher.h> diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp index 222f157b8e4..d4055f51ccd 100644 --- a/filedistribution/src/apps/filedistributor/filedistributor.cpp +++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp @@ -12,6 +12,7 @@ #include <vespa/filedistribution/rpc/filedistributorrpc.h> #include <vespa/filedistribution/common/componentsdeleter.h> #include <vespa/fileacquirer/config-filedistributorrpc.h> +#include <vespa/config/common/exceptions.h> #include <vespa/config-zookeepers.h> #include <vespa/fastos/app.h> #include <boost/program_options.hpp> diff --git a/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp b/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp index 11577542573..e70e03b9442 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp @@ -5,12 +5,12 @@ #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/vstringfmt.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/stllike/asciistream.h> using config::ConfigGetter; -namespace storage { -namespace memfile { +namespace storage::memfile { namespace { @@ -117,5 +117,4 @@ Environment::swapModifiedBuckets(document::BucketId::List & ids) _modifiedBuckets.swap(ids); } -} // memfile -} // storage +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/common/environment.h b/memfilepersistence/src/vespa/memfilepersistence/common/environment.h index 93ef0768148..3798e24f329 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/common/environment.h +++ b/memfilepersistence/src/vespa/memfilepersistence/common/environment.h @@ -22,9 +22,9 @@ #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/config/helper/configfetcher.h> +namespace config { class ConfigUri; } -namespace storage { -namespace memfile { +namespace storage::memfile { class MemFileMapper; class MemFileCache; @@ -129,6 +129,4 @@ struct DefaultLazyFileFactory vespalib::LazyFile::UP createFile(const std::string& fileName) const override; }; -} // storage -} // memfile - +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp b/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp index 614d399ca22..cf66fe691da 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp @@ -5,11 +5,11 @@ #include <vespa/persistence/spi/exceptions.h> #include <vespa/vdslib/state/nodestate.h> #include <vespa/config/helper/configfetcher.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/guard.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <fstream> -#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".persistence.mountpointlist"); @@ -39,9 +39,7 @@ MountPointList::getPartitionStates() const for (uint32_t i=0; i<_mountPoints.size(); ++i) { if (!(_mountPoints[i]->isOk())) { const IOEvent* event = _mountPoints[i]->getLastEvent(); - - list[i] = spi::PartitionState(spi::PartitionState::DOWN, - event->getDescription()); + list[i] = spi::PartitionState(spi::PartitionState::DOWN, event->getDescription()); } } diff --git a/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp b/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp index e8fff0facd0..0e3a66e8b25 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp @@ -4,12 +4,12 @@ #include <vespa/memfilepersistence/common/environment.h> #include <vespa/memfilepersistence/mapper/memfilemapper.h> #include <vespa/memfilepersistence/spi/memfilepersistenceprovidermetrics.h> +#include <vespa/vespalib/util/exception.h> #include <vespa/log/log.h> LOG_SETUP(".persistence.memfile.cache"); -namespace storage { -namespace memfile { +namespace storage::memfile { void MemFileCache::Entry::setInUse(bool inUse) { @@ -517,7 +517,4 @@ MemFileCache::printCacheEntriesHtml(std::ostream& out) const out << "</ol>\n"; } -} // memfile - -} // storage - +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp b/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp index 072b3e9fae0..bd54efcb57c 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp @@ -14,16 +14,14 @@ #include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> #include <vespa/vespalib/util/programoptions.h> #include <vespa/config/helper/configgetter.hpp> -#include <sstream> - +#include <vespa/config/subscription/configuri.h> using config::ConfigGetter; using document::DocumenttypesConfig; using config::FileSpec; using document::DocumentTypeRepo; -namespace storage { -namespace memfile { +namespace storage::memfile { namespace { std::ostream* cout; @@ -347,8 +345,7 @@ int SlotFileDumper::dump(int argc, const char * const * argv, if (doc.get()) { printDoc(*doc, o); } else { - printFailure("Unable to get document in " + - it->toString(true)); + printFailure("Unable to get document in " + it->toString(true)); } } } @@ -361,5 +358,4 @@ int SlotFileDumper::dump(int argc, const char * const * argv, return 0; } -} // memfile -} // storage +} diff --git a/messagebus/src/vespa/messagebus/callstack.h b/messagebus/src/vespa/messagebus/callstack.h index f4e0ffe0df4..7ee1753e6b4 100644 --- a/messagebus/src/vespa/messagebus/callstack.h +++ b/messagebus/src/vespa/messagebus/callstack.h @@ -2,8 +2,8 @@ #pragma once -#include <vector> #include "context.h" +#include <vector> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/destinationsession.h b/messagebus/src/vespa/messagebus/destinationsession.h index 973d65ec152..fd4005c09be 100644 --- a/messagebus/src/vespa/messagebus/destinationsession.h +++ b/messagebus/src/vespa/messagebus/destinationsession.h @@ -1,8 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <string> #include "destinationsessionparams.h" #include "imessagehandler.h" #include "reply.h" @@ -10,6 +8,7 @@ namespace mbus { class MessageBus; +class Message; /** * A DestinationSession is used to receive Message objects and reply @@ -18,6 +17,7 @@ class MessageBus; class DestinationSession : public IMessageHandler { private: friend class MessageBus; + using MessageUP = std::unique_ptr<Message>; MessageBus &_mbus; string _name; @@ -62,7 +62,7 @@ public: * * @param msg the Message you want to acknowledge */ - void acknowledge(Message::UP msg); + void acknowledge(MessageUP msg); /** * Send a Reply as a response to a Message. The Reply will be routed back to @@ -80,7 +80,7 @@ public: * * @param message the Message */ - void handleMessage(Message::UP message) override; + void handleMessage(MessageUP message) override; /** * Returns the message handler of this session. diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.h b/messagebus/src/vespa/messagebus/destinationsessionparams.h index 4026cdbff91..98d6b38200c 100644 --- a/messagebus/src/vespa/messagebus/destinationsessionparams.h +++ b/messagebus/src/vespa/messagebus/destinationsessionparams.h @@ -1,8 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <string> #include "imessagehandler.h" +#include "common.h" namespace mbus { @@ -16,7 +16,7 @@ namespace mbus { */ class DestinationSessionParams { private: - string _name; + string _name; bool _broadcastName; IMessageHandler *_handler; diff --git a/messagebus/src/vespa/messagebus/emptyreply.h b/messagebus/src/vespa/messagebus/emptyreply.h index db8beb33b9b..4bacfb4864b 100644 --- a/messagebus/src/vespa/messagebus/emptyreply.h +++ b/messagebus/src/vespa/messagebus/emptyreply.h @@ -2,6 +2,7 @@ #pragma once #include "reply.h" +#include "blob.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/error.h b/messagebus/src/vespa/messagebus/error.h index e3d62bf6bbe..8fa56ef1673 100644 --- a/messagebus/src/vespa/messagebus/error.h +++ b/messagebus/src/vespa/messagebus/error.h @@ -1,7 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/common.h> +#include "common.h" #ifdef Error #undef Error diff --git a/messagebus/src/vespa/messagebus/imessagehandler.h b/messagebus/src/vespa/messagebus/imessagehandler.h index b8c50172b6c..a6e7f5fa296 100644 --- a/messagebus/src/vespa/messagebus/imessagehandler.h +++ b/messagebus/src/vespa/messagebus/imessagehandler.h @@ -3,10 +3,11 @@ #pragma once #include <memory> -#include "message.h" namespace mbus { +class Message; + /** * This interface is implemented by application components that want * to handle incoming messages received from either an @@ -26,7 +27,7 @@ public: * * @param message the Message being delivered **/ - virtual void handleMessage(Message::UP message) = 0; + virtual void handleMessage(std::unique_ptr<Message> message) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/intermediatesession.h b/messagebus/src/vespa/messagebus/intermediatesession.h index b4832d5069a..7d710a2e787 100644 --- a/messagebus/src/vespa/messagebus/intermediatesession.h +++ b/messagebus/src/vespa/messagebus/intermediatesession.h @@ -1,8 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <string> #include "reply.h" #include "imessagehandler.h" #include "intermediatesessionparams.h" @@ -11,6 +9,7 @@ namespace mbus { class MessageBus; class ReplyGate; +class Message; /** * An IntermediateSession is used to process Message and Reply objects @@ -21,6 +20,7 @@ class IntermediateSession : public IMessageHandler, { private: friend class MessageBus; + using MessageUP = std::unique_ptr<Message>; MessageBus &_mbus; string _name; @@ -70,7 +70,7 @@ public: * * @param msg The message to forward. */ - void forward(Message::UP msg); + void forward(MessageUP msg); /** * Convenience method to call {@link #forward(Routable)}. @@ -87,7 +87,7 @@ public: */ const string getConnectionSpec() const; - void handleMessage(Message::UP message) override; + void handleMessage(MessageUP message) override; void handleReply(Reply::UP reply) override; }; diff --git a/messagebus/src/vespa/messagebus/intermediatesessionparams.h b/messagebus/src/vespa/messagebus/intermediatesessionparams.h index 84224b8803b..ad4a3e96574 100644 --- a/messagebus/src/vespa/messagebus/intermediatesessionparams.h +++ b/messagebus/src/vespa/messagebus/intermediatesessionparams.h @@ -1,9 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <string> #include "imessagehandler.h" #include "ireplyhandler.h" +#include "common.h" namespace mbus { @@ -17,7 +17,7 @@ namespace mbus { */ class IntermediateSessionParams { private: - string _name; + string _name; bool _broadcastName; IMessageHandler *_msgHandler; IReplyHandler *_replyHandler; diff --git a/messagebus/src/vespa/messagebus/ireplyhandler.h b/messagebus/src/vespa/messagebus/ireplyhandler.h index c29717cd748..08361eee65e 100644 --- a/messagebus/src/vespa/messagebus/ireplyhandler.h +++ b/messagebus/src/vespa/messagebus/ireplyhandler.h @@ -2,10 +2,11 @@ #pragma once #include <memory> -#include "reply.h" namespace mbus { +class Reply; + /** * This interface is implemented by application components that want * to handle incoming replies received from either an @@ -25,7 +26,7 @@ public: * * @param reply the Reply being delivered **/ - virtual void handleReply(Reply::UP reply) = 0; + virtual void handleReply(std::unique_ptr<Reply> reply) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index cea0aaa91fb..74f2943d43f 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -1,10 +1,10 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "routable.h" +#include <vespa/messagebus/routing/route.h> #include <vespa/fastos/time.h> #include <memory> -#include <vespa/messagebus/routing/route.h> -#include "routable.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 4d85dd6aca9..b550dd59774 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -9,6 +9,8 @@ namespace mbus { +class Error; + class PayLoadFiller { public: @@ -59,27 +61,17 @@ public: RPCSendV1(); ~RPCSendV1(); - // Implements RPCSendAdapter. void attach(RPCNetwork &net) override; - // Implements RPCSendAdapter. void send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining) override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining) override; - // Implements IReplyHandler. - void handleReply(Reply::UP reply) override; - - // Implements IDiscardHandler. + void handleReply(std::unique_ptr<Reply> reply) override; void handleDiscard(Context ctx) override; - - // Implements FRT_Invokable. void invoke(FRT_RPCRequest *req); - - // Implements FRT_IRequestWait. void RequestDone(FRT_RPCRequest *req) override; }; } // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/reply.cpp b/messagebus/src/vespa/messagebus/reply.cpp index 6a6cc417265..06f84a15d5f 100644 --- a/messagebus/src/vespa/messagebus/reply.cpp +++ b/messagebus/src/vespa/messagebus/reply.cpp @@ -1,10 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "reply.h" #include "emptyreply.h" -#include "error.h" #include "errorcode.h" #include "ireplyhandler.h" #include "message.h" -#include "reply.h" #include "tracelevel.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/backtrace.h> @@ -83,4 +82,14 @@ Reply::hasFatalErrors() const return false; } +void +Reply::setMessage(Message::UP msg) { + _msg = std::move(msg); +} + +Message::UP +Reply::getMessage() { + return std::move(_msg); +} + } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/reply.h b/messagebus/src/vespa/messagebus/reply.h index f352d442931..599baaa9bdb 100644 --- a/messagebus/src/vespa/messagebus/reply.h +++ b/messagebus/src/vespa/messagebus/reply.h @@ -1,9 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vector> #include "error.h" -#include "message.h" +#include "routable.h" +#include <vector> namespace mbus { @@ -17,9 +17,10 @@ class Message; */ class Reply : public Routable { private: - std::vector<Error> _errors; // A list of errors that have occured during the lifetime of this reply. - Message::UP _msg; // The message to which this is a reply. - double _retryDelay; // How to perform resending of this. + using MessageUP = std::unique_ptr<Message>; + std::vector<Error> _errors; // A list of errors that have occured during the lifetime of this reply. + MessageUP _msg; // The message to which this is a reply. + double _retryDelay; // How to perform resending of this. public: /** @@ -87,7 +88,7 @@ public: * * @param msg the Message to attach */ - void setMessage(Message::UP msg) { _msg = std::move(msg); } + void setMessage(MessageUP msg); /** * Detach the Message attached to this Reply. If a Reply contains errors, @@ -96,7 +97,7 @@ public: * * @return the detached Message */ - Message::UP getMessage() { return std::move(_msg); } + MessageUP getMessage(); /** * Returns the retry request of this reply. This can be set using {@link diff --git a/messagebus/src/vespa/messagebus/replygate.cpp b/messagebus/src/vespa/messagebus/replygate.cpp index a32310f08c7..c11d5e3ef88 100644 --- a/messagebus/src/vespa/messagebus/replygate.cpp +++ b/messagebus/src/vespa/messagebus/replygate.cpp @@ -1,5 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "replygate.h" +#include "message.h" +#include "reply.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/replygate.h b/messagebus/src/vespa/messagebus/replygate.h index 72c9378c62c..078e79aa84a 100644 --- a/messagebus/src/vespa/messagebus/replygate.h +++ b/messagebus/src/vespa/messagebus/replygate.h @@ -2,11 +2,10 @@ #pragma once -#include <vespa/vespalib/util/referencecounter.h> #include "idiscardhandler.h" #include "imessagehandler.h" #include "ireplyhandler.h" -#include "message.h" +#include <vespa/vespalib/util/referencecounter.h> namespace mbus { @@ -43,7 +42,7 @@ public: * the matching Reply has been obtained. In order to obtain the matching * Reply, this method will push this object on the CallStack of the Message. */ - void handleMessage(Message::UP msg) override; + void handleMessage(std::unique_ptr<Message> msg) override; /** * Forward or discard Reply. If the gate is still open, it will forward the @@ -51,7 +50,7 @@ public: * the Reply will be discarded. This method also decreases the reference * counter of this object. */ - void handleReply(Reply::UP reply) override; + void handleReply(std::unique_ptr<Reply> reply) override; // Implements IDiscardHandler. void handleDiscard(Context ctx) override; @@ -64,4 +63,3 @@ public: }; } // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/routable.h b/messagebus/src/vespa/messagebus/routable.h index b02c72efaca..0398fd01035 100644 --- a/messagebus/src/vespa/messagebus/routable.h +++ b/messagebus/src/vespa/messagebus/routable.h @@ -1,11 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <vespa/messagebus/blob.h> -#include <vespa/messagebus/callstack.h> -#include <vespa/messagebus/context.h> -#include <vespa/messagebus/trace.h> +#include "callstack.h" +#include "trace.h" +#include "common.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp b/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp index a1725e608a3..162318a0f76 100644 --- a/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp +++ b/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp @@ -1,5 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #include "staticthrottlepolicy.h" +#include "message.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index 16c5cfcfc4c..12894652fb7 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -4,6 +4,8 @@ #include <vespa/messagebus/imessagehandler.h> #include <vespa/messagebus/ireplyhandler.h> +#include <vespa/messagebus/message.h> +#include <vespa/messagebus/reply.h> #include <vespa/vespalib/util/sync.h> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.h b/messagebus/src/vespa/messagebus/testlib/slobrok.h index 1677f6635f7..8105f20007c 100644 --- a/messagebus/src/vespa/messagebus/testlib/slobrok.h +++ b/messagebus/src/vespa/messagebus/testlib/slobrok.h @@ -4,6 +4,7 @@ #include <vespa/messagebus/common.h> #include <vespa/slobrok/cfg.h> +#include <vespa/fastos/thread.h> namespace slobrok { class SBEnv; diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h index 1dddf39428b..fe04abb116b 100644 --- a/messagebus/src/vespa/messagebus/tracenode.h +++ b/messagebus/src/vespa/messagebus/tracenode.h @@ -1,7 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/common.h> #include <vespa/vespalib/trace/tracenode.h> namespace mbus { diff --git a/searchcommon/src/vespa/searchcommon/common/schema.cpp b/searchcommon/src/vespa/searchcommon/common/schema.cpp index 720b4572fa8..c52a886fb3a 100644 --- a/searchcommon/src/vespa/searchcommon/common/schema.cpp +++ b/searchcommon/src/vespa/searchcommon/common/schema.cpp @@ -11,7 +11,6 @@ LOG_SETUP(".index.schema"); using namespace config; using namespace search::index; -using config::InvalidConfigException; namespace { diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 57c4b62cddb..2235cf3e342 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -9,6 +9,7 @@ #include <vespa/vespalib/util/signalhandler.h> #include <vespa/vespalib/util/programoptions.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/config/common/exceptions.h> #include <vespa/fastos/app.h> #include <string> diff --git a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp index 5731700a55f..46250301c18 100644 --- a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp +++ b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp @@ -6,6 +6,7 @@ #include <vespa/config-rank-profiles.h> #include <vespa/config/config.h> #include <vespa/config/helper/legacy.h> +#include <vespa/config/common/exceptions.h> #include <vespa/eval/eval/tensor_spec.h> #include <vespa/eval/eval/value_cache/constant_value.h> #include <vespa/eval/tensor/default_tensor_engine.h> diff --git a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp index 423a3f509c0..59f0635d56a 100644 --- a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp +++ b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp @@ -18,6 +18,7 @@ #include <vespa/vespalib/util/slaveproc.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/objects/nbostream.h> +#include <vespa/config/common/exceptions.h> #include <vespa/config/helper/configgetter.hpp> #include <iostream> diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp index 58844dc969c..7aa4bf908f2 100644 --- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp +++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp @@ -3,6 +3,7 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/config-slobroks.h> #include <vespa/config/common/configsystem.h> +#include <vespa/config/common/exceptions.h> #include <vespa/fnet/frt/frt.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/stringfmt.h> diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp index ed5e069c3ee..a99707bc275 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp @@ -5,7 +5,8 @@ #include "datasetcollection.h" #include "plain_dataset.h" #include "engine_base.h" -#include <vespa/searchcore/fdispatch/common/appcontext.h> +#include <vespa/config/common/exceptions.h> +#include <set> #include <vespa/log/log.h> LOG_SETUP(".search.nodemanager"); @@ -16,9 +17,10 @@ FastS_NodeManager::configure(std::unique_ptr<PartitionsConfig> cfg) LOG(config, "configuring datasetcollection from '%s'", _configUri.getConfigId().c_str()); SetPartMap(*cfg, 2000); - _componentConfig.addConfig(vespalib::ComponentConfigProducer::Config("fdispatch.nodemanager", - _fetcher->getGeneration(), - "will not update generation unless config has changed")); + _componentConfig.addConfig( + vespalib::ComponentConfigProducer::Config("fdispatch.nodemanager", + _fetcher->getGeneration(), + "will not update generation unless config has changed")); } @@ -26,15 +28,11 @@ class AdminBadEngines { std::set<vespalib::string> _bad; public: - void - addAdminBad(const vespalib::string &name) - { + void addAdminBad(const vespalib::string &name) { _bad.insert(name); } - bool - isAdminBad(const vespalib::string &name) const - { + bool isAdminBad(const vespalib::string &name) const { return _bad.find(name) != _bad.end(); } }; diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h index 1d2aa617f35..47d06033175 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h @@ -7,6 +7,7 @@ #include <vespa/config/helper/configfetcher.h> #include <vespa/searchcore/fdispatch/common/queryperf.h> #include <vespa/vespalib/net/simple_component_config_producer.h> +#include <vespa/config/subscription/configuri.h> using vespa::config::search::core::PartitionsConfig; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index 8c737f26af7..a678678edd0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -13,6 +13,7 @@ #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/searchcore/config/config-ranking-constants.h> #include <vespa/vespalib/time/time_box.h> +#include <thread> LOG_SETUP(".proton.server.documentdbconfigmanager"); @@ -217,7 +218,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) while (timeBox.hasTimeLeft() && (filePath == "")) { filePath = fileAcquirer.wait_for(rc.fileref, timeBox.timeLeft()); if (filePath == "") { - FastOS_Thread::Sleep(100); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } LOG(info, "Got file path from file acquirer: '%s' (name='%s', type='%s', ref='%s')", diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp index b5d76c3f60a..44a99395427 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp @@ -2,10 +2,12 @@ #include "proton_config_fetcher.h" #include "bootstrapconfig.h" -#include <vespa/vespalib/util/exceptions.h> -#include <thread> #include "proton_config_snapshot.h" #include "i_proton_configurer.h" +#include <vespa/config/common/exceptions.h> +#include <vespa/vespalib/util/exceptions.h> +#include <thread> + #include <vespa/log/log.h> LOG_SETUP(".proton.server.proton_config_fetcher"); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index 1b0959d63fd..b07a80d92f4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -1,8 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserverapp.h" -#include <vespa/log/log.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/log/log.h> LOG_SETUP(".translogserverapp"); using search::common::FileHeaderContext; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index 4ee29b91bda..f88fcb98421 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -6,6 +6,8 @@ #include <vespa/config/helper/configfetcher.h> #include <vespa/vespalib/util/ptrholder.h> +namespace config { class ConfigUri; } + namespace search { namespace common { class FileHeaderContext; } diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp index fb785a55562..50d0c2a0399 100644 --- a/slobrok/src/apps/slobrok/slobrok.cpp +++ b/slobrok/src/apps/slobrok/slobrok.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/fnet/fnet.h> #include <vespa/slobrok/server/sbenv.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fastos/app.h> diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 88d34f228c4..d525bbd3e62 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -6,6 +6,7 @@ #include <sstream> #include <vespa/vespalib/net/state_server.h> #include <vespa/vespalib/util/host_name.h> +#include <vespa/vespalib/util/exception.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/transport.h> diff --git a/storage/src/tests/bucketmover/bucketmovertest.cpp b/storage/src/tests/bucketmover/bucketmovertest.cpp index c233765ad27..9940cb0e55b 100644 --- a/storage/src/tests/bucketmover/bucketmovertest.cpp +++ b/storage/src/tests/bucketmover/bucketmovertest.cpp @@ -6,7 +6,7 @@ #include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> -#include <vespa/storage/bucketdb/storbucketdb.h> +#include <vespa/config/common/exceptions.h> bool debug = false; diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 15f0373e680..8eb1ed69bf8 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -11,9 +11,10 @@ #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> -#include <thread> #include <vespa/metrics/metricmanager.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".test.metrics"); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 52b976586e8..a62e6fab640 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -1,4 +1,14 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <tests/common/testhelper.h> +#include <tests/common/storagelinktest.h> +#include <tests/common/teststorageapp.h> +#include <tests/persistence/filestorage/forwardingmessagesender.h> +#include <vespa/storage/storageserver/statemanager.h> +#include <vespa/storage/bucketdb/bucketmanager.h> +#include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/datatype/datatype.h> #include <vespa/document/fieldvalue/document.h> @@ -17,17 +27,9 @@ #include <vespa/storageapi/message/multioperation.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> -#include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/storage/persistence/persistencethread.h> -#include <vespa/storage/persistence/filestorage/filestormanager.h> -#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> -#include <tests/common/testhelper.h> -#include <tests/common/storagelinktest.h> -#include <tests/common/teststorageapp.h> -#include <tests/persistence/filestorage/forwardingmessagesender.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/storageapi/message/batch.h> -#include <vespa/storage/storageserver/statemanager.h> +#include <vespa/config/common/exceptions.h> #include <vespa/fastos/file.h> #include <vespa/log/log.h> diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp index 58b4ae4d475..4c6e4e42b06 100644 --- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp +++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp @@ -6,6 +6,7 @@ #include <tests/common/teststorageapp.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> +#include <vespa/config/common/exceptions.h> namespace storage { diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index a661c5c445e..12130db59d1 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -1,12 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vector> #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/storage/persistence/messages.h> #include <tests/persistence/common/persistenceproviderwrapper.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <tests/persistence/common/filestortestfixture.h> #include <vespa/vespalib/util/barrier.h> +#include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/stllike/hash_set_insert.hpp> #include <vespa/log/log.h> diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 751d6b535a3..ff7f5f02077 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -1,16 +1,16 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <boost/pointer_cast.hpp> #include <cppunit/extensions/HelperMacros.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/message/stat.h> -#include <vespa/vdslib/state/nodestate.h> #include <vespa/storage/storageserver/bouncer.h> #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/config/common/exceptions.h> + namespace storage { diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index c01b24aae8d..cf96605b3ce 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -3,6 +3,7 @@ #include <vespa/storage/storageserver/communicationmanager.h> #include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> @@ -27,12 +28,10 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { std::shared_ptr<api::StorageCommand> createDummyCommand( api::StorageMessage::Priority priority) { - auto cmd = std::make_shared<api::GetCommand>( - document::BucketId(0), - document::DocumentId("doc::mydoc"), - "[all]"); - cmd->setAddress(api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 1)); + auto cmd = std::make_shared<api::GetCommand>(document::BucketId(0), + document::DocumentId("doc::mydoc"), + "[all]"); + cmd->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1)); cmd->setPriority(priority); return cmd; } diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index 26317465b5a..8858d5433a2 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -11,6 +11,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/testkit/test_kit.h> using document::DataType; diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index 8a164361c88..39d423f37fe 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -9,6 +9,7 @@ #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/log/log.h> diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h index 9ebebf73bea..193b6be133f 100644 --- a/storage/src/tests/storageserver/testvisitormessagesession.h +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -6,6 +6,7 @@ #include <vespa/storage/visiting/visitorthread.h> #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/storage/storageserver/priorityconverter.h> +#include <vespa/config/subscription/configuri.h> #include <deque> namespace storage { diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 1824b976aea..ff1c3e3a5de 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -16,6 +16,7 @@ #include <vespa/documentapi/messagebus/messages/multioperationmessage.h> #include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> #include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> namespace storage { diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 84f1297b05f..e5759e7fa85 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -15,6 +15,7 @@ #include <vespa/documentapi/messagebus/messages/multioperationmessage.h> #include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> #include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> #include <thread> diff --git a/storage/src/vespa/storage/bucketmover/bucketmover.cpp b/storage/src/vespa/storage/bucketmover/bucketmover.cpp index b38c061de44..b5522360b11 100644 --- a/storage/src/vespa/storage/bucketmover/bucketmover.cpp +++ b/storage/src/vespa/storage/bucketmover/bucketmover.cpp @@ -6,7 +6,9 @@ #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/storageutil/log.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/stringfmt.h> +#include <thread> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".bucketmover"); @@ -176,7 +178,7 @@ BucketMover::sendNewMoves() // what is happening. (Cannot use wait() here as reply of // message sent will signal the monitor) if (_config->operationDelay != 0) { - FastOS_Thread::Sleep(_config->operationDelay); + std::this_thread::sleep_for(std::chrono::milliseconds(_config->operationDelay)); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp index f0e640afe27..9d98deef199 100644 --- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp @@ -2,8 +2,9 @@ #include "modifiedbucketchecker.h" #include "filestormanager.h" -#include <vespa/log/log.h> +#include <vespa/config/common/exceptions.h> +#include <vespa/log/log.h> LOG_SETUP(".persistence.filestor.modifiedbucketchecker"); namespace storage { @@ -81,9 +82,7 @@ ModifiedBucketChecker::onClose() void ModifiedBucketChecker::run(framework::ThreadHandle& thread) { - LOG(debug, - "Started modified bucket checker thread with pid %d", - getpid()); + LOG(debug, "Started modified bucket checker thread with pid %d", getpid()); while (!thread.interrupted()) { thread.registerTick(); @@ -202,4 +201,3 @@ ModifiedBucketChecker::tick() } } // ns storage - diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index c285645309a..71becba40df 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -3,6 +3,8 @@ #include "bouncer.h" #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/config/common/exceptions.h> #include <sstream> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index 8f6706d2cf8..6be465a8aa5 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -19,6 +19,8 @@ #include <vespa/storage/config/config-stor-bouncer.h> #include <vespa/vespalib/util/sync.h> +namespace config { class ConfigUri; } + namespace storage { class Bouncer : public StorageLink, @@ -82,6 +84,3 @@ private: }; } // storage - - - diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp index 05b581d839a..f606ba5b584 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp @@ -1,13 +1,13 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketintegritychecker.h" - #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/storageutil/log.h> #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/storage/bucketdb/lockablemap.hpp> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/bufferedlogger.h> diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index f32b1c242cf..5b0bb84a8fa 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -2,18 +2,22 @@ #include "communicationmanager.h" #include "fnetlistener.h" #include "rpcrequestwrapper.h" +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/common/nodestateupdater.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> #include <vespa/storageapi/message/state.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/messagebus/emptyreply.h> -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/common/nodestateupdater.h> #include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".communication.manager"); +using vespalib::make_string; + namespace storage { PriorityQueue::PriorityQueue() : @@ -97,8 +101,7 @@ CommunicationManager::getAllocationType(api::StorageMessage& msg) const void -CommunicationManager::receiveStorageReply( - const std::shared_ptr<api::StorageReply>& reply) +CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) { assert(reply.get()); enqueue(reply); @@ -107,8 +110,7 @@ CommunicationManager::receiveStorageReply( namespace { vespalib::string getNodeId(StorageComponent& sc) { vespalib::asciistream ost; - ost << sc.getClusterName() << "/" << sc.getNodeType() - << "/" << sc.getIndex(); + ost << sc.getClusterName() << "/" << sc.getNodeType() << "/" << sc.getIndex(); return ost.str(); } @@ -124,14 +126,10 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - LOG(debug, "Not handling command of type %d as we have closed down", - msg->getType()); - MBUS_TRACE(msg->getTrace(), 6, - "Communication manager: Failing message as we are closed"); + LOG(debug, "Not handling command of type %d as we have closed down", msg->getType()); + MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Failing message as we are closed"); std::unique_ptr<mbus::Reply> reply(new mbus::EmptyReply()); - reply->addError(mbus::Error( - documentapi::DocumentProtocol::ERROR_ABORTED, - "Node shutting down")); + reply->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_ABORTED, "Node shutting down")); msg->swapState(*reply); _messageBusSession->reply(std::move(reply)); return; @@ -139,40 +137,33 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) const vespalib::string & protocolName = msg->getProtocol(); if (protocolName == documentapi::DocumentProtocol::NAME) { - std::unique_ptr<documentapi::DocumentMessage> docMsgPtr( - static_cast<documentapi::DocumentMessage*>(msg.release())); + std::unique_ptr<documentapi::DocumentMessage> docMsgPtr(static_cast<documentapi::DocumentMessage*>(msg.release())); assert(docMsgPtr.get()); std::unique_ptr<api::StorageCommand> cmd( - _docApiConverter.toStorageAPI( - static_cast<documentapi::DocumentMessage&>(*docMsgPtr), - _component.getTypeRepo())); + _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentMessage&>(*docMsgPtr), _component.getTypeRepo())); if (!cmd.get()) { - LOGBM(warning, "Unsupported message: StorageApi could not convert " - "message of type %d to a storageapi message", + LOGBM(warning, "Unsupported message: StorageApi could not convert message of type %d to a storageapi message", docMsgPtr->getType()); _metrics.convertToStorageAPIFailures.inc(); return; } cmd->setTrace(docMsgPtr->getTrace()); - cmd->setTransportContext(std::unique_ptr<api::TransportContext>( - new StorageTransportContext(std::move(docMsgPtr)))); + cmd->setTransportContext(std::unique_ptr<api::TransportContext>(new StorageTransportContext(std::move(docMsgPtr)))); - enqueue(std::shared_ptr<api::StorageCommand>(cmd.release())); + enqueue(std::shared_ptr<api::StorageCommand>(std::move(cmd))); } else if (protocolName == mbusprot::StorageProtocol::NAME) { - std::unique_ptr<mbusprot::StorageCommand> storMsgPtr( - static_cast<mbusprot::StorageCommand*>(msg.release())); + std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release())); assert(storMsgPtr.get()); const std::shared_ptr<api::StorageCommand> & cmd = storMsgPtr->getCommand(); cmd->setTimeout(storMsgPtr->getTimeRemaining()); cmd->setTrace(storMsgPtr->getTrace()); - cmd->setTransportContext(std::unique_ptr<api::TransportContext>( - new StorageTransportContext(std::move(storMsgPtr)))); + cmd->setTransportContext(std::unique_ptr<api::TransportContext>(new StorageTransportContext(std::move(storMsgPtr)))); enqueue(cmd); } else { @@ -184,41 +175,35 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) void CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) { - MBUS_TRACE(reply->getTrace(), 4, getNodeId(_component) - + "Communication manager: Received reply from message bus"); + MBUS_TRACE(reply->getTrace(), 4, getNodeId(_component) + "Communication manager: Received reply from message bus"); // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - LOG(debug, "Not handling reply of type %d as we have closed down", - reply->getType()); + LOG(debug, "Not handling reply of type %d as we have closed down", reply->getType()); return; } LOG(spam, "Got reply of type %d, trace is %s", reply->getType(), reply->getTrace().toString().c_str()); // EmptyReply must be converted to real replies before processing. if (reply->getType() == 0) { - std::unique_ptr<mbus::Message> message(reply->getMessage().release()); + std::unique_ptr<mbus::Message> message(reply->getMessage()); if (message.get()) { std::unique_ptr<mbus::Reply> convertedReply; const vespalib::string& protocolName = message->getProtocol(); if (protocolName == documentapi::DocumentProtocol::NAME) { - convertedReply.reset(static_cast<documentapi::DocumentMessage*>( - message.get())->createReply().release()); + convertedReply = static_cast<documentapi::DocumentMessage &>(*message).createReply(); } else if (protocolName == mbusprot::StorageProtocol::NAME) { std::shared_ptr<api::StorageReply> repl( - static_cast<mbusprot::StorageCommand*>(message.get()) - ->getCommand()->makeReply().release()); - mbusprot::StorageReply::UP sreply( - new mbusprot::StorageReply(repl)); + static_cast<mbusprot::StorageCommand &>(*message).getCommand()->makeReply()); + mbusprot::StorageReply::UP sreply(new mbusprot::StorageReply(repl)); if (reply->hasErrors()) { // Convert only the first error since storageapi only // supports one return code. uint32_t mbuscode = reply->getError(0).getCode(); - api::ReturnCode::Result code( - (api::ReturnCode::Result) mbuscode); + api::ReturnCode::Result code((api::ReturnCode::Result) mbuscode); // Encode mbuscode into message not to lose it sreply->getReply()->setResult(storage::api::ReturnCode( code, @@ -229,20 +214,18 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) + reply->getError(0).getService() + vespalib::string(")"))); } - convertedReply.reset(sreply.release()); + convertedReply = std::move(sreply); } else { - LOG(warning, "Received reply of unhandled protocol '%s'", - protocolName.c_str()); + LOG(warning, "Received reply of unhandled protocol '%s'", protocolName.c_str()); return; } convertedReply->swapState(*reply); - convertedReply->setMessage(mbus::Message::UP(message.release())); - reply.reset(convertedReply.release()); + convertedReply->setMessage(std::move(message)); + reply = std::move(convertedReply); } if (reply->getType() == 0) { - LOG(warning, "Failed to convert empty reply by reflecting on " - "local message copy."); + LOG(warning, "Failed to convert empty reply by reflecting on local message copy."); return; } } @@ -252,48 +235,38 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) if (protocolName == documentapi::DocumentProtocol::NAME) { std::shared_ptr<api::StorageCommand> originalCommand; - { vespalib::LockGuard lock(_messageBusSentLock); - typedef std::map<api::StorageMessage::Id, - api::StorageCommand::SP> MessageMap; - MessageMap::iterator iter( - _messageBusSent.find(reply->getContext().value.UINT64)); + typedef std::map<api::StorageMessage::Id, api::StorageCommand::SP> MessageMap; + MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64)); if (iter != _messageBusSent.end()) { originalCommand.swap(iter->second); _messageBusSent.erase(iter); } else { - LOG(warning, "Failed to convert reply - original sent " - "command doesn't exist"); + LOG(warning, "Failed to convert reply - original sent command doesn't exist"); return; } } std::shared_ptr<api::StorageReply> sar( - _docApiConverter.toStorageAPI( - static_cast<documentapi::DocumentReply&>(*reply), - *originalCommand).release()); + _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentReply&>(*reply), *originalCommand)); if (sar.get()) { sar->setTrace(reply->getTrace()); receiveStorageReply(sar); } } else if (protocolName == mbusprot::StorageProtocol::NAME) { - mbusprot::StorageReply* sr( - static_cast<mbusprot::StorageReply*>(reply.get())); + mbusprot::StorageReply* sr(static_cast<mbusprot::StorageReply*>(reply.get())); sr->getReply()->setTrace(reply->getTrace()); receiveStorageReply(sr->getReply()); } else { - LOGBM(warning, "Received unsupported reply type %d for protocol " - "'%s'.", + LOGBM(warning, "Received unsupported reply type %d for protocol '%s'.", reply->getType(), reply->getProtocol().c_str()); } } } -CommunicationManager::CommunicationManager( - StorageComponentRegister& compReg, - const config::ConfigUri & configUri) +CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) : StorageLink("Communication manager"), _component(compReg, "communicationmanager"), _metrics(_component.getLoadTypes()->getMetricLoadTypes()), @@ -381,8 +354,7 @@ void CommunicationManager::onClose() while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0)); if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); sendReply(reply); } @@ -401,12 +373,23 @@ CommunicationManager::configureMessageBusLimits( : cfg.mbusContentNodeMaxPendingSize); } -void CommunicationManager::configure( - std::unique_ptr<CommunicationManagerConfig> config) +void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) { // Only allow dynamic (live) reconfiguration of message bus limits. if (_mbus.get()) { configureMessageBusLimits(*config); + if (_mbus->getRPCNetwork().getPort() != config->mbusport) { + auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.", + _mbus->getRPCNetwork().getPort(), config->mbusport); + LOG(warning, "%s", m.c_str()); + _component.requestShutdown(m); + } + if (_listener->getListenPort() != config->rpcport) { + auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.", + _listener->getListenPort(), config->rpcport); + LOG(warning, "%s", m.c_str()); + _component.requestShutdown(m); + } return; }; @@ -423,18 +406,12 @@ void CommunicationManager::configure( // Configure messagebus here as we for legacy reasons have // config here. - _mbus.reset(new mbus::RPCMessageBus( - mbus::ProtocolSet() - .add(mbus::IProtocol::SP( - new documentapi::DocumentProtocol( - *_component.getLoadTypes(), - _component.getTypeRepo()))) - .add(mbus::IProtocol::SP( - new mbusprot::StorageProtocol( - _component.getTypeRepo(), - *_component.getLoadTypes()))), - params, - _configUri)); + _mbus = std::make_unique<mbus::RPCMessageBus>( + mbus::ProtocolSet() + .add(std::make_shared<documentapi::DocumentProtocol>(*_component.getLoadTypes(), _component.getTypeRepo())) + .add(std::make_shared<mbusprot::StorageProtocol>(_component.getTypeRepo(), *_component.getLoadTypes())), + params, + _configUri); configureMessageBusLimits(*config); } @@ -458,8 +435,7 @@ void CommunicationManager::configure( void CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) { - MBUS_TRACE(msg->getTrace(), 9, - "Communication manager: Sending message down chain."); + MBUS_TRACE(msg->getTrace(), 9, "Communication manager: Sending message down chain."); framework::MilliSecTimer startTime(_component.getClock()); try { LOG(spam, "Process: %s", msg->toString().c_str()); @@ -469,14 +445,11 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } LOG(spam, "Done processing: %s", msg->toString().c_str()); - _metrics.messageProcessTime[msg->getLoadType()].addValue( - startTime.getElapsedTimeAsDouble()); + _metrics.messageProcessTime[msg->getLoadType()].addValue(startTime.getElapsedTimeAsDouble()); } catch (std::exception& e) { - LOGBP(error, "When running command %s, caught exception %s. " - "Discarding message", + LOGBP(error, "When running command %s, caught exception %s. Discarding message", msg->toString().c_str(), e.what()); - _metrics.exceptionMessageProcessTime[msg->getLoadType()].addValue( - startTime.getElapsedTimeAsDouble()); + _metrics.exceptionMessageProcessTime[msg->getLoadType()].addValue(startTime.getElapsedTimeAsDouble()); } catch (...) { LOG(fatal, "Caught fatal exception in communication manager"); throw; @@ -486,19 +459,17 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) void CommunicationManager::enqueue(const std::shared_ptr<api::StorageMessage> & msg) { + using MemoryToken = framework::MemoryToken; assert(msg.get()); const uint32_t memoryFootprint = msg->getMemoryFootprint(); - framework::MemoryToken::UP token = _component.getMemoryManager().allocate( - getAllocationType(*msg), - memoryFootprint * 2, memoryFootprint * 2, - msg->getPriority()); + MemoryToken::UP token = _component.getMemoryManager().allocate(getAllocationType(*msg), memoryFootprint * 2, + memoryFootprint * 2, msg->getPriority()); - if (token.get()) { - msg->setMemoryToken(std::unique_ptr<framework::MemoryToken>(token.release())); + if (token) { + msg->setMemoryToken(std::move(token)); - LOG(spam, "Enq storage message %s, priority %d", - msg->toString().c_str(), msg->getPriority()); + LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(msg); } else { _metrics.failedDueToTooLittleMemory.inc(); @@ -510,10 +481,8 @@ CommunicationManager::enqueue(const std::shared_ptr<api::StorageMessage> & msg) api::StorageCommand* cmd(dynamic_cast<api::StorageCommand*>(msg.get())); if (cmd) { - std::shared_ptr<api::StorageReply> reply( - cmd->makeReply().release()); - reply->setResult(api::ReturnCode( - api::ReturnCode::BUSY, ost.str())); + std::shared_ptr<api::StorageReply> reply(cmd->makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, ost.str())); sendReply(reply); } } @@ -522,28 +491,22 @@ CommunicationManager::enqueue(const std::shared_ptr<api::StorageMessage> & msg) bool CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg) { - MBUS_TRACE(msg->getTrace(), 6, - "Communication manager: Sending " + msg->toString()); + MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString()); if (msg->getType().isReply()) { - if (static_cast<api::StorageReply&>(*msg).getResult().failed()) { - LOG(debug, "Request %s failed: %s", - msg->getType().toString().c_str(), - static_cast<api::StorageReply&>(*msg) - .getResult().toString().c_str()); + const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg); + if (m.getResult().failed()) { + LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str()); } - return sendReply( - std::static_pointer_cast<api::StorageReply>(msg)); + return sendReply(std::static_pointer_cast<api::StorageReply>(msg)); } else { - return sendCommand( - std::static_pointer_cast<api::StorageCommand>(msg)); + return sendCommand(std::static_pointer_cast<api::StorageCommand>(msg)); } } void -CommunicationManager::sendMessageBusMessage( - const std::shared_ptr<api::StorageCommand>& msg, - std::unique_ptr<mbus::Message> mbusMsg, - const mbus::Route& route) +CommunicationManager::sendMessageBusMessage(const std::shared_ptr<api::StorageCommand>& msg, + std::unique_ptr<mbus::Message> mbusMsg, + const mbus::Route& route) { // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. @@ -553,20 +516,16 @@ CommunicationManager::sendMessageBusMessage( LOG(spam, "Sending message bus msg of type %d", mbusMsg->getType()); - MBUS_TRACE(mbusMsg->getTrace(), 6, - "Communication manager: Passing message to source session"); + MBUS_TRACE(mbusMsg->getTrace(), 6, "Communication manager: Passing message to source session"); mbus::Result result = _sourceSession->send(std::move(mbusMsg), route); if (!result.isAccepted()) { - std::shared_ptr<api::StorageReply> reply(msg->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); if (reply.get()) { if (result.getError().getCode() > mbus::ErrorCode::FATAL_ERROR) { - reply->setResult(api::ReturnCode( - api::ReturnCode::ABORTED, - result.getError().getMessage())); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, result.getError().getMessage())); } else { - reply->setResult(api::ReturnCode( - api::ReturnCode::BUSY, result.getError().getMessage())); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, result.getError().getMessage())); } } else { LOG(spam, "Failed to synthesize reply"); @@ -581,18 +540,17 @@ CommunicationManager::sendCommand( const std::shared_ptr<api::StorageCommand> & msg) { if (!msg->getAddress()) { - LOGBP(warning, "Got command without address of type %s in " - "CommunicationManager::sendCommand", - msg->getType().getName().c_str()); + LOGBP(warning, "Got command without address of type %s in CommunicationManager::sendCommand", + msg->getType().getName().c_str()); return false; } if (!msg->sourceIndexSet()) { msg->setSourceIndex(_component.getIndex()); } - // Components can not specify what storage node to send to - // without specifying protocol. This is a workaround, such that code - // doesn't have to care whether message is in documentapi or storage - // protocol. + // Components can not specify what storage node to send to + // without specifying protocol. This is a workaround, such that code + // doesn't have to care whether message is in documentapi or storage + // protocol. api::StorageMessageAddress address(*msg->getAddress()); switch (msg->getType().getId()) { case api::MessageType::STATBUCKET_ID: { @@ -608,9 +566,7 @@ CommunicationManager::sendCommand( switch (address.getProtocol()) { case api::StorageMessageAddress::STORAGE: { - LOG(spam, "Send to %s: %s", - address.toString().c_str(), - msg->toString().c_str()); + LOG(spam, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); std::unique_ptr<mbus::Message> cmd(new mbusprot::StorageCommand(msg)); @@ -623,16 +579,12 @@ CommunicationManager::sendCommand( } case api::StorageMessageAddress::DOCUMENT: { - MBUS_TRACE(msg->getTrace(), 7, - "Communication manager: Converting storageapi message to " - "documentapi"); + MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converting storageapi message to documentapi"); - std::unique_ptr<mbus::Message> mbusMsg( - _docApiConverter.toDocumentAPI(*msg, _component.getTypeRepo())); + std::unique_ptr<mbus::Message> mbusMsg(_docApiConverter.toDocumentAPI(*msg, _component.getTypeRepo())); if (mbusMsg.get()) { - MBUS_TRACE(msg->getTrace(), 7, - "Communication manager: Converted OK"); + MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converted OK"); mbusMsg->setTrace(msg->getTrace()); mbusMsg->setRetryEnabled(address.retryEnabled()); @@ -655,22 +607,15 @@ CommunicationManager::sendCommand( } void -CommunicationManager::serializeNodeState( - const api::GetNodeStateReply& gns, - std::ostream& os, - bool includeDescription, - bool includeDiskDescription, - bool useOldFormat) const +CommunicationManager::serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, + bool includeDescription, bool includeDiskDescription, bool useOldFormat) const { vespalib::asciistream tmp; if (gns.hasNodeState()) { - gns.getNodeState().serialize( - tmp, "", includeDescription, - includeDiskDescription, useOldFormat); + gns.getNodeState().serialize(tmp, "", includeDescription, includeDiskDescription, useOldFormat); } else { - _component.getStateUpdater().getReportedNodeState()->serialize( - tmp, "", includeDescription, - includeDiskDescription, useOldFormat); + _component.getStateUpdater().getReportedNodeState()->serialize(tmp, "", includeDescription, + includeDiskDescription, useOldFormat); } os << tmp.str(); } @@ -682,17 +627,14 @@ CommunicationManager::sendDirectRPCReply( { std::string requestName(request.getMethodName()); if (requestName == "getnodestate3") { - api::GetNodeStateReply& gns( - static_cast<api::GetNodeStateReply&>(*reply)); + api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); request.addReturnString(gns.getNodeInfo().c_str()); - LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", - gns.getNodeInfo().c_str()); + LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str()); } else if (requestName == "getnodestate2") { - api::GetNodeStateReply& gns( - static_cast<api::GetNodeStateReply&>(*reply)); + api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); @@ -704,13 +646,11 @@ CommunicationManager::sendDirectRPCReply( request.addReturnString(reply->getResult().getMessage().c_str()); if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) { - api::GetNodeStateReply& gns( - static_cast<api::GetNodeStateReply&>(*reply)); + api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, false, false, true); request.addReturnString(ns.str().c_str()); - request.addReturnInt(static_cast<int>( - gns.getNodeState().getInitProgress().getValue() * 100)); + request.addReturnInt(static_cast<int>(gns.getNodeState().getInitProgress().getValue() * 100)); } } @@ -730,35 +670,28 @@ CommunicationManager::sendMessageBusReply( // If this was originally documentapi, create a reply now and transfer the // state. if (context._docAPIMsg.get()) { - if (reply->getResult().getResult() - == api::ReturnCode::WRONG_DISTRIBUTION) - { - replyUP.reset(new documentapi::WrongDistributionReply( - reply->getResult().getMessage())); + if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) { + replyUP.reset(new documentapi::WrongDistributionReply(reply->getResult().getMessage())); replyUP->swapState(*context._docAPIMsg); replyUP->setTrace(reply->getTrace()); - replyUP->addError(mbus::Error( - documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - reply->getResult().getMessage())); + replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + reply->getResult().getMessage())); } else { replyUP = context._docAPIMsg->createReply(); replyUP->swapState(*context._docAPIMsg); replyUP->setTrace(reply->getTrace()); - replyUP->setMessage(std::unique_ptr<mbus::Message>( - context._docAPIMsg.release())); + replyUP->setMessage(std::move(context._docAPIMsg)); _docApiConverter.transferReplyState(*reply, *replyUP); } } else if (context._storageProtocolMsg.get()) { replyUP.reset(new mbusprot::StorageReply(reply)); if (reply->getResult().getResult() != api::ReturnCode::OK) { - replyUP->addError(mbus::Error(reply->getResult().getResult(), - reply->getResult().getMessage())); + replyUP->addError(mbus::Error(reply->getResult().getResult(), reply->getResult().getMessage())); } replyUP->swapState(*context._storageProtocolMsg); replyUP->setTrace(reply->getTrace()); - replyUP->setMessage(mbus::Message::UP( - context._storageProtocolMsg.release())); + replyUP->setMessage(std::move(context._storageProtocolMsg)); } if (replyUP.get() != NULL) { @@ -783,19 +716,13 @@ CommunicationManager::sendReply( // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, - "Node is shutting down")); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); } - std::unique_ptr<StorageTransportContext> context( - static_cast<StorageTransportContext*>( - reply->getTransportContext().release())); + std::unique_ptr<StorageTransportContext> context(static_cast<StorageTransportContext*>(reply->getTransportContext().release())); if (!context.get()) { - LOG(spam, - "No transport context in reply %s", - reply->toString().c_str()); - + LOG(spam, "No transport context in reply %s", reply->toString().c_str()); return false; } @@ -836,8 +763,7 @@ CommunicationManager::updateMetrics(const MetricLockGuard &) } void -CommunicationManager::print(std::ostream& out, bool verbose, - const std::string& indent) const +CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; out << "CommunicationManager"; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index c5d17294dd7..921ca1400fa 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -19,12 +19,20 @@ #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageapi/mbusprot/storagecommand.h> #include <vespa/storageapi/mbusprot/storagereply.h> -#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/imessagehandler.h> +#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/config/helper/configfetcher.h> #include <vespa/vespalib/util/document_runnable.h> +#include <vespa/config/subscription/configuri.h> #include <map> #include <queue> #include <atomic> +namespace mbus { + class RPCMessageBus; + class SourceSession; + class DestinationSession; +} namespace storage { class VisitorMbusSession; @@ -108,8 +116,8 @@ public: ~StorageTransportContext(); std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg; - std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; - std::unique_ptr<RPCRequestWrapper> _request; + std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; + std::unique_ptr<RPCRequestWrapper> _request; }; class CommunicationManager : public StorageLink, @@ -138,28 +146,20 @@ private: void process(const std::shared_ptr<api::StorageMessage>& msg); - using CommunicationManagerConfig - = vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); - void configure(std::unique_ptr<CommunicationManagerConfig> config) override; - void receiveStorageReply(const std::shared_ptr<api::StorageReply>&); - void serializeNodeState( - const api::GetNodeStateReply& gns, - std::ostream& os, - bool includeDescription, - bool includeDiskDescription, - bool useOldFormat) const; + void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription, + bool includeDiskDescription, bool useOldFormat) const; static const uint64_t FORWARDED_MESSAGE = 0; std::unique_ptr<mbus::RPCMessageBus> _mbus; - mbus::DestinationSession::UP _messageBusSession; - mbus::SourceSession::UP _sourceSession; - mbus::SourceSession::UP _visitorSourceSession; + std::unique_ptr<mbus::DestinationSession> _messageBusSession; + std::unique_ptr<mbus::SourceSession> _sourceSession; uint32_t _count; vespalib::Lock _messageBusSentLock; diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ae789891852..2906667d1ee 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -1,5 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "documentapiconverter.h" +#include "priorityconverter.h" #include <vespa/documentapi/documentapi.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/datagram.h> @@ -11,9 +12,6 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/batch.h> -#include <vespa/messagebus/errorcode.h> -#include <vespa/storageapi/messageapi/returncode.h> -#include <vespa/vdslib/container/documentlist.h> #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/log/log.h> @@ -21,6 +19,12 @@ LOG_SETUP(".documentapiconverter"); namespace storage { +DocumentApiConverter::DocumentApiConverter(const config::ConfigUri & configUri) + : _priConverter(std::make_unique<PriorityConverter>(configUri)) +{} + +DocumentApiConverter::~DocumentApiConverter() {} + std::unique_ptr<api::StorageCommand> DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, const document::DocumentTypeRepo::SP &repo) @@ -31,55 +35,42 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, switch (fromMsg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: { - documentapi::PutDocumentMessage& from( - static_cast<documentapi::PutDocumentMessage&>(fromMsg)); - api::PutCommand::UP to(new api::PutCommand( - document::BucketId(0), from.getDocument(), - from.getTimestamp())); + documentapi::PutDocumentMessage& from(static_cast<documentapi::PutDocumentMessage&>(fromMsg)); + api::PutCommand::UP to(new api::PutCommand(document::BucketId(0), from.getDocument(), from.getTimestamp())); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: { - documentapi::UpdateDocumentMessage& from( - static_cast<documentapi::UpdateDocumentMessage&>(fromMsg)); - api::UpdateCommand::UP to(new api::UpdateCommand( - document::BucketId(0), from.getDocumentUpdate(), - from.getNewTimestamp())); + documentapi::UpdateDocumentMessage& from(static_cast<documentapi::UpdateDocumentMessage&>(fromMsg)); + api::UpdateCommand::UP to(new api::UpdateCommand(document::BucketId(0), from.getDocumentUpdate(), + from.getNewTimestamp())); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: { - documentapi::RemoveDocumentMessage& from( - static_cast<documentapi::RemoveDocumentMessage&>(fromMsg)); - api::RemoveCommand::UP to(new api::RemoveCommand( - document::BucketId(0), from.getDocumentId(), 0)); + documentapi::RemoveDocumentMessage& from(static_cast<documentapi::RemoveDocumentMessage&>(fromMsg)); + api::RemoveCommand::UP to(new api::RemoveCommand(document::BucketId(0), from.getDocumentId(), 0)); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_GETDOCUMENT: { - documentapi::GetDocumentMessage& from( - static_cast<documentapi::GetDocumentMessage&>(fromMsg)); - api::GetCommand::UP to(new api::GetCommand( - document::BucketId(0), from.getDocumentId(), - from.getFieldSet())); + documentapi::GetDocumentMessage& from(static_cast<documentapi::GetDocumentMessage&>(fromMsg)); + api::GetCommand::UP to(new api::GetCommand(document::BucketId(0), from.getDocumentId(), from.getFieldSet())); toMsg.reset(to.release()); break; } case DocumentProtocol::MESSAGE_CREATEVISITOR: { - documentapi::CreateVisitorMessage& from( - static_cast<documentapi::CreateVisitorMessage&>(fromMsg)); - api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand( - from.getLibraryName(), - from.getInstanceId(), - from.getDocumentSelection())); + documentapi::CreateVisitorMessage& from(static_cast<documentapi::CreateVisitorMessage&>(fromMsg)); + api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand(from.getLibraryName(), from.getInstanceId(), + from.getDocumentSelection())); to->setControlDestination(from.getControlDestination()); to->setDataDestination(from.getDataDestination()); @@ -94,76 +85,57 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, to->setVisitorDispatcherVersion(from.getVisitorDispatcherVersion()); to->setVisitorOrdering(from.getVisitorOrdering()); to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_DESTROYVISITOR: { - documentapi::DestroyVisitorMessage& from( - static_cast<documentapi::DestroyVisitorMessage&>(fromMsg)); - api::DestroyVisitorCommand::UP to(new api::DestroyVisitorCommand( - from.getInstanceId())); - toMsg.reset(to.release()); + documentapi::DestroyVisitorMessage& from(static_cast<documentapi::DestroyVisitorMessage&>(fromMsg)); + toMsg = std::make_unique<api::DestroyVisitorCommand>(from.getInstanceId()); break; } case DocumentProtocol::MESSAGE_MULTIOPERATION: { - documentapi::MultiOperationMessage& from( - static_cast<documentapi::MultiOperationMessage&>(fromMsg)); - api::MultiOperationCommand::UP to(new api::MultiOperationCommand(repo, - from.getBucketId(), from.getBuffer(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + documentapi::MultiOperationMessage& from(static_cast<documentapi::MultiOperationMessage&>(fromMsg)); + toMsg = std::make_unique<api::MultiOperationCommand>(repo, from.getBucketId(), from.getBuffer(), + from.keepTimeStamps()); break; } case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE: { - documentapi::BatchDocumentUpdateMessage& from( - static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg)); - api::BatchDocumentUpdateCommand::UP to( - new api::BatchDocumentUpdateCommand(from.getUpdates())); - toMsg.reset(to.release()); + documentapi::BatchDocumentUpdateMessage& from(static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg)); + toMsg = std::make_unique<api::BatchDocumentUpdateCommand>(from.getUpdates()); break; } case DocumentProtocol::MESSAGE_STATBUCKET: { - documentapi::StatBucketMessage& from( - static_cast<documentapi::StatBucketMessage&>(fromMsg)); - api::StatBucketCommand::UP to(new api::StatBucketCommand( - from.getBucketId(), from.getDocumentSelection())); - toMsg.reset(to.release()); + documentapi::StatBucketMessage& from(static_cast<documentapi::StatBucketMessage&>(fromMsg)); + toMsg = std::make_unique<api::StatBucketCommand>(from.getBucketId(), from.getDocumentSelection()); break; } case DocumentProtocol::MESSAGE_GETBUCKETLIST: { - documentapi::GetBucketListMessage& from( - static_cast<documentapi::GetBucketListMessage&>(fromMsg)); - api::GetBucketListCommand::UP to(new api::GetBucketListCommand( - from.getBucketId())); - toMsg.reset(to.release()); + documentapi::GetBucketListMessage& from(static_cast<documentapi::GetBucketListMessage&>(fromMsg)); + toMsg = std::make_unique<api::GetBucketListCommand>(from.getBucketId()); break; } case DocumentProtocol::MESSAGE_VISITORINFO: { - documentapi::VisitorInfoMessage& from( - static_cast<documentapi::VisitorInfoMessage&>(fromMsg)); + documentapi::VisitorInfoMessage& from(static_cast<documentapi::VisitorInfoMessage&>(fromMsg)); api::VisitorInfoCommand::UP to(new api::VisitorInfoCommand); for (uint32_t i = 0; i < from.getFinishedBuckets().size(); ++i) { to->setBucketCompleted(from.getFinishedBuckets()[i], 0); } if (!from.getErrorMessage().empty()) { - to->setErrorCode(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage())); + to->setErrorCode(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage())); } - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_REMOVELOCATION: { - documentapi::RemoveLocationMessage& from( - static_cast<documentapi::RemoveLocationMessage&>(fromMsg)); - api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand( - from.getDocumentSelection(), document::BucketId(0))); + documentapi::RemoveLocationMessage& from(static_cast<documentapi::RemoveLocationMessage&>(fromMsg)); + api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand(from.getDocumentSelection(), document::BucketId(0))); toMsg.reset(to.release()); break; } @@ -177,8 +149,7 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, timeout = INT_MAX; } toMsg->setTimeout(timeout); - toMsg->setPriority( - _priConverter.toStoragePriority(fromMsg.getPriority())); + toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority())); toMsg->setLoadType(fromMsg.getLoadType()); LOG(spam, "Converted command %s, loadtype %d, mapped priority %d to %d", @@ -193,34 +164,27 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply, api::StorageCommand& fromCommand) { if (LOG_WOULD_LOG(spam)) { - LOG(spam, "Trace for reply:\n%s", - fromReply.getTrace().toString().c_str()); + LOG(spam, "Trace for reply:\n%s", fromReply.getTrace().toString().c_str()); } std::unique_ptr<api::StorageReply> toMsg; switch (fromReply.getType()) { case documentapi::DocumentProtocol::REPLY_CREATEVISITOR: { - documentapi::CreateVisitorReply& fromRep( - static_cast<documentapi::CreateVisitorReply&>(fromReply)); - const api::CreateVisitorCommand& fromCmd( - static_cast<const api::CreateVisitorCommand&>(fromCommand)); + documentapi::CreateVisitorReply& fromRep(static_cast<documentapi::CreateVisitorReply&>(fromReply)); + const api::CreateVisitorCommand& fromCmd(static_cast<const api::CreateVisitorCommand&>(fromCommand)); api::CreateVisitorReply::UP to(new api::CreateVisitorReply(fromCmd)); to->setVisitorStatistics(fromRep.getVisitorStatistics()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case documentapi::DocumentProtocol::REPLY_STATBUCKET: { - documentapi::StatBucketReply& fromRep( - static_cast<documentapi::StatBucketReply&>(fromReply)); - const api::StatBucketCommand& fromCmd( - static_cast<const api::StatBucketCommand&>(fromCommand)); + documentapi::StatBucketReply& fromRep(static_cast<documentapi::StatBucketReply&>(fromReply)); + const api::StatBucketCommand& fromCmd(static_cast<const api::StatBucketCommand&>(fromCommand)); - api::StatBucketReply::UP to( - new api::StatBucketReply(fromCmd, fromRep.getResults())); - toMsg.reset(to.release()); + toMsg = std::make_unique<api::StatBucketReply>(fromCmd, fromRep.getResults()); break; } default: @@ -230,131 +194,98 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply, if (toMsg.get()) { if (fromReply.hasErrors()) { - toMsg->setResult(api::ReturnCode( - (api::ReturnCode::Result) fromReply.getError(0).getCode(), - fromReply.getError(0).getMessage())); - toMsg->setPriority( - _priConverter.toStoragePriority(fromReply.getPriority())); + toMsg->setResult(api::ReturnCode((api::ReturnCode::Result) fromReply.getError(0).getCode(), + fromReply.getError(0).getMessage())); + toMsg->setPriority(_priConverter->toStoragePriority(fromReply.getPriority())); } } return std::move(toMsg); } std::unique_ptr<mbus::Message> -DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, - const document::DocumentTypeRepo::SP &repo) +DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, const document::DocumentTypeRepo::SP &repo) { std::unique_ptr<mbus::Message> toMsg; switch (fromMsg.getType().getId()) { case api::MessageType::PUT_ID: { api::PutCommand& from(static_cast<api::PutCommand&>(fromMsg)); - documentapi::PutDocumentMessage::UP to( - new documentapi::PutDocumentMessage(from.getDocument())); + documentapi::PutDocumentMessage::UP to(new documentapi::PutDocumentMessage(from.getDocument())); to->setTimestamp(from.getTimestamp()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::UPDATE_ID: { api::UpdateCommand& from(static_cast<api::UpdateCommand&>(fromMsg)); - documentapi::UpdateDocumentMessage::UP to( - new documentapi::UpdateDocumentMessage(from.getUpdate())); + documentapi::UpdateDocumentMessage::UP to(new documentapi::UpdateDocumentMessage(from.getUpdate())); to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::REMOVE_ID: { api::RemoveCommand& from(static_cast<api::RemoveCommand&>(fromMsg)); - documentapi::RemoveDocumentMessage::UP to( - new documentapi::RemoveDocumentMessage(from.getDocumentId())); - toMsg.reset(to.release()); + toMsg = std::make_unique<documentapi::RemoveDocumentMessage>(from.getDocumentId()); break; } case api::MessageType::VISITOR_INFO_ID: { - api::VisitorInfoCommand& from( - static_cast<api::VisitorInfoCommand&>(fromMsg)); - documentapi::VisitorInfoMessage::UP to( - new documentapi::VisitorInfoMessage); + api::VisitorInfoCommand& from(static_cast<api::VisitorInfoCommand&>(fromMsg)); + documentapi::VisitorInfoMessage::UP to(new documentapi::VisitorInfoMessage); for (uint32_t i = 0; i < from.getCompletedBucketsList().size(); ++i) { - to->getFinishedBuckets().push_back( - from.getCompletedBucketsList()[i].bucketId); + to->getFinishedBuckets().push_back(from.getCompletedBucketsList()[i].bucketId); } to->setErrorMessage(from.getErrorCode().getMessage()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::DOCBLOCK_ID: { api::DocBlockCommand& from(static_cast<api::DocBlockCommand&>(fromMsg)); - documentapi::MultiOperationMessage::UP to( - new documentapi::MultiOperationMessage( - from.getBucketId(), - from.getDocumentBlock(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + toMsg = std::make_unique<documentapi::MultiOperationMessage>(from.getBucketId(), from.getDocumentBlock(), + from.keepTimeStamps()); break; } case api::MessageType::SEARCHRESULT_ID: { - api::SearchResultCommand& from( - static_cast<api::SearchResultCommand&>(fromMsg)); - documentapi::SearchResultMessage::UP to( - new documentapi::SearchResultMessage(from)); - toMsg.reset(to.release()); + api::SearchResultCommand& from(static_cast<api::SearchResultCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::SearchResultMessage>(from); break; } case api::MessageType::QUERYRESULT_ID: { - api::QueryResultCommand& from( - static_cast<api::QueryResultCommand&>(fromMsg)); - documentapi::QueryResultMessage::UP to( - new documentapi::QueryResultMessage( - from.getSearchResult(), from.getDocumentSummary())); - toMsg.reset(to.release()); + api::QueryResultCommand& from(static_cast<api::QueryResultCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::QueryResultMessage>(from.getSearchResult(), from.getDocumentSummary()); break; } case api::MessageType::DOCUMENTSUMMARY_ID: { - api::DocumentSummaryCommand& from( - static_cast<api::DocumentSummaryCommand&>(fromMsg)); - documentapi::DocumentSummaryMessage::UP to( - new documentapi::DocumentSummaryMessage(from)); - toMsg.reset(to.release()); + api::DocumentSummaryCommand& from(static_cast<api::DocumentSummaryCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::DocumentSummaryMessage>(from); break; } case api::MessageType::MULTIOPERATION_ID: { - api::MultiOperationCommand& from( - static_cast<api::MultiOperationCommand&>(fromMsg)); - documentapi::MultiOperationMessage::UP to( - new documentapi::MultiOperationMessage(repo, - from.getBucketId(), - from.getBuffer(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + api::MultiOperationCommand& from(static_cast<api::MultiOperationCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::MultiOperationMessage>(repo, from.getBucketId(), from.getBuffer(), + from.keepTimeStamps()); break; } case api::MessageType::MAPVISITOR_ID: { - api::MapVisitorCommand& from( - static_cast<api::MapVisitorCommand&>(fromMsg)); - documentapi::MapVisitorMessage::UP to( - new documentapi::MapVisitorMessage); + api::MapVisitorCommand& from(static_cast<api::MapVisitorCommand&>(fromMsg)); + documentapi::MapVisitorMessage::UP to(new documentapi::MapVisitorMessage); to->getData() = from.getData(); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::DOCUMENTLIST_ID: { - api::DocumentListCommand& from( - static_cast<api::DocumentListCommand&>(fromMsg)); - documentapi::DocumentListMessage::UP to( - new documentapi::DocumentListMessage(from.getBucketId())); + api::DocumentListCommand& from(static_cast<api::DocumentListCommand&>(fromMsg)); + documentapi::DocumentListMessage::UP to(new documentapi::DocumentListMessage(from.getBucketId())); for (uint32_t i = 0; i < from.getDocuments().size(); i++) { to->getDocuments().push_back( @@ -363,28 +294,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, from.getDocuments()[i]._doc, from.getDocuments()[i]._removeEntry)); } - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::EMPTYBUCKETS_ID: { - api::EmptyBucketsCommand& from( - static_cast<api::EmptyBucketsCommand&>(fromMsg)); - std::unique_ptr<documentapi::EmptyBucketsMessage> to( - new documentapi::EmptyBucketsMessage(from.getBuckets())); - toMsg.reset(to.release()); + api::EmptyBucketsCommand& from(static_cast<api::EmptyBucketsCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::EmptyBucketsMessage>(from.getBuckets()); break; } case api::MessageType::VISITOR_CREATE_ID: { - api::CreateVisitorCommand& from( - static_cast<api::CreateVisitorCommand&>(fromMsg)); + api::CreateVisitorCommand& from(static_cast<api::CreateVisitorCommand&>(fromMsg)); documentapi::CreateVisitorMessage::UP to( - new documentapi::CreateVisitorMessage( - from.getLibraryName(), - from.getInstanceId(), - from.getControlDestination(), - from.getDataDestination())); + new documentapi::CreateVisitorMessage(from.getLibraryName(), from.getInstanceId(), + from.getControlDestination(), from.getDataDestination())); to->setDocumentSelection(from.getDocumentSelection()); to->setMaximumPendingReplyCount(from.getMaximumPendingReplyCount()); to->setParameters(from.getParameters()); @@ -396,27 +320,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, to->getBuckets() = from.getBuckets(); to->setVisitorOrdering(from.getVisitorOrdering()); to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::VISITOR_DESTROY_ID: { - api::DestroyVisitorCommand& from( - static_cast<api::DestroyVisitorCommand&>(fromMsg)); - documentapi::DestroyVisitorMessage::UP to( - new documentapi::DestroyVisitorMessage); + api::DestroyVisitorCommand& from(static_cast<api::DestroyVisitorCommand&>(fromMsg)); + documentapi::DestroyVisitorMessage::UP to(new documentapi::DestroyVisitorMessage); to->setInstanceId(from.getInstanceId()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::STATBUCKET_ID: { - api::StatBucketCommand& from( - static_cast<api::StatBucketCommand&>(fromMsg)); - documentapi::StatBucketMessage::UP to( - new documentapi::StatBucketMessage( - from.getBucketId(), from.getDocumentSelection())); - toMsg.reset(to.release()); + api::StatBucketCommand& from(static_cast<api::StatBucketCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::StatBucketMessage>(from.getBucketId(), from.getDocumentSelection()); break; } default: @@ -434,13 +352,11 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, } void -DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, - mbus::Reply& toMsg) +DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, mbus::Reply& toMsg) { // First map error codes. if (fromMsg.getResult().failed()) { - mbus::Error error(mbus::Error(fromMsg.getResult().getResult(), - fromMsg.getResult().toString())); + mbus::Error error(mbus::Error(fromMsg.getResult().getResult(), fromMsg.getResult().toString())); toMsg.addError(error); LOG(debug, "Converted storageapi error code %d to %s", fromMsg.getResult().getResult(), error.toString().c_str()); @@ -449,65 +365,49 @@ DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, using documentapi::DocumentProtocol; if (toMsg.getType() == DocumentProtocol::REPLY_GETDOCUMENT) { api::GetReply& from(static_cast<api::GetReply&>(fromMsg)); - documentapi::GetDocumentReply& to( - static_cast<documentapi::GetDocumentReply&>(toMsg)); + documentapi::GetDocumentReply& to(static_cast<documentapi::GetDocumentReply&>(toMsg)); if (from.getDocument().get() != 0) { to.setDocument(from.getDocument()); to.setLastModified(from.getLastModifiedTimestamp()); } } else if (toMsg.getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT) { api::RemoveReply& from(static_cast<api::RemoveReply&>(fromMsg)); - documentapi::RemoveDocumentReply& to( - static_cast<documentapi::RemoveDocumentReply&>(toMsg)); + documentapi::RemoveDocumentReply& to(static_cast<documentapi::RemoveDocumentReply&>(toMsg)); to.setWasFound(from.wasFound()); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_PUTDOCUMENT) { api::PutReply& from(static_cast<api::PutReply&>(fromMsg)); - documentapi::WriteDocumentReply& to( - static_cast<documentapi::WriteDocumentReply&>(toMsg)); + documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg)); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_MULTIOPERATION) { - api::MultiOperationReply& from( - static_cast<api::MultiOperationReply&>(fromMsg)); - documentapi::WriteDocumentReply& to( - static_cast<documentapi::WriteDocumentReply&>(toMsg)); - to.setHighestModificationTimestamp( - from.getHighestModificationTimestamp()); + api::MultiOperationReply& from(static_cast<api::MultiOperationReply&>(fromMsg)); + documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg)); + to.setHighestModificationTimestamp(from.getHighestModificationTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT) { api::UpdateReply& from(static_cast<api::UpdateReply&>(fromMsg)); - documentapi::UpdateDocumentReply& to( - static_cast<documentapi::UpdateDocumentReply&>(toMsg)); + documentapi::UpdateDocumentReply& to(static_cast<documentapi::UpdateDocumentReply&>(toMsg)); to.setWasFound(from.wasFound()); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_STATBUCKET) { api::StatBucketReply& from(static_cast<api::StatBucketReply&>(fromMsg)); - documentapi::StatBucketReply& to( - static_cast<documentapi::StatBucketReply&>(toMsg)); + documentapi::StatBucketReply& to(static_cast<documentapi::StatBucketReply&>(toMsg)); to.setResults(from.getResults()); } else if (toMsg.getType() == DocumentProtocol::REPLY_GETBUCKETLIST) { - api::GetBucketListReply& from( - static_cast<api::GetBucketListReply&>(fromMsg)); - documentapi::GetBucketListReply& to( - static_cast<documentapi::GetBucketListReply&>(toMsg)); - const std::vector<api::GetBucketListReply::BucketInfo>& buckets( - from.getBuckets()); + api::GetBucketListReply& from(static_cast<api::GetBucketListReply&>(fromMsg)); + documentapi::GetBucketListReply& to(static_cast<documentapi::GetBucketListReply&>(toMsg)); + const std::vector<api::GetBucketListReply::BucketInfo>& buckets(from.getBuckets()); for (uint32_t i = 0; i < buckets.size(); i++) { to.getBuckets().push_back( - documentapi::GetBucketListReply::BucketInfo( - buckets[i]._bucket, buckets[i]._bucketInformation)); + documentapi::GetBucketListReply::BucketInfo(buckets[i]._bucket, buckets[i]._bucketInformation)); } } else if (toMsg.getType() == DocumentProtocol::REPLY_CREATEVISITOR) { - api::CreateVisitorReply& from( - static_cast<api::CreateVisitorReply&>(fromMsg)); - documentapi::CreateVisitorReply& to( - static_cast<documentapi::CreateVisitorReply&>(toMsg)); + api::CreateVisitorReply& from(static_cast<api::CreateVisitorReply&>(fromMsg)); + documentapi::CreateVisitorReply& to(static_cast<documentapi::CreateVisitorReply&>(toMsg)); to.setLastBucket(from.getLastBucket()); to.setVisitorStatistics(from.getVisitorStatistics()); } else if (toMsg.getType() == DocumentProtocol::REPLY_BATCHDOCUMENTUPDATE) { - api::BatchDocumentUpdateReply& from( - static_cast<api::BatchDocumentUpdateReply&>(fromMsg)); - documentapi::BatchDocumentUpdateReply& to( - static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg)); + api::BatchDocumentUpdateReply& from(static_cast<api::BatchDocumentUpdateReply&>(fromMsg)); + documentapi::BatchDocumentUpdateReply& to(static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg)); to.getDocumentsNotFound() = from.getDocumentsNotFound(); } } diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.h b/storage/src/vespa/storage/storageserver/documentapiconverter.h index bd620f58dc0..f53b538272a 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.h +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.h @@ -1,15 +1,19 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "priorityconverter.h" -#include <vespa/storageapi/messageapi/storagecommand.h> -#include <vespa/storageapi/messageapi/storagereply.h> #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/documentapi/messagebus/messages/documentreply.h> #include <vespa/document/repo/documenttyperepo.h> +namespace config { class ConfigUri; } namespace storage { +namespace api { + class StorageCommand; + class StorageReply; +} + +class PriorityConverter; /** Converts messages from storageapi to documentapi and vice versa. @@ -17,25 +21,16 @@ namespace storage { class DocumentApiConverter { public: - DocumentApiConverter(const config::ConfigUri & configUri) - : _priConverter(configUri) {} - - std::unique_ptr<storage::api::StorageCommand> toStorageAPI( - documentapi::DocumentMessage& msg, - const document::DocumentTypeRepo::SP &repo); - - std::unique_ptr<storage::api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); + DocumentApiConverter(const config::ConfigUri & configUri); + ~DocumentApiConverter(); + std::unique_ptr<api::StorageCommand> toStorageAPI(documentapi::DocumentMessage& msg, const document::DocumentTypeRepo::SP &repo); + std::unique_ptr<api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); void transferReplyState(storage::api::StorageReply& from, mbus::Reply& to); - - std::unique_ptr<mbus::Message> toDocumentAPI( - storage::api::StorageCommand& cmd, - const document::DocumentTypeRepo::SP &repo); - - const PriorityConverter& getPriorityConverter() const { return _priConverter; } + std::unique_ptr<mbus::Message> toDocumentAPI(api::StorageCommand& cmd, const document::DocumentTypeRepo::SP &repo); + const PriorityConverter& getPriorityConverter() const { return *_priConverter; } private: - PriorityConverter _priConverter; + std::unique_ptr<PriorityConverter> _priConverter; }; } // namespace storage - diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 7daf2fb4777..9cc4a643d55 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/host_name.h> +#include <vespa/fnet/frt/supervisor.h> #include <sstream> #include <vespa/log/log.h> @@ -35,6 +36,12 @@ FNetListener::~FNetListener() } } +int +FNetListener::getListenPort() const +{ + return _orb->GetListenPort(); +} + void FNetListener::registerHandle(const vespalib::stringref & handle) { _slobrokRegister.registerName(handle); @@ -58,42 +65,28 @@ FNetListener::initRPC() { FRT_ReflectionBuilder rb(_orb.get()); - rb.DefineMethod( - "getnodestate3", "sii", "ss", true, - FRT_METHOD(FNetListener::RPC_getNodeState2), - this); + rb.DefineMethod("getnodestate3", "sii", "ss", true, FRT_METHOD(FNetListener::RPC_getNodeState2), this); rb.MethodDesc("Get state of this node"); rb.ParamDesc("nodestate", "Expected state of given node. If correct, the " "request will be queued on target until it changes. To not give " "any state use the string 'unknown', enforcing a direct reply."); - rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the " - "state requester"); + rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester"); rb.ReturnDesc("nodestate", "State string for this node"); rb.ReturnDesc("hostinfo", "Information about host this node is running on"); //------------------------------------------------------------------------- - rb.DefineMethod( - "getnodestate2", "si", "s", true, - FRT_METHOD(FNetListener::RPC_getNodeState2), - this); + rb.DefineMethod("getnodestate2", "si", "s", true, FRT_METHOD(FNetListener::RPC_getNodeState2), this); rb.MethodDesc("Get state of this node"); rb.ParamDesc("nodestate", "Expected state of given node. If correct, the " "request will be queued on target until it changes. To not give " "any state use the string 'unknown', enforcing a direct reply."); - rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the " - "state requester"); + rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester"); rb.ReturnDesc("nodestate", "State string for this node"); //------------------------------------------------------------------------- - rb.DefineMethod( - "setsystemstate2", "s", "", true, - FRT_METHOD(FNetListener::RPC_setSystemState2), - this); + rb.DefineMethod("setsystemstate2", "s", "", true, FRT_METHOD(FNetListener::RPC_setSystemState2), this); rb.MethodDesc("Set systemstate on this node"); rb.ParamDesc("systemstate", "New systemstate to set"); //------------------------------------------------------------------------- - rb.DefineMethod( - "getcurrenttime", "", "lis", true, - FRT_METHOD(FNetListener::RPC_getCurrentTime), - this); + rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); rb.ReturnDesc("nanoseconds", "additional nanoseconds since epoch"); @@ -133,10 +126,9 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) req->GetParams()->GetValue(0)._string._len); std::shared_ptr<api::GetNodeStateCommand> cmd( - new api::GetNodeStateCommand( - expected != "unknown" ? - std::unique_ptr<lib::NodeState>(new lib::NodeState(expected)) : - std::unique_ptr<lib::NodeState>())); + new api::GetNodeStateCommand(expected != "unknown" + ? std::make_unique<lib::NodeState>(expected) + : std::unique_ptr<lib::NodeState>())); cmd->setPriority(api::StorageMessage::VERYHIGH); cmd->setTimeout(req->GetParams()->GetValue(1)._intval32); @@ -144,9 +136,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32); } // Create a request object to avoid needing a separate transport type - std::unique_ptr<RPCRequestWrapper> request(new RPCRequestWrapper(req)); - cmd->setTransportContext(std::unique_ptr<api::TransportContext>( - new StorageTransportContext(std::move(request)))); + cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req))); req->Detach(); _comManager.enqueue(cmd); } @@ -160,17 +150,14 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req) return; } vespalib::string systemStateStr(req->GetParams()->GetValue(0)._string._str, - req->GetParams()->GetValue(0)._string._len); + req->GetParams()->GetValue(0)._string._len); lib::ClusterState systemState(systemStateStr); - std::shared_ptr<api::SetSystemStateCommand> cmd( - new api::SetSystemStateCommand(systemState)); + std::shared_ptr<api::SetSystemStateCommand> cmd(std::make_shared<api::SetSystemStateCommand>(systemState)); cmd->setPriority(api::StorageMessage::VERYHIGH); - // Create a request object to avoid needing a separate transport type - std::unique_ptr<RPCRequestWrapper> request(new RPCRequestWrapper(req)); - cmd->setTransportContext(std::unique_ptr<api::TransportContext>( - new StorageTransportContext(std::move(request)))); + // Create a request object to avoid needing a separate transport type + cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req))); req->Detach(); _comManager.enqueue(cmd); } diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index 781f818b673..ecbfb0ce9d9 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -22,6 +22,7 @@ public: void registerHandle(const vespalib::stringref & handle); void close(); + int getListenPort() const; // Used by unit tests. bool serviceExists(const vespalib::stringref & connectionSpec); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index ede7be3b9ad..99ce5c5df64 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -1,12 +1,13 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mergethrottler.h" -#include "storagemetricsset.h" -#include <sstream> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/persistence/messages.h> +#include <vespa/messagebus/message.h> +#include <vespa/config/common/exceptions.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> + #include <vespa/log/log.h> LOG_SETUP(".mergethrottler"); diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.cpp b/storage/src/vespa/storage/storageserver/priorityconverter.cpp index c8cf9e5fc29..1ab820c6918 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.cpp +++ b/storage/src/vespa/storage/storageserver/priorityconverter.cpp @@ -2,6 +2,7 @@ #include "priorityconverter.h" #include <vespa/documentapi/messagebus/documentprotocol.h> +#include <vespa/config/subscription/configuri.h> namespace storage { diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.h b/storage/src/vespa/storage/storageserver/priorityconverter.h index d5d2953ea45..0daf5b8c891 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.h +++ b/storage/src/vespa/storage/storageserver/priorityconverter.h @@ -3,11 +3,14 @@ #pragma once #include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/config/config.h> +#include <vespa/config/helper/configfetcher.h> #include <vespa/documentapi/messagebus/priority.h> +#include <vespa/vespalib/util/sync.h> #include <atomic> #include <array> +namespace config {class ConfigUri; } + namespace storage { class PriorityConverter diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 5c74f520cde..b05d159ef08 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -3,19 +3,21 @@ #include "servicelayernode.h" #include "bouncer.h" #include "bucketintegritychecker.h" -#include <vespa/storage/bucketmover/bucketmover.h> #include "communicationmanager.h" #include "changedbucketownershiphandler.h" #include "mergethrottler.h" #include "opslogger.h" #include "statemanager.h" +#include "priorityconverter.h" #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/bucketdb/storagebucketdbinitializer.h> +#include <vespa/storage/bucketmover/bucketmover.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/persistence/spi/exceptions.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/log/log.h> LOG_SETUP(".node.servicelayer"); @@ -28,8 +30,7 @@ ServiceLayerNode::ServiceLayerNode( ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors) - : StorageNode(configUri, context, generationFetcher, - std::unique_ptr<HostInfo>(new HostInfo)), + : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)), _context(context), _persistenceProvider(persistenceProvider), _partitions(0), @@ -46,19 +47,15 @@ void ServiceLayerNode::init() _init_has_been_called = true; spi::Result initResult(_persistenceProvider.initialize()); if (initResult.hasError()) { - LOG(error, "Failed to initialize persistence provider: %s", - initResult.toString().c_str()); - throw spi::HandledException( - "Failed provider init: " + initResult.toString(), VESPA_STRLOC); + LOG(error, "Failed to initialize persistence provider: %s", initResult.toString().c_str()); + throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC); } spi::PartitionStateListResult result( _persistenceProvider.getPartitionStates()); if (result.hasError()) { - LOG(error, "Failed to get partition list from persistence provider: %s", - result.toString().c_str()); - throw spi::HandledException("Failed to get partition list: " - + result.toString(), VESPA_STRLOC); + LOG(error, "Failed to get partition list from persistence provider: %s", result.toString().c_str()); + throw spi::HandledException("Failed to get partition list: " + result.toString(), VESPA_STRLOC); } _partitions = result.getList(); if (_partitions.size() == 0) { @@ -76,8 +73,7 @@ void ServiceLayerNode::init() LOG(warning, "Network failure: '%s'", e.what()); throw; } catch (const vespalib::Exception & e) { - LOG(error, "Caught exception %s during startup. Calling destruct " - "functions in hopes of dying gracefully.", + LOG(error, "Caught exception %s during startup. Calling destruct functions in hopes of dying gracefully.", e.getMessage().c_str()); requestShutdown("Failed to initialize: " + e.getMessage()); throw; @@ -135,8 +131,7 @@ ServiceLayerNode::initializeNodeSpecific() if (_partitions[i].getState() == spi::PartitionState::UP) { ++usablePartitions; } else { - lib::DiskState diskState(lib::State::DOWN, - _partitions[i].getReason()); + lib::DiskState diskState(lib::State::DOWN, _partitions[i].getReason()); ns.setDiskState(i, diskState); } } @@ -150,8 +145,7 @@ ServiceLayerNode::initializeNodeSpecific() ns.setReliability(_serverConfig->nodeReliability); for (uint16_t i=0; i<_serverConfig->diskCapacity.size(); ++i) { if (i >= ns.getDiskCount()) { - LOG(warning, "Capacity configured for partition %" PRIu64 " but only " - "%u partitions found.", + LOG(warning, "Capacity configured for partition %" PRIu64 " but only %u partitions found.", _serverConfig->diskCapacity.size(), ns.getDiskCount()); continue; } @@ -159,8 +153,7 @@ ServiceLayerNode::initializeNodeSpecific() ds.setCapacity(_serverConfig->diskCapacity[i]); ns.setDiskState(i, ds); } - LOG(debug, "Adjusting reported node state to include partition count and " - "states, capacity and reliability: %s", + LOG(debug, "Adjusting reported node state to include partition count and states, capacity and reliability: %s", ns.toString().c_str()); _component->getStateUpdater().setReportedNodeState(ns); } @@ -180,35 +173,28 @@ ServiceLayerNode::handleLiveConfigUpdate() DIFFERWARN(diskCount, "Cannot alter partition count of node live"); { updated = false; - NodeStateUpdater::Lock::SP lock( - _component->getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_component->getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); if (DIFFER(nodeCapacity)) { - LOG(info, "Live config update: Updating node capacity " - "from %f to %f.", + LOG(info, "Live config update: Updating node capacity from %f to %f.", oldC.nodeCapacity, newC.nodeCapacity); ASSIGN(nodeCapacity); ns.setCapacity(newC.nodeCapacity); } if (DIFFER(diskCapacity)) { - for (uint32_t i=0; - i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i) - { + for (uint32_t i=0; i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i) { if (newC.diskCapacity[i] != oldC.diskCapacity[i]) { lib::DiskState ds(ns.getDiskState(i)); ds.setCapacity(newC.diskCapacity[i]); ns.setDiskState(i, ds); - LOG(info, "Live config update: Disk capacity of " - "disk %u changed from %f to %f.", + LOG(info, "Live config update: Disk capacity of disk %u changed from %f to %f.", i, oldC.diskCapacity[i], newC.diskCapacity[i]); } } ASSIGN(diskCapacity); } if (DIFFER(nodeReliability)) { - LOG(info, "Live config update: Node reliability changed " - "from %u to %u.", + LOG(info, "Live config update: Node reliability changed from %u to %u.", oldC.nodeReliability, newC.nodeReliability); ASSIGN(nodeReliability); ns.setReliability(newC.nodeReliability); @@ -246,16 +232,14 @@ ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread) srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); srcParams.setReplyHandler(*mbusSession); mbusSession->setSourceSession( - _communicationManager->getMessageBus().getMessageBus() - .createSourceSession(srcParams)); + _communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams)); return VisitorMessageSession::UP(std::move(mbusSession)); } documentapi::Priority::Value ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const { - return _communicationManager->getPriorityConverter(). - toDocumentPriority(storagePriority); + return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority); } StorageLink::UP @@ -264,8 +248,7 @@ ServiceLayerNode::createChain() ServiceLayerComponentRegister& compReg(_context.getComponentRegister()); StorageLink::UP chain; - chain.reset(_communicationManager = new CommunicationManager( - compReg, _configUri)); + chain.reset(_communicationManager = new CommunicationManager(compReg, _configUri)); chain->push_back(StorageLink::UP(new Bouncer(compReg, _configUri))); if (_noUsablePartitionMode) { /* @@ -279,8 +262,7 @@ ServiceLayerNode::createChain() chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg))); chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg))); chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg))); - chain->push_back(StorageLink::UP( - new bucketmover::BucketMover(_configUri, compReg))); + chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); chain->push_back(StorageLink::UP(new StorageBucketDBInitializer( _configUri, _partitions, getDoneInitializeHandler(), compReg))); chain->push_back(StorageLink::UP(new BucketManager( diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 5e1f8b4df79..e069202e5f9 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -9,9 +9,9 @@ #include "recoveryvisitor.h" #include <vespa/storageframework/generic/memory/memorymanagerinterface.h> #include <vespa/storage/common/statusmessages.h> +#include <vespa/config/common/exceptions.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> #include <vespa/vespalib/util/stringfmt.h> -#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".visitor.manager"); diff --git a/streamingvisitors/src/vespa/searchvisitor/rankmanager.cpp b/streamingvisitors/src/vespa/searchvisitor/rankmanager.cpp index 4befe163377..59e301e5dd3 100644 --- a/streamingvisitors/src/vespa/searchvisitor/rankmanager.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/rankmanager.cpp @@ -4,6 +4,7 @@ #include <vespa/searchlib/features/setup.h> #include <vespa/searchlib/fef/functiontablefactory.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/exception.h> #include <vespa/log/log.h> LOG_SETUP(".searchvisitor.rankmanager"); diff --git a/vdslib/src/tests/distribution/distributiontest.cpp b/vdslib/src/tests/distribution/distributiontest.cpp index 7e10d61634a..127ba20b0df 100644 --- a/vdslib/src/tests/distribution/distributiontest.cpp +++ b/vdslib/src/tests/distribution/distributiontest.cpp @@ -3,9 +3,6 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/distribution/idealnodecalculator.h> #include <vespa/config/helper/configfetcher.h> -#include <chrono> -#include <thread> -#include <fstream> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/testkit/test_kit.h> @@ -15,8 +12,11 @@ #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/config-stor-distribution.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/config/subscription/configuri.h> #include <vespa/fastos/file.h> - +#include <chrono> +#include <thread> +#include <fstream> namespace storage { namespace lib { diff --git a/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp b/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp index af805d461fc..79d245f6a73 100644 --- a/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp +++ b/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp @@ -1,6 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <boost/tokenizer.hpp> +#include "locator.h" #include <vespa/documentapi/messagebus/documentprotocol.h> #include <vespa/messagebus/configagent.h> #include <vespa/messagebus/iconfighandler.h> @@ -8,9 +8,9 @@ #include <vespa/vdslib/bucketdistribution.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/config/helper/configgetter.hpp> - - -#include "locator.h" +#include <vespa/config/common/exceptions.h> +#include <vespa/config/subscription/configuri.h> +#include <boost/tokenizer.hpp> typedef std::map<std::string, uint32_t> ClusterMap; using namespace config; diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp index baf7c01d631..e01104943cd 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp @@ -11,6 +11,7 @@ #include <vespa/messagebus/routing/routedirective.h> #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/slobrok/sbmirror.h> +#include <vespa/config/common/exceptions.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/vespalib/util/stringfmt.h> diff --git a/vespalib/src/vespa/vespalib/trace/tracenode.h b/vespalib/src/vespa/vespalib/trace/tracenode.h index fbb428c43d2..e425ebd564a 100644 --- a/vespalib/src/vespa/vespalib/trace/tracenode.h +++ b/vespalib/src/vespa/vespalib/trace/tracenode.h @@ -1,8 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vector> #include <vespa/vespalib/stllike/string.h> +#include <vector> namespace vespalib { @@ -25,7 +25,7 @@ private: TraceNode *_parent; bool _strict; bool _hasNote; - string _note; + string _note; std::vector<TraceNode> _children; int64_t _timestamp; |