diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-06-08 13:27:29 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-06-08 13:28:18 +0200 |
commit | a222a37083bf74b4b2992e91a4ec90921b0cc2bf (patch) | |
tree | 1cac7aa9d5b476b4ec03c1e43866baa9b34b2393 | |
parent | 25809a32e2a9227d92e355483984e9ba592358f4 (diff) |
Only include what you really need
58 files changed, 315 insertions, 418 deletions
diff --git a/config/src/tests/configfetcher/configfetcher.cpp b/config/src/tests/configfetcher/configfetcher.cpp index 02d90352f69..eab2e8a8c84 100644 --- a/config/src/tests/configfetcher/configfetcher.cpp +++ b/config/src/tests/configfetcher/configfetcher.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/config/helper/configfetcher.h> +#include <vespa/vespalib/util/exception.h> #include <fstream> #include "config-my.h" #include <atomic> @@ -151,7 +152,7 @@ TEST_F("verify that config generation can be obtained from config fetcher", Conf if (cb._configured) { break; } - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(std::chrono::milliseconds(10));; } EXPECT_EQUAL(2, fetcher.getGeneration()); EXPECT_EQUAL("bar", cb._config.get()->myField); diff --git a/config/src/tests/subscriber/subscriber.cpp b/config/src/tests/subscriber/subscriber.cpp index 3bae5ed85b1..f0b7fe18930 100644 --- a/config/src/tests/subscriber/subscriber.cpp +++ b/config/src/tests/subscriber/subscriber.cpp @@ -281,9 +281,9 @@ TEST_MT_FFF("requireThatConfigIsReturnedWhenUpdatedDuringNextConfig", 2, MyManag verifyConfig("foo2", f3.h1->getConfig()); verifyConfig("bar", f3.h2->getConfig()); } else { - FastOS_Thread::Sleep(300); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); f1.updateValue(0, createFooValue("foo2"), 2); - FastOS_Thread::Sleep(300); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); f1.updateGeneration(1, 2); } } @@ -331,7 +331,7 @@ TEST_MT_FFF("requireThatNextConfigIsInterruptedOnClose", 2, MyManager, APIFixtur ASSERT_TRUE(timer.MilliSecsToNow() >= 500.0); ASSERT_TRUE(timer.MilliSecsToNow() < 60000.0); } else { - FastOS_Thread::Sleep(1000); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); f3.s.close(); } } @@ -514,7 +514,7 @@ TEST_MT_FF("requireThatConfigSubscriberWaitsUntilNextConfigSucceeds", 2, MyManag verifyConfig("foo2", h1->getConfig()); // First update is skipped } else { TEST_BARRIER(); - FastOS_Thread::Sleep(1000); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); f1.updateValue(0, createFooValue("foo2"), 3); } } diff --git a/config/src/vespa/config/common/configparser.cpp b/config/src/vespa/config/common/configparser.cpp index e0a0b0138b9..c490133172f 100644 --- a/config/src/vespa/config/common/configparser.cpp +++ b/config/src/vespa/config/common/configparser.cpp @@ -1,11 +1,17 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configparser.h" +#include "exceptions.h" #include "misc.h" #include <vespa/vespalib/stllike/asciistream.h> namespace config { +void ConfigParser::throwNoDefaultValue(const vespalib::stringref & key) { + throw InvalidConfigException("Config parameter " + key + " has no " + "default value and is not specified in config", VESPA_STRLOC); +} + vespalib::string ConfigParser::deQuote(const vespalib::stringref & source) { diff --git a/config/src/vespa/config/common/configparser.h b/config/src/vespa/config/common/configparser.h index 613dfc33d94..cde036281a5 100644 --- a/config/src/vespa/config/common/configparser.h +++ b/config/src/vespa/config/common/configparser.h @@ -1,7 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/stringfmt.h> #include <map> #include <set> @@ -25,6 +24,7 @@ private: static std::map<vespalib::string, vsvector> splitMap( const vsvector & config); static vespalib::string deQuote(const vespalib::stringref & source); + static void throwNoDefaultValue(const vespalib::stringref & key); template<typename T> static T convert(const vsvector &); @@ -81,8 +81,7 @@ ConfigParser::parseInternal(const vespalib::stringref & key, const V & config) V lines = getLinesForKey(key, config); if (lines.size() == 0) { - throw InvalidConfigException("Config parameter " + key + " has no " - "default value and is not specified in config", VESPA_STRLOC); + throwNoDefaultValue(key); } return convert<T>(lines); } diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp index 0a4b95e9153..1f1bfe69bb8 100644 --- a/config/src/vespa/config/helper/configfetcher.cpp +++ b/config/src/vespa/config/helper/configfetcher.cpp @@ -1,7 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "configfetcher.h" - +#include <vespa/vespalib/util/thread.h> #include <vespa/log/log.h> LOG_SETUP(".config.helper.configfetcher"); @@ -9,7 +9,7 @@ namespace config { ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) : _poller(context), - _thread(_poller), + _thread(std::make_unique<vespalib::Thread>(_poller)), _closed(false), _started(false) { @@ -17,20 +17,25 @@ ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) ConfigFetcher::ConfigFetcher(const SourceSpec & spec) : _poller(IConfigContext::SP(new ConfigContext(spec))), - _thread(_poller), + _thread(std::make_unique<vespalib::Thread>(_poller)), _closed(false), _started(false) { } void +ConfigFetcher::subscribeGenerationChanges(IGenerationCallback * callback) { + _poller.subscribeGenerationChanges(callback); +} + +void ConfigFetcher::start() { if (!_closed) { LOG(debug, "Polling for config"); _poller.poll(); LOG(debug, "Starting fetcher thread..."); - _thread.start(); + _thread->start(); _started = true; LOG(debug, "Fetcher thread started"); } @@ -47,7 +52,7 @@ ConfigFetcher::close() if (!_closed) { _poller.close(); if (_started) - _thread.join(); + _thread->join(); } } diff --git a/config/src/vespa/config/helper/configfetcher.h b/config/src/vespa/config/helper/configfetcher.h index ed04dc62f50..872937d8635 100644 --- a/config/src/vespa/config/helper/configfetcher.h +++ b/config/src/vespa/config/helper/configfetcher.h @@ -2,11 +2,11 @@ #pragma once #include "configpoller.h" -#include <vespa/config/config.h> #include <vespa/config/common/timingvalues.h> -#include <vespa/vespalib/util/thread.h> #include <atomic> +namespace vespalib { class Thread; } + namespace config { /** @@ -22,16 +22,14 @@ public: template <typename ConfigType> void subscribe(const std::string & configId, IFetcherCallback<ConfigType> * callback, uint64_t subscribeTimeout = DEFAULT_SUBSCRIBE_TIMEOUT); - void subscribeGenerationChanges(IGenerationCallback * callback) { - _poller.subscribeGenerationChanges(callback); - } + void subscribeGenerationChanges(IGenerationCallback * callback); void start(); void close(); int64_t getGeneration() const { return _poller.getGeneration(); } private: ConfigPoller _poller; - vespalib::Thread _thread; + std::unique_ptr<vespalib::Thread> _thread; std::atomic<bool> _closed; std::atomic<bool> _started; }; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp index ba4d0cff079..b494ad5673c 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/messagetypepolicy.cpp @@ -2,8 +2,7 @@ #include "messagetypepolicy.h" #include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/messagebus/routing/route.h> -#include <vespa/messagebus/routing/routingcontext.h> +#include <vespa/messagebus/message.h> #include <vespa/vespalib/stllike/hash_map.hpp> using vespa::config::content::MessagetyperouteselectorpolicyConfig; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp index f19e0c4c85f..780851ab597 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp @@ -9,10 +9,9 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/config-stor-distribution.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> -#include <vespa/log/log.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/log/log.h> LOG_SETUP(".storagepolicy"); using vespalib::make_string; @@ -120,8 +119,7 @@ StoragePolicy::doSelect(mbus::RoutingContext &context) document::BucketId id; switch(msg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId( - static_cast<const PutDocumentMessage&>(msg).getDocument()->getId()); + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument()->getId()); break; case DocumentProtocol::MESSAGE_GETDOCUMENT: diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h index c73c4c8560d..3b45e9c6bf7 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h @@ -2,10 +2,10 @@ #pragma once #include "externslobrokpolicy.h" -#include <vespa/document/bucket/bucketidfactory.h> #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/messagebus/reply.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> #include <vespa/config/helper/ifetchercallback.h> #include <vespa/config/helper/configfetcher.h> diff --git a/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp b/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp index 11577542573..e70e03b9442 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/common/environment.cpp @@ -5,12 +5,12 @@ #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/vstringfmt.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/stllike/asciistream.h> using config::ConfigGetter; -namespace storage { -namespace memfile { +namespace storage::memfile { namespace { @@ -117,5 +117,4 @@ Environment::swapModifiedBuckets(document::BucketId::List & ids) _modifiedBuckets.swap(ids); } -} // memfile -} // storage +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/common/environment.h b/memfilepersistence/src/vespa/memfilepersistence/common/environment.h index 93ef0768148..3798e24f329 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/common/environment.h +++ b/memfilepersistence/src/vespa/memfilepersistence/common/environment.h @@ -22,9 +22,9 @@ #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/config/helper/configfetcher.h> +namespace config { class ConfigUri; } -namespace storage { -namespace memfile { +namespace storage::memfile { class MemFileMapper; class MemFileCache; @@ -129,6 +129,4 @@ struct DefaultLazyFileFactory vespalib::LazyFile::UP createFile(const std::string& fileName) const override; }; -} // storage -} // memfile - +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp b/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp index 614d399ca22..cf66fe691da 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/device/mountpointlist.cpp @@ -5,11 +5,11 @@ #include <vespa/persistence/spi/exceptions.h> #include <vespa/vdslib/state/nodestate.h> #include <vespa/config/helper/configfetcher.h> +#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/guard.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <fstream> -#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".persistence.mountpointlist"); @@ -39,9 +39,7 @@ MountPointList::getPartitionStates() const for (uint32_t i=0; i<_mountPoints.size(); ++i) { if (!(_mountPoints[i]->isOk())) { const IOEvent* event = _mountPoints[i]->getLastEvent(); - - list[i] = spi::PartitionState(spi::PartitionState::DOWN, - event->getDescription()); + list[i] = spi::PartitionState(spi::PartitionState::DOWN, event->getDescription()); } } diff --git a/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp b/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp index e8fff0facd0..0e3a66e8b25 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/memfile/memfilecache.cpp @@ -4,12 +4,12 @@ #include <vespa/memfilepersistence/common/environment.h> #include <vespa/memfilepersistence/mapper/memfilemapper.h> #include <vespa/memfilepersistence/spi/memfilepersistenceprovidermetrics.h> +#include <vespa/vespalib/util/exception.h> #include <vespa/log/log.h> LOG_SETUP(".persistence.memfile.cache"); -namespace storage { -namespace memfile { +namespace storage::memfile { void MemFileCache::Entry::setInUse(bool inUse) { @@ -517,7 +517,4 @@ MemFileCache::printCacheEntriesHtml(std::ostream& out) const out << "</ol>\n"; } -} // memfile - -} // storage - +} diff --git a/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp b/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp index 072b3e9fae0..bd54efcb57c 100644 --- a/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp +++ b/memfilepersistence/src/vespa/memfilepersistence/tools/dumpslotfile.cpp @@ -14,16 +14,14 @@ #include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> #include <vespa/vespalib/util/programoptions.h> #include <vespa/config/helper/configgetter.hpp> -#include <sstream> - +#include <vespa/config/subscription/configuri.h> using config::ConfigGetter; using document::DocumenttypesConfig; using config::FileSpec; using document::DocumentTypeRepo; -namespace storage { -namespace memfile { +namespace storage::memfile { namespace { std::ostream* cout; @@ -347,8 +345,7 @@ int SlotFileDumper::dump(int argc, const char * const * argv, if (doc.get()) { printDoc(*doc, o); } else { - printFailure("Unable to get document in " + - it->toString(true)); + printFailure("Unable to get document in " + it->toString(true)); } } } @@ -361,5 +358,4 @@ int SlotFileDumper::dump(int argc, const char * const * argv, return 0; } -} // memfile -} // storage +} diff --git a/messagebus/src/vespa/messagebus/callstack.h b/messagebus/src/vespa/messagebus/callstack.h index f4e0ffe0df4..7ee1753e6b4 100644 --- a/messagebus/src/vespa/messagebus/callstack.h +++ b/messagebus/src/vespa/messagebus/callstack.h @@ -2,8 +2,8 @@ #pragma once -#include <vector> #include "context.h" +#include <vector> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/destinationsession.h b/messagebus/src/vespa/messagebus/destinationsession.h index 973d65ec152..fd4005c09be 100644 --- a/messagebus/src/vespa/messagebus/destinationsession.h +++ b/messagebus/src/vespa/messagebus/destinationsession.h @@ -1,8 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <string> #include "destinationsessionparams.h" #include "imessagehandler.h" #include "reply.h" @@ -10,6 +8,7 @@ namespace mbus { class MessageBus; +class Message; /** * A DestinationSession is used to receive Message objects and reply @@ -18,6 +17,7 @@ class MessageBus; class DestinationSession : public IMessageHandler { private: friend class MessageBus; + using MessageUP = std::unique_ptr<Message>; MessageBus &_mbus; string _name; @@ -62,7 +62,7 @@ public: * * @param msg the Message you want to acknowledge */ - void acknowledge(Message::UP msg); + void acknowledge(MessageUP msg); /** * Send a Reply as a response to a Message. The Reply will be routed back to @@ -80,7 +80,7 @@ public: * * @param message the Message */ - void handleMessage(Message::UP message) override; + void handleMessage(MessageUP message) override; /** * Returns the message handler of this session. diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.h b/messagebus/src/vespa/messagebus/destinationsessionparams.h index 4026cdbff91..98d6b38200c 100644 --- a/messagebus/src/vespa/messagebus/destinationsessionparams.h +++ b/messagebus/src/vespa/messagebus/destinationsessionparams.h @@ -1,8 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <string> #include "imessagehandler.h" +#include "common.h" namespace mbus { @@ -16,7 +16,7 @@ namespace mbus { */ class DestinationSessionParams { private: - string _name; + string _name; bool _broadcastName; IMessageHandler *_handler; diff --git a/messagebus/src/vespa/messagebus/emptyreply.h b/messagebus/src/vespa/messagebus/emptyreply.h index db8beb33b9b..4bacfb4864b 100644 --- a/messagebus/src/vespa/messagebus/emptyreply.h +++ b/messagebus/src/vespa/messagebus/emptyreply.h @@ -2,6 +2,7 @@ #pragma once #include "reply.h" +#include "blob.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/error.h b/messagebus/src/vespa/messagebus/error.h index e3d62bf6bbe..8fa56ef1673 100644 --- a/messagebus/src/vespa/messagebus/error.h +++ b/messagebus/src/vespa/messagebus/error.h @@ -1,7 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/common.h> +#include "common.h" #ifdef Error #undef Error diff --git a/messagebus/src/vespa/messagebus/imessagehandler.h b/messagebus/src/vespa/messagebus/imessagehandler.h index b8c50172b6c..a6e7f5fa296 100644 --- a/messagebus/src/vespa/messagebus/imessagehandler.h +++ b/messagebus/src/vespa/messagebus/imessagehandler.h @@ -3,10 +3,11 @@ #pragma once #include <memory> -#include "message.h" namespace mbus { +class Message; + /** * This interface is implemented by application components that want * to handle incoming messages received from either an @@ -26,7 +27,7 @@ public: * * @param message the Message being delivered **/ - virtual void handleMessage(Message::UP message) = 0; + virtual void handleMessage(std::unique_ptr<Message> message) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/intermediatesession.h b/messagebus/src/vespa/messagebus/intermediatesession.h index b4832d5069a..7d710a2e787 100644 --- a/messagebus/src/vespa/messagebus/intermediatesession.h +++ b/messagebus/src/vespa/messagebus/intermediatesession.h @@ -1,8 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <string> #include "reply.h" #include "imessagehandler.h" #include "intermediatesessionparams.h" @@ -11,6 +9,7 @@ namespace mbus { class MessageBus; class ReplyGate; +class Message; /** * An IntermediateSession is used to process Message and Reply objects @@ -21,6 +20,7 @@ class IntermediateSession : public IMessageHandler, { private: friend class MessageBus; + using MessageUP = std::unique_ptr<Message>; MessageBus &_mbus; string _name; @@ -70,7 +70,7 @@ public: * * @param msg The message to forward. */ - void forward(Message::UP msg); + void forward(MessageUP msg); /** * Convenience method to call {@link #forward(Routable)}. @@ -87,7 +87,7 @@ public: */ const string getConnectionSpec() const; - void handleMessage(Message::UP message) override; + void handleMessage(MessageUP message) override; void handleReply(Reply::UP reply) override; }; diff --git a/messagebus/src/vespa/messagebus/intermediatesessionparams.h b/messagebus/src/vespa/messagebus/intermediatesessionparams.h index 84224b8803b..ad4a3e96574 100644 --- a/messagebus/src/vespa/messagebus/intermediatesessionparams.h +++ b/messagebus/src/vespa/messagebus/intermediatesessionparams.h @@ -1,9 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <string> #include "imessagehandler.h" #include "ireplyhandler.h" +#include "common.h" namespace mbus { @@ -17,7 +17,7 @@ namespace mbus { */ class IntermediateSessionParams { private: - string _name; + string _name; bool _broadcastName; IMessageHandler *_msgHandler; IReplyHandler *_replyHandler; diff --git a/messagebus/src/vespa/messagebus/ireplyhandler.h b/messagebus/src/vespa/messagebus/ireplyhandler.h index c29717cd748..08361eee65e 100644 --- a/messagebus/src/vespa/messagebus/ireplyhandler.h +++ b/messagebus/src/vespa/messagebus/ireplyhandler.h @@ -2,10 +2,11 @@ #pragma once #include <memory> -#include "reply.h" namespace mbus { +class Reply; + /** * This interface is implemented by application components that want * to handle incoming replies received from either an @@ -25,7 +26,7 @@ public: * * @param reply the Reply being delivered **/ - virtual void handleReply(Reply::UP reply) = 0; + virtual void handleReply(std::unique_ptr<Reply> reply) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index cea0aaa91fb..74f2943d43f 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -1,10 +1,10 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "routable.h" +#include <vespa/messagebus/routing/route.h> #include <vespa/fastos/time.h> #include <memory> -#include <vespa/messagebus/routing/route.h> -#include "routable.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 4d85dd6aca9..b550dd59774 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -9,6 +9,8 @@ namespace mbus { +class Error; + class PayLoadFiller { public: @@ -59,27 +61,17 @@ public: RPCSendV1(); ~RPCSendV1(); - // Implements RPCSendAdapter. void attach(RPCNetwork &net) override; - // Implements RPCSendAdapter. void send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining) override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining) override; - // Implements IReplyHandler. - void handleReply(Reply::UP reply) override; - - // Implements IDiscardHandler. + void handleReply(std::unique_ptr<Reply> reply) override; void handleDiscard(Context ctx) override; - - // Implements FRT_Invokable. void invoke(FRT_RPCRequest *req); - - // Implements FRT_IRequestWait. void RequestDone(FRT_RPCRequest *req) override; }; } // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/reply.cpp b/messagebus/src/vespa/messagebus/reply.cpp index 6a6cc417265..06f84a15d5f 100644 --- a/messagebus/src/vespa/messagebus/reply.cpp +++ b/messagebus/src/vespa/messagebus/reply.cpp @@ -1,10 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "reply.h" #include "emptyreply.h" -#include "error.h" #include "errorcode.h" #include "ireplyhandler.h" #include "message.h" -#include "reply.h" #include "tracelevel.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/backtrace.h> @@ -83,4 +82,14 @@ Reply::hasFatalErrors() const return false; } +void +Reply::setMessage(Message::UP msg) { + _msg = std::move(msg); +} + +Message::UP +Reply::getMessage() { + return std::move(_msg); +} + } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/reply.h b/messagebus/src/vespa/messagebus/reply.h index f352d442931..599baaa9bdb 100644 --- a/messagebus/src/vespa/messagebus/reply.h +++ b/messagebus/src/vespa/messagebus/reply.h @@ -1,9 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vector> #include "error.h" -#include "message.h" +#include "routable.h" +#include <vector> namespace mbus { @@ -17,9 +17,10 @@ class Message; */ class Reply : public Routable { private: - std::vector<Error> _errors; // A list of errors that have occured during the lifetime of this reply. - Message::UP _msg; // The message to which this is a reply. - double _retryDelay; // How to perform resending of this. + using MessageUP = std::unique_ptr<Message>; + std::vector<Error> _errors; // A list of errors that have occured during the lifetime of this reply. + MessageUP _msg; // The message to which this is a reply. + double _retryDelay; // How to perform resending of this. public: /** @@ -87,7 +88,7 @@ public: * * @param msg the Message to attach */ - void setMessage(Message::UP msg) { _msg = std::move(msg); } + void setMessage(MessageUP msg); /** * Detach the Message attached to this Reply. If a Reply contains errors, @@ -96,7 +97,7 @@ public: * * @return the detached Message */ - Message::UP getMessage() { return std::move(_msg); } + MessageUP getMessage(); /** * Returns the retry request of this reply. This can be set using {@link diff --git a/messagebus/src/vespa/messagebus/replygate.cpp b/messagebus/src/vespa/messagebus/replygate.cpp index a32310f08c7..c11d5e3ef88 100644 --- a/messagebus/src/vespa/messagebus/replygate.cpp +++ b/messagebus/src/vespa/messagebus/replygate.cpp @@ -1,5 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "replygate.h" +#include "message.h" +#include "reply.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/replygate.h b/messagebus/src/vespa/messagebus/replygate.h index 72c9378c62c..078e79aa84a 100644 --- a/messagebus/src/vespa/messagebus/replygate.h +++ b/messagebus/src/vespa/messagebus/replygate.h @@ -2,11 +2,10 @@ #pragma once -#include <vespa/vespalib/util/referencecounter.h> #include "idiscardhandler.h" #include "imessagehandler.h" #include "ireplyhandler.h" -#include "message.h" +#include <vespa/vespalib/util/referencecounter.h> namespace mbus { @@ -43,7 +42,7 @@ public: * the matching Reply has been obtained. In order to obtain the matching * Reply, this method will push this object on the CallStack of the Message. */ - void handleMessage(Message::UP msg) override; + void handleMessage(std::unique_ptr<Message> msg) override; /** * Forward or discard Reply. If the gate is still open, it will forward the @@ -51,7 +50,7 @@ public: * the Reply will be discarded. This method also decreases the reference * counter of this object. */ - void handleReply(Reply::UP reply) override; + void handleReply(std::unique_ptr<Reply> reply) override; // Implements IDiscardHandler. void handleDiscard(Context ctx) override; @@ -64,4 +63,3 @@ public: }; } // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/routable.h b/messagebus/src/vespa/messagebus/routable.h index b02c72efaca..0398fd01035 100644 --- a/messagebus/src/vespa/messagebus/routable.h +++ b/messagebus/src/vespa/messagebus/routable.h @@ -1,11 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> -#include <vespa/messagebus/blob.h> -#include <vespa/messagebus/callstack.h> -#include <vespa/messagebus/context.h> -#include <vespa/messagebus/trace.h> +#include "callstack.h" +#include "trace.h" +#include "common.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp b/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp index a1725e608a3..162318a0f76 100644 --- a/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp +++ b/messagebus/src/vespa/messagebus/staticthrottlepolicy.cpp @@ -1,5 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #include "staticthrottlepolicy.h" +#include "message.h" namespace mbus { diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index 16c5cfcfc4c..12894652fb7 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -4,6 +4,8 @@ #include <vespa/messagebus/imessagehandler.h> #include <vespa/messagebus/ireplyhandler.h> +#include <vespa/messagebus/message.h> +#include <vespa/messagebus/reply.h> #include <vespa/vespalib/util/sync.h> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.h b/messagebus/src/vespa/messagebus/testlib/slobrok.h index 1677f6635f7..8105f20007c 100644 --- a/messagebus/src/vespa/messagebus/testlib/slobrok.h +++ b/messagebus/src/vespa/messagebus/testlib/slobrok.h @@ -4,6 +4,7 @@ #include <vespa/messagebus/common.h> #include <vespa/slobrok/cfg.h> +#include <vespa/fastos/thread.h> namespace slobrok { class SBEnv; diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h index 1dddf39428b..fe04abb116b 100644 --- a/messagebus/src/vespa/messagebus/tracenode.h +++ b/messagebus/src/vespa/messagebus/tracenode.h @@ -1,7 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/common.h> #include <vespa/vespalib/trace/tracenode.h> namespace mbus { diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp index ed5e069c3ee..a99707bc275 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp @@ -5,7 +5,8 @@ #include "datasetcollection.h" #include "plain_dataset.h" #include "engine_base.h" -#include <vespa/searchcore/fdispatch/common/appcontext.h> +#include <vespa/config/common/exceptions.h> +#include <set> #include <vespa/log/log.h> LOG_SETUP(".search.nodemanager"); @@ -16,9 +17,10 @@ FastS_NodeManager::configure(std::unique_ptr<PartitionsConfig> cfg) LOG(config, "configuring datasetcollection from '%s'", _configUri.getConfigId().c_str()); SetPartMap(*cfg, 2000); - _componentConfig.addConfig(vespalib::ComponentConfigProducer::Config("fdispatch.nodemanager", - _fetcher->getGeneration(), - "will not update generation unless config has changed")); + _componentConfig.addConfig( + vespalib::ComponentConfigProducer::Config("fdispatch.nodemanager", + _fetcher->getGeneration(), + "will not update generation unless config has changed")); } @@ -26,15 +28,11 @@ class AdminBadEngines { std::set<vespalib::string> _bad; public: - void - addAdminBad(const vespalib::string &name) - { + void addAdminBad(const vespalib::string &name) { _bad.insert(name); } - bool - isAdminBad(const vespalib::string &name) const - { + bool isAdminBad(const vespalib::string &name) const { return _bad.find(name) != _bad.end(); } }; diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h index 1d2aa617f35..47d06033175 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.h @@ -7,6 +7,7 @@ #include <vespa/config/helper/configfetcher.h> #include <vespa/searchcore/fdispatch/common/queryperf.h> #include <vespa/vespalib/net/simple_component_config_producer.h> +#include <vespa/config/subscription/configuri.h> using vespa::config::search::core::PartitionsConfig; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index 8c737f26af7..a678678edd0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -13,6 +13,7 @@ #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/searchcore/config/config-ranking-constants.h> #include <vespa/vespalib/time/time_box.h> +#include <thread> LOG_SETUP(".proton.server.documentdbconfigmanager"); @@ -217,7 +218,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) while (timeBox.hasTimeLeft() && (filePath == "")) { filePath = fileAcquirer.wait_for(rc.fileref, timeBox.timeLeft()); if (filePath == "") { - FastOS_Thread::Sleep(100); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } LOG(info, "Got file path from file acquirer: '%s' (name='%s', type='%s', ref='%s')", diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index 1b0959d63fd..b07a80d92f4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -1,8 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserverapp.h" -#include <vespa/log/log.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/log/log.h> LOG_SETUP(".translogserverapp"); using search::common::FileHeaderContext; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index 4ee29b91bda..f88fcb98421 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -6,6 +6,8 @@ #include <vespa/config/helper/configfetcher.h> #include <vespa/vespalib/util/ptrholder.h> +namespace config { class ConfigUri; } + namespace search { namespace common { class FileHeaderContext; } diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index a661c5c445e..12130db59d1 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -1,12 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vector> #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/storage/persistence/messages.h> #include <tests/persistence/common/persistenceproviderwrapper.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <tests/persistence/common/filestortestfixture.h> #include <vespa/vespalib/util/barrier.h> +#include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/stllike/hash_set_insert.hpp> #include <vespa/log/log.h> diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index c01b24aae8d..cf96605b3ce 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -3,6 +3,7 @@ #include <vespa/storage/storageserver/communicationmanager.h> #include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> @@ -27,12 +28,10 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { std::shared_ptr<api::StorageCommand> createDummyCommand( api::StorageMessage::Priority priority) { - auto cmd = std::make_shared<api::GetCommand>( - document::BucketId(0), - document::DocumentId("doc::mydoc"), - "[all]"); - cmd->setAddress(api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 1)); + auto cmd = std::make_shared<api::GetCommand>(document::BucketId(0), + document::DocumentId("doc::mydoc"), + "[all]"); + cmd->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1)); cmd->setPriority(priority); return cmd; } diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index 26317465b5a..8858d5433a2 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -11,6 +11,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/testkit/test_kit.h> using document::DataType; diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h index 9ebebf73bea..193b6be133f 100644 --- a/storage/src/tests/storageserver/testvisitormessagesession.h +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -6,6 +6,7 @@ #include <vespa/storage/visiting/visitorthread.h> #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/storage/storageserver/priorityconverter.h> +#include <vespa/config/subscription/configuri.h> #include <deque> namespace storage { diff --git a/storage/src/vespa/storage/bucketmover/bucketmover.cpp b/storage/src/vespa/storage/bucketmover/bucketmover.cpp index b38c061de44..063050baa3c 100644 --- a/storage/src/vespa/storage/bucketmover/bucketmover.cpp +++ b/storage/src/vespa/storage/bucketmover/bucketmover.cpp @@ -7,6 +7,7 @@ #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/storageutil/log.h> #include <vespa/vespalib/util/stringfmt.h> +#include <thread> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".bucketmover"); @@ -176,7 +177,7 @@ BucketMover::sendNewMoves() // what is happening. (Cannot use wait() here as reply of // message sent will signal the monitor) if (_config->operationDelay != 0) { - FastOS_Thread::Sleep(_config->operationDelay); + std::this_thread::sleep_for(std::chrono::milliseconds(_config->operationDelay)); } } diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index c285645309a..71becba40df 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -3,6 +3,8 @@ #include "bouncer.h" #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/config/common/exceptions.h> #include <sstream> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index 8f6706d2cf8..6be465a8aa5 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -19,6 +19,8 @@ #include <vespa/storage/config/config-stor-bouncer.h> #include <vespa/vespalib/util/sync.h> +namespace config { class ConfigUri; } + namespace storage { class Bouncer : public StorageLink, @@ -82,6 +84,3 @@ private: }; } // storage - - - diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index f32b1c242cf..9087482cb42 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -2,12 +2,13 @@ #include "communicationmanager.h" #include "fnetlistener.h" #include "rpcrequestwrapper.h" +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/common/nodestateupdater.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> #include <vespa/storageapi/message/state.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/messagebus/emptyreply.h> -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/common/nodestateupdater.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/stllike/hash_map.hpp> diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index c5d17294dd7..6c8923b4c08 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -19,12 +19,20 @@ #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageapi/mbusprot/storagecommand.h> #include <vespa/storageapi/mbusprot/storagereply.h> -#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/imessagehandler.h> +#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/config/helper/configfetcher.h> #include <vespa/vespalib/util/document_runnable.h> +#include <vespa/config/subscription/configuri.h> #include <map> #include <queue> #include <atomic> +namespace mbus { + class RPCMessageBus; + class SourceSession; + class DestinationSession; +} namespace storage { class VisitorMbusSession; @@ -108,8 +116,8 @@ public: ~StorageTransportContext(); std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg; - std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; - std::unique_ptr<RPCRequestWrapper> _request; + std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; + std::unique_ptr<RPCRequestWrapper> _request; }; class CommunicationManager : public StorageLink, @@ -142,9 +150,7 @@ private: = vespa::config::content::core::StorCommunicationmanagerConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); - void configure(std::unique_ptr<CommunicationManagerConfig> config) override; - void receiveStorageReply(const std::shared_ptr<api::StorageReply>&); void serializeNodeState( @@ -157,9 +163,8 @@ private: static const uint64_t FORWARDED_MESSAGE = 0; std::unique_ptr<mbus::RPCMessageBus> _mbus; - mbus::DestinationSession::UP _messageBusSession; - mbus::SourceSession::UP _sourceSession; - mbus::SourceSession::UP _visitorSourceSession; + std::unique_ptr<mbus::DestinationSession> _messageBusSession; + std::unique_ptr<mbus::SourceSession> _sourceSession; uint32_t _count; vespalib::Lock _messageBusSentLock; diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ae789891852..2906667d1ee 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -1,5 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "documentapiconverter.h" +#include "priorityconverter.h" #include <vespa/documentapi/documentapi.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/datagram.h> @@ -11,9 +12,6 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/batch.h> -#include <vespa/messagebus/errorcode.h> -#include <vespa/storageapi/messageapi/returncode.h> -#include <vespa/vdslib/container/documentlist.h> #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/log/log.h> @@ -21,6 +19,12 @@ LOG_SETUP(".documentapiconverter"); namespace storage { +DocumentApiConverter::DocumentApiConverter(const config::ConfigUri & configUri) + : _priConverter(std::make_unique<PriorityConverter>(configUri)) +{} + +DocumentApiConverter::~DocumentApiConverter() {} + std::unique_ptr<api::StorageCommand> DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, const document::DocumentTypeRepo::SP &repo) @@ -31,55 +35,42 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, switch (fromMsg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: { - documentapi::PutDocumentMessage& from( - static_cast<documentapi::PutDocumentMessage&>(fromMsg)); - api::PutCommand::UP to(new api::PutCommand( - document::BucketId(0), from.getDocument(), - from.getTimestamp())); + documentapi::PutDocumentMessage& from(static_cast<documentapi::PutDocumentMessage&>(fromMsg)); + api::PutCommand::UP to(new api::PutCommand(document::BucketId(0), from.getDocument(), from.getTimestamp())); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: { - documentapi::UpdateDocumentMessage& from( - static_cast<documentapi::UpdateDocumentMessage&>(fromMsg)); - api::UpdateCommand::UP to(new api::UpdateCommand( - document::BucketId(0), from.getDocumentUpdate(), - from.getNewTimestamp())); + documentapi::UpdateDocumentMessage& from(static_cast<documentapi::UpdateDocumentMessage&>(fromMsg)); + api::UpdateCommand::UP to(new api::UpdateCommand(document::BucketId(0), from.getDocumentUpdate(), + from.getNewTimestamp())); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: { - documentapi::RemoveDocumentMessage& from( - static_cast<documentapi::RemoveDocumentMessage&>(fromMsg)); - api::RemoveCommand::UP to(new api::RemoveCommand( - document::BucketId(0), from.getDocumentId(), 0)); + documentapi::RemoveDocumentMessage& from(static_cast<documentapi::RemoveDocumentMessage&>(fromMsg)); + api::RemoveCommand::UP to(new api::RemoveCommand(document::BucketId(0), from.getDocumentId(), 0)); to->setCondition(from.getCondition()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_GETDOCUMENT: { - documentapi::GetDocumentMessage& from( - static_cast<documentapi::GetDocumentMessage&>(fromMsg)); - api::GetCommand::UP to(new api::GetCommand( - document::BucketId(0), from.getDocumentId(), - from.getFieldSet())); + documentapi::GetDocumentMessage& from(static_cast<documentapi::GetDocumentMessage&>(fromMsg)); + api::GetCommand::UP to(new api::GetCommand(document::BucketId(0), from.getDocumentId(), from.getFieldSet())); toMsg.reset(to.release()); break; } case DocumentProtocol::MESSAGE_CREATEVISITOR: { - documentapi::CreateVisitorMessage& from( - static_cast<documentapi::CreateVisitorMessage&>(fromMsg)); - api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand( - from.getLibraryName(), - from.getInstanceId(), - from.getDocumentSelection())); + documentapi::CreateVisitorMessage& from(static_cast<documentapi::CreateVisitorMessage&>(fromMsg)); + api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand(from.getLibraryName(), from.getInstanceId(), + from.getDocumentSelection())); to->setControlDestination(from.getControlDestination()); to->setDataDestination(from.getDataDestination()); @@ -94,76 +85,57 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, to->setVisitorDispatcherVersion(from.getVisitorDispatcherVersion()); to->setVisitorOrdering(from.getVisitorOrdering()); to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_DESTROYVISITOR: { - documentapi::DestroyVisitorMessage& from( - static_cast<documentapi::DestroyVisitorMessage&>(fromMsg)); - api::DestroyVisitorCommand::UP to(new api::DestroyVisitorCommand( - from.getInstanceId())); - toMsg.reset(to.release()); + documentapi::DestroyVisitorMessage& from(static_cast<documentapi::DestroyVisitorMessage&>(fromMsg)); + toMsg = std::make_unique<api::DestroyVisitorCommand>(from.getInstanceId()); break; } case DocumentProtocol::MESSAGE_MULTIOPERATION: { - documentapi::MultiOperationMessage& from( - static_cast<documentapi::MultiOperationMessage&>(fromMsg)); - api::MultiOperationCommand::UP to(new api::MultiOperationCommand(repo, - from.getBucketId(), from.getBuffer(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + documentapi::MultiOperationMessage& from(static_cast<documentapi::MultiOperationMessage&>(fromMsg)); + toMsg = std::make_unique<api::MultiOperationCommand>(repo, from.getBucketId(), from.getBuffer(), + from.keepTimeStamps()); break; } case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE: { - documentapi::BatchDocumentUpdateMessage& from( - static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg)); - api::BatchDocumentUpdateCommand::UP to( - new api::BatchDocumentUpdateCommand(from.getUpdates())); - toMsg.reset(to.release()); + documentapi::BatchDocumentUpdateMessage& from(static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg)); + toMsg = std::make_unique<api::BatchDocumentUpdateCommand>(from.getUpdates()); break; } case DocumentProtocol::MESSAGE_STATBUCKET: { - documentapi::StatBucketMessage& from( - static_cast<documentapi::StatBucketMessage&>(fromMsg)); - api::StatBucketCommand::UP to(new api::StatBucketCommand( - from.getBucketId(), from.getDocumentSelection())); - toMsg.reset(to.release()); + documentapi::StatBucketMessage& from(static_cast<documentapi::StatBucketMessage&>(fromMsg)); + toMsg = std::make_unique<api::StatBucketCommand>(from.getBucketId(), from.getDocumentSelection()); break; } case DocumentProtocol::MESSAGE_GETBUCKETLIST: { - documentapi::GetBucketListMessage& from( - static_cast<documentapi::GetBucketListMessage&>(fromMsg)); - api::GetBucketListCommand::UP to(new api::GetBucketListCommand( - from.getBucketId())); - toMsg.reset(to.release()); + documentapi::GetBucketListMessage& from(static_cast<documentapi::GetBucketListMessage&>(fromMsg)); + toMsg = std::make_unique<api::GetBucketListCommand>(from.getBucketId()); break; } case DocumentProtocol::MESSAGE_VISITORINFO: { - documentapi::VisitorInfoMessage& from( - static_cast<documentapi::VisitorInfoMessage&>(fromMsg)); + documentapi::VisitorInfoMessage& from(static_cast<documentapi::VisitorInfoMessage&>(fromMsg)); api::VisitorInfoCommand::UP to(new api::VisitorInfoCommand); for (uint32_t i = 0; i < from.getFinishedBuckets().size(); ++i) { to->setBucketCompleted(from.getFinishedBuckets()[i], 0); } if (!from.getErrorMessage().empty()) { - to->setErrorCode(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage())); + to->setErrorCode(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage())); } - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case DocumentProtocol::MESSAGE_REMOVELOCATION: { - documentapi::RemoveLocationMessage& from( - static_cast<documentapi::RemoveLocationMessage&>(fromMsg)); - api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand( - from.getDocumentSelection(), document::BucketId(0))); + documentapi::RemoveLocationMessage& from(static_cast<documentapi::RemoveLocationMessage&>(fromMsg)); + api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand(from.getDocumentSelection(), document::BucketId(0))); toMsg.reset(to.release()); break; } @@ -177,8 +149,7 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg, timeout = INT_MAX; } toMsg->setTimeout(timeout); - toMsg->setPriority( - _priConverter.toStoragePriority(fromMsg.getPriority())); + toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority())); toMsg->setLoadType(fromMsg.getLoadType()); LOG(spam, "Converted command %s, loadtype %d, mapped priority %d to %d", @@ -193,34 +164,27 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply, api::StorageCommand& fromCommand) { if (LOG_WOULD_LOG(spam)) { - LOG(spam, "Trace for reply:\n%s", - fromReply.getTrace().toString().c_str()); + LOG(spam, "Trace for reply:\n%s", fromReply.getTrace().toString().c_str()); } std::unique_ptr<api::StorageReply> toMsg; switch (fromReply.getType()) { case documentapi::DocumentProtocol::REPLY_CREATEVISITOR: { - documentapi::CreateVisitorReply& fromRep( - static_cast<documentapi::CreateVisitorReply&>(fromReply)); - const api::CreateVisitorCommand& fromCmd( - static_cast<const api::CreateVisitorCommand&>(fromCommand)); + documentapi::CreateVisitorReply& fromRep(static_cast<documentapi::CreateVisitorReply&>(fromReply)); + const api::CreateVisitorCommand& fromCmd(static_cast<const api::CreateVisitorCommand&>(fromCommand)); api::CreateVisitorReply::UP to(new api::CreateVisitorReply(fromCmd)); to->setVisitorStatistics(fromRep.getVisitorStatistics()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case documentapi::DocumentProtocol::REPLY_STATBUCKET: { - documentapi::StatBucketReply& fromRep( - static_cast<documentapi::StatBucketReply&>(fromReply)); - const api::StatBucketCommand& fromCmd( - static_cast<const api::StatBucketCommand&>(fromCommand)); + documentapi::StatBucketReply& fromRep(static_cast<documentapi::StatBucketReply&>(fromReply)); + const api::StatBucketCommand& fromCmd(static_cast<const api::StatBucketCommand&>(fromCommand)); - api::StatBucketReply::UP to( - new api::StatBucketReply(fromCmd, fromRep.getResults())); - toMsg.reset(to.release()); + toMsg = std::make_unique<api::StatBucketReply>(fromCmd, fromRep.getResults()); break; } default: @@ -230,131 +194,98 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply, if (toMsg.get()) { if (fromReply.hasErrors()) { - toMsg->setResult(api::ReturnCode( - (api::ReturnCode::Result) fromReply.getError(0).getCode(), - fromReply.getError(0).getMessage())); - toMsg->setPriority( - _priConverter.toStoragePriority(fromReply.getPriority())); + toMsg->setResult(api::ReturnCode((api::ReturnCode::Result) fromReply.getError(0).getCode(), + fromReply.getError(0).getMessage())); + toMsg->setPriority(_priConverter->toStoragePriority(fromReply.getPriority())); } } return std::move(toMsg); } std::unique_ptr<mbus::Message> -DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, - const document::DocumentTypeRepo::SP &repo) +DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, const document::DocumentTypeRepo::SP &repo) { std::unique_ptr<mbus::Message> toMsg; switch (fromMsg.getType().getId()) { case api::MessageType::PUT_ID: { api::PutCommand& from(static_cast<api::PutCommand&>(fromMsg)); - documentapi::PutDocumentMessage::UP to( - new documentapi::PutDocumentMessage(from.getDocument())); + documentapi::PutDocumentMessage::UP to(new documentapi::PutDocumentMessage(from.getDocument())); to->setTimestamp(from.getTimestamp()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::UPDATE_ID: { api::UpdateCommand& from(static_cast<api::UpdateCommand&>(fromMsg)); - documentapi::UpdateDocumentMessage::UP to( - new documentapi::UpdateDocumentMessage(from.getUpdate())); + documentapi::UpdateDocumentMessage::UP to(new documentapi::UpdateDocumentMessage(from.getUpdate())); to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::REMOVE_ID: { api::RemoveCommand& from(static_cast<api::RemoveCommand&>(fromMsg)); - documentapi::RemoveDocumentMessage::UP to( - new documentapi::RemoveDocumentMessage(from.getDocumentId())); - toMsg.reset(to.release()); + toMsg = std::make_unique<documentapi::RemoveDocumentMessage>(from.getDocumentId()); break; } case api::MessageType::VISITOR_INFO_ID: { - api::VisitorInfoCommand& from( - static_cast<api::VisitorInfoCommand&>(fromMsg)); - documentapi::VisitorInfoMessage::UP to( - new documentapi::VisitorInfoMessage); + api::VisitorInfoCommand& from(static_cast<api::VisitorInfoCommand&>(fromMsg)); + documentapi::VisitorInfoMessage::UP to(new documentapi::VisitorInfoMessage); for (uint32_t i = 0; i < from.getCompletedBucketsList().size(); ++i) { - to->getFinishedBuckets().push_back( - from.getCompletedBucketsList()[i].bucketId); + to->getFinishedBuckets().push_back(from.getCompletedBucketsList()[i].bucketId); } to->setErrorMessage(from.getErrorCode().getMessage()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::DOCBLOCK_ID: { api::DocBlockCommand& from(static_cast<api::DocBlockCommand&>(fromMsg)); - documentapi::MultiOperationMessage::UP to( - new documentapi::MultiOperationMessage( - from.getBucketId(), - from.getDocumentBlock(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + toMsg = std::make_unique<documentapi::MultiOperationMessage>(from.getBucketId(), from.getDocumentBlock(), + from.keepTimeStamps()); break; } case api::MessageType::SEARCHRESULT_ID: { - api::SearchResultCommand& from( - static_cast<api::SearchResultCommand&>(fromMsg)); - documentapi::SearchResultMessage::UP to( - new documentapi::SearchResultMessage(from)); - toMsg.reset(to.release()); + api::SearchResultCommand& from(static_cast<api::SearchResultCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::SearchResultMessage>(from); break; } case api::MessageType::QUERYRESULT_ID: { - api::QueryResultCommand& from( - static_cast<api::QueryResultCommand&>(fromMsg)); - documentapi::QueryResultMessage::UP to( - new documentapi::QueryResultMessage( - from.getSearchResult(), from.getDocumentSummary())); - toMsg.reset(to.release()); + api::QueryResultCommand& from(static_cast<api::QueryResultCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::QueryResultMessage>(from.getSearchResult(), from.getDocumentSummary()); break; } case api::MessageType::DOCUMENTSUMMARY_ID: { - api::DocumentSummaryCommand& from( - static_cast<api::DocumentSummaryCommand&>(fromMsg)); - documentapi::DocumentSummaryMessage::UP to( - new documentapi::DocumentSummaryMessage(from)); - toMsg.reset(to.release()); + api::DocumentSummaryCommand& from(static_cast<api::DocumentSummaryCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::DocumentSummaryMessage>(from); break; } case api::MessageType::MULTIOPERATION_ID: { - api::MultiOperationCommand& from( - static_cast<api::MultiOperationCommand&>(fromMsg)); - documentapi::MultiOperationMessage::UP to( - new documentapi::MultiOperationMessage(repo, - from.getBucketId(), - from.getBuffer(), - from.keepTimeStamps())); - toMsg.reset(to.release()); + api::MultiOperationCommand& from(static_cast<api::MultiOperationCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::MultiOperationMessage>(repo, from.getBucketId(), from.getBuffer(), + from.keepTimeStamps()); break; } case api::MessageType::MAPVISITOR_ID: { - api::MapVisitorCommand& from( - static_cast<api::MapVisitorCommand&>(fromMsg)); - documentapi::MapVisitorMessage::UP to( - new documentapi::MapVisitorMessage); + api::MapVisitorCommand& from(static_cast<api::MapVisitorCommand&>(fromMsg)); + documentapi::MapVisitorMessage::UP to(new documentapi::MapVisitorMessage); to->getData() = from.getData(); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::DOCUMENTLIST_ID: { - api::DocumentListCommand& from( - static_cast<api::DocumentListCommand&>(fromMsg)); - documentapi::DocumentListMessage::UP to( - new documentapi::DocumentListMessage(from.getBucketId())); + api::DocumentListCommand& from(static_cast<api::DocumentListCommand&>(fromMsg)); + documentapi::DocumentListMessage::UP to(new documentapi::DocumentListMessage(from.getBucketId())); for (uint32_t i = 0; i < from.getDocuments().size(); i++) { to->getDocuments().push_back( @@ -363,28 +294,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, from.getDocuments()[i]._doc, from.getDocuments()[i]._removeEntry)); } - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::EMPTYBUCKETS_ID: { - api::EmptyBucketsCommand& from( - static_cast<api::EmptyBucketsCommand&>(fromMsg)); - std::unique_ptr<documentapi::EmptyBucketsMessage> to( - new documentapi::EmptyBucketsMessage(from.getBuckets())); - toMsg.reset(to.release()); + api::EmptyBucketsCommand& from(static_cast<api::EmptyBucketsCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::EmptyBucketsMessage>(from.getBuckets()); break; } case api::MessageType::VISITOR_CREATE_ID: { - api::CreateVisitorCommand& from( - static_cast<api::CreateVisitorCommand&>(fromMsg)); + api::CreateVisitorCommand& from(static_cast<api::CreateVisitorCommand&>(fromMsg)); documentapi::CreateVisitorMessage::UP to( - new documentapi::CreateVisitorMessage( - from.getLibraryName(), - from.getInstanceId(), - from.getControlDestination(), - from.getDataDestination())); + new documentapi::CreateVisitorMessage(from.getLibraryName(), from.getInstanceId(), + from.getControlDestination(), from.getDataDestination())); to->setDocumentSelection(from.getDocumentSelection()); to->setMaximumPendingReplyCount(from.getMaximumPendingReplyCount()); to->setParameters(from.getParameters()); @@ -396,27 +320,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, to->getBuckets() = from.getBuckets(); to->setVisitorOrdering(from.getVisitorOrdering()); to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::VISITOR_DESTROY_ID: { - api::DestroyVisitorCommand& from( - static_cast<api::DestroyVisitorCommand&>(fromMsg)); - documentapi::DestroyVisitorMessage::UP to( - new documentapi::DestroyVisitorMessage); + api::DestroyVisitorCommand& from(static_cast<api::DestroyVisitorCommand&>(fromMsg)); + documentapi::DestroyVisitorMessage::UP to(new documentapi::DestroyVisitorMessage); to->setInstanceId(from.getInstanceId()); - toMsg.reset(to.release()); + toMsg = std::move(to); break; } case api::MessageType::STATBUCKET_ID: { - api::StatBucketCommand& from( - static_cast<api::StatBucketCommand&>(fromMsg)); - documentapi::StatBucketMessage::UP to( - new documentapi::StatBucketMessage( - from.getBucketId(), from.getDocumentSelection())); - toMsg.reset(to.release()); + api::StatBucketCommand& from(static_cast<api::StatBucketCommand&>(fromMsg)); + toMsg = std::make_unique<documentapi::StatBucketMessage>(from.getBucketId(), from.getDocumentSelection()); break; } default: @@ -434,13 +352,11 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, } void -DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, - mbus::Reply& toMsg) +DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, mbus::Reply& toMsg) { // First map error codes. if (fromMsg.getResult().failed()) { - mbus::Error error(mbus::Error(fromMsg.getResult().getResult(), - fromMsg.getResult().toString())); + mbus::Error error(mbus::Error(fromMsg.getResult().getResult(), fromMsg.getResult().toString())); toMsg.addError(error); LOG(debug, "Converted storageapi error code %d to %s", fromMsg.getResult().getResult(), error.toString().c_str()); @@ -449,65 +365,49 @@ DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, using documentapi::DocumentProtocol; if (toMsg.getType() == DocumentProtocol::REPLY_GETDOCUMENT) { api::GetReply& from(static_cast<api::GetReply&>(fromMsg)); - documentapi::GetDocumentReply& to( - static_cast<documentapi::GetDocumentReply&>(toMsg)); + documentapi::GetDocumentReply& to(static_cast<documentapi::GetDocumentReply&>(toMsg)); if (from.getDocument().get() != 0) { to.setDocument(from.getDocument()); to.setLastModified(from.getLastModifiedTimestamp()); } } else if (toMsg.getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT) { api::RemoveReply& from(static_cast<api::RemoveReply&>(fromMsg)); - documentapi::RemoveDocumentReply& to( - static_cast<documentapi::RemoveDocumentReply&>(toMsg)); + documentapi::RemoveDocumentReply& to(static_cast<documentapi::RemoveDocumentReply&>(toMsg)); to.setWasFound(from.wasFound()); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_PUTDOCUMENT) { api::PutReply& from(static_cast<api::PutReply&>(fromMsg)); - documentapi::WriteDocumentReply& to( - static_cast<documentapi::WriteDocumentReply&>(toMsg)); + documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg)); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_MULTIOPERATION) { - api::MultiOperationReply& from( - static_cast<api::MultiOperationReply&>(fromMsg)); - documentapi::WriteDocumentReply& to( - static_cast<documentapi::WriteDocumentReply&>(toMsg)); - to.setHighestModificationTimestamp( - from.getHighestModificationTimestamp()); + api::MultiOperationReply& from(static_cast<api::MultiOperationReply&>(fromMsg)); + documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg)); + to.setHighestModificationTimestamp(from.getHighestModificationTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT) { api::UpdateReply& from(static_cast<api::UpdateReply&>(fromMsg)); - documentapi::UpdateDocumentReply& to( - static_cast<documentapi::UpdateDocumentReply&>(toMsg)); + documentapi::UpdateDocumentReply& to(static_cast<documentapi::UpdateDocumentReply&>(toMsg)); to.setWasFound(from.wasFound()); to.setHighestModificationTimestamp(from.getTimestamp()); } else if (toMsg.getType() == DocumentProtocol::REPLY_STATBUCKET) { api::StatBucketReply& from(static_cast<api::StatBucketReply&>(fromMsg)); - documentapi::StatBucketReply& to( - static_cast<documentapi::StatBucketReply&>(toMsg)); + documentapi::StatBucketReply& to(static_cast<documentapi::StatBucketReply&>(toMsg)); to.setResults(from.getResults()); } else if (toMsg.getType() == DocumentProtocol::REPLY_GETBUCKETLIST) { - api::GetBucketListReply& from( - static_cast<api::GetBucketListReply&>(fromMsg)); - documentapi::GetBucketListReply& to( - static_cast<documentapi::GetBucketListReply&>(toMsg)); - const std::vector<api::GetBucketListReply::BucketInfo>& buckets( - from.getBuckets()); + api::GetBucketListReply& from(static_cast<api::GetBucketListReply&>(fromMsg)); + documentapi::GetBucketListReply& to(static_cast<documentapi::GetBucketListReply&>(toMsg)); + const std::vector<api::GetBucketListReply::BucketInfo>& buckets(from.getBuckets()); for (uint32_t i = 0; i < buckets.size(); i++) { to.getBuckets().push_back( - documentapi::GetBucketListReply::BucketInfo( - buckets[i]._bucket, buckets[i]._bucketInformation)); + documentapi::GetBucketListReply::BucketInfo(buckets[i]._bucket, buckets[i]._bucketInformation)); } } else if (toMsg.getType() == DocumentProtocol::REPLY_CREATEVISITOR) { - api::CreateVisitorReply& from( - static_cast<api::CreateVisitorReply&>(fromMsg)); - documentapi::CreateVisitorReply& to( - static_cast<documentapi::CreateVisitorReply&>(toMsg)); + api::CreateVisitorReply& from(static_cast<api::CreateVisitorReply&>(fromMsg)); + documentapi::CreateVisitorReply& to(static_cast<documentapi::CreateVisitorReply&>(toMsg)); to.setLastBucket(from.getLastBucket()); to.setVisitorStatistics(from.getVisitorStatistics()); } else if (toMsg.getType() == DocumentProtocol::REPLY_BATCHDOCUMENTUPDATE) { - api::BatchDocumentUpdateReply& from( - static_cast<api::BatchDocumentUpdateReply&>(fromMsg)); - documentapi::BatchDocumentUpdateReply& to( - static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg)); + api::BatchDocumentUpdateReply& from(static_cast<api::BatchDocumentUpdateReply&>(fromMsg)); + documentapi::BatchDocumentUpdateReply& to(static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg)); to.getDocumentsNotFound() = from.getDocumentsNotFound(); } } diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.h b/storage/src/vespa/storage/storageserver/documentapiconverter.h index bd620f58dc0..f53b538272a 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.h +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.h @@ -1,15 +1,19 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "priorityconverter.h" -#include <vespa/storageapi/messageapi/storagecommand.h> -#include <vespa/storageapi/messageapi/storagereply.h> #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/documentapi/messagebus/messages/documentreply.h> #include <vespa/document/repo/documenttyperepo.h> +namespace config { class ConfigUri; } namespace storage { +namespace api { + class StorageCommand; + class StorageReply; +} + +class PriorityConverter; /** Converts messages from storageapi to documentapi and vice versa. @@ -17,25 +21,16 @@ namespace storage { class DocumentApiConverter { public: - DocumentApiConverter(const config::ConfigUri & configUri) - : _priConverter(configUri) {} - - std::unique_ptr<storage::api::StorageCommand> toStorageAPI( - documentapi::DocumentMessage& msg, - const document::DocumentTypeRepo::SP &repo); - - std::unique_ptr<storage::api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); + DocumentApiConverter(const config::ConfigUri & configUri); + ~DocumentApiConverter(); + std::unique_ptr<api::StorageCommand> toStorageAPI(documentapi::DocumentMessage& msg, const document::DocumentTypeRepo::SP &repo); + std::unique_ptr<api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); void transferReplyState(storage::api::StorageReply& from, mbus::Reply& to); - - std::unique_ptr<mbus::Message> toDocumentAPI( - storage::api::StorageCommand& cmd, - const document::DocumentTypeRepo::SP &repo); - - const PriorityConverter& getPriorityConverter() const { return _priConverter; } + std::unique_ptr<mbus::Message> toDocumentAPI(api::StorageCommand& cmd, const document::DocumentTypeRepo::SP &repo); + const PriorityConverter& getPriorityConverter() const { return *_priConverter; } private: - PriorityConverter _priConverter; + std::unique_ptr<PriorityConverter> _priConverter; }; } // namespace storage - diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 7daf2fb4777..0572d17af4c 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/host_name.h> +#include <vespa/fnet/frt/supervisor.h> #include <sstream> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index ede7be3b9ad..d37b45435ce 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -2,11 +2,13 @@ #include "mergethrottler.h" #include "storagemetricsset.h" -#include <sstream> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/persistence/messages.h> +#include <vespa/messagebus/message.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <sstream> + #include <vespa/log/log.h> LOG_SETUP(".mergethrottler"); diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.cpp b/storage/src/vespa/storage/storageserver/priorityconverter.cpp index c8cf9e5fc29..1ab820c6918 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.cpp +++ b/storage/src/vespa/storage/storageserver/priorityconverter.cpp @@ -2,6 +2,7 @@ #include "priorityconverter.h" #include <vespa/documentapi/messagebus/documentprotocol.h> +#include <vespa/config/subscription/configuri.h> namespace storage { diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.h b/storage/src/vespa/storage/storageserver/priorityconverter.h index d5d2953ea45..0daf5b8c891 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.h +++ b/storage/src/vespa/storage/storageserver/priorityconverter.h @@ -3,11 +3,14 @@ #pragma once #include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/config/config.h> +#include <vespa/config/helper/configfetcher.h> #include <vespa/documentapi/messagebus/priority.h> +#include <vespa/vespalib/util/sync.h> #include <atomic> #include <array> +namespace config {class ConfigUri; } + namespace storage { class PriorityConverter diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 5c74f520cde..b05d159ef08 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -3,19 +3,21 @@ #include "servicelayernode.h" #include "bouncer.h" #include "bucketintegritychecker.h" -#include <vespa/storage/bucketmover/bucketmover.h> #include "communicationmanager.h" #include "changedbucketownershiphandler.h" #include "mergethrottler.h" #include "opslogger.h" #include "statemanager.h" +#include "priorityconverter.h" #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/bucketdb/storagebucketdbinitializer.h> +#include <vespa/storage/bucketmover/bucketmover.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/persistence/spi/exceptions.h> +#include <vespa/messagebus/rpcmessagebus.h> #include <vespa/log/log.h> LOG_SETUP(".node.servicelayer"); @@ -28,8 +30,7 @@ ServiceLayerNode::ServiceLayerNode( ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors) - : StorageNode(configUri, context, generationFetcher, - std::unique_ptr<HostInfo>(new HostInfo)), + : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)), _context(context), _persistenceProvider(persistenceProvider), _partitions(0), @@ -46,19 +47,15 @@ void ServiceLayerNode::init() _init_has_been_called = true; spi::Result initResult(_persistenceProvider.initialize()); if (initResult.hasError()) { - LOG(error, "Failed to initialize persistence provider: %s", - initResult.toString().c_str()); - throw spi::HandledException( - "Failed provider init: " + initResult.toString(), VESPA_STRLOC); + LOG(error, "Failed to initialize persistence provider: %s", initResult.toString().c_str()); + throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC); } spi::PartitionStateListResult result( _persistenceProvider.getPartitionStates()); if (result.hasError()) { - LOG(error, "Failed to get partition list from persistence provider: %s", - result.toString().c_str()); - throw spi::HandledException("Failed to get partition list: " - + result.toString(), VESPA_STRLOC); + LOG(error, "Failed to get partition list from persistence provider: %s", result.toString().c_str()); + throw spi::HandledException("Failed to get partition list: " + result.toString(), VESPA_STRLOC); } _partitions = result.getList(); if (_partitions.size() == 0) { @@ -76,8 +73,7 @@ void ServiceLayerNode::init() LOG(warning, "Network failure: '%s'", e.what()); throw; } catch (const vespalib::Exception & e) { - LOG(error, "Caught exception %s during startup. Calling destruct " - "functions in hopes of dying gracefully.", + LOG(error, "Caught exception %s during startup. Calling destruct functions in hopes of dying gracefully.", e.getMessage().c_str()); requestShutdown("Failed to initialize: " + e.getMessage()); throw; @@ -135,8 +131,7 @@ ServiceLayerNode::initializeNodeSpecific() if (_partitions[i].getState() == spi::PartitionState::UP) { ++usablePartitions; } else { - lib::DiskState diskState(lib::State::DOWN, - _partitions[i].getReason()); + lib::DiskState diskState(lib::State::DOWN, _partitions[i].getReason()); ns.setDiskState(i, diskState); } } @@ -150,8 +145,7 @@ ServiceLayerNode::initializeNodeSpecific() ns.setReliability(_serverConfig->nodeReliability); for (uint16_t i=0; i<_serverConfig->diskCapacity.size(); ++i) { if (i >= ns.getDiskCount()) { - LOG(warning, "Capacity configured for partition %" PRIu64 " but only " - "%u partitions found.", + LOG(warning, "Capacity configured for partition %" PRIu64 " but only %u partitions found.", _serverConfig->diskCapacity.size(), ns.getDiskCount()); continue; } @@ -159,8 +153,7 @@ ServiceLayerNode::initializeNodeSpecific() ds.setCapacity(_serverConfig->diskCapacity[i]); ns.setDiskState(i, ds); } - LOG(debug, "Adjusting reported node state to include partition count and " - "states, capacity and reliability: %s", + LOG(debug, "Adjusting reported node state to include partition count and states, capacity and reliability: %s", ns.toString().c_str()); _component->getStateUpdater().setReportedNodeState(ns); } @@ -180,35 +173,28 @@ ServiceLayerNode::handleLiveConfigUpdate() DIFFERWARN(diskCount, "Cannot alter partition count of node live"); { updated = false; - NodeStateUpdater::Lock::SP lock( - _component->getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_component->getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); if (DIFFER(nodeCapacity)) { - LOG(info, "Live config update: Updating node capacity " - "from %f to %f.", + LOG(info, "Live config update: Updating node capacity from %f to %f.", oldC.nodeCapacity, newC.nodeCapacity); ASSIGN(nodeCapacity); ns.setCapacity(newC.nodeCapacity); } if (DIFFER(diskCapacity)) { - for (uint32_t i=0; - i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i) - { + for (uint32_t i=0; i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i) { if (newC.diskCapacity[i] != oldC.diskCapacity[i]) { lib::DiskState ds(ns.getDiskState(i)); ds.setCapacity(newC.diskCapacity[i]); ns.setDiskState(i, ds); - LOG(info, "Live config update: Disk capacity of " - "disk %u changed from %f to %f.", + LOG(info, "Live config update: Disk capacity of disk %u changed from %f to %f.", i, oldC.diskCapacity[i], newC.diskCapacity[i]); } } ASSIGN(diskCapacity); } if (DIFFER(nodeReliability)) { - LOG(info, "Live config update: Node reliability changed " - "from %u to %u.", + LOG(info, "Live config update: Node reliability changed from %u to %u.", oldC.nodeReliability, newC.nodeReliability); ASSIGN(nodeReliability); ns.setReliability(newC.nodeReliability); @@ -246,16 +232,14 @@ ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread) srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); srcParams.setReplyHandler(*mbusSession); mbusSession->setSourceSession( - _communicationManager->getMessageBus().getMessageBus() - .createSourceSession(srcParams)); + _communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams)); return VisitorMessageSession::UP(std::move(mbusSession)); } documentapi::Priority::Value ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const { - return _communicationManager->getPriorityConverter(). - toDocumentPriority(storagePriority); + return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority); } StorageLink::UP @@ -264,8 +248,7 @@ ServiceLayerNode::createChain() ServiceLayerComponentRegister& compReg(_context.getComponentRegister()); StorageLink::UP chain; - chain.reset(_communicationManager = new CommunicationManager( - compReg, _configUri)); + chain.reset(_communicationManager = new CommunicationManager(compReg, _configUri)); chain->push_back(StorageLink::UP(new Bouncer(compReg, _configUri))); if (_noUsablePartitionMode) { /* @@ -279,8 +262,7 @@ ServiceLayerNode::createChain() chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg))); chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg))); chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg))); - chain->push_back(StorageLink::UP( - new bucketmover::BucketMover(_configUri, compReg))); + chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); chain->push_back(StorageLink::UP(new StorageBucketDBInitializer( _configUri, _partitions, getDoneInitializeHandler(), compReg))); chain->push_back(StorageLink::UP(new BucketManager( diff --git a/vdslib/src/tests/distribution/distributiontest.cpp b/vdslib/src/tests/distribution/distributiontest.cpp index 7e10d61634a..127ba20b0df 100644 --- a/vdslib/src/tests/distribution/distributiontest.cpp +++ b/vdslib/src/tests/distribution/distributiontest.cpp @@ -3,9 +3,6 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/distribution/idealnodecalculator.h> #include <vespa/config/helper/configfetcher.h> -#include <chrono> -#include <thread> -#include <fstream> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/testkit/test_kit.h> @@ -15,8 +12,11 @@ #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/config-stor-distribution.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/config/subscription/configuri.h> #include <vespa/fastos/file.h> - +#include <chrono> +#include <thread> +#include <fstream> namespace storage { namespace lib { diff --git a/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp b/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp index af805d461fc..79d245f6a73 100644 --- a/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp +++ b/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp @@ -1,6 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <boost/tokenizer.hpp> +#include "locator.h" #include <vespa/documentapi/messagebus/documentprotocol.h> #include <vespa/messagebus/configagent.h> #include <vespa/messagebus/iconfighandler.h> @@ -8,9 +8,9 @@ #include <vespa/vdslib/bucketdistribution.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/config/helper/configgetter.hpp> - - -#include "locator.h" +#include <vespa/config/common/exceptions.h> +#include <vespa/config/subscription/configuri.h> +#include <boost/tokenizer.hpp> typedef std::map<std::string, uint32_t> ClusterMap; using namespace config; diff --git a/vespalib/src/vespa/vespalib/trace/tracenode.h b/vespalib/src/vespa/vespalib/trace/tracenode.h index fbb428c43d2..e425ebd564a 100644 --- a/vespalib/src/vespa/vespalib/trace/tracenode.h +++ b/vespalib/src/vespa/vespalib/trace/tracenode.h @@ -1,8 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vector> #include <vespa/vespalib/stllike/string.h> +#include <vector> namespace vespalib { @@ -25,7 +25,7 @@ private: TraceNode *_parent; bool _strict; bool _hasNote; - string _note; + string _note; std::vector<TraceNode> _children; int64_t _timestamp; |