summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-06-08 13:27:29 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-06-08 13:28:18 +0200
commita222a37083bf74b4b2992e91a4ec90921b0cc2bf (patch)
tree1cac7aa9d5b476b4ec03c1e43866baa9b34b2393 /storage
parent25809a32e2a9227d92e355483984e9ba592358f4 (diff)
Only include what you really need
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp2
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp11
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp1
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.h1
-rw-r--r--storage/src/vespa/storage/bucketmover/bucketmover.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.h5
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp5
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h21
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp312
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.h33
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/priorityconverter.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/priorityconverter.h5
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp62
16 files changed, 183 insertions, 290 deletions
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index a661c5c445e..12130db59d1 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -1,12 +1,12 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vector>
#include <vespa/vdstestlib/cppunit/macros.h>
#include <vespa/storage/persistence/messages.h>
#include <tests/persistence/common/persistenceproviderwrapper.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <tests/persistence/common/filestortestfixture.h>
#include <vespa/vespalib/util/barrier.h>
+#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/stllike/hash_set_insert.hpp>
#include <vespa/log/log.h>
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index c01b24aae8d..cf96605b3ce 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -3,6 +3,7 @@
#include <vespa/storage/storageserver/communicationmanager.h>
#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h>
@@ -27,12 +28,10 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
std::shared_ptr<api::StorageCommand> createDummyCommand(
api::StorageMessage::Priority priority)
{
- auto cmd = std::make_shared<api::GetCommand>(
- document::BucketId(0),
- document::DocumentId("doc::mydoc"),
- "[all]");
- cmd->setAddress(api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 1));
+ auto cmd = std::make_shared<api::GetCommand>(document::BucketId(0),
+ document::DocumentId("doc::mydoc"),
+ "[all]");
+ cmd->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1));
cmd->setPriority(priority);
return cmd;
}
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
index 26317465b5a..8858d5433a2 100644
--- a/storage/src/tests/storageserver/documentapiconvertertest.cpp
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -11,6 +11,7 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/testkit/test_kit.h>
using document::DataType;
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h
index 9ebebf73bea..193b6be133f 100644
--- a/storage/src/tests/storageserver/testvisitormessagesession.h
+++ b/storage/src/tests/storageserver/testvisitormessagesession.h
@@ -6,6 +6,7 @@
#include <vespa/storage/visiting/visitorthread.h>
#include <vespa/documentapi/messagebus/messages/documentmessage.h>
#include <vespa/storage/storageserver/priorityconverter.h>
+#include <vespa/config/subscription/configuri.h>
#include <deque>
namespace storage {
diff --git a/storage/src/vespa/storage/bucketmover/bucketmover.cpp b/storage/src/vespa/storage/bucketmover/bucketmover.cpp
index b38c061de44..063050baa3c 100644
--- a/storage/src/vespa/storage/bucketmover/bucketmover.cpp
+++ b/storage/src/vespa/storage/bucketmover/bucketmover.cpp
@@ -7,6 +7,7 @@
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/storageutil/log.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <thread>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".bucketmover");
@@ -176,7 +177,7 @@ BucketMover::sendNewMoves()
// what is happening. (Cannot use wait() here as reply of
// message sent will signal the monitor)
if (_config->operationDelay != 0) {
- FastOS_Thread::Sleep(_config->operationDelay);
+ std::this_thread::sleep_for(std::chrono::milliseconds(_config->operationDelay));
}
}
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp
index c285645309a..71becba40df 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.cpp
+++ b/storage/src/vespa/storage/storageserver/bouncer.cpp
@@ -3,6 +3,8 @@
#include "bouncer.h"
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/message/persistence.h>
+#include <vespa/config/subscription/configuri.h>
+#include <vespa/config/common/exceptions.h>
#include <sstream>
#include <vespa/log/log.h>
diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h
index 8f6706d2cf8..6be465a8aa5 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.h
+++ b/storage/src/vespa/storage/storageserver/bouncer.h
@@ -19,6 +19,8 @@
#include <vespa/storage/config/config-stor-bouncer.h>
#include <vespa/vespalib/util/sync.h>
+namespace config { class ConfigUri; }
+
namespace storage {
class Bouncer : public StorageLink,
@@ -82,6 +84,3 @@ private:
};
} // storage
-
-
-
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index f32b1c242cf..9087482cb42 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -2,12 +2,13 @@
#include "communicationmanager.h"
#include "fnetlistener.h"
#include "rpcrequestwrapper.h"
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
#include <vespa/storageapi/message/state.h>
+#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index c5d17294dd7..6c8923b4c08 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -19,12 +19,20 @@
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageapi/mbusprot/storagecommand.h>
#include <vespa/storageapi/mbusprot/storagereply.h>
-#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/imessagehandler.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/config/helper/configfetcher.h>
#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/config/subscription/configuri.h>
#include <map>
#include <queue>
#include <atomic>
+namespace mbus {
+ class RPCMessageBus;
+ class SourceSession;
+ class DestinationSession;
+}
namespace storage {
class VisitorMbusSession;
@@ -108,8 +116,8 @@ public:
~StorageTransportContext();
std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
- std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg;
- std::unique_ptr<RPCRequestWrapper> _request;
+ std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg;
+ std::unique_ptr<RPCRequestWrapper> _request;
};
class CommunicationManager : public StorageLink,
@@ -142,9 +150,7 @@ private:
= vespa::config::content::core::StorCommunicationmanagerConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
-
void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
-
void receiveStorageReply(const std::shared_ptr<api::StorageReply>&);
void serializeNodeState(
@@ -157,9 +163,8 @@ private:
static const uint64_t FORWARDED_MESSAGE = 0;
std::unique_ptr<mbus::RPCMessageBus> _mbus;
- mbus::DestinationSession::UP _messageBusSession;
- mbus::SourceSession::UP _sourceSession;
- mbus::SourceSession::UP _visitorSourceSession;
+ std::unique_ptr<mbus::DestinationSession> _messageBusSession;
+ std::unique_ptr<mbus::SourceSession> _sourceSession;
uint32_t _count;
vespalib::Lock _messageBusSentLock;
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index ae789891852..2906667d1ee 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -1,5 +1,6 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "documentapiconverter.h"
+#include "priorityconverter.h"
#include <vespa/documentapi/documentapi.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageapi/message/datagram.h>
@@ -11,9 +12,6 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storageapi/message/batch.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/storageapi/messageapi/returncode.h>
-#include <vespa/vdslib/container/documentlist.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/log/log.h>
@@ -21,6 +19,12 @@ LOG_SETUP(".documentapiconverter");
namespace storage {
+DocumentApiConverter::DocumentApiConverter(const config::ConfigUri & configUri)
+ : _priConverter(std::make_unique<PriorityConverter>(configUri))
+{}
+
+DocumentApiConverter::~DocumentApiConverter() {}
+
std::unique_ptr<api::StorageCommand>
DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg,
const document::DocumentTypeRepo::SP &repo)
@@ -31,55 +35,42 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg,
switch (fromMsg.getType()) {
case DocumentProtocol::MESSAGE_PUTDOCUMENT:
{
- documentapi::PutDocumentMessage& from(
- static_cast<documentapi::PutDocumentMessage&>(fromMsg));
- api::PutCommand::UP to(new api::PutCommand(
- document::BucketId(0), from.getDocument(),
- from.getTimestamp()));
+ documentapi::PutDocumentMessage& from(static_cast<documentapi::PutDocumentMessage&>(fromMsg));
+ api::PutCommand::UP to(new api::PutCommand(document::BucketId(0), from.getDocument(), from.getTimestamp()));
to->setCondition(from.getCondition());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
{
- documentapi::UpdateDocumentMessage& from(
- static_cast<documentapi::UpdateDocumentMessage&>(fromMsg));
- api::UpdateCommand::UP to(new api::UpdateCommand(
- document::BucketId(0), from.getDocumentUpdate(),
- from.getNewTimestamp()));
+ documentapi::UpdateDocumentMessage& from(static_cast<documentapi::UpdateDocumentMessage&>(fromMsg));
+ api::UpdateCommand::UP to(new api::UpdateCommand(document::BucketId(0), from.getDocumentUpdate(),
+ from.getNewTimestamp()));
to->setOldTimestamp(from.getOldTimestamp());
to->setCondition(from.getCondition());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
{
- documentapi::RemoveDocumentMessage& from(
- static_cast<documentapi::RemoveDocumentMessage&>(fromMsg));
- api::RemoveCommand::UP to(new api::RemoveCommand(
- document::BucketId(0), from.getDocumentId(), 0));
+ documentapi::RemoveDocumentMessage& from(static_cast<documentapi::RemoveDocumentMessage&>(fromMsg));
+ api::RemoveCommand::UP to(new api::RemoveCommand(document::BucketId(0), from.getDocumentId(), 0));
to->setCondition(from.getCondition());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case DocumentProtocol::MESSAGE_GETDOCUMENT:
{
- documentapi::GetDocumentMessage& from(
- static_cast<documentapi::GetDocumentMessage&>(fromMsg));
- api::GetCommand::UP to(new api::GetCommand(
- document::BucketId(0), from.getDocumentId(),
- from.getFieldSet()));
+ documentapi::GetDocumentMessage& from(static_cast<documentapi::GetDocumentMessage&>(fromMsg));
+ api::GetCommand::UP to(new api::GetCommand(document::BucketId(0), from.getDocumentId(), from.getFieldSet()));
toMsg.reset(to.release());
break;
}
case DocumentProtocol::MESSAGE_CREATEVISITOR:
{
- documentapi::CreateVisitorMessage& from(
- static_cast<documentapi::CreateVisitorMessage&>(fromMsg));
- api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand(
- from.getLibraryName(),
- from.getInstanceId(),
- from.getDocumentSelection()));
+ documentapi::CreateVisitorMessage& from(static_cast<documentapi::CreateVisitorMessage&>(fromMsg));
+ api::CreateVisitorCommand::UP to(new api::CreateVisitorCommand(from.getLibraryName(), from.getInstanceId(),
+ from.getDocumentSelection()));
to->setControlDestination(from.getControlDestination());
to->setDataDestination(from.getDataDestination());
@@ -94,76 +85,57 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg,
to->setVisitorDispatcherVersion(from.getVisitorDispatcherVersion());
to->setVisitorOrdering(from.getVisitorOrdering());
to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case DocumentProtocol::MESSAGE_DESTROYVISITOR:
{
- documentapi::DestroyVisitorMessage& from(
- static_cast<documentapi::DestroyVisitorMessage&>(fromMsg));
- api::DestroyVisitorCommand::UP to(new api::DestroyVisitorCommand(
- from.getInstanceId()));
- toMsg.reset(to.release());
+ documentapi::DestroyVisitorMessage& from(static_cast<documentapi::DestroyVisitorMessage&>(fromMsg));
+ toMsg = std::make_unique<api::DestroyVisitorCommand>(from.getInstanceId());
break;
}
case DocumentProtocol::MESSAGE_MULTIOPERATION:
{
- documentapi::MultiOperationMessage& from(
- static_cast<documentapi::MultiOperationMessage&>(fromMsg));
- api::MultiOperationCommand::UP to(new api::MultiOperationCommand(repo,
- from.getBucketId(), from.getBuffer(),
- from.keepTimeStamps()));
- toMsg.reset(to.release());
+ documentapi::MultiOperationMessage& from(static_cast<documentapi::MultiOperationMessage&>(fromMsg));
+ toMsg = std::make_unique<api::MultiOperationCommand>(repo, from.getBucketId(), from.getBuffer(),
+ from.keepTimeStamps());
break;
}
case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE:
{
- documentapi::BatchDocumentUpdateMessage& from(
- static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg));
- api::BatchDocumentUpdateCommand::UP to(
- new api::BatchDocumentUpdateCommand(from.getUpdates()));
- toMsg.reset(to.release());
+ documentapi::BatchDocumentUpdateMessage& from(static_cast<documentapi::BatchDocumentUpdateMessage&>(fromMsg));
+ toMsg = std::make_unique<api::BatchDocumentUpdateCommand>(from.getUpdates());
break;
}
case DocumentProtocol::MESSAGE_STATBUCKET:
{
- documentapi::StatBucketMessage& from(
- static_cast<documentapi::StatBucketMessage&>(fromMsg));
- api::StatBucketCommand::UP to(new api::StatBucketCommand(
- from.getBucketId(), from.getDocumentSelection()));
- toMsg.reset(to.release());
+ documentapi::StatBucketMessage& from(static_cast<documentapi::StatBucketMessage&>(fromMsg));
+ toMsg = std::make_unique<api::StatBucketCommand>(from.getBucketId(), from.getDocumentSelection());
break;
}
case DocumentProtocol::MESSAGE_GETBUCKETLIST:
{
- documentapi::GetBucketListMessage& from(
- static_cast<documentapi::GetBucketListMessage&>(fromMsg));
- api::GetBucketListCommand::UP to(new api::GetBucketListCommand(
- from.getBucketId()));
- toMsg.reset(to.release());
+ documentapi::GetBucketListMessage& from(static_cast<documentapi::GetBucketListMessage&>(fromMsg));
+ toMsg = std::make_unique<api::GetBucketListCommand>(from.getBucketId());
break;
}
case DocumentProtocol::MESSAGE_VISITORINFO:
{
- documentapi::VisitorInfoMessage& from(
- static_cast<documentapi::VisitorInfoMessage&>(fromMsg));
+ documentapi::VisitorInfoMessage& from(static_cast<documentapi::VisitorInfoMessage&>(fromMsg));
api::VisitorInfoCommand::UP to(new api::VisitorInfoCommand);
for (uint32_t i = 0; i < from.getFinishedBuckets().size(); ++i) {
to->setBucketCompleted(from.getFinishedBuckets()[i], 0);
}
if (!from.getErrorMessage().empty()) {
- to->setErrorCode(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage()));
+ to->setErrorCode(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, from.getErrorMessage()));
}
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case DocumentProtocol::MESSAGE_REMOVELOCATION:
{
- documentapi::RemoveLocationMessage& from(
- static_cast<documentapi::RemoveLocationMessage&>(fromMsg));
- api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand(
- from.getDocumentSelection(), document::BucketId(0)));
+ documentapi::RemoveLocationMessage& from(static_cast<documentapi::RemoveLocationMessage&>(fromMsg));
+ api::RemoveLocationCommand::UP to(new api::RemoveLocationCommand(from.getDocumentSelection(), document::BucketId(0)));
toMsg.reset(to.release());
break;
}
@@ -177,8 +149,7 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg,
timeout = INT_MAX;
}
toMsg->setTimeout(timeout);
- toMsg->setPriority(
- _priConverter.toStoragePriority(fromMsg.getPriority()));
+ toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority()));
toMsg->setLoadType(fromMsg.getLoadType());
LOG(spam, "Converted command %s, loadtype %d, mapped priority %d to %d",
@@ -193,34 +164,27 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply,
api::StorageCommand& fromCommand)
{
if (LOG_WOULD_LOG(spam)) {
- LOG(spam, "Trace for reply:\n%s",
- fromReply.getTrace().toString().c_str());
+ LOG(spam, "Trace for reply:\n%s", fromReply.getTrace().toString().c_str());
}
std::unique_ptr<api::StorageReply> toMsg;
switch (fromReply.getType()) {
case documentapi::DocumentProtocol::REPLY_CREATEVISITOR:
{
- documentapi::CreateVisitorReply& fromRep(
- static_cast<documentapi::CreateVisitorReply&>(fromReply));
- const api::CreateVisitorCommand& fromCmd(
- static_cast<const api::CreateVisitorCommand&>(fromCommand));
+ documentapi::CreateVisitorReply& fromRep(static_cast<documentapi::CreateVisitorReply&>(fromReply));
+ const api::CreateVisitorCommand& fromCmd(static_cast<const api::CreateVisitorCommand&>(fromCommand));
api::CreateVisitorReply::UP to(new api::CreateVisitorReply(fromCmd));
to->setVisitorStatistics(fromRep.getVisitorStatistics());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case documentapi::DocumentProtocol::REPLY_STATBUCKET:
{
- documentapi::StatBucketReply& fromRep(
- static_cast<documentapi::StatBucketReply&>(fromReply));
- const api::StatBucketCommand& fromCmd(
- static_cast<const api::StatBucketCommand&>(fromCommand));
+ documentapi::StatBucketReply& fromRep(static_cast<documentapi::StatBucketReply&>(fromReply));
+ const api::StatBucketCommand& fromCmd(static_cast<const api::StatBucketCommand&>(fromCommand));
- api::StatBucketReply::UP to(
- new api::StatBucketReply(fromCmd, fromRep.getResults()));
- toMsg.reset(to.release());
+ toMsg = std::make_unique<api::StatBucketReply>(fromCmd, fromRep.getResults());
break;
}
default:
@@ -230,131 +194,98 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentReply& fromReply,
if (toMsg.get()) {
if (fromReply.hasErrors()) {
- toMsg->setResult(api::ReturnCode(
- (api::ReturnCode::Result) fromReply.getError(0).getCode(),
- fromReply.getError(0).getMessage()));
- toMsg->setPriority(
- _priConverter.toStoragePriority(fromReply.getPriority()));
+ toMsg->setResult(api::ReturnCode((api::ReturnCode::Result) fromReply.getError(0).getCode(),
+ fromReply.getError(0).getMessage()));
+ toMsg->setPriority(_priConverter->toStoragePriority(fromReply.getPriority()));
}
}
return std::move(toMsg);
}
std::unique_ptr<mbus::Message>
-DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg,
- const document::DocumentTypeRepo::SP &repo)
+DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg, const document::DocumentTypeRepo::SP &repo)
{
std::unique_ptr<mbus::Message> toMsg;
switch (fromMsg.getType().getId()) {
case api::MessageType::PUT_ID:
{
api::PutCommand& from(static_cast<api::PutCommand&>(fromMsg));
- documentapi::PutDocumentMessage::UP to(
- new documentapi::PutDocumentMessage(from.getDocument()));
+ documentapi::PutDocumentMessage::UP to(new documentapi::PutDocumentMessage(from.getDocument()));
to->setTimestamp(from.getTimestamp());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::UPDATE_ID:
{
api::UpdateCommand& from(static_cast<api::UpdateCommand&>(fromMsg));
- documentapi::UpdateDocumentMessage::UP to(
- new documentapi::UpdateDocumentMessage(from.getUpdate()));
+ documentapi::UpdateDocumentMessage::UP to(new documentapi::UpdateDocumentMessage(from.getUpdate()));
to->setOldTimestamp(from.getOldTimestamp());
to->setNewTimestamp(from.getTimestamp());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::REMOVE_ID:
{
api::RemoveCommand& from(static_cast<api::RemoveCommand&>(fromMsg));
- documentapi::RemoveDocumentMessage::UP to(
- new documentapi::RemoveDocumentMessage(from.getDocumentId()));
- toMsg.reset(to.release());
+ toMsg = std::make_unique<documentapi::RemoveDocumentMessage>(from.getDocumentId());
break;
}
case api::MessageType::VISITOR_INFO_ID:
{
- api::VisitorInfoCommand& from(
- static_cast<api::VisitorInfoCommand&>(fromMsg));
- documentapi::VisitorInfoMessage::UP to(
- new documentapi::VisitorInfoMessage);
+ api::VisitorInfoCommand& from(static_cast<api::VisitorInfoCommand&>(fromMsg));
+ documentapi::VisitorInfoMessage::UP to(new documentapi::VisitorInfoMessage);
for (uint32_t i = 0; i < from.getCompletedBucketsList().size(); ++i) {
- to->getFinishedBuckets().push_back(
- from.getCompletedBucketsList()[i].bucketId);
+ to->getFinishedBuckets().push_back(from.getCompletedBucketsList()[i].bucketId);
}
to->setErrorMessage(from.getErrorCode().getMessage());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::DOCBLOCK_ID:
{
api::DocBlockCommand& from(static_cast<api::DocBlockCommand&>(fromMsg));
- documentapi::MultiOperationMessage::UP to(
- new documentapi::MultiOperationMessage(
- from.getBucketId(),
- from.getDocumentBlock(),
- from.keepTimeStamps()));
- toMsg.reset(to.release());
+ toMsg = std::make_unique<documentapi::MultiOperationMessage>(from.getBucketId(), from.getDocumentBlock(),
+ from.keepTimeStamps());
break;
}
case api::MessageType::SEARCHRESULT_ID:
{
- api::SearchResultCommand& from(
- static_cast<api::SearchResultCommand&>(fromMsg));
- documentapi::SearchResultMessage::UP to(
- new documentapi::SearchResultMessage(from));
- toMsg.reset(to.release());
+ api::SearchResultCommand& from(static_cast<api::SearchResultCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::SearchResultMessage>(from);
break;
}
case api::MessageType::QUERYRESULT_ID:
{
- api::QueryResultCommand& from(
- static_cast<api::QueryResultCommand&>(fromMsg));
- documentapi::QueryResultMessage::UP to(
- new documentapi::QueryResultMessage(
- from.getSearchResult(), from.getDocumentSummary()));
- toMsg.reset(to.release());
+ api::QueryResultCommand& from(static_cast<api::QueryResultCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::QueryResultMessage>(from.getSearchResult(), from.getDocumentSummary());
break;
}
case api::MessageType::DOCUMENTSUMMARY_ID:
{
- api::DocumentSummaryCommand& from(
- static_cast<api::DocumentSummaryCommand&>(fromMsg));
- documentapi::DocumentSummaryMessage::UP to(
- new documentapi::DocumentSummaryMessage(from));
- toMsg.reset(to.release());
+ api::DocumentSummaryCommand& from(static_cast<api::DocumentSummaryCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::DocumentSummaryMessage>(from);
break;
}
case api::MessageType::MULTIOPERATION_ID:
{
- api::MultiOperationCommand& from(
- static_cast<api::MultiOperationCommand&>(fromMsg));
- documentapi::MultiOperationMessage::UP to(
- new documentapi::MultiOperationMessage(repo,
- from.getBucketId(),
- from.getBuffer(),
- from.keepTimeStamps()));
- toMsg.reset(to.release());
+ api::MultiOperationCommand& from(static_cast<api::MultiOperationCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::MultiOperationMessage>(repo, from.getBucketId(), from.getBuffer(),
+ from.keepTimeStamps());
break;
}
case api::MessageType::MAPVISITOR_ID:
{
- api::MapVisitorCommand& from(
- static_cast<api::MapVisitorCommand&>(fromMsg));
- documentapi::MapVisitorMessage::UP to(
- new documentapi::MapVisitorMessage);
+ api::MapVisitorCommand& from(static_cast<api::MapVisitorCommand&>(fromMsg));
+ documentapi::MapVisitorMessage::UP to(new documentapi::MapVisitorMessage);
to->getData() = from.getData();
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::DOCUMENTLIST_ID:
{
- api::DocumentListCommand& from(
- static_cast<api::DocumentListCommand&>(fromMsg));
- documentapi::DocumentListMessage::UP to(
- new documentapi::DocumentListMessage(from.getBucketId()));
+ api::DocumentListCommand& from(static_cast<api::DocumentListCommand&>(fromMsg));
+ documentapi::DocumentListMessage::UP to(new documentapi::DocumentListMessage(from.getBucketId()));
for (uint32_t i = 0; i < from.getDocuments().size(); i++) {
to->getDocuments().push_back(
@@ -363,28 +294,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg,
from.getDocuments()[i]._doc,
from.getDocuments()[i]._removeEntry));
}
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::EMPTYBUCKETS_ID:
{
- api::EmptyBucketsCommand& from(
- static_cast<api::EmptyBucketsCommand&>(fromMsg));
- std::unique_ptr<documentapi::EmptyBucketsMessage> to(
- new documentapi::EmptyBucketsMessage(from.getBuckets()));
- toMsg.reset(to.release());
+ api::EmptyBucketsCommand& from(static_cast<api::EmptyBucketsCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::EmptyBucketsMessage>(from.getBuckets());
break;
}
case api::MessageType::VISITOR_CREATE_ID:
{
- api::CreateVisitorCommand& from(
- static_cast<api::CreateVisitorCommand&>(fromMsg));
+ api::CreateVisitorCommand& from(static_cast<api::CreateVisitorCommand&>(fromMsg));
documentapi::CreateVisitorMessage::UP to(
- new documentapi::CreateVisitorMessage(
- from.getLibraryName(),
- from.getInstanceId(),
- from.getControlDestination(),
- from.getDataDestination()));
+ new documentapi::CreateVisitorMessage(from.getLibraryName(), from.getInstanceId(),
+ from.getControlDestination(), from.getDataDestination()));
to->setDocumentSelection(from.getDocumentSelection());
to->setMaximumPendingReplyCount(from.getMaximumPendingReplyCount());
to->setParameters(from.getParameters());
@@ -396,27 +320,21 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg,
to->getBuckets() = from.getBuckets();
to->setVisitorOrdering(from.getVisitorOrdering());
to->setMaxBucketsPerVisitor(from.getMaxBucketsPerVisitor());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::VISITOR_DESTROY_ID:
{
- api::DestroyVisitorCommand& from(
- static_cast<api::DestroyVisitorCommand&>(fromMsg));
- documentapi::DestroyVisitorMessage::UP to(
- new documentapi::DestroyVisitorMessage);
+ api::DestroyVisitorCommand& from(static_cast<api::DestroyVisitorCommand&>(fromMsg));
+ documentapi::DestroyVisitorMessage::UP to(new documentapi::DestroyVisitorMessage);
to->setInstanceId(from.getInstanceId());
- toMsg.reset(to.release());
+ toMsg = std::move(to);
break;
}
case api::MessageType::STATBUCKET_ID:
{
- api::StatBucketCommand& from(
- static_cast<api::StatBucketCommand&>(fromMsg));
- documentapi::StatBucketMessage::UP to(
- new documentapi::StatBucketMessage(
- from.getBucketId(), from.getDocumentSelection()));
- toMsg.reset(to.release());
+ api::StatBucketCommand& from(static_cast<api::StatBucketCommand&>(fromMsg));
+ toMsg = std::make_unique<documentapi::StatBucketMessage>(from.getBucketId(), from.getDocumentSelection());
break;
}
default:
@@ -434,13 +352,11 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg,
}
void
-DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg,
- mbus::Reply& toMsg)
+DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg, mbus::Reply& toMsg)
{
// First map error codes.
if (fromMsg.getResult().failed()) {
- mbus::Error error(mbus::Error(fromMsg.getResult().getResult(),
- fromMsg.getResult().toString()));
+ mbus::Error error(mbus::Error(fromMsg.getResult().getResult(), fromMsg.getResult().toString()));
toMsg.addError(error);
LOG(debug, "Converted storageapi error code %d to %s",
fromMsg.getResult().getResult(), error.toString().c_str());
@@ -449,65 +365,49 @@ DocumentApiConverter::transferReplyState(api::StorageReply& fromMsg,
using documentapi::DocumentProtocol;
if (toMsg.getType() == DocumentProtocol::REPLY_GETDOCUMENT) {
api::GetReply& from(static_cast<api::GetReply&>(fromMsg));
- documentapi::GetDocumentReply& to(
- static_cast<documentapi::GetDocumentReply&>(toMsg));
+ documentapi::GetDocumentReply& to(static_cast<documentapi::GetDocumentReply&>(toMsg));
if (from.getDocument().get() != 0) {
to.setDocument(from.getDocument());
to.setLastModified(from.getLastModifiedTimestamp());
}
} else if (toMsg.getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT) {
api::RemoveReply& from(static_cast<api::RemoveReply&>(fromMsg));
- documentapi::RemoveDocumentReply& to(
- static_cast<documentapi::RemoveDocumentReply&>(toMsg));
+ documentapi::RemoveDocumentReply& to(static_cast<documentapi::RemoveDocumentReply&>(toMsg));
to.setWasFound(from.wasFound());
to.setHighestModificationTimestamp(from.getTimestamp());
} else if (toMsg.getType() == DocumentProtocol::REPLY_PUTDOCUMENT) {
api::PutReply& from(static_cast<api::PutReply&>(fromMsg));
- documentapi::WriteDocumentReply& to(
- static_cast<documentapi::WriteDocumentReply&>(toMsg));
+ documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg));
to.setHighestModificationTimestamp(from.getTimestamp());
} else if (toMsg.getType() == DocumentProtocol::REPLY_MULTIOPERATION) {
- api::MultiOperationReply& from(
- static_cast<api::MultiOperationReply&>(fromMsg));
- documentapi::WriteDocumentReply& to(
- static_cast<documentapi::WriteDocumentReply&>(toMsg));
- to.setHighestModificationTimestamp(
- from.getHighestModificationTimestamp());
+ api::MultiOperationReply& from(static_cast<api::MultiOperationReply&>(fromMsg));
+ documentapi::WriteDocumentReply& to(static_cast<documentapi::WriteDocumentReply&>(toMsg));
+ to.setHighestModificationTimestamp(from.getHighestModificationTimestamp());
} else if (toMsg.getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT) {
api::UpdateReply& from(static_cast<api::UpdateReply&>(fromMsg));
- documentapi::UpdateDocumentReply& to(
- static_cast<documentapi::UpdateDocumentReply&>(toMsg));
+ documentapi::UpdateDocumentReply& to(static_cast<documentapi::UpdateDocumentReply&>(toMsg));
to.setWasFound(from.wasFound());
to.setHighestModificationTimestamp(from.getTimestamp());
} else if (toMsg.getType() == DocumentProtocol::REPLY_STATBUCKET) {
api::StatBucketReply& from(static_cast<api::StatBucketReply&>(fromMsg));
- documentapi::StatBucketReply& to(
- static_cast<documentapi::StatBucketReply&>(toMsg));
+ documentapi::StatBucketReply& to(static_cast<documentapi::StatBucketReply&>(toMsg));
to.setResults(from.getResults());
} else if (toMsg.getType() == DocumentProtocol::REPLY_GETBUCKETLIST) {
- api::GetBucketListReply& from(
- static_cast<api::GetBucketListReply&>(fromMsg));
- documentapi::GetBucketListReply& to(
- static_cast<documentapi::GetBucketListReply&>(toMsg));
- const std::vector<api::GetBucketListReply::BucketInfo>& buckets(
- from.getBuckets());
+ api::GetBucketListReply& from(static_cast<api::GetBucketListReply&>(fromMsg));
+ documentapi::GetBucketListReply& to(static_cast<documentapi::GetBucketListReply&>(toMsg));
+ const std::vector<api::GetBucketListReply::BucketInfo>& buckets(from.getBuckets());
for (uint32_t i = 0; i < buckets.size(); i++) {
to.getBuckets().push_back(
- documentapi::GetBucketListReply::BucketInfo(
- buckets[i]._bucket, buckets[i]._bucketInformation));
+ documentapi::GetBucketListReply::BucketInfo(buckets[i]._bucket, buckets[i]._bucketInformation));
}
} else if (toMsg.getType() == DocumentProtocol::REPLY_CREATEVISITOR) {
- api::CreateVisitorReply& from(
- static_cast<api::CreateVisitorReply&>(fromMsg));
- documentapi::CreateVisitorReply& to(
- static_cast<documentapi::CreateVisitorReply&>(toMsg));
+ api::CreateVisitorReply& from(static_cast<api::CreateVisitorReply&>(fromMsg));
+ documentapi::CreateVisitorReply& to(static_cast<documentapi::CreateVisitorReply&>(toMsg));
to.setLastBucket(from.getLastBucket());
to.setVisitorStatistics(from.getVisitorStatistics());
} else if (toMsg.getType() == DocumentProtocol::REPLY_BATCHDOCUMENTUPDATE) {
- api::BatchDocumentUpdateReply& from(
- static_cast<api::BatchDocumentUpdateReply&>(fromMsg));
- documentapi::BatchDocumentUpdateReply& to(
- static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg));
+ api::BatchDocumentUpdateReply& from(static_cast<api::BatchDocumentUpdateReply&>(fromMsg));
+ documentapi::BatchDocumentUpdateReply& to(static_cast<documentapi::BatchDocumentUpdateReply&>(toMsg));
to.getDocumentsNotFound() = from.getDocumentsNotFound();
}
}
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.h b/storage/src/vespa/storage/storageserver/documentapiconverter.h
index bd620f58dc0..f53b538272a 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.h
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.h
@@ -1,15 +1,19 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "priorityconverter.h"
-#include <vespa/storageapi/messageapi/storagecommand.h>
-#include <vespa/storageapi/messageapi/storagereply.h>
#include <vespa/documentapi/messagebus/messages/documentmessage.h>
#include <vespa/documentapi/messagebus/messages/documentreply.h>
#include <vespa/document/repo/documenttyperepo.h>
+namespace config { class ConfigUri; }
namespace storage {
+namespace api {
+ class StorageCommand;
+ class StorageReply;
+}
+
+class PriorityConverter;
/**
Converts messages from storageapi to documentapi and
vice versa.
@@ -17,25 +21,16 @@ namespace storage {
class DocumentApiConverter
{
public:
- DocumentApiConverter(const config::ConfigUri & configUri)
- : _priConverter(configUri) {}
-
- std::unique_ptr<storage::api::StorageCommand> toStorageAPI(
- documentapi::DocumentMessage& msg,
- const document::DocumentTypeRepo::SP &repo);
-
- std::unique_ptr<storage::api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand);
+ DocumentApiConverter(const config::ConfigUri & configUri);
+ ~DocumentApiConverter();
+ std::unique_ptr<api::StorageCommand> toStorageAPI(documentapi::DocumentMessage& msg, const document::DocumentTypeRepo::SP &repo);
+ std::unique_ptr<api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand);
void transferReplyState(storage::api::StorageReply& from, mbus::Reply& to);
-
- std::unique_ptr<mbus::Message> toDocumentAPI(
- storage::api::StorageCommand& cmd,
- const document::DocumentTypeRepo::SP &repo);
-
- const PriorityConverter& getPriorityConverter() const { return _priConverter; }
+ std::unique_ptr<mbus::Message> toDocumentAPI(api::StorageCommand& cmd, const document::DocumentTypeRepo::SP &repo);
+ const PriorityConverter& getPriorityConverter() const { return *_priConverter; }
private:
- PriorityConverter _priConverter;
+ std::unique_ptr<PriorityConverter> _priConverter;
};
} // namespace storage
-
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 7daf2fb4777..0572d17af4c 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -5,6 +5,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
+#include <vespa/fnet/frt/supervisor.h>
#include <sstream>
#include <vespa/log/log.h>
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index ede7be3b9ad..d37b45435ce 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -2,11 +2,13 @@
#include "mergethrottler.h"
#include "storagemetricsset.h"
-#include <sstream>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/messagebus/message.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <sstream>
+
#include <vespa/log/log.h>
LOG_SETUP(".mergethrottler");
diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.cpp b/storage/src/vespa/storage/storageserver/priorityconverter.cpp
index c8cf9e5fc29..1ab820c6918 100644
--- a/storage/src/vespa/storage/storageserver/priorityconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/priorityconverter.cpp
@@ -2,6 +2,7 @@
#include "priorityconverter.h"
#include <vespa/documentapi/messagebus/documentprotocol.h>
+#include <vespa/config/subscription/configuri.h>
namespace storage {
diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.h b/storage/src/vespa/storage/storageserver/priorityconverter.h
index d5d2953ea45..0daf5b8c891 100644
--- a/storage/src/vespa/storage/storageserver/priorityconverter.h
+++ b/storage/src/vespa/storage/storageserver/priorityconverter.h
@@ -3,11 +3,14 @@
#pragma once
#include <vespa/storage/config/config-stor-prioritymapping.h>
-#include <vespa/config/config.h>
+#include <vespa/config/helper/configfetcher.h>
#include <vespa/documentapi/messagebus/priority.h>
+#include <vespa/vespalib/util/sync.h>
#include <atomic>
#include <array>
+namespace config {class ConfigUri; }
+
namespace storage {
class PriorityConverter
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 5c74f520cde..b05d159ef08 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -3,19 +3,21 @@
#include "servicelayernode.h"
#include "bouncer.h"
#include "bucketintegritychecker.h"
-#include <vespa/storage/bucketmover/bucketmover.h>
#include "communicationmanager.h"
#include "changedbucketownershiphandler.h"
#include "mergethrottler.h"
#include "opslogger.h"
#include "statemanager.h"
+#include "priorityconverter.h"
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/bucketdb/storagebucketdbinitializer.h>
+#include <vespa/storage/bucketmover/bucketmover.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/persistence/spi/exceptions.h>
+#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/log/log.h>
LOG_SETUP(".node.servicelayer");
@@ -28,8 +30,7 @@ ServiceLayerNode::ServiceLayerNode(
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
const VisitorFactory::Map& externalVisitors)
- : StorageNode(configUri, context, generationFetcher,
- std::unique_ptr<HostInfo>(new HostInfo)),
+ : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)),
_context(context),
_persistenceProvider(persistenceProvider),
_partitions(0),
@@ -46,19 +47,15 @@ void ServiceLayerNode::init()
_init_has_been_called = true;
spi::Result initResult(_persistenceProvider.initialize());
if (initResult.hasError()) {
- LOG(error, "Failed to initialize persistence provider: %s",
- initResult.toString().c_str());
- throw spi::HandledException(
- "Failed provider init: " + initResult.toString(), VESPA_STRLOC);
+ LOG(error, "Failed to initialize persistence provider: %s", initResult.toString().c_str());
+ throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC);
}
spi::PartitionStateListResult result(
_persistenceProvider.getPartitionStates());
if (result.hasError()) {
- LOG(error, "Failed to get partition list from persistence provider: %s",
- result.toString().c_str());
- throw spi::HandledException("Failed to get partition list: "
- + result.toString(), VESPA_STRLOC);
+ LOG(error, "Failed to get partition list from persistence provider: %s", result.toString().c_str());
+ throw spi::HandledException("Failed to get partition list: " + result.toString(), VESPA_STRLOC);
}
_partitions = result.getList();
if (_partitions.size() == 0) {
@@ -76,8 +73,7 @@ void ServiceLayerNode::init()
LOG(warning, "Network failure: '%s'", e.what());
throw;
} catch (const vespalib::Exception & e) {
- LOG(error, "Caught exception %s during startup. Calling destruct "
- "functions in hopes of dying gracefully.",
+ LOG(error, "Caught exception %s during startup. Calling destruct functions in hopes of dying gracefully.",
e.getMessage().c_str());
requestShutdown("Failed to initialize: " + e.getMessage());
throw;
@@ -135,8 +131,7 @@ ServiceLayerNode::initializeNodeSpecific()
if (_partitions[i].getState() == spi::PartitionState::UP) {
++usablePartitions;
} else {
- lib::DiskState diskState(lib::State::DOWN,
- _partitions[i].getReason());
+ lib::DiskState diskState(lib::State::DOWN, _partitions[i].getReason());
ns.setDiskState(i, diskState);
}
}
@@ -150,8 +145,7 @@ ServiceLayerNode::initializeNodeSpecific()
ns.setReliability(_serverConfig->nodeReliability);
for (uint16_t i=0; i<_serverConfig->diskCapacity.size(); ++i) {
if (i >= ns.getDiskCount()) {
- LOG(warning, "Capacity configured for partition %" PRIu64 " but only "
- "%u partitions found.",
+ LOG(warning, "Capacity configured for partition %" PRIu64 " but only %u partitions found.",
_serverConfig->diskCapacity.size(), ns.getDiskCount());
continue;
}
@@ -159,8 +153,7 @@ ServiceLayerNode::initializeNodeSpecific()
ds.setCapacity(_serverConfig->diskCapacity[i]);
ns.setDiskState(i, ds);
}
- LOG(debug, "Adjusting reported node state to include partition count and "
- "states, capacity and reliability: %s",
+ LOG(debug, "Adjusting reported node state to include partition count and states, capacity and reliability: %s",
ns.toString().c_str());
_component->getStateUpdater().setReportedNodeState(ns);
}
@@ -180,35 +173,28 @@ ServiceLayerNode::handleLiveConfigUpdate()
DIFFERWARN(diskCount, "Cannot alter partition count of node live");
{
updated = false;
- NodeStateUpdater::Lock::SP lock(
- _component->getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(
- *_component->getStateUpdater().getReportedNodeState());
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
if (DIFFER(nodeCapacity)) {
- LOG(info, "Live config update: Updating node capacity "
- "from %f to %f.",
+ LOG(info, "Live config update: Updating node capacity from %f to %f.",
oldC.nodeCapacity, newC.nodeCapacity);
ASSIGN(nodeCapacity);
ns.setCapacity(newC.nodeCapacity);
}
if (DIFFER(diskCapacity)) {
- for (uint32_t i=0;
- i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i)
- {
+ for (uint32_t i=0; i<newC.diskCapacity.size() && i<ns.getDiskCount(); ++i) {
if (newC.diskCapacity[i] != oldC.diskCapacity[i]) {
lib::DiskState ds(ns.getDiskState(i));
ds.setCapacity(newC.diskCapacity[i]);
ns.setDiskState(i, ds);
- LOG(info, "Live config update: Disk capacity of "
- "disk %u changed from %f to %f.",
+ LOG(info, "Live config update: Disk capacity of disk %u changed from %f to %f.",
i, oldC.diskCapacity[i], newC.diskCapacity[i]);
}
}
ASSIGN(diskCapacity);
}
if (DIFFER(nodeReliability)) {
- LOG(info, "Live config update: Node reliability changed "
- "from %u to %u.",
+ LOG(info, "Live config update: Node reliability changed from %u to %u.",
oldC.nodeReliability, newC.nodeReliability);
ASSIGN(nodeReliability);
ns.setReliability(newC.nodeReliability);
@@ -246,16 +232,14 @@ ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread)
srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
srcParams.setReplyHandler(*mbusSession);
mbusSession->setSourceSession(
- _communicationManager->getMessageBus().getMessageBus()
- .createSourceSession(srcParams));
+ _communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams));
return VisitorMessageSession::UP(std::move(mbusSession));
}
documentapi::Priority::Value
ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const
{
- return _communicationManager->getPriorityConverter().
- toDocumentPriority(storagePriority);
+ return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority);
}
StorageLink::UP
@@ -264,8 +248,7 @@ ServiceLayerNode::createChain()
ServiceLayerComponentRegister& compReg(_context.getComponentRegister());
StorageLink::UP chain;
- chain.reset(_communicationManager = new CommunicationManager(
- compReg, _configUri));
+ chain.reset(_communicationManager = new CommunicationManager(compReg, _configUri));
chain->push_back(StorageLink::UP(new Bouncer(compReg, _configUri)));
if (_noUsablePartitionMode) {
/*
@@ -279,8 +262,7 @@ ServiceLayerNode::createChain()
chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg)));
chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg)));
chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg)));
- chain->push_back(StorageLink::UP(
- new bucketmover::BucketMover(_configUri, compReg)));
+ chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg)));
chain->push_back(StorageLink::UP(new StorageBucketDBInitializer(
_configUri, _partitions, getDoneInitializeHandler(), compReg)));
chain->push_back(StorageLink::UP(new BucketManager(