From d3cd3115fc79ed62774c21684b7d39e45c484ff2 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 2 Mar 2018 10:00:59 +0000 Subject: Add handling of v3 setDistributionStates RPC from cluster controller ClusterStateBundle is populated with derived states, but these are not yet exposed via the bucket space getter function. --- storage/src/tests/storageserver/CMakeLists.txt | 1 + .../src/tests/storageserver/fnet_listener_test.cpp | 194 +++++++++++++++++++++ .../src/vespa/storage/storageserver/CMakeLists.txt | 1 + .../storageserver/cluster_state_bundle_codec.h | 28 +++ .../storage/storageserver/communicationmanager.h | 17 +- .../storageserver/encoded_cluster_state_bundle.h | 19 ++ .../vespa/storage/storageserver/fnetlistener.cpp | 74 ++++++-- .../src/vespa/storage/storageserver/fnetlistener.h | 24 ++- .../vespa/storage/storageserver/message_enqueuer.h | 17 ++ .../storage/storageserver/rpcrequestwrapper.h | 5 +- .../slime_cluster_state_bundle_codec.cpp | 125 +++++++++++++ .../slime_cluster_state_bundle_codec.h | 24 +++ .../vespa/vdslib/state/cluster_state_bundle.cpp | 41 ++++- .../src/vespa/vdslib/state/cluster_state_bundle.h | 16 ++ 14 files changed, 556 insertions(+), 30 deletions(-) create mode 100644 storage/src/tests/storageserver/fnet_listener_test.cpp create mode 100644 storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h create mode 100644 storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h create mode 100644 storage/src/vespa/storage/storageserver/message_enqueuer.h create mode 100644 storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp create mode 100644 storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index 95faf7e433e..c28a6102b71 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_teststorageserver TEST communicationmanagertest.cpp configurable_bucket_resolver_test.cpp documentapiconvertertest.cpp + fnet_listener_test.cpp mergethrottlertest.cpp priorityconvertertest.cpp service_layer_error_listener_test.cpp diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp new file mode 100644 index 00000000000..cc9c424ac28 --- /dev/null +++ b/storage/src/tests/storageserver/fnet_listener_test.cpp @@ -0,0 +1,194 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace storage { + +using document::FixedBucketSpaces; + +class FNetListenerTest : public CppUnit::TestFixture { +public: + CPPUNIT_TEST_SUITE(FNetListenerTest); + CPPUNIT_TEST(baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle); + CPPUNIT_TEST(set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle); + CPPUNIT_TEST(compressed_bundle_is_transparently_uncompressed); + CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed); + CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error); + CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error); + CPPUNIT_TEST_SUITE_END(); + + void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle(); + void set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle(); + void compressed_bundle_is_transparently_uncompressed(); + void set_distribution_rpc_is_immediately_failed_if_listener_is_closed(); + void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error(); + void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest); + +namespace { + +struct MockOperationEnqueuer : MessageEnqueuer { + std::vector> _enqueued; + + void enqueue(std::shared_ptr msg) override { + _enqueued.emplace_back(std::move(msg)); + } +}; + +struct DummyReturnHandler : FRT_IReturnHandler { + void HandleReturn() override {} + FNET_Connection* GetConnection() override { return nullptr; } +}; + +struct Fixture { + // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests + mbus::Slobrok slobrok; + vdstestlib::DirConfig config; + MockOperationEnqueuer enqueuer; + std::unique_ptr fnet_listener; + SlimeClusterStateBundleCodec codec; + DummyReturnHandler return_handler; + bool request_is_detached{false}; + FRT_RPCRequest* bound_request{nullptr}; + + Fixture() : config(getStandardConfig(true)) { + config.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(config, slobrok); + fnet_listener = std::make_unique(enqueuer, config.getConfigId(), 0); + } + + ~Fixture() { + // Must destroy any associated message contexts that may have refs to FRT_Request + // instance _before_ we destroy the request itself. + enqueuer._enqueued.clear(); + if (bound_request) { + bound_request->SubRef(); + } + } + + void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt8(static_cast(encoded_bundle._compression_type)); + params->AddInt32(uncompressed_length); + const auto buf_len = encoded_bundle._buffer->getDataLen(); + params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(const lib::ClusterStateBundle& bundle) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + auto encoded_bundle = codec.encode(bundle); + bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length); + } + + void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) { + CPPUNIT_ASSERT(bound_request != nullptr); + CPPUNIT_ASSERT(request_is_detached); + CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size()); + auto& state_request = dynamic_cast(*enqueuer._enqueued[0]); + CPPUNIT_ASSERT_EQUAL(expectedBundle, state_request.getClusterStateBundle()); + } + + void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) { + create_request(bundle); + fnet_listener->RPC_setDistributionStates(bound_request); + assert_enqueued_operation_has_bundle(bundle); + } + + void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) { + fnet_listener->RPC_setDistributionStates(bound_request); + CPPUNIT_ASSERT(!request_is_detached); + CPPUNIT_ASSERT(bound_request->IsError()); + CPPUNIT_ASSERT_EQUAL(static_cast(error_code), bound_request->GetErrorCode()); + } + + lib::ClusterStateBundle dummy_baseline_bundle() const { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); + } +}; + +std::shared_ptr state_of(vespalib::stringref state) { + return std::make_shared(state); +} + +vespalib::string make_compressable_state_string() { + vespalib::asciistream ss; + for (int i = 0; i < 99; ++i) { + ss << " ." << i << ".s:d"; + } + return vespalib::make_string("version:123 distributor:100%s storage:100%s", + ss.str().c_str(), ss.str().c_str()); +} + +} + +void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() { + Fixture f; + auto baseline = f.dummy_baseline_bundle(); + + f.assert_request_received_and_propagated(baseline); +} + +void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() { + Fixture f; + lib::ClusterStateBundle spaces_bundle( + lib::ClusterState("version:123 distributor:3 storage:3"), + {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, + {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}}); + + f.assert_request_received_and_propagated(spaces_bundle); +} + +void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { + Fixture f; + auto state_str = make_compressable_state_string(); + lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; + + f.create_request(compressable_bundle); + // First verify that the bundle is sent in compressed form + CPPUNIT_ASSERT(f.bound_request->GetParams()->GetValue(2)._data._len < state_str.size()); + // Ensure we uncompress it to the original form + f.fnet_listener->RPC_setDistributionStates(f.bound_request); + f.assert_enqueued_operation_has_bundle(compressable_bundle); +} + +void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() { + Fixture f; + f.create_request(f.dummy_baseline_bundle()); + f.fnet_listener->close(); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); +} + +void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() { + Fixture f; + auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); + f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); +} + +void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() { + Fixture f; + auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); + f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); +} + +} diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 0f90a5a8afb..8c9add78fd4 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -20,6 +20,7 @@ vespa_add_library(storage_storageserver service_layer_error_listener.cpp servicelayernode.cpp servicelayernodecontext.cpp + slime_cluster_state_bundle_codec.cpp statemanager.cpp statereporter.cpp storagemetricsset.cpp diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h new file mode 100644 index 00000000000..41c5db9876d --- /dev/null +++ b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h @@ -0,0 +1,28 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "encoded_cluster_state_bundle.h" + +namespace storage { + +namespace lib { class ClusterStateBundle; } + +/** + * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC. + * + * Implementations may choose to compress the encoded representation of the bundle. + * + * It is important that the input given to decode() is exactly equal to that given from + * encode() for the results to be correct. Implementations must ensure that this information + * is enough to losslessly reconstruct the full encoded ClusterStateBundle. + */ +class ClusterStateBundleCodec { +public: + virtual ~ClusterStateBundleCodec() = default; + + virtual EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const = 0; + virtual std::shared_ptr decode(const EncodedClusterStateBundle&) const = 0; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index a29f3c8c3e3..5c7d5812fdb 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -12,6 +12,7 @@ #include "communicationmanagermetrics.h" #include "documentapiconverter.h" +#include "message_enqueuer.h" #include #include #include @@ -85,12 +86,14 @@ public: std::unique_ptr _request; }; -class CommunicationManager : public StorageLink, - public framework::Runnable, - private config::IFetcherCallback, - public mbus::IMessageHandler, - public mbus::IReplyHandler, - private framework::MetricUpdateHook +class CommunicationManager final + : public StorageLink, + public framework::Runnable, + private config::IFetcherCallback, + public mbus::IMessageHandler, + public mbus::IReplyHandler, + private framework::MetricUpdateHook, + public MessageEnqueuer { private: CommunicationManager(const CommunicationManager&); @@ -150,7 +153,7 @@ public: const config::ConfigUri & configUri); ~CommunicationManager(); - void enqueue(std::shared_ptr msg); + void enqueue(std::shared_ptr msg) override; mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; } const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); } diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h new file mode 100644 index 00000000000..6f25a6b67a6 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h @@ -0,0 +1,19 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include + +namespace storage { + +/** + * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle. + */ +struct EncodedClusterStateBundle { + vespalib::compression::CompressionConfig::Type _compression_type; + uint32_t _uncompressed_length; + std::unique_ptr _buffer; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 7a0711c9f7c..f95281a5007 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -2,20 +2,22 @@ #include "fnetlistener.h" #include "communicationmanager.h" #include "rpcrequestwrapper.h" +#include "slime_cluster_state_bundle_codec.h" #include #include #include #include #include -#include +#include // XXXXX +#include LOG_SETUP(".rpc.listener"); namespace storage { -FNetListener::FNetListener(CommunicationManager& comManager, const config::ConfigUri & configUri, uint32_t port) - : _comManager(comManager), +FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port) + : _messageEnqueuer(messageEnqueuer), _orb(std::make_unique()), _closed(false), _slobrokRegister(*_orb, configUri) @@ -86,6 +88,12 @@ FNetListener::initRPC() rb.MethodDesc("Set systemstate on this node"); rb.ParamDesc("systemstate", "New systemstate to set"); //------------------------------------------------------------------------- + rb.DefineMethod("setdistributionstates", "bix", "", true, FRT_METHOD(FNetListener::RPC_setDistributionStates), this); + rb.MethodDesc("Set distribution states for cluster and bucket spaces"); + rb.ParamDesc("compressionType", "Compression type for payload"); + rb.ParamDesc("uncompressedSize", "Uncompressed size for payload"); + rb.ParamDesc("payload", "Binary Slime format payload"); + //------------------------------------------------------------------------- rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); @@ -113,6 +121,13 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req) return; } +void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr cmd, FRT_RPCRequest *req) { + // Create a request object to avoid needing a separate transport type + cmd->setTransportContext(std::make_unique(std::make_unique(req))); + req->Detach(); + _messageEnqueuer.enqueue(std::move(cmd)); +} + void FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) { @@ -134,10 +149,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) if (req->GetParams()->GetNumValues() > 2) { cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32); } - // Create a request object to avoid needing a separate transport type - cmd->setTransportContext(std::make_unique(std::make_unique(req))); - req->Detach(); - _comManager.enqueue(std::move(cmd)); + detach_and_forward_to_enqueuer(std::move(cmd), req); } void @@ -155,10 +167,50 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req) auto cmd(std::make_shared(lib::ClusterStateBundle(systemState))); cmd->setPriority(api::StorageMessage::VERYHIGH); - // Create a request object to avoid needing a separate transport type - cmd->setTransportContext(std::make_unique(std::make_unique(req))); - req->Detach(); - _comManager.enqueue(std::move(cmd)); + detach_and_forward_to_enqueuer(std::move(cmd), req); +} + +namespace { + +std::shared_ptr decode_bundle_from_params(const FRT_Values& params) { + const uint32_t uncompressed_length = params[1]._intval32; + if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) { + throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is " + "greater than max size (%u)", uncompressed_length, + FNetListener::StateBundleMaxUncompressedSize)); + } + SlimeClusterStateBundleCodec codec; + EncodedClusterStateBundle encoded_bundle; + encoded_bundle._compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8); + encoded_bundle._uncompressed_length = uncompressed_length; + // Caution: type cast to const ptr is essential or DataBuffer behavior changes! + encoded_bundle._buffer = std::make_unique( + static_cast(params[2]._data._buf), params[2]._data._len); + return codec.decode(encoded_bundle); +} + +} + +void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { + if (_closed) { + LOG(debug, "Not handling RPC call setDistributionStates() as we have closed"); + req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); + return; + } + std::shared_ptr state_bundle; + try { + state_bundle = decode_bundle_from_params(*req->GetParams()); + } catch (std::exception& e) { + LOG(error, "setDistributionStates RPC failed decoding: %s", e.what()); + req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what()); + return; + } + + // TODO add constructor taking in shared_ptr directly instead? + auto cmd = std::make_shared(*state_bundle); + cmd->setPriority(api::StorageMessage::VERYHIGH); + + detach_and_forward_to_enqueuer(std::move(cmd), req); } } diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index 05d2d275838..bf004a3eb2b 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -2,37 +2,43 @@ #pragma once #include +#include namespace storage { -class CommunicationManager; +namespace api { class StorageMessage; } + +class MessageEnqueuer; class StorageServerInterface; class FNetListener : public FRT_Invokable { public: - FNetListener(CommunicationManager& comManager, - const config::ConfigUri & configUri, uint32_t port); - ~FNetListener(); + static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16; + + FNetListener(MessageEnqueuer& messageEnqueuer, + const config::ConfigUri & configUri, + uint32_t port); + ~FNetListener() override; void initRPC(); void RPC_getNodeState2(FRT_RPCRequest *req); void RPC_setSystemState2(FRT_RPCRequest *req); void RPC_getCurrentTime(FRT_RPCRequest *req); + void RPC_setDistributionStates(FRT_RPCRequest* req); void registerHandle(const vespalib::stringref & handle); void close(); int getListenPort() const; - // Used by unit tests. - bool serviceExists(const vespalib::stringref & connectionSpec); - private: - CommunicationManager& _comManager; + MessageEnqueuer& _messageEnqueuer; std::unique_ptr _orb; - bool _closed; + std::atomic _closed; slobrok::api::RegisterAPI _slobrokRegister; vespalib::string _handle; + + void detach_and_forward_to_enqueuer(std::shared_ptr cmd, FRT_RPCRequest *req); }; } diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h new file mode 100644 index 00000000000..921328a054b --- /dev/null +++ b/storage/src/vespa/storage/storageserver/message_enqueuer.h @@ -0,0 +1,17 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include + +namespace storage { +namespace api { class StorageMessage; } + +// Interface for enqueuing a StorageMessage for further processing +class MessageEnqueuer { +public: + virtual ~MessageEnqueuer() = default; + virtual void enqueue(std::shared_ptr msg) = 0; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h index 40a5470ebb9..8412e911601 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h @@ -12,12 +12,13 @@ namespace storage { **/ class RPCRequestWrapper { public: - enum { + enum ErrorCode { ERR_HANDLE_NOT_CONNECTED = 75000, // > 0xffff ERR_HANDLE_GONE = 75001, ERR_REQUEST_DELETED = 75002, ERR_HANDLE_DISABLED = 75003, - ERR_NODE_SHUTTING_DOWN = 75004 + ERR_NODE_SHUTTING_DOWN = 75004, + ERR_BAD_REQUEST = 75005 }; RPCRequestWrapper(FRT_RPCRequest *req); diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp new file mode 100644 index 00000000000..5b7e0ab4621 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp @@ -0,0 +1,125 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "slime_cluster_state_bundle_codec.h" +#include +#include +#include +#include +#include + +using document::FixedBucketSpaces; +using vespalib::slime::Cursor; +using vespalib::slime::BinaryFormat; +using vespalib::DataBuffer; +using vespalib::ConstBufferRef; +using vespalib::compression::CompressionConfig; +using vespalib::compression::decompress; +using vespalib::compression::compress; +using vespalib::Memory; +using namespace vespalib::slime; + +namespace storage { + +// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp +namespace { +class OutputBuf : public vespalib::Output { +public: + explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { } + vespalib::DataBuffer & getBuf() { return _buf; } +private: + vespalib::WritableMemory reserve(size_t bytes) override { + _buf.ensureFree(bytes); + return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen()); + } + Output &commit(size_t bytes) override { + _buf.moveFreeToData(bytes); + return *this; + } + vespalib::DataBuffer _buf; +}; + +vespalib::string serialize_state(const lib::ClusterState& state) { + vespalib::asciistream as; + state.serialize(as, false); + return as.str(); +} + +} + +// Only used from unit tests; the cluster controller encodes all bundles +// we decode in practice. +EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode( + const lib::ClusterStateBundle& bundle) const +{ + vespalib::Slime slime; + Cursor& root = slime.setObject(); + Cursor& states = root.setObject("states"); + states.setString("baseline", serialize_state(*bundle.getBaselineClusterState())); + Cursor& spaces = states.setObject("spaces"); + for (const auto& sp : bundle.getDerivedClusterStates()) { + spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second)); + } + + OutputBuf out_buf(4096); + BinaryFormat::encode(slime, out_buf); + ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen()); + auto buf = std::make_unique(vespalib::roundUp2inN(out_buf.getBuf().getDataLen())); + auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false); + + EncodedClusterStateBundle encoded_bundle; + encoded_bundle._compression_type = actual_type; + assert(to_compress.size() <= INT32_MAX); + encoded_bundle._uncompressed_length = to_compress.size(); + encoded_bundle._buffer = std::move(buf); + return encoded_bundle; +} + +namespace { + +static const Memory StatesField("states"); +static const Memory BaselineField("baseline"); +static const Memory SpacesField("spaces"); + +struct StateInserter : vespalib::slime::ObjectTraverser { + lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states; + + explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states) + : _space_states(space_states) {} + + void field(const Memory& symbol, const Inspector& inspector) override { + _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()), + std::make_shared(inspector.asString().make_string())); + } +}; + +} + +std::shared_ptr SlimeClusterStateBundleCodec::decode( + const EncodedClusterStateBundle& encoded_bundle) const +{ + ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen()); + DataBuffer uncompressed; + decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length, + blob, uncompressed, false); + if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) { + throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is " + "not equal to actual uncompressed size (%zu)", + encoded_bundle._uncompressed_length, + uncompressed.getDataLen())); + } + + vespalib::Slime slime; + BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime); + Inspector& root = slime.get(); + Inspector& states = root[StatesField]; + lib::ClusterState baseline(states[BaselineField].asString().make_string()); + + Inspector& spaces = states[SpacesField]; + lib::ClusterStateBundle::BucketSpaceStateMapping space_states; + StateInserter inserter(space_states); + spaces.traverse(inserter); + // TODO add shared_ptr constructor for baseline? + return std::make_shared(baseline, std::move(space_states)); +} + +} diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h new file mode 100644 index 00000000000..1fb95134059 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h @@ -0,0 +1,24 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "cluster_state_bundle_codec.h" +#include + +namespace storage { + +/** + * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding + * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is + * intentionally extensible so that we may add other information to it later. + * + * LZ4 compression is transparently applied during encoding and decompression is + * subsequently applied during decoding. + */ +class SlimeClusterStateBundleCodec : public ClusterStateBundleCodec { +public: + EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const override; + std::shared_ptr decode(const EncodedClusterStateBundle&) const override; +}; + +} diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp index a2bbba5e52c..cf9c0ba6dff 100644 --- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp +++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp @@ -1,7 +1,9 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include #include "cluster_state_bundle.h" #include "clusterstate.h" +#include namespace storage::lib { @@ -10,6 +12,13 @@ ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState) { } +ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState, + BucketSpaceStateMapping derivedBucketSpaceStates) + : _baselineClusterState(std::make_shared(baselineClusterState)), + _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates)) +{ +} + ClusterStateBundle::~ClusterStateBundle() = default; const std::shared_ptr & @@ -22,6 +31,7 @@ const std::shared_ptr & ClusterStateBundle::getDerivedClusterState(document::BucketSpace) const { // For now, just return the baseline cluster state. + // TODO use _derivedBucketSpaceStates return _baselineClusterState; } @@ -34,7 +44,36 @@ ClusterStateBundle::getVersion() const bool ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const { - return *_baselineClusterState == *rhs._baselineClusterState; + if (!(*_baselineClusterState == *rhs._baselineClusterState)) { + return false; + } + if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) { + return false; + } + // Can't do a regular operator== comparison since we must check equality + // of cluster state _values_, not their _pointers_. + for (auto& lhs_ds : _derivedBucketSpaceStates) { + auto rhs_iter = rhs._derivedBucketSpaceStates.find(lhs_ds.first); + if ((rhs_iter == rhs._derivedBucketSpaceStates.end()) + || !(*lhs_ds.second == *rhs_iter->second)) { + return false; + } + } + return true; +} + +std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) { + os << "ClusterStateBundle('" << *bundle.getBaselineClusterState(); + if (!bundle.getDerivedClusterStates().empty()) { + // Output ordering is undefined for of per-space states. + for (auto& ds : bundle.getDerivedClusterStates()) { + os << "', "; + os << document::FixedBucketSpaces::to_string(ds.first); + os << " '" << *ds.second; + } + } + os << "')"; + return os; } } diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h index 77d26092f4e..a8ee0b54713 100644 --- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h +++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h @@ -3,6 +3,8 @@ #pragma once #include +#include +#include namespace storage::lib { @@ -14,14 +16,28 @@ class ClusterState; */ class ClusterStateBundle { +public: + using BucketSpaceStateMapping = std::unordered_map< + document::BucketSpace, + std::shared_ptr, + document::BucketSpace::hash + >; std::shared_ptr _baselineClusterState; + BucketSpaceStateMapping _derivedBucketSpaceStates; public: explicit ClusterStateBundle(const ClusterState &baselineClusterState); + ClusterStateBundle(const ClusterState& baselineClusterState, + BucketSpaceStateMapping derivedBucketSpaceStates); ~ClusterStateBundle(); const std::shared_ptr &getBaselineClusterState() const; const std::shared_ptr &getDerivedClusterState(document::BucketSpace bucketSpace) const; + const BucketSpaceStateMapping& getDerivedClusterStates() const noexcept { + return _derivedBucketSpaceStates; + } uint32_t getVersion() const; bool operator==(const ClusterStateBundle &rhs) const; }; +std::ostream& operator<<(std::ostream&, const ClusterStateBundle&); + } -- cgit v1.2.3