From d51ddcbcebc9eb31e91e569fad8ae4907d63a939 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 6 Mar 2018 13:08:55 +0100 Subject: Revert "Add cluster state RPC v3 support on content nodes" --- storage/src/tests/storageserver/CMakeLists.txt | 1 - .../src/tests/storageserver/fnet_listener_test.cpp | 194 --------------------- .../src/vespa/storage/storageserver/CMakeLists.txt | 1 - .../storageserver/cluster_state_bundle_codec.h | 28 --- .../storage/storageserver/communicationmanager.h | 17 +- .../storageserver/encoded_cluster_state_bundle.h | 19 -- .../vespa/storage/storageserver/fnetlistener.cpp | 72 ++------ .../src/vespa/storage/storageserver/fnetlistener.h | 24 +-- .../vespa/storage/storageserver/message_enqueuer.h | 17 -- .../storage/storageserver/rpcrequestwrapper.h | 5 +- .../slime_cluster_state_bundle_codec.cpp | 125 ------------- .../slime_cluster_state_bundle_codec.h | 24 --- .../vespa/vdslib/state/cluster_state_bundle.cpp | 41 +---- .../src/vespa/vdslib/state/cluster_state_bundle.h | 16 -- vdslib/src/vespa/vdslib/state/clusterstate.cpp | 2 +- vdslib/src/vespa/vdslib/state/clusterstate.h | 4 +- 16 files changed, 32 insertions(+), 558 deletions(-) delete mode 100644 storage/src/tests/storageserver/fnet_listener_test.cpp delete mode 100644 storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h delete mode 100644 storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h delete mode 100644 storage/src/vespa/storage/storageserver/message_enqueuer.h delete mode 100644 storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp delete mode 100644 storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index c28a6102b71..95faf7e433e 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -7,7 +7,6 @@ vespa_add_library(storage_teststorageserver TEST communicationmanagertest.cpp configurable_bucket_resolver_test.cpp documentapiconvertertest.cpp - fnet_listener_test.cpp mergethrottlertest.cpp priorityconvertertest.cpp service_layer_error_listener_test.cpp diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp deleted file mode 100644 index cc9c424ac28..00000000000 --- a/storage/src/tests/storageserver/fnet_listener_test.cpp +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace storage { - -using document::FixedBucketSpaces; - -class FNetListenerTest : public CppUnit::TestFixture { -public: - CPPUNIT_TEST_SUITE(FNetListenerTest); - CPPUNIT_TEST(baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle); - CPPUNIT_TEST(set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle); - CPPUNIT_TEST(compressed_bundle_is_transparently_uncompressed); - CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed); - CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error); - CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error); - CPPUNIT_TEST_SUITE_END(); - - void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle(); - void set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle(); - void compressed_bundle_is_transparently_uncompressed(); - void set_distribution_rpc_is_immediately_failed_if_listener_is_closed(); - void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error(); - void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error(); -}; - -CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest); - -namespace { - -struct MockOperationEnqueuer : MessageEnqueuer { - std::vector> _enqueued; - - void enqueue(std::shared_ptr msg) override { - _enqueued.emplace_back(std::move(msg)); - } -}; - -struct DummyReturnHandler : FRT_IReturnHandler { - void HandleReturn() override {} - FNET_Connection* GetConnection() override { return nullptr; } -}; - -struct Fixture { - // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests - mbus::Slobrok slobrok; - vdstestlib::DirConfig config; - MockOperationEnqueuer enqueuer; - std::unique_ptr fnet_listener; - SlimeClusterStateBundleCodec codec; - DummyReturnHandler return_handler; - bool request_is_detached{false}; - FRT_RPCRequest* bound_request{nullptr}; - - Fixture() : config(getStandardConfig(true)) { - config.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(config, slobrok); - fnet_listener = std::make_unique(enqueuer, config.getConfigId(), 0); - } - - ~Fixture() { - // Must destroy any associated message contexts that may have refs to FRT_Request - // instance _before_ we destroy the request itself. - enqueuer._enqueued.clear(); - if (bound_request) { - bound_request->SubRef(); - } - } - - void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { - bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting - auto* params = bound_request->GetParams(); - params->AddInt8(static_cast(encoded_bundle._compression_type)); - params->AddInt32(uncompressed_length); - const auto buf_len = encoded_bundle._buffer->getDataLen(); - params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); - - bound_request->SetDetachedPT(&request_is_detached); - bound_request->SetReturnHandler(&return_handler); - } - - void create_request(const lib::ClusterStateBundle& bundle) { - // Only 1 request allowed per fixture due to lifetime handling snags - assert(bound_request == nullptr); - auto encoded_bundle = codec.encode(bundle); - bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length); - } - - void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) { - CPPUNIT_ASSERT(bound_request != nullptr); - CPPUNIT_ASSERT(request_is_detached); - CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size()); - auto& state_request = dynamic_cast(*enqueuer._enqueued[0]); - CPPUNIT_ASSERT_EQUAL(expectedBundle, state_request.getClusterStateBundle()); - } - - void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) { - create_request(bundle); - fnet_listener->RPC_setDistributionStates(bound_request); - assert_enqueued_operation_has_bundle(bundle); - } - - void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) { - fnet_listener->RPC_setDistributionStates(bound_request); - CPPUNIT_ASSERT(!request_is_detached); - CPPUNIT_ASSERT(bound_request->IsError()); - CPPUNIT_ASSERT_EQUAL(static_cast(error_code), bound_request->GetErrorCode()); - } - - lib::ClusterStateBundle dummy_baseline_bundle() const { - return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); - } -}; - -std::shared_ptr state_of(vespalib::stringref state) { - return std::make_shared(state); -} - -vespalib::string make_compressable_state_string() { - vespalib::asciistream ss; - for (int i = 0; i < 99; ++i) { - ss << " ." << i << ".s:d"; - } - return vespalib::make_string("version:123 distributor:100%s storage:100%s", - ss.str().c_str(), ss.str().c_str()); -} - -} - -void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() { - Fixture f; - auto baseline = f.dummy_baseline_bundle(); - - f.assert_request_received_and_propagated(baseline); -} - -void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() { - Fixture f; - lib::ClusterStateBundle spaces_bundle( - lib::ClusterState("version:123 distributor:3 storage:3"), - {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, - {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}}); - - f.assert_request_received_and_propagated(spaces_bundle); -} - -void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { - Fixture f; - auto state_str = make_compressable_state_string(); - lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; - - f.create_request(compressable_bundle); - // First verify that the bundle is sent in compressed form - CPPUNIT_ASSERT(f.bound_request->GetParams()->GetValue(2)._data._len < state_str.size()); - // Ensure we uncompress it to the original form - f.fnet_listener->RPC_setDistributionStates(f.bound_request); - f.assert_enqueued_operation_has_bundle(compressable_bundle); -} - -void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() { - Fixture f; - f.create_request(f.dummy_baseline_bundle()); - f.fnet_listener->close(); - f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); -} - -void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; - auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); - f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); - f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); -} - -void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; - auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); - f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); - f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); -} - -} diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 8c9add78fd4..0f90a5a8afb 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -20,7 +20,6 @@ vespa_add_library(storage_storageserver service_layer_error_listener.cpp servicelayernode.cpp servicelayernodecontext.cpp - slime_cluster_state_bundle_codec.cpp statemanager.cpp statereporter.cpp storagemetricsset.cpp diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h deleted file mode 100644 index 41c5db9876d..00000000000 --- a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "encoded_cluster_state_bundle.h" - -namespace storage { - -namespace lib { class ClusterStateBundle; } - -/** - * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC. - * - * Implementations may choose to compress the encoded representation of the bundle. - * - * It is important that the input given to decode() is exactly equal to that given from - * encode() for the results to be correct. Implementations must ensure that this information - * is enough to losslessly reconstruct the full encoded ClusterStateBundle. - */ -class ClusterStateBundleCodec { -public: - virtual ~ClusterStateBundleCodec() = default; - - virtual EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const = 0; - virtual std::shared_ptr decode(const EncodedClusterStateBundle&) const = 0; -}; - -} diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 5c7d5812fdb..a29f3c8c3e3 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -12,7 +12,6 @@ #include "communicationmanagermetrics.h" #include "documentapiconverter.h" -#include "message_enqueuer.h" #include #include #include @@ -86,14 +85,12 @@ public: std::unique_ptr _request; }; -class CommunicationManager final - : public StorageLink, - public framework::Runnable, - private config::IFetcherCallback, - public mbus::IMessageHandler, - public mbus::IReplyHandler, - private framework::MetricUpdateHook, - public MessageEnqueuer +class CommunicationManager : public StorageLink, + public framework::Runnable, + private config::IFetcherCallback, + public mbus::IMessageHandler, + public mbus::IReplyHandler, + private framework::MetricUpdateHook { private: CommunicationManager(const CommunicationManager&); @@ -153,7 +150,7 @@ public: const config::ConfigUri & configUri); ~CommunicationManager(); - void enqueue(std::shared_ptr msg) override; + void enqueue(std::shared_ptr msg); mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; } const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); } diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h deleted file mode 100644 index 6f25a6b67a6..00000000000 --- a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include - -namespace storage { - -/** - * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle. - */ -struct EncodedClusterStateBundle { - vespalib::compression::CompressionConfig::Type _compression_type; - uint32_t _uncompressed_length; - std::unique_ptr _buffer; -}; - -} diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 32ea8982ffb..7a0711c9f7c 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -2,7 +2,6 @@ #include "fnetlistener.h" #include "communicationmanager.h" #include "rpcrequestwrapper.h" -#include "slime_cluster_state_bundle_codec.h" #include #include #include @@ -10,12 +9,13 @@ #include #include + LOG_SETUP(".rpc.listener"); namespace storage { -FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port) - : _messageEnqueuer(messageEnqueuer), +FNetListener::FNetListener(CommunicationManager& comManager, const config::ConfigUri & configUri, uint32_t port) + : _comManager(comManager), _orb(std::make_unique()), _closed(false), _slobrokRegister(*_orb, configUri) @@ -86,12 +86,6 @@ FNetListener::initRPC() rb.MethodDesc("Set systemstate on this node"); rb.ParamDesc("systemstate", "New systemstate to set"); //------------------------------------------------------------------------- - rb.DefineMethod("setdistributionstates", "bix", "", true, FRT_METHOD(FNetListener::RPC_setDistributionStates), this); - rb.MethodDesc("Set distribution states for cluster and bucket spaces"); - rb.ParamDesc("compressionType", "Compression type for payload"); - rb.ParamDesc("uncompressedSize", "Uncompressed size for payload"); - rb.ParamDesc("payload", "Binary Slime format payload"); - //------------------------------------------------------------------------- rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); @@ -119,13 +113,6 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req) return; } -void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr cmd, FRT_RPCRequest *req) { - // Create a request object to avoid needing a separate transport type - cmd->setTransportContext(std::make_unique(std::make_unique(req))); - req->Detach(); - _messageEnqueuer.enqueue(std::move(cmd)); -} - void FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) { @@ -147,7 +134,10 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) if (req->GetParams()->GetNumValues() > 2) { cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32); } - detach_and_forward_to_enqueuer(std::move(cmd), req); + // Create a request object to avoid needing a separate transport type + cmd->setTransportContext(std::make_unique(std::make_unique(req))); + req->Detach(); + _comManager.enqueue(std::move(cmd)); } void @@ -165,50 +155,10 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req) auto cmd(std::make_shared(lib::ClusterStateBundle(systemState))); cmd->setPriority(api::StorageMessage::VERYHIGH); - detach_and_forward_to_enqueuer(std::move(cmd), req); -} - -namespace { - -std::shared_ptr decode_bundle_from_params(const FRT_Values& params) { - const uint32_t uncompressed_length = params[1]._intval32; - if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) { - throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is " - "greater than max size (%u)", uncompressed_length, - FNetListener::StateBundleMaxUncompressedSize)); - } - SlimeClusterStateBundleCodec codec; - EncodedClusterStateBundle encoded_bundle; - encoded_bundle._compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8); - encoded_bundle._uncompressed_length = uncompressed_length; - // Caution: type cast to const ptr is essential or DataBuffer behavior changes! - encoded_bundle._buffer = std::make_unique( - static_cast(params[2]._data._buf), params[2]._data._len); - return codec.decode(encoded_bundle); -} - -} - -void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { - if (_closed) { - LOG(debug, "Not handling RPC call setDistributionStates() as we have closed"); - req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); - return; - } - std::shared_ptr state_bundle; - try { - state_bundle = decode_bundle_from_params(*req->GetParams()); - } catch (std::exception& e) { - LOG(error, "setDistributionStates RPC failed decoding: %s", e.what()); - req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what()); - return; - } - - // TODO add constructor taking in shared_ptr directly instead? - auto cmd = std::make_shared(*state_bundle); - cmd->setPriority(api::StorageMessage::VERYHIGH); - - detach_and_forward_to_enqueuer(std::move(cmd), req); + // Create a request object to avoid needing a separate transport type + cmd->setTransportContext(std::make_unique(std::make_unique(req))); + req->Detach(); + _comManager.enqueue(std::move(cmd)); } } diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index bf004a3eb2b..05d2d275838 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -2,43 +2,37 @@ #pragma once #include -#include namespace storage { -namespace api { class StorageMessage; } - -class MessageEnqueuer; +class CommunicationManager; class StorageServerInterface; class FNetListener : public FRT_Invokable { public: - static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16; - - FNetListener(MessageEnqueuer& messageEnqueuer, - const config::ConfigUri & configUri, - uint32_t port); - ~FNetListener() override; + FNetListener(CommunicationManager& comManager, + const config::ConfigUri & configUri, uint32_t port); + ~FNetListener(); void initRPC(); void RPC_getNodeState2(FRT_RPCRequest *req); void RPC_setSystemState2(FRT_RPCRequest *req); void RPC_getCurrentTime(FRT_RPCRequest *req); - void RPC_setDistributionStates(FRT_RPCRequest* req); void registerHandle(const vespalib::stringref & handle); void close(); int getListenPort() const; + // Used by unit tests. + bool serviceExists(const vespalib::stringref & connectionSpec); + private: - MessageEnqueuer& _messageEnqueuer; + CommunicationManager& _comManager; std::unique_ptr _orb; - std::atomic _closed; + bool _closed; slobrok::api::RegisterAPI _slobrokRegister; vespalib::string _handle; - - void detach_and_forward_to_enqueuer(std::shared_ptr cmd, FRT_RPCRequest *req); }; } diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h deleted file mode 100644 index 921328a054b..00000000000 --- a/storage/src/vespa/storage/storageserver/message_enqueuer.h +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include - -namespace storage { -namespace api { class StorageMessage; } - -// Interface for enqueuing a StorageMessage for further processing -class MessageEnqueuer { -public: - virtual ~MessageEnqueuer() = default; - virtual void enqueue(std::shared_ptr msg) = 0; -}; - -} diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h index 8412e911601..40a5470ebb9 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h @@ -12,13 +12,12 @@ namespace storage { **/ class RPCRequestWrapper { public: - enum ErrorCode { + enum { ERR_HANDLE_NOT_CONNECTED = 75000, // > 0xffff ERR_HANDLE_GONE = 75001, ERR_REQUEST_DELETED = 75002, ERR_HANDLE_DISABLED = 75003, - ERR_NODE_SHUTTING_DOWN = 75004, - ERR_BAD_REQUEST = 75005 + ERR_NODE_SHUTTING_DOWN = 75004 }; RPCRequestWrapper(FRT_RPCRequest *req); diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp deleted file mode 100644 index 5b7e0ab4621..00000000000 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "slime_cluster_state_bundle_codec.h" -#include -#include -#include -#include -#include - -using document::FixedBucketSpaces; -using vespalib::slime::Cursor; -using vespalib::slime::BinaryFormat; -using vespalib::DataBuffer; -using vespalib::ConstBufferRef; -using vespalib::compression::CompressionConfig; -using vespalib::compression::decompress; -using vespalib::compression::compress; -using vespalib::Memory; -using namespace vespalib::slime; - -namespace storage { - -// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp -namespace { -class OutputBuf : public vespalib::Output { -public: - explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { } - vespalib::DataBuffer & getBuf() { return _buf; } -private: - vespalib::WritableMemory reserve(size_t bytes) override { - _buf.ensureFree(bytes); - return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen()); - } - Output &commit(size_t bytes) override { - _buf.moveFreeToData(bytes); - return *this; - } - vespalib::DataBuffer _buf; -}; - -vespalib::string serialize_state(const lib::ClusterState& state) { - vespalib::asciistream as; - state.serialize(as, false); - return as.str(); -} - -} - -// Only used from unit tests; the cluster controller encodes all bundles -// we decode in practice. -EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode( - const lib::ClusterStateBundle& bundle) const -{ - vespalib::Slime slime; - Cursor& root = slime.setObject(); - Cursor& states = root.setObject("states"); - states.setString("baseline", serialize_state(*bundle.getBaselineClusterState())); - Cursor& spaces = states.setObject("spaces"); - for (const auto& sp : bundle.getDerivedClusterStates()) { - spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second)); - } - - OutputBuf out_buf(4096); - BinaryFormat::encode(slime, out_buf); - ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen()); - auto buf = std::make_unique(vespalib::roundUp2inN(out_buf.getBuf().getDataLen())); - auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false); - - EncodedClusterStateBundle encoded_bundle; - encoded_bundle._compression_type = actual_type; - assert(to_compress.size() <= INT32_MAX); - encoded_bundle._uncompressed_length = to_compress.size(); - encoded_bundle._buffer = std::move(buf); - return encoded_bundle; -} - -namespace { - -static const Memory StatesField("states"); -static const Memory BaselineField("baseline"); -static const Memory SpacesField("spaces"); - -struct StateInserter : vespalib::slime::ObjectTraverser { - lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states; - - explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states) - : _space_states(space_states) {} - - void field(const Memory& symbol, const Inspector& inspector) override { - _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()), - std::make_shared(inspector.asString().make_string())); - } -}; - -} - -std::shared_ptr SlimeClusterStateBundleCodec::decode( - const EncodedClusterStateBundle& encoded_bundle) const -{ - ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen()); - DataBuffer uncompressed; - decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length, - blob, uncompressed, false); - if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) { - throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is " - "not equal to actual uncompressed size (%zu)", - encoded_bundle._uncompressed_length, - uncompressed.getDataLen())); - } - - vespalib::Slime slime; - BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime); - Inspector& root = slime.get(); - Inspector& states = root[StatesField]; - lib::ClusterState baseline(states[BaselineField].asString().make_string()); - - Inspector& spaces = states[SpacesField]; - lib::ClusterStateBundle::BucketSpaceStateMapping space_states; - StateInserter inserter(space_states); - spaces.traverse(inserter); - // TODO add shared_ptr constructor for baseline? - return std::make_shared(baseline, std::move(space_states)); -} - -} diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h deleted file mode 100644 index 1fb95134059..00000000000 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "cluster_state_bundle_codec.h" -#include - -namespace storage { - -/** - * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding - * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is - * intentionally extensible so that we may add other information to it later. - * - * LZ4 compression is transparently applied during encoding and decompression is - * subsequently applied during decoding. - */ -class SlimeClusterStateBundleCodec : public ClusterStateBundleCodec { -public: - EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const override; - std::shared_ptr decode(const EncodedClusterStateBundle&) const override; -}; - -} diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp index cf9c0ba6dff..a2bbba5e52c 100644 --- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp +++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp @@ -1,9 +1,7 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include #include "cluster_state_bundle.h" #include "clusterstate.h" -#include namespace storage::lib { @@ -12,13 +10,6 @@ ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState) { } -ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState, - BucketSpaceStateMapping derivedBucketSpaceStates) - : _baselineClusterState(std::make_shared(baselineClusterState)), - _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates)) -{ -} - ClusterStateBundle::~ClusterStateBundle() = default; const std::shared_ptr & @@ -31,7 +22,6 @@ const std::shared_ptr & ClusterStateBundle::getDerivedClusterState(document::BucketSpace) const { // For now, just return the baseline cluster state. - // TODO use _derivedBucketSpaceStates return _baselineClusterState; } @@ -44,36 +34,7 @@ ClusterStateBundle::getVersion() const bool ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const { - if (!(*_baselineClusterState == *rhs._baselineClusterState)) { - return false; - } - if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) { - return false; - } - // Can't do a regular operator== comparison since we must check equality - // of cluster state _values_, not their _pointers_. - for (auto& lhs_ds : _derivedBucketSpaceStates) { - auto rhs_iter = rhs._derivedBucketSpaceStates.find(lhs_ds.first); - if ((rhs_iter == rhs._derivedBucketSpaceStates.end()) - || !(*lhs_ds.second == *rhs_iter->second)) { - return false; - } - } - return true; -} - -std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) { - os << "ClusterStateBundle('" << *bundle.getBaselineClusterState(); - if (!bundle.getDerivedClusterStates().empty()) { - // Output ordering is undefined for of per-space states. - for (auto& ds : bundle.getDerivedClusterStates()) { - os << "', "; - os << document::FixedBucketSpaces::to_string(ds.first); - os << " '" << *ds.second; - } - } - os << "')"; - return os; + return *_baselineClusterState == *rhs._baselineClusterState; } } diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h index a8ee0b54713..77d26092f4e 100644 --- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h +++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h @@ -3,8 +3,6 @@ #pragma once #include -#include -#include namespace storage::lib { @@ -16,28 +14,14 @@ class ClusterState; */ class ClusterStateBundle { -public: - using BucketSpaceStateMapping = std::unordered_map< - document::BucketSpace, - std::shared_ptr, - document::BucketSpace::hash - >; std::shared_ptr _baselineClusterState; - BucketSpaceStateMapping _derivedBucketSpaceStates; public: explicit ClusterStateBundle(const ClusterState &baselineClusterState); - ClusterStateBundle(const ClusterState& baselineClusterState, - BucketSpaceStateMapping derivedBucketSpaceStates); ~ClusterStateBundle(); const std::shared_ptr &getBaselineClusterState() const; const std::shared_ptr &getDerivedClusterState(document::BucketSpace bucketSpace) const; - const BucketSpaceStateMapping& getDerivedClusterStates() const noexcept { - return _derivedBucketSpaceStates; - } uint32_t getVersion() const; bool operator==(const ClusterStateBundle &rhs) const; }; -std::ostream& operator<<(std::ostream&, const ClusterStateBundle&); - } diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.cpp b/vdslib/src/vespa/vdslib/state/clusterstate.cpp index fbc9943e22d..9a69ed98c79 100644 --- a/vdslib/src/vespa/vdslib/state/clusterstate.cpp +++ b/vdslib/src/vespa/vdslib/state/clusterstate.cpp @@ -55,7 +55,7 @@ struct NodeData { } }; -ClusterState::ClusterState(const vespalib::string& serialized) +ClusterState::ClusterState(const vespalib::stringref & serialized) : Printable(), _version(0), _clusterState(&State::UP), diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.h b/vdslib/src/vespa/vdslib/state/clusterstate.h index 9e8dd0f292a..b60f2f6f1cc 100644 --- a/vdslib/src/vespa/vdslib/state/clusterstate.h +++ b/vdslib/src/vespa/vdslib/state/clusterstate.h @@ -36,9 +36,7 @@ public: ClusterState(); ClusterState(const ClusterState&); - // FIXME make ClusterState parsing not require null termination of string, - // then move to vespalib::stringref - explicit ClusterState(const vespalib::string& serialized); + explicit ClusterState(const vespalib::stringref & serialized); ~ClusterState(); std::string getTextualDifference(const ClusterState& other) const; -- cgit v1.2.3