summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2018-03-07 09:17:48 +0100
committerGitHub <noreply@github.com>2018-03-07 09:17:48 +0100
commit0f19cbae3b5062026cd3aeef68ceb2c3f6b3180b (patch)
tree9d9cb3d102d4a19456b7ce3d72317019fb9867d5
parent47089f7581931bd9fa577a067f0736d60d6c598e (diff)
parent01a5247ef6fb29b6639f7c509c9ebe2fca92c517 (diff)
Merge pull request #5229 from vespa-engine/vekterli/add-cluster-state-rpc-v3-support-on-content-nodes-take-2
Add cluster state RPC v3 support on content nodes - take 2
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp194
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h28
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h17
-rw-r--r--storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h19
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp72
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h24
-rw-r--r--storage/src/vespa/storage/storageserver/message_enqueuer.h17
-rw-r--r--storage/src/vespa/storage/storageserver/rpcrequestwrapper.h5
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp125
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h24
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp41
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.h16
-rw-r--r--vdslib/src/vespa/vdslib/state/clusterstate.cpp2
-rw-r--r--vdslib/src/vespa/vdslib/state/clusterstate.h4
17 files changed, 559 insertions, 33 deletions
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index 95faf7e433e..c28a6102b71 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_teststorageserver TEST
communicationmanagertest.cpp
configurable_bucket_resolver_test.cpp
documentapiconvertertest.cpp
+ fnet_listener_test.cpp
mergethrottlertest.cpp
priorityconvertertest.cpp
service_layer_error_listener_test.cpp
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
new file mode 100644
index 00000000000..cc9c424ac28
--- /dev/null
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -0,0 +1,194 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/storageserver/fnetlistener.h>
+#include <vespa/storage/storageserver/message_enqueuer.h>
+#include <vespa/storage/storageserver/rpcrequestwrapper.h>
+#include <vespa/storage/storageserver/slime_cluster_state_bundle_codec.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <tests/common/testhelper.h>
+#include <vector>
+
+namespace storage {
+
+using document::FixedBucketSpaces;
+
+class FNetListenerTest : public CppUnit::TestFixture {
+public:
+ CPPUNIT_TEST_SUITE(FNetListenerTest);
+ CPPUNIT_TEST(baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle);
+ CPPUNIT_TEST(set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle);
+ CPPUNIT_TEST(compressed_bundle_is_transparently_uncompressed);
+ CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
+ CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST_SUITE_END();
+
+ void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
+ void set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle();
+ void compressed_bundle_is_transparently_uncompressed();
+ void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
+ void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
+
+namespace {
+
+struct MockOperationEnqueuer : MessageEnqueuer {
+ std::vector<std::shared_ptr<api::StorageMessage>> _enqueued;
+
+ void enqueue(std::shared_ptr<api::StorageMessage> msg) override {
+ _enqueued.emplace_back(std::move(msg));
+ }
+};
+
+struct DummyReturnHandler : FRT_IReturnHandler {
+ void HandleReturn() override {}
+ FNET_Connection* GetConnection() override { return nullptr; }
+};
+
+struct Fixture {
+ // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig config;
+ MockOperationEnqueuer enqueuer;
+ std::unique_ptr<FNetListener> fnet_listener;
+ SlimeClusterStateBundleCodec codec;
+ DummyReturnHandler return_handler;
+ bool request_is_detached{false};
+ FRT_RPCRequest* bound_request{nullptr};
+
+ Fixture() : config(getStandardConfig(true)) {
+ config.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(config, slobrok);
+ fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
+ }
+
+ ~Fixture() {
+ // Must destroy any associated message contexts that may have refs to FRT_Request
+ // instance _before_ we destroy the request itself.
+ enqueuer._enqueued.clear();
+ if (bound_request) {
+ bound_request->SubRef();
+ }
+ }
+
+ void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(uncompressed_length);
+ const auto buf_len = encoded_bundle._buffer->getDataLen();
+ params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(const lib::ClusterStateBundle& bundle) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ auto encoded_bundle = codec.encode(bundle);
+ bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length);
+ }
+
+ void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::SetSystemStateCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(expectedBundle, state_request.getClusterStateBundle());
+ }
+
+ void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) {
+ create_request(bundle);
+ fnet_listener->RPC_setDistributionStates(bound_request);
+ assert_enqueued_operation_has_bundle(bundle);
+ }
+
+ void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) {
+ fnet_listener->RPC_setDistributionStates(bound_request);
+ CPPUNIT_ASSERT(!request_is_detached);
+ CPPUNIT_ASSERT(bound_request->IsError());
+ CPPUNIT_ASSERT_EQUAL(static_cast<uint32_t>(error_code), bound_request->GetErrorCode());
+ }
+
+ lib::ClusterStateBundle dummy_baseline_bundle() const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
+ }
+};
+
+std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
+ return std::make_shared<const lib::ClusterState>(state);
+}
+
+vespalib::string make_compressable_state_string() {
+ vespalib::asciistream ss;
+ for (int i = 0; i < 99; ++i) {
+ ss << " ." << i << ".s:d";
+ }
+ return vespalib::make_string("version:123 distributor:100%s storage:100%s",
+ ss.str().c_str(), ss.str().c_str());
+}
+
+}
+
+void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
+ Fixture f;
+ auto baseline = f.dummy_baseline_bundle();
+
+ f.assert_request_received_and_propagated(baseline);
+}
+
+void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
+ Fixture f;
+ lib::ClusterStateBundle spaces_bundle(
+ lib::ClusterState("version:123 distributor:3 storage:3"),
+ {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
+ {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}});
+
+ f.assert_request_received_and_propagated(spaces_bundle);
+}
+
+void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
+ Fixture f;
+ auto state_str = make_compressable_state_string();
+ lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
+
+ f.create_request(compressable_bundle);
+ // First verify that the bundle is sent in compressed form
+ CPPUNIT_ASSERT(f.bound_request->GetParams()->GetValue(2)._data._len < state_str.size());
+ // Ensure we uncompress it to the original form
+ f.fnet_listener->RPC_setDistributionStates(f.bound_request);
+ f.assert_enqueued_operation_has_bundle(compressable_bundle);
+}
+
+void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
+ Fixture f;
+ f.create_request(f.dummy_baseline_bundle());
+ f.fnet_listener->close();
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
+}
+
+void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
+ Fixture f;
+ auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
+ f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
+}
+
+void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
+ Fixture f;
+ auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
+ f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
+ f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 0f90a5a8afb..8c9add78fd4 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -20,6 +20,7 @@ vespa_add_library(storage_storageserver
service_layer_error_listener.cpp
servicelayernode.cpp
servicelayernodecontext.cpp
+ slime_cluster_state_bundle_codec.cpp
statemanager.cpp
statereporter.cpp
storagemetricsset.cpp
diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
new file mode 100644
index 00000000000..41c5db9876d
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
@@ -0,0 +1,28 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "encoded_cluster_state_bundle.h"
+
+namespace storage {
+
+namespace lib { class ClusterStateBundle; }
+
+/**
+ * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
+ *
+ * Implementations may choose to compress the encoded representation of the bundle.
+ *
+ * It is important that the input given to decode() is exactly equal to that given from
+ * encode() for the results to be correct. Implementations must ensure that this information
+ * is enough to losslessly reconstruct the full encoded ClusterStateBundle.
+ */
+class ClusterStateBundleCodec {
+public:
+ virtual ~ClusterStateBundleCodec() = default;
+
+ virtual EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const = 0;
+ virtual std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 5b927d36bb5..b9721635c24 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -635,7 +635,7 @@ CommunicationManager::sendDirectRPCReply(
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
- } else if (requestName == "setsystemstate2") {
+ } else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
// No data to return
} else {
request.addReturnInt(reply->getResult().getResult());
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index a29f3c8c3e3..5c7d5812fdb 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -12,6 +12,7 @@
#include "communicationmanagermetrics.h"
#include "documentapiconverter.h"
+#include "message_enqueuer.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
@@ -85,12 +86,14 @@ public:
std::unique_ptr<RPCRequestWrapper> _request;
};
-class CommunicationManager : public StorageLink,
- public framework::Runnable,
- private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
- public mbus::IMessageHandler,
- public mbus::IReplyHandler,
- private framework::MetricUpdateHook
+class CommunicationManager final
+ : public StorageLink,
+ public framework::Runnable,
+ private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
+ public mbus::IMessageHandler,
+ public mbus::IReplyHandler,
+ private framework::MetricUpdateHook,
+ public MessageEnqueuer
{
private:
CommunicationManager(const CommunicationManager&);
@@ -150,7 +153,7 @@ public:
const config::ConfigUri & configUri);
~CommunicationManager();
- void enqueue(std::shared_ptr<api::StorageMessage> msg);
+ void enqueue(std::shared_ptr<api::StorageMessage> msg) override;
mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; }
const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }
diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
new file mode 100644
index 00000000000..6f25a6b67a6
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
@@ -0,0 +1,19 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/util/compressor.h>
+
+namespace storage {
+
+/**
+ * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
+ */
+struct EncodedClusterStateBundle {
+ vespalib::compression::CompressionConfig::Type _compression_type;
+ uint32_t _uncompressed_length;
+ std::unique_ptr<vespalib::DataBuffer> _buffer;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 7a0711c9f7c..32ea8982ffb 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -2,6 +2,7 @@
#include "fnetlistener.h"
#include "communicationmanager.h"
#include "rpcrequestwrapper.h"
+#include "slime_cluster_state_bundle_codec.h"
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
@@ -9,13 +10,12 @@
#include <sstream>
#include <vespa/log/log.h>
-
LOG_SETUP(".rpc.listener");
namespace storage {
-FNetListener::FNetListener(CommunicationManager& comManager, const config::ConfigUri & configUri, uint32_t port)
- : _comManager(comManager),
+FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
+ : _messageEnqueuer(messageEnqueuer),
_orb(std::make_unique<FRT_Supervisor>()),
_closed(false),
_slobrokRegister(*_orb, configUri)
@@ -86,6 +86,12 @@ FNetListener::initRPC()
rb.MethodDesc("Set systemstate on this node");
rb.ParamDesc("systemstate", "New systemstate to set");
//-------------------------------------------------------------------------
+ rb.DefineMethod("setdistributionstates", "bix", "", true, FRT_METHOD(FNetListener::RPC_setDistributionStates), this);
+ rb.MethodDesc("Set distribution states for cluster and bucket spaces");
+ rb.ParamDesc("compressionType", "Compression type for payload");
+ rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
+ rb.ParamDesc("payload", "Binary Slime format payload");
+ //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -113,6 +119,13 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req)
return;
}
+void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req) {
+ // Create a request object to avoid needing a separate transport type
+ cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
+ req->Detach();
+ _messageEnqueuer.enqueue(std::move(cmd));
+}
+
void
FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
{
@@ -134,10 +147,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
if (req->GetParams()->GetNumValues() > 2) {
cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32);
}
- // Create a request object to avoid needing a separate transport type
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
- req->Detach();
- _comManager.enqueue(std::move(cmd));
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
}
void
@@ -155,10 +165,50 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req)
auto cmd(std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)));
cmd->setPriority(api::StorageMessage::VERYHIGH);
- // Create a request object to avoid needing a separate transport type
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
- req->Detach();
- _comManager.enqueue(std::move(cmd));
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
+}
+
+namespace {
+
+std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const FRT_Values& params) {
+ const uint32_t uncompressed_length = params[1]._intval32;
+ if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) {
+ throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is "
+ "greater than max size (%u)", uncompressed_length,
+ FNetListener::StateBundleMaxUncompressedSize));
+ }
+ SlimeClusterStateBundleCodec codec;
+ EncodedClusterStateBundle encoded_bundle;
+ encoded_bundle._compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8);
+ encoded_bundle._uncompressed_length = uncompressed_length;
+ // Caution: type cast to const ptr is essential or DataBuffer behavior changes!
+ encoded_bundle._buffer = std::make_unique<vespalib::DataBuffer>(
+ static_cast<const char*>(params[2]._data._buf), params[2]._data._len);
+ return codec.decode(encoded_bundle);
+}
+
+}
+
+void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
+ if (_closed) {
+ LOG(debug, "Not handling RPC call setDistributionStates() as we have closed");
+ req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
+ return;
+ }
+ std::shared_ptr<const lib::ClusterStateBundle> state_bundle;
+ try {
+ state_bundle = decode_bundle_from_params(*req->GetParams());
+ } catch (std::exception& e) {
+ LOG(error, "setDistributionStates RPC failed decoding: %s", e.what());
+ req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
+ return;
+ }
+
+ // TODO add constructor taking in shared_ptr directly instead?
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
+ cmd->setPriority(api::StorageMessage::VERYHIGH);
+
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
}
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index 05d2d275838..bf004a3eb2b 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -2,37 +2,43 @@
#pragma once
#include <vespa/slobrok/sbregister.h>
+#include <atomic>
namespace storage {
-class CommunicationManager;
+namespace api { class StorageMessage; }
+
+class MessageEnqueuer;
class StorageServerInterface;
class FNetListener : public FRT_Invokable
{
public:
- FNetListener(CommunicationManager& comManager,
- const config::ConfigUri & configUri, uint32_t port);
- ~FNetListener();
+ static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16;
+
+ FNetListener(MessageEnqueuer& messageEnqueuer,
+ const config::ConfigUri & configUri,
+ uint32_t port);
+ ~FNetListener() override;
void initRPC();
void RPC_getNodeState2(FRT_RPCRequest *req);
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
+ void RPC_setDistributionStates(FRT_RPCRequest* req);
void registerHandle(const vespalib::stringref & handle);
void close();
int getListenPort() const;
- // Used by unit tests.
- bool serviceExists(const vespalib::stringref & connectionSpec);
-
private:
- CommunicationManager& _comManager;
+ MessageEnqueuer& _messageEnqueuer;
std::unique_ptr<FRT_Supervisor> _orb;
- bool _closed;
+ std::atomic<bool> _closed;
slobrok::api::RegisterAPI _slobrokRegister;
vespalib::string _handle;
+
+ void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
};
}
diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h
new file mode 100644
index 00000000000..921328a054b
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/message_enqueuer.h
@@ -0,0 +1,17 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace storage {
+namespace api { class StorageMessage; }
+
+// Interface for enqueuing a StorageMessage for further processing
+class MessageEnqueuer {
+public:
+ virtual ~MessageEnqueuer() = default;
+ virtual void enqueue(std::shared_ptr<api::StorageMessage> msg) = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
index 40a5470ebb9..8412e911601 100644
--- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
+++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
@@ -12,12 +12,13 @@ namespace storage {
**/
class RPCRequestWrapper {
public:
- enum {
+ enum ErrorCode {
ERR_HANDLE_NOT_CONNECTED = 75000, // > 0xffff
ERR_HANDLE_GONE = 75001,
ERR_REQUEST_DELETED = 75002,
ERR_HANDLE_DISABLED = 75003,
- ERR_NODE_SHUTTING_DOWN = 75004
+ ERR_NODE_SHUTTING_DOWN = 75004,
+ ERR_BAD_REQUEST = 75005
};
RPCRequestWrapper(FRT_RPCRequest *req);
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
new file mode 100644
index 00000000000..5b7e0ab4621
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
@@ -0,0 +1,125 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "slime_cluster_state_bundle_codec.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+using document::FixedBucketSpaces;
+using vespalib::slime::Cursor;
+using vespalib::slime::BinaryFormat;
+using vespalib::DataBuffer;
+using vespalib::ConstBufferRef;
+using vespalib::compression::CompressionConfig;
+using vespalib::compression::decompress;
+using vespalib::compression::compress;
+using vespalib::Memory;
+using namespace vespalib::slime;
+
+namespace storage {
+
+// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp
+namespace {
+class OutputBuf : public vespalib::Output {
+public:
+ explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
+ vespalib::DataBuffer & getBuf() { return _buf; }
+private:
+ vespalib::WritableMemory reserve(size_t bytes) override {
+ _buf.ensureFree(bytes);
+ return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
+ }
+ Output &commit(size_t bytes) override {
+ _buf.moveFreeToData(bytes);
+ return *this;
+ }
+ vespalib::DataBuffer _buf;
+};
+
+vespalib::string serialize_state(const lib::ClusterState& state) {
+ vespalib::asciistream as;
+ state.serialize(as, false);
+ return as.str();
+}
+
+}
+
+// Only used from unit tests; the cluster controller encodes all bundles
+// we decode in practice.
+EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
+ const lib::ClusterStateBundle& bundle) const
+{
+ vespalib::Slime slime;
+ Cursor& root = slime.setObject();
+ Cursor& states = root.setObject("states");
+ states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
+ Cursor& spaces = states.setObject("spaces");
+ for (const auto& sp : bundle.getDerivedClusterStates()) {
+ spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second));
+ }
+
+ OutputBuf out_buf(4096);
+ BinaryFormat::encode(slime, out_buf);
+ ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen());
+ auto buf = std::make_unique<DataBuffer>(vespalib::roundUp2inN(out_buf.getBuf().getDataLen()));
+ auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false);
+
+ EncodedClusterStateBundle encoded_bundle;
+ encoded_bundle._compression_type = actual_type;
+ assert(to_compress.size() <= INT32_MAX);
+ encoded_bundle._uncompressed_length = to_compress.size();
+ encoded_bundle._buffer = std::move(buf);
+ return encoded_bundle;
+}
+
+namespace {
+
+static const Memory StatesField("states");
+static const Memory BaselineField("baseline");
+static const Memory SpacesField("spaces");
+
+struct StateInserter : vespalib::slime::ObjectTraverser {
+ lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
+
+ explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states)
+ : _space_states(space_states) {}
+
+ void field(const Memory& symbol, const Inspector& inspector) override {
+ _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()),
+ std::make_shared<const lib::ClusterState>(inspector.asString().make_string()));
+ }
+};
+
+}
+
+std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::decode(
+ const EncodedClusterStateBundle& encoded_bundle) const
+{
+ ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen());
+ DataBuffer uncompressed;
+ decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length,
+ blob, uncompressed, false);
+ if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) {
+ throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is "
+ "not equal to actual uncompressed size (%zu)",
+ encoded_bundle._uncompressed_length,
+ uncompressed.getDataLen()));
+ }
+
+ vespalib::Slime slime;
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
+ Inspector& root = slime.get();
+ Inspector& states = root[StatesField];
+ lib::ClusterState baseline(states[BaselineField].asString().make_string());
+
+ Inspector& spaces = states[SpacesField];
+ lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
+ StateInserter inserter(space_states);
+ spaces.traverse(inserter);
+ // TODO add shared_ptr constructor for baseline?
+ return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
new file mode 100644
index 00000000000..1fb95134059
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
@@ -0,0 +1,24 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "cluster_state_bundle_codec.h"
+#include <memory>
+
+namespace storage {
+
+/**
+ * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
+ * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is
+ * intentionally extensible so that we may add other information to it later.
+ *
+ * LZ4 compression is transparently applied during encoding and decompression is
+ * subsequently applied during decoding.
+ */
+class SlimeClusterStateBundleCodec : public ClusterStateBundleCodec {
+public:
+ EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const override;
+ std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const override;
+};
+
+}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
index a2bbba5e52c..cf9c0ba6dff 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
@@ -1,7 +1,9 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include "cluster_state_bundle.h"
#include "clusterstate.h"
+#include <iostream>
namespace storage::lib {
@@ -10,6 +12,13 @@ ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState)
{
}
+ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates)
+ : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
+ _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates))
+{
+}
+
ClusterStateBundle::~ClusterStateBundle() = default;
const std::shared_ptr<const lib::ClusterState> &
@@ -22,6 +31,7 @@ const std::shared_ptr<const lib::ClusterState> &
ClusterStateBundle::getDerivedClusterState(document::BucketSpace) const
{
// For now, just return the baseline cluster state.
+ // TODO use _derivedBucketSpaceStates
return _baselineClusterState;
}
@@ -34,7 +44,36 @@ ClusterStateBundle::getVersion() const
bool
ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const
{
- return *_baselineClusterState == *rhs._baselineClusterState;
+ if (!(*_baselineClusterState == *rhs._baselineClusterState)) {
+ return false;
+ }
+ if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) {
+ return false;
+ }
+ // Can't do a regular operator== comparison since we must check equality
+ // of cluster state _values_, not their _pointers_.
+ for (auto& lhs_ds : _derivedBucketSpaceStates) {
+ auto rhs_iter = rhs._derivedBucketSpaceStates.find(lhs_ds.first);
+ if ((rhs_iter == rhs._derivedBucketSpaceStates.end())
+ || !(*lhs_ds.second == *rhs_iter->second)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) {
+ os << "ClusterStateBundle('" << *bundle.getBaselineClusterState();
+ if (!bundle.getDerivedClusterStates().empty()) {
+ // Output ordering is undefined for of per-space states.
+ for (auto& ds : bundle.getDerivedClusterStates()) {
+ os << "', ";
+ os << document::FixedBucketSpaces::to_string(ds.first);
+ os << " '" << *ds.second;
+ }
+ }
+ os << "')";
+ return os;
}
}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
index 77d26092f4e..a8ee0b54713 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
@@ -3,6 +3,8 @@
#pragma once
#include <vespa/document/bucket/bucketspace.h>
+#include <unordered_map>
+#include <iosfwd>
namespace storage::lib {
@@ -14,14 +16,28 @@ class ClusterState;
*/
class ClusterStateBundle
{
+public:
+ using BucketSpaceStateMapping = std::unordered_map<
+ document::BucketSpace,
+ std::shared_ptr<const ClusterState>,
+ document::BucketSpace::hash
+ >;
std::shared_ptr<const ClusterState> _baselineClusterState;
+ BucketSpaceStateMapping _derivedBucketSpaceStates;
public:
explicit ClusterStateBundle(const ClusterState &baselineClusterState);
+ ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates);
~ClusterStateBundle();
const std::shared_ptr<const ClusterState> &getBaselineClusterState() const;
const std::shared_ptr<const ClusterState> &getDerivedClusterState(document::BucketSpace bucketSpace) const;
+ const BucketSpaceStateMapping& getDerivedClusterStates() const noexcept {
+ return _derivedBucketSpaceStates;
+ }
uint32_t getVersion() const;
bool operator==(const ClusterStateBundle &rhs) const;
};
+std::ostream& operator<<(std::ostream&, const ClusterStateBundle&);
+
}
diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.cpp b/vdslib/src/vespa/vdslib/state/clusterstate.cpp
index 9a69ed98c79..fbc9943e22d 100644
--- a/vdslib/src/vespa/vdslib/state/clusterstate.cpp
+++ b/vdslib/src/vespa/vdslib/state/clusterstate.cpp
@@ -55,7 +55,7 @@ struct NodeData {
}
};
-ClusterState::ClusterState(const vespalib::stringref & serialized)
+ClusterState::ClusterState(const vespalib::string& serialized)
: Printable(),
_version(0),
_clusterState(&State::UP),
diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.h b/vdslib/src/vespa/vdslib/state/clusterstate.h
index b60f2f6f1cc..9e8dd0f292a 100644
--- a/vdslib/src/vespa/vdslib/state/clusterstate.h
+++ b/vdslib/src/vespa/vdslib/state/clusterstate.h
@@ -36,7 +36,9 @@ public:
ClusterState();
ClusterState(const ClusterState&);
- explicit ClusterState(const vespalib::stringref & serialized);
+ // FIXME make ClusterState parsing not require null termination of string,
+ // then move to vespalib::stringref
+ explicit ClusterState(const vespalib::string& serialized);
~ClusterState();
std::string getTextualDifference(const ClusterState& other) const;