summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-03-02 10:00:59 +0000
committerTor Brede Vekterli <vekterli@oath.com>2018-03-06 14:31:03 +0000
commitd3cd3115fc79ed62774c21684b7d39e45c484ff2 (patch)
treeca484557b1daf558ce4ec49a71d4750e8774d79e
parent0b7c34053ae921722ce103edc61487e017a87bf7 (diff)
Add handling of v3 setDistributionStates RPC from cluster controller
ClusterStateBundle is populated with derived states, but these are not yet exposed via the bucket space getter function.
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp194
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h28
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h17
-rw-r--r--storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h19
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp74
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h24
-rw-r--r--storage/src/vespa/storage/storageserver/message_enqueuer.h17
-rw-r--r--storage/src/vespa/storage/storageserver/rpcrequestwrapper.h5
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp125
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h24
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp41
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.h16
14 files changed, 556 insertions, 30 deletions
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index 95faf7e433e..c28a6102b71 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_teststorageserver TEST
communicationmanagertest.cpp
configurable_bucket_resolver_test.cpp
documentapiconvertertest.cpp
+ fnet_listener_test.cpp
mergethrottlertest.cpp
priorityconvertertest.cpp
service_layer_error_listener_test.cpp
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
new file mode 100644
index 00000000000..cc9c424ac28
--- /dev/null
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -0,0 +1,194 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/storageserver/fnetlistener.h>
+#include <vespa/storage/storageserver/message_enqueuer.h>
+#include <vespa/storage/storageserver/rpcrequestwrapper.h>
+#include <vespa/storage/storageserver/slime_cluster_state_bundle_codec.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <tests/common/testhelper.h>
+#include <vector>
+
+namespace storage {
+
+using document::FixedBucketSpaces;
+
+class FNetListenerTest : public CppUnit::TestFixture {
+public:
+ CPPUNIT_TEST_SUITE(FNetListenerTest);
+ CPPUNIT_TEST(baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle);
+ CPPUNIT_TEST(set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle);
+ CPPUNIT_TEST(compressed_bundle_is_transparently_uncompressed);
+ CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
+ CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST_SUITE_END();
+
+ void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
+ void set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle();
+ void compressed_bundle_is_transparently_uncompressed();
+ void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
+ void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
+
+namespace {
+
+struct MockOperationEnqueuer : MessageEnqueuer {
+ std::vector<std::shared_ptr<api::StorageMessage>> _enqueued;
+
+ void enqueue(std::shared_ptr<api::StorageMessage> msg) override {
+ _enqueued.emplace_back(std::move(msg));
+ }
+};
+
+struct DummyReturnHandler : FRT_IReturnHandler {
+ void HandleReturn() override {}
+ FNET_Connection* GetConnection() override { return nullptr; }
+};
+
+struct Fixture {
+ // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig config;
+ MockOperationEnqueuer enqueuer;
+ std::unique_ptr<FNetListener> fnet_listener;
+ SlimeClusterStateBundleCodec codec;
+ DummyReturnHandler return_handler;
+ bool request_is_detached{false};
+ FRT_RPCRequest* bound_request{nullptr};
+
+ Fixture() : config(getStandardConfig(true)) {
+ config.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(config, slobrok);
+ fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
+ }
+
+ ~Fixture() {
+ // Must destroy any associated message contexts that may have refs to FRT_Request
+ // instance _before_ we destroy the request itself.
+ enqueuer._enqueued.clear();
+ if (bound_request) {
+ bound_request->SubRef();
+ }
+ }
+
+ void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(uncompressed_length);
+ const auto buf_len = encoded_bundle._buffer->getDataLen();
+ params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(const lib::ClusterStateBundle& bundle) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ auto encoded_bundle = codec.encode(bundle);
+ bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length);
+ }
+
+ void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::SetSystemStateCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(expectedBundle, state_request.getClusterStateBundle());
+ }
+
+ void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) {
+ create_request(bundle);
+ fnet_listener->RPC_setDistributionStates(bound_request);
+ assert_enqueued_operation_has_bundle(bundle);
+ }
+
+ void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) {
+ fnet_listener->RPC_setDistributionStates(bound_request);
+ CPPUNIT_ASSERT(!request_is_detached);
+ CPPUNIT_ASSERT(bound_request->IsError());
+ CPPUNIT_ASSERT_EQUAL(static_cast<uint32_t>(error_code), bound_request->GetErrorCode());
+ }
+
+ lib::ClusterStateBundle dummy_baseline_bundle() const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
+ }
+};
+
+std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
+ return std::make_shared<const lib::ClusterState>(state);
+}
+
+vespalib::string make_compressable_state_string() {
+ vespalib::asciistream ss;
+ for (int i = 0; i < 99; ++i) {
+ ss << " ." << i << ".s:d";
+ }
+ return vespalib::make_string("version:123 distributor:100%s storage:100%s",
+ ss.str().c_str(), ss.str().c_str());
+}
+
+}
+
+void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
+ Fixture f;
+ auto baseline = f.dummy_baseline_bundle();
+
+ f.assert_request_received_and_propagated(baseline);
+}
+
+void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
+ Fixture f;
+ lib::ClusterStateBundle spaces_bundle(
+ lib::ClusterState("version:123 distributor:3 storage:3"),
+ {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
+ {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}});
+
+ f.assert_request_received_and_propagated(spaces_bundle);
+}
+
+void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
+ Fixture f;
+ auto state_str = make_compressable_state_string();
+ lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
+
+ f.create_request(compressable_bundle);
+ // First verify that the bundle is sent in compressed form
+ CPPUNIT_ASSERT(f.bound_request->GetParams()->GetValue(2)._data._len < state_str.size());
+ // Ensure we uncompress it to the original form
+ f.fnet_listener->RPC_setDistributionStates(f.bound_request);
+ f.assert_enqueued_operation_has_bundle(compressable_bundle);
+}
+
+void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
+ Fixture f;
+ f.create_request(f.dummy_baseline_bundle());
+ f.fnet_listener->close();
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
+}
+
+void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
+ Fixture f;
+ auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
+ f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
+}
+
+void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
+ Fixture f;
+ auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
+ f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 0f90a5a8afb..8c9add78fd4 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -20,6 +20,7 @@ vespa_add_library(storage_storageserver
service_layer_error_listener.cpp
servicelayernode.cpp
servicelayernodecontext.cpp
+ slime_cluster_state_bundle_codec.cpp
statemanager.cpp
statereporter.cpp
storagemetricsset.cpp
diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
new file mode 100644
index 00000000000..41c5db9876d
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
@@ -0,0 +1,28 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "encoded_cluster_state_bundle.h"
+
+namespace storage {
+
+namespace lib { class ClusterStateBundle; }
+
+/**
+ * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
+ *
+ * Implementations may choose to compress the encoded representation of the bundle.
+ *
+ * It is important that the input given to decode() is exactly equal to that given from
+ * encode() for the results to be correct. Implementations must ensure that this information
+ * is enough to losslessly reconstruct the full encoded ClusterStateBundle.
+ */
+class ClusterStateBundleCodec {
+public:
+ virtual ~ClusterStateBundleCodec() = default;
+
+ virtual EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const = 0;
+ virtual std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index a29f3c8c3e3..5c7d5812fdb 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -12,6 +12,7 @@
#include "communicationmanagermetrics.h"
#include "documentapiconverter.h"
+#include "message_enqueuer.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
@@ -85,12 +86,14 @@ public:
std::unique_ptr<RPCRequestWrapper> _request;
};
-class CommunicationManager : public StorageLink,
- public framework::Runnable,
- private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
- public mbus::IMessageHandler,
- public mbus::IReplyHandler,
- private framework::MetricUpdateHook
+class CommunicationManager final
+ : public StorageLink,
+ public framework::Runnable,
+ private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
+ public mbus::IMessageHandler,
+ public mbus::IReplyHandler,
+ private framework::MetricUpdateHook,
+ public MessageEnqueuer
{
private:
CommunicationManager(const CommunicationManager&);
@@ -150,7 +153,7 @@ public:
const config::ConfigUri & configUri);
~CommunicationManager();
- void enqueue(std::shared_ptr<api::StorageMessage> msg);
+ void enqueue(std::shared_ptr<api::StorageMessage> msg) override;
mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; }
const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }
diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
new file mode 100644
index 00000000000..6f25a6b67a6
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
@@ -0,0 +1,19 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/util/compressor.h>
+
+namespace storage {
+
+/**
+ * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
+ */
+struct EncodedClusterStateBundle {
+ vespalib::compression::CompressionConfig::Type _compression_type;
+ uint32_t _uncompressed_length;
+ std::unique_ptr<vespalib::DataBuffer> _buffer;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 7a0711c9f7c..f95281a5007 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -2,20 +2,22 @@
#include "fnetlistener.h"
#include "communicationmanager.h"
#include "rpcrequestwrapper.h"
+#include "slime_cluster_state_bundle_codec.h"
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/fnet/frt/supervisor.h>
#include <sstream>
-#include <vespa/log/log.h>
+#include <iostream> // XXXXX
+#include <vespa/log/log.h>
LOG_SETUP(".rpc.listener");
namespace storage {
-FNetListener::FNetListener(CommunicationManager& comManager, const config::ConfigUri & configUri, uint32_t port)
- : _comManager(comManager),
+FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
+ : _messageEnqueuer(messageEnqueuer),
_orb(std::make_unique<FRT_Supervisor>()),
_closed(false),
_slobrokRegister(*_orb, configUri)
@@ -86,6 +88,12 @@ FNetListener::initRPC()
rb.MethodDesc("Set systemstate on this node");
rb.ParamDesc("systemstate", "New systemstate to set");
//-------------------------------------------------------------------------
+ rb.DefineMethod("setdistributionstates", "bix", "", true, FRT_METHOD(FNetListener::RPC_setDistributionStates), this);
+ rb.MethodDesc("Set distribution states for cluster and bucket spaces");
+ rb.ParamDesc("compressionType", "Compression type for payload");
+ rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
+ rb.ParamDesc("payload", "Binary Slime format payload");
+ //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -113,6 +121,13 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req)
return;
}
+void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req) {
+ // Create a request object to avoid needing a separate transport type
+ cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
+ req->Detach();
+ _messageEnqueuer.enqueue(std::move(cmd));
+}
+
void
FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
{
@@ -134,10 +149,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
if (req->GetParams()->GetNumValues() > 2) {
cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32);
}
- // Create a request object to avoid needing a separate transport type
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
- req->Detach();
- _comManager.enqueue(std::move(cmd));
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
}
void
@@ -155,10 +167,50 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req)
auto cmd(std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)));
cmd->setPriority(api::StorageMessage::VERYHIGH);
- // Create a request object to avoid needing a separate transport type
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
- req->Detach();
- _comManager.enqueue(std::move(cmd));
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
+}
+
+namespace {
+
+std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const FRT_Values& params) {
+ const uint32_t uncompressed_length = params[1]._intval32;
+ if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) {
+ throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is "
+ "greater than max size (%u)", uncompressed_length,
+ FNetListener::StateBundleMaxUncompressedSize));
+ }
+ SlimeClusterStateBundleCodec codec;
+ EncodedClusterStateBundle encoded_bundle;
+ encoded_bundle._compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8);
+ encoded_bundle._uncompressed_length = uncompressed_length;
+ // Caution: type cast to const ptr is essential or DataBuffer behavior changes!
+ encoded_bundle._buffer = std::make_unique<vespalib::DataBuffer>(
+ static_cast<const char*>(params[2]._data._buf), params[2]._data._len);
+ return codec.decode(encoded_bundle);
+}
+
+}
+
+void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
+ if (_closed) {
+ LOG(debug, "Not handling RPC call setDistributionStates() as we have closed");
+ req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
+ return;
+ }
+ std::shared_ptr<const lib::ClusterStateBundle> state_bundle;
+ try {
+ state_bundle = decode_bundle_from_params(*req->GetParams());
+ } catch (std::exception& e) {
+ LOG(error, "setDistributionStates RPC failed decoding: %s", e.what());
+ req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
+ return;
+ }
+
+ // TODO add constructor taking in shared_ptr directly instead?
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
+ cmd->setPriority(api::StorageMessage::VERYHIGH);
+
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
}
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index 05d2d275838..bf004a3eb2b 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -2,37 +2,43 @@
#pragma once
#include <vespa/slobrok/sbregister.h>
+#include <atomic>
namespace storage {
-class CommunicationManager;
+namespace api { class StorageMessage; }
+
+class MessageEnqueuer;
class StorageServerInterface;
class FNetListener : public FRT_Invokable
{
public:
- FNetListener(CommunicationManager& comManager,
- const config::ConfigUri & configUri, uint32_t port);
- ~FNetListener();
+ static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16;
+
+ FNetListener(MessageEnqueuer& messageEnqueuer,
+ const config::ConfigUri & configUri,
+ uint32_t port);
+ ~FNetListener() override;
void initRPC();
void RPC_getNodeState2(FRT_RPCRequest *req);
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
+ void RPC_setDistributionStates(FRT_RPCRequest* req);
void registerHandle(const vespalib::stringref & handle);
void close();
int getListenPort() const;
- // Used by unit tests.
- bool serviceExists(const vespalib::stringref & connectionSpec);
-
private:
- CommunicationManager& _comManager;
+ MessageEnqueuer& _messageEnqueuer;
std::unique_ptr<FRT_Supervisor> _orb;
- bool _closed;
+ std::atomic<bool> _closed;
slobrok::api::RegisterAPI _slobrokRegister;
vespalib::string _handle;
+
+ void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
};
}
diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h
new file mode 100644
index 00000000000..921328a054b
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/message_enqueuer.h
@@ -0,0 +1,17 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace storage {
+namespace api { class StorageMessage; }
+
+// Interface for enqueuing a StorageMessage for further processing
+class MessageEnqueuer {
+public:
+ virtual ~MessageEnqueuer() = default;
+ virtual void enqueue(std::shared_ptr<api::StorageMessage> msg) = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
index 40a5470ebb9..8412e911601 100644
--- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
+++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
@@ -12,12 +12,13 @@ namespace storage {
**/
class RPCRequestWrapper {
public:
- enum {
+ enum ErrorCode {
ERR_HANDLE_NOT_CONNECTED = 75000, // > 0xffff
ERR_HANDLE_GONE = 75001,
ERR_REQUEST_DELETED = 75002,
ERR_HANDLE_DISABLED = 75003,
- ERR_NODE_SHUTTING_DOWN = 75004
+ ERR_NODE_SHUTTING_DOWN = 75004,
+ ERR_BAD_REQUEST = 75005
};
RPCRequestWrapper(FRT_RPCRequest *req);
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
new file mode 100644
index 00000000000..5b7e0ab4621
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
@@ -0,0 +1,125 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "slime_cluster_state_bundle_codec.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+using document::FixedBucketSpaces;
+using vespalib::slime::Cursor;
+using vespalib::slime::BinaryFormat;
+using vespalib::DataBuffer;
+using vespalib::ConstBufferRef;
+using vespalib::compression::CompressionConfig;
+using vespalib::compression::decompress;
+using vespalib::compression::compress;
+using vespalib::Memory;
+using namespace vespalib::slime;
+
+namespace storage {
+
+// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp
+namespace {
+class OutputBuf : public vespalib::Output {
+public:
+ explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
+ vespalib::DataBuffer & getBuf() { return _buf; }
+private:
+ vespalib::WritableMemory reserve(size_t bytes) override {
+ _buf.ensureFree(bytes);
+ return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
+ }
+ Output &commit(size_t bytes) override {
+ _buf.moveFreeToData(bytes);
+ return *this;
+ }
+ vespalib::DataBuffer _buf;
+};
+
+vespalib::string serialize_state(const lib::ClusterState& state) {
+ vespalib::asciistream as;
+ state.serialize(as, false);
+ return as.str();
+}
+
+}
+
+// Only used from unit tests; the cluster controller encodes all bundles
+// we decode in practice.
+EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
+ const lib::ClusterStateBundle& bundle) const
+{
+ vespalib::Slime slime;
+ Cursor& root = slime.setObject();
+ Cursor& states = root.setObject("states");
+ states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
+ Cursor& spaces = states.setObject("spaces");
+ for (const auto& sp : bundle.getDerivedClusterStates()) {
+ spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second));
+ }
+
+ OutputBuf out_buf(4096);
+ BinaryFormat::encode(slime, out_buf);
+ ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen());
+ auto buf = std::make_unique<DataBuffer>(vespalib::roundUp2inN(out_buf.getBuf().getDataLen()));
+ auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false);
+
+ EncodedClusterStateBundle encoded_bundle;
+ encoded_bundle._compression_type = actual_type;
+ assert(to_compress.size() <= INT32_MAX);
+ encoded_bundle._uncompressed_length = to_compress.size();
+ encoded_bundle._buffer = std::move(buf);
+ return encoded_bundle;
+}
+
+namespace {
+
+static const Memory StatesField("states");
+static const Memory BaselineField("baseline");
+static const Memory SpacesField("spaces");
+
+struct StateInserter : vespalib::slime::ObjectTraverser {
+ lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
+
+ explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states)
+ : _space_states(space_states) {}
+
+ void field(const Memory& symbol, const Inspector& inspector) override {
+ _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()),
+ std::make_shared<const lib::ClusterState>(inspector.asString().make_string()));
+ }
+};
+
+}
+
+std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::decode(
+ const EncodedClusterStateBundle& encoded_bundle) const
+{
+ ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen());
+ DataBuffer uncompressed;
+ decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length,
+ blob, uncompressed, false);
+ if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) {
+ throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is "
+ "not equal to actual uncompressed size (%zu)",
+ encoded_bundle._uncompressed_length,
+ uncompressed.getDataLen()));
+ }
+
+ vespalib::Slime slime;
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
+ Inspector& root = slime.get();
+ Inspector& states = root[StatesField];
+ lib::ClusterState baseline(states[BaselineField].asString().make_string());
+
+ Inspector& spaces = states[SpacesField];
+ lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
+ StateInserter inserter(space_states);
+ spaces.traverse(inserter);
+ // TODO add shared_ptr constructor for baseline?
+ return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
new file mode 100644
index 00000000000..1fb95134059
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
@@ -0,0 +1,24 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "cluster_state_bundle_codec.h"
+#include <memory>
+
+namespace storage {
+
+/**
+ * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
+ * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is
+ * intentionally extensible so that we may add other information to it later.
+ *
+ * LZ4 compression is transparently applied during encoding and decompression is
+ * subsequently applied during decoding.
+ */
+class SlimeClusterStateBundleCodec : public ClusterStateBundleCodec {
+public:
+ EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const override;
+ std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const override;
+};
+
+}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
index a2bbba5e52c..cf9c0ba6dff 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
@@ -1,7 +1,9 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include "cluster_state_bundle.h"
#include "clusterstate.h"
+#include <iostream>
namespace storage::lib {
@@ -10,6 +12,13 @@ ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState)
{
}
+ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates)
+ : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
+ _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates))
+{
+}
+
ClusterStateBundle::~ClusterStateBundle() = default;
const std::shared_ptr<const lib::ClusterState> &
@@ -22,6 +31,7 @@ const std::shared_ptr<const lib::ClusterState> &
ClusterStateBundle::getDerivedClusterState(document::BucketSpace) const
{
// For now, just return the baseline cluster state.
+ // TODO use _derivedBucketSpaceStates
return _baselineClusterState;
}
@@ -34,7 +44,36 @@ ClusterStateBundle::getVersion() const
bool
ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const
{
- return *_baselineClusterState == *rhs._baselineClusterState;
+ if (!(*_baselineClusterState == *rhs._baselineClusterState)) {
+ return false;
+ }
+ if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) {
+ return false;
+ }
+ // Can't do a regular operator== comparison since we must check equality
+ // of cluster state _values_, not their _pointers_.
+ for (auto& lhs_ds : _derivedBucketSpaceStates) {
+ auto rhs_iter = rhs._derivedBucketSpaceStates.find(lhs_ds.first);
+ if ((rhs_iter == rhs._derivedBucketSpaceStates.end())
+ || !(*lhs_ds.second == *rhs_iter->second)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) {
+ os << "ClusterStateBundle('" << *bundle.getBaselineClusterState();
+ if (!bundle.getDerivedClusterStates().empty()) {
+ // Output ordering is undefined for of per-space states.
+ for (auto& ds : bundle.getDerivedClusterStates()) {
+ os << "', ";
+ os << document::FixedBucketSpaces::to_string(ds.first);
+ os << " '" << *ds.second;
+ }
+ }
+ os << "')";
+ return os;
}
}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
index 77d26092f4e..a8ee0b54713 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
@@ -3,6 +3,8 @@
#pragma once
#include <vespa/document/bucket/bucketspace.h>
+#include <unordered_map>
+#include <iosfwd>
namespace storage::lib {
@@ -14,14 +16,28 @@ class ClusterState;
*/
class ClusterStateBundle
{
+public:
+ using BucketSpaceStateMapping = std::unordered_map<
+ document::BucketSpace,
+ std::shared_ptr<const ClusterState>,
+ document::BucketSpace::hash
+ >;
std::shared_ptr<const ClusterState> _baselineClusterState;
+ BucketSpaceStateMapping _derivedBucketSpaceStates;
public:
explicit ClusterStateBundle(const ClusterState &baselineClusterState);
+ ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates);
~ClusterStateBundle();
const std::shared_ptr<const ClusterState> &getBaselineClusterState() const;
const std::shared_ptr<const ClusterState> &getDerivedClusterState(document::BucketSpace bucketSpace) const;
+ const BucketSpaceStateMapping& getDerivedClusterStates() const noexcept {
+ return _derivedBucketSpaceStates;
+ }
uint32_t getVersion() const;
bool operator==(const ClusterStateBundle &rhs) const;
};
+std::ostream& operator<<(std::ostream&, const ClusterStateBundle&);
+
}