summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2018-03-06 13:16:07 +0100
committerGitHub <noreply@github.com>2018-03-06 13:16:07 +0100
commitf29fd212b8a04e162b56e1a00925388af570efde (patch)
treedf4168b61a0d78e1e0cd8b0d64ddefb2f22ffa43
parent15b30e6df3fbc615178b28f4531f065c8fa10694 (diff)
parentd51ddcbcebc9eb31e91e569fad8ae4907d63a939 (diff)
Merge pull request #5223 from vespa-engine/revert-5214-vekterli/add-cluster-state-rpc-v3-support-on-content-nodes
Revert "Add cluster state RPC v3 support on content nodes"
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp194
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h28
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h17
-rw-r--r--storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h19
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp72
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h24
-rw-r--r--storage/src/vespa/storage/storageserver/message_enqueuer.h17
-rw-r--r--storage/src/vespa/storage/storageserver/rpcrequestwrapper.h5
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp125
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h24
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp41
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.h16
-rw-r--r--vdslib/src/vespa/vdslib/state/clusterstate.cpp2
-rw-r--r--vdslib/src/vespa/vdslib/state/clusterstate.h4
16 files changed, 32 insertions, 558 deletions
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index c28a6102b71..95faf7e433e 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -7,7 +7,6 @@ vespa_add_library(storage_teststorageserver TEST
communicationmanagertest.cpp
configurable_bucket_resolver_test.cpp
documentapiconvertertest.cpp
- fnet_listener_test.cpp
mergethrottlertest.cpp
priorityconvertertest.cpp
service_layer_error_listener_test.cpp
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
deleted file mode 100644
index cc9c424ac28..00000000000
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ /dev/null
@@ -1,194 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/storageserver/fnetlistener.h>
-#include <vespa/storage/storageserver/message_enqueuer.h>
-#include <vespa/storage/storageserver/rpcrequestwrapper.h>
-#include <vespa/storage/storageserver/slime_cluster_state_bundle_codec.h>
-#include <vespa/storageapi/message/state.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vdstestlib/cppunit/macros.h>
-#include <vespa/vdstestlib/cppunit/dirconfig.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <tests/common/testhelper.h>
-#include <vector>
-
-namespace storage {
-
-using document::FixedBucketSpaces;
-
-class FNetListenerTest : public CppUnit::TestFixture {
-public:
- CPPUNIT_TEST_SUITE(FNetListenerTest);
- CPPUNIT_TEST(baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle);
- CPPUNIT_TEST(set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle);
- CPPUNIT_TEST(compressed_bundle_is_transparently_uncompressed);
- CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
- CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
- CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
- CPPUNIT_TEST_SUITE_END();
-
- void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
- void set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle();
- void compressed_bundle_is_transparently_uncompressed();
- void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
- void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
- void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
-};
-
-CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
-
-namespace {
-
-struct MockOperationEnqueuer : MessageEnqueuer {
- std::vector<std::shared_ptr<api::StorageMessage>> _enqueued;
-
- void enqueue(std::shared_ptr<api::StorageMessage> msg) override {
- _enqueued.emplace_back(std::move(msg));
- }
-};
-
-struct DummyReturnHandler : FRT_IReturnHandler {
- void HandleReturn() override {}
- FNET_Connection* GetConnection() override { return nullptr; }
-};
-
-struct Fixture {
- // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
- mbus::Slobrok slobrok;
- vdstestlib::DirConfig config;
- MockOperationEnqueuer enqueuer;
- std::unique_ptr<FNetListener> fnet_listener;
- SlimeClusterStateBundleCodec codec;
- DummyReturnHandler return_handler;
- bool request_is_detached{false};
- FRT_RPCRequest* bound_request{nullptr};
-
- Fixture() : config(getStandardConfig(true)) {
- config.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(config, slobrok);
- fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
- }
-
- ~Fixture() {
- // Must destroy any associated message contexts that may have refs to FRT_Request
- // instance _before_ we destroy the request itself.
- enqueuer._enqueued.clear();
- if (bound_request) {
- bound_request->SubRef();
- }
- }
-
- void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
- bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
- auto* params = bound_request->GetParams();
- params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
- params->AddInt32(uncompressed_length);
- const auto buf_len = encoded_bundle._buffer->getDataLen();
- params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
-
- bound_request->SetDetachedPT(&request_is_detached);
- bound_request->SetReturnHandler(&return_handler);
- }
-
- void create_request(const lib::ClusterStateBundle& bundle) {
- // Only 1 request allowed per fixture due to lifetime handling snags
- assert(bound_request == nullptr);
- auto encoded_bundle = codec.encode(bundle);
- bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length);
- }
-
- void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) {
- CPPUNIT_ASSERT(bound_request != nullptr);
- CPPUNIT_ASSERT(request_is_detached);
- CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
- auto& state_request = dynamic_cast<const api::SetSystemStateCommand&>(*enqueuer._enqueued[0]);
- CPPUNIT_ASSERT_EQUAL(expectedBundle, state_request.getClusterStateBundle());
- }
-
- void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) {
- create_request(bundle);
- fnet_listener->RPC_setDistributionStates(bound_request);
- assert_enqueued_operation_has_bundle(bundle);
- }
-
- void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) {
- fnet_listener->RPC_setDistributionStates(bound_request);
- CPPUNIT_ASSERT(!request_is_detached);
- CPPUNIT_ASSERT(bound_request->IsError());
- CPPUNIT_ASSERT_EQUAL(static_cast<uint32_t>(error_code), bound_request->GetErrorCode());
- }
-
- lib::ClusterStateBundle dummy_baseline_bundle() const {
- return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
- }
-};
-
-std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
- return std::make_shared<const lib::ClusterState>(state);
-}
-
-vespalib::string make_compressable_state_string() {
- vespalib::asciistream ss;
- for (int i = 0; i < 99; ++i) {
- ss << " ." << i << ".s:d";
- }
- return vespalib::make_string("version:123 distributor:100%s storage:100%s",
- ss.str().c_str(), ss.str().c_str());
-}
-
-}
-
-void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
- Fixture f;
- auto baseline = f.dummy_baseline_bundle();
-
- f.assert_request_received_and_propagated(baseline);
-}
-
-void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
- Fixture f;
- lib::ClusterStateBundle spaces_bundle(
- lib::ClusterState("version:123 distributor:3 storage:3"),
- {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
- {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}});
-
- f.assert_request_received_and_propagated(spaces_bundle);
-}
-
-void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
- Fixture f;
- auto state_str = make_compressable_state_string();
- lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
-
- f.create_request(compressable_bundle);
- // First verify that the bundle is sent in compressed form
- CPPUNIT_ASSERT(f.bound_request->GetParams()->GetValue(2)._data._len < state_str.size());
- // Ensure we uncompress it to the original form
- f.fnet_listener->RPC_setDistributionStates(f.bound_request);
- f.assert_enqueued_operation_has_bundle(compressable_bundle);
-}
-
-void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
- Fixture f;
- f.create_request(f.dummy_baseline_bundle());
- f.fnet_listener->close();
- f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
-}
-
-void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
- auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
- f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
- f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
-}
-
-void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
- auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
- f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
- f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
-}
-
-}
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 8c9add78fd4..0f90a5a8afb 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -20,7 +20,6 @@ vespa_add_library(storage_storageserver
service_layer_error_listener.cpp
servicelayernode.cpp
servicelayernodecontext.cpp
- slime_cluster_state_bundle_codec.cpp
statemanager.cpp
statereporter.cpp
storagemetricsset.cpp
diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
deleted file mode 100644
index 41c5db9876d..00000000000
--- a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "encoded_cluster_state_bundle.h"
-
-namespace storage {
-
-namespace lib { class ClusterStateBundle; }
-
-/**
- * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
- *
- * Implementations may choose to compress the encoded representation of the bundle.
- *
- * It is important that the input given to decode() is exactly equal to that given from
- * encode() for the results to be correct. Implementations must ensure that this information
- * is enough to losslessly reconstruct the full encoded ClusterStateBundle.
- */
-class ClusterStateBundleCodec {
-public:
- virtual ~ClusterStateBundleCodec() = default;
-
- virtual EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const = 0;
- virtual std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const = 0;
-};
-
-}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 5c7d5812fdb..a29f3c8c3e3 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -12,7 +12,6 @@
#include "communicationmanagermetrics.h"
#include "documentapiconverter.h"
-#include "message_enqueuer.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
@@ -86,14 +85,12 @@ public:
std::unique_ptr<RPCRequestWrapper> _request;
};
-class CommunicationManager final
- : public StorageLink,
- public framework::Runnable,
- private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
- public mbus::IMessageHandler,
- public mbus::IReplyHandler,
- private framework::MetricUpdateHook,
- public MessageEnqueuer
+class CommunicationManager : public StorageLink,
+ public framework::Runnable,
+ private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
+ public mbus::IMessageHandler,
+ public mbus::IReplyHandler,
+ private framework::MetricUpdateHook
{
private:
CommunicationManager(const CommunicationManager&);
@@ -153,7 +150,7 @@ public:
const config::ConfigUri & configUri);
~CommunicationManager();
- void enqueue(std::shared_ptr<api::StorageMessage> msg) override;
+ void enqueue(std::shared_ptr<api::StorageMessage> msg);
mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; }
const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }
diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
deleted file mode 100644
index 6f25a6b67a6..00000000000
--- a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/data/databuffer.h>
-#include <vespa/vespalib/util/compressor.h>
-
-namespace storage {
-
-/**
- * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
- */
-struct EncodedClusterStateBundle {
- vespalib::compression::CompressionConfig::Type _compression_type;
- uint32_t _uncompressed_length;
- std::unique_ptr<vespalib::DataBuffer> _buffer;
-};
-
-}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 32ea8982ffb..7a0711c9f7c 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -2,7 +2,6 @@
#include "fnetlistener.h"
#include "communicationmanager.h"
#include "rpcrequestwrapper.h"
-#include "slime_cluster_state_bundle_codec.h"
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
@@ -10,12 +9,13 @@
#include <sstream>
#include <vespa/log/log.h>
+
LOG_SETUP(".rpc.listener");
namespace storage {
-FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
- : _messageEnqueuer(messageEnqueuer),
+FNetListener::FNetListener(CommunicationManager& comManager, const config::ConfigUri & configUri, uint32_t port)
+ : _comManager(comManager),
_orb(std::make_unique<FRT_Supervisor>()),
_closed(false),
_slobrokRegister(*_orb, configUri)
@@ -86,12 +86,6 @@ FNetListener::initRPC()
rb.MethodDesc("Set systemstate on this node");
rb.ParamDesc("systemstate", "New systemstate to set");
//-------------------------------------------------------------------------
- rb.DefineMethod("setdistributionstates", "bix", "", true, FRT_METHOD(FNetListener::RPC_setDistributionStates), this);
- rb.MethodDesc("Set distribution states for cluster and bucket spaces");
- rb.ParamDesc("compressionType", "Compression type for payload");
- rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
- rb.ParamDesc("payload", "Binary Slime format payload");
- //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", true, FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -119,13 +113,6 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req)
return;
}
-void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req) {
- // Create a request object to avoid needing a separate transport type
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
- req->Detach();
- _messageEnqueuer.enqueue(std::move(cmd));
-}
-
void
FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
{
@@ -147,7 +134,10 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
if (req->GetParams()->GetNumValues() > 2) {
cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32);
}
- detach_and_forward_to_enqueuer(std::move(cmd), req);
+ // Create a request object to avoid needing a separate transport type
+ cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
+ req->Detach();
+ _comManager.enqueue(std::move(cmd));
}
void
@@ -165,50 +155,10 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req)
auto cmd(std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)));
cmd->setPriority(api::StorageMessage::VERYHIGH);
- detach_and_forward_to_enqueuer(std::move(cmd), req);
-}
-
-namespace {
-
-std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const FRT_Values& params) {
- const uint32_t uncompressed_length = params[1]._intval32;
- if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) {
- throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is "
- "greater than max size (%u)", uncompressed_length,
- FNetListener::StateBundleMaxUncompressedSize));
- }
- SlimeClusterStateBundleCodec codec;
- EncodedClusterStateBundle encoded_bundle;
- encoded_bundle._compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8);
- encoded_bundle._uncompressed_length = uncompressed_length;
- // Caution: type cast to const ptr is essential or DataBuffer behavior changes!
- encoded_bundle._buffer = std::make_unique<vespalib::DataBuffer>(
- static_cast<const char*>(params[2]._data._buf), params[2]._data._len);
- return codec.decode(encoded_bundle);
-}
-
-}
-
-void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
- if (_closed) {
- LOG(debug, "Not handling RPC call setDistributionStates() as we have closed");
- req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
- return;
- }
- std::shared_ptr<const lib::ClusterStateBundle> state_bundle;
- try {
- state_bundle = decode_bundle_from_params(*req->GetParams());
- } catch (std::exception& e) {
- LOG(error, "setDistributionStates RPC failed decoding: %s", e.what());
- req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
- return;
- }
-
- // TODO add constructor taking in shared_ptr directly instead?
- auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
- cmd->setPriority(api::StorageMessage::VERYHIGH);
-
- detach_and_forward_to_enqueuer(std::move(cmd), req);
+ // Create a request object to avoid needing a separate transport type
+ cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
+ req->Detach();
+ _comManager.enqueue(std::move(cmd));
}
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index bf004a3eb2b..05d2d275838 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -2,43 +2,37 @@
#pragma once
#include <vespa/slobrok/sbregister.h>
-#include <atomic>
namespace storage {
-namespace api { class StorageMessage; }
-
-class MessageEnqueuer;
+class CommunicationManager;
class StorageServerInterface;
class FNetListener : public FRT_Invokable
{
public:
- static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16;
-
- FNetListener(MessageEnqueuer& messageEnqueuer,
- const config::ConfigUri & configUri,
- uint32_t port);
- ~FNetListener() override;
+ FNetListener(CommunicationManager& comManager,
+ const config::ConfigUri & configUri, uint32_t port);
+ ~FNetListener();
void initRPC();
void RPC_getNodeState2(FRT_RPCRequest *req);
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
- void RPC_setDistributionStates(FRT_RPCRequest* req);
void registerHandle(const vespalib::stringref & handle);
void close();
int getListenPort() const;
+ // Used by unit tests.
+ bool serviceExists(const vespalib::stringref & connectionSpec);
+
private:
- MessageEnqueuer& _messageEnqueuer;
+ CommunicationManager& _comManager;
std::unique_ptr<FRT_Supervisor> _orb;
- std::atomic<bool> _closed;
+ bool _closed;
slobrok::api::RegisterAPI _slobrokRegister;
vespalib::string _handle;
-
- void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
};
}
diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h
deleted file mode 100644
index 921328a054b..00000000000
--- a/storage/src/vespa/storage/storageserver/message_enqueuer.h
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <memory>
-
-namespace storage {
-namespace api { class StorageMessage; }
-
-// Interface for enqueuing a StorageMessage for further processing
-class MessageEnqueuer {
-public:
- virtual ~MessageEnqueuer() = default;
- virtual void enqueue(std::shared_ptr<api::StorageMessage> msg) = 0;
-};
-
-}
diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
index 8412e911601..40a5470ebb9 100644
--- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
+++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
@@ -12,13 +12,12 @@ namespace storage {
**/
class RPCRequestWrapper {
public:
- enum ErrorCode {
+ enum {
ERR_HANDLE_NOT_CONNECTED = 75000, // > 0xffff
ERR_HANDLE_GONE = 75001,
ERR_REQUEST_DELETED = 75002,
ERR_HANDLE_DISABLED = 75003,
- ERR_NODE_SHUTTING_DOWN = 75004,
- ERR_BAD_REQUEST = 75005
+ ERR_NODE_SHUTTING_DOWN = 75004
};
RPCRequestWrapper(FRT_RPCRequest *req);
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
deleted file mode 100644
index 5b7e0ab4621..00000000000
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "slime_cluster_state_bundle_codec.h"
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <vespa/vespalib/data/slime/slime.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-
-using document::FixedBucketSpaces;
-using vespalib::slime::Cursor;
-using vespalib::slime::BinaryFormat;
-using vespalib::DataBuffer;
-using vespalib::ConstBufferRef;
-using vespalib::compression::CompressionConfig;
-using vespalib::compression::decompress;
-using vespalib::compression::compress;
-using vespalib::Memory;
-using namespace vespalib::slime;
-
-namespace storage {
-
-// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp
-namespace {
-class OutputBuf : public vespalib::Output {
-public:
- explicit OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
- vespalib::DataBuffer & getBuf() { return _buf; }
-private:
- vespalib::WritableMemory reserve(size_t bytes) override {
- _buf.ensureFree(bytes);
- return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
- }
- Output &commit(size_t bytes) override {
- _buf.moveFreeToData(bytes);
- return *this;
- }
- vespalib::DataBuffer _buf;
-};
-
-vespalib::string serialize_state(const lib::ClusterState& state) {
- vespalib::asciistream as;
- state.serialize(as, false);
- return as.str();
-}
-
-}
-
-// Only used from unit tests; the cluster controller encodes all bundles
-// we decode in practice.
-EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
- const lib::ClusterStateBundle& bundle) const
-{
- vespalib::Slime slime;
- Cursor& root = slime.setObject();
- Cursor& states = root.setObject("states");
- states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
- Cursor& spaces = states.setObject("spaces");
- for (const auto& sp : bundle.getDerivedClusterStates()) {
- spaces.setString(FixedBucketSpaces::to_string(sp.first), serialize_state(*sp.second));
- }
-
- OutputBuf out_buf(4096);
- BinaryFormat::encode(slime, out_buf);
- ConstBufferRef to_compress(out_buf.getBuf().getData(), out_buf.getBuf().getDataLen());
- auto buf = std::make_unique<DataBuffer>(vespalib::roundUp2inN(out_buf.getBuf().getDataLen()));
- auto actual_type = compress(CompressionConfig::LZ4, to_compress, *buf, false);
-
- EncodedClusterStateBundle encoded_bundle;
- encoded_bundle._compression_type = actual_type;
- assert(to_compress.size() <= INT32_MAX);
- encoded_bundle._uncompressed_length = to_compress.size();
- encoded_bundle._buffer = std::move(buf);
- return encoded_bundle;
-}
-
-namespace {
-
-static const Memory StatesField("states");
-static const Memory BaselineField("baseline");
-static const Memory SpacesField("spaces");
-
-struct StateInserter : vespalib::slime::ObjectTraverser {
- lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
-
- explicit StateInserter(lib::ClusterStateBundle::BucketSpaceStateMapping& space_states)
- : _space_states(space_states) {}
-
- void field(const Memory& symbol, const Inspector& inspector) override {
- _space_states.emplace(FixedBucketSpaces::from_string(symbol.make_stringref()),
- std::make_shared<const lib::ClusterState>(inspector.asString().make_string()));
- }
-};
-
-}
-
-std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::decode(
- const EncodedClusterStateBundle& encoded_bundle) const
-{
- ConstBufferRef blob(encoded_bundle._buffer->getData(), encoded_bundle._buffer->getDataLen());
- DataBuffer uncompressed;
- decompress(encoded_bundle._compression_type, encoded_bundle._uncompressed_length,
- blob, uncompressed, false);
- if (encoded_bundle._uncompressed_length != uncompressed.getDataLen()) {
- throw std::range_error(vespalib::make_string("ClusterStateBundle indicated uncompressed size (%u) is "
- "not equal to actual uncompressed size (%zu)",
- encoded_bundle._uncompressed_length,
- uncompressed.getDataLen()));
- }
-
- vespalib::Slime slime;
- BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
- Inspector& root = slime.get();
- Inspector& states = root[StatesField];
- lib::ClusterState baseline(states[BaselineField].asString().make_string());
-
- Inspector& spaces = states[SpacesField];
- lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
- StateInserter inserter(space_states);
- spaces.traverse(inserter);
- // TODO add shared_ptr constructor for baseline?
- return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
-}
-
-}
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
deleted file mode 100644
index 1fb95134059..00000000000
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "cluster_state_bundle_codec.h"
-#include <memory>
-
-namespace storage {
-
-/**
- * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
- * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is
- * intentionally extensible so that we may add other information to it later.
- *
- * LZ4 compression is transparently applied during encoding and decompression is
- * subsequently applied during decoding.
- */
-class SlimeClusterStateBundleCodec : public ClusterStateBundleCodec {
-public:
- EncodedClusterStateBundle encode(const lib::ClusterStateBundle&) const override;
- std::shared_ptr<const lib::ClusterStateBundle> decode(const EncodedClusterStateBundle&) const override;
-};
-
-}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
index cf9c0ba6dff..a2bbba5e52c 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
@@ -1,9 +1,7 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include "cluster_state_bundle.h"
#include "clusterstate.h"
-#include <iostream>
namespace storage::lib {
@@ -12,13 +10,6 @@ ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState)
{
}
-ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState,
- BucketSpaceStateMapping derivedBucketSpaceStates)
- : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
- _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates))
-{
-}
-
ClusterStateBundle::~ClusterStateBundle() = default;
const std::shared_ptr<const lib::ClusterState> &
@@ -31,7 +22,6 @@ const std::shared_ptr<const lib::ClusterState> &
ClusterStateBundle::getDerivedClusterState(document::BucketSpace) const
{
// For now, just return the baseline cluster state.
- // TODO use _derivedBucketSpaceStates
return _baselineClusterState;
}
@@ -44,36 +34,7 @@ ClusterStateBundle::getVersion() const
bool
ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const
{
- if (!(*_baselineClusterState == *rhs._baselineClusterState)) {
- return false;
- }
- if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) {
- return false;
- }
- // Can't do a regular operator== comparison since we must check equality
- // of cluster state _values_, not their _pointers_.
- for (auto& lhs_ds : _derivedBucketSpaceStates) {
- auto rhs_iter = rhs._derivedBucketSpaceStates.find(lhs_ds.first);
- if ((rhs_iter == rhs._derivedBucketSpaceStates.end())
- || !(*lhs_ds.second == *rhs_iter->second)) {
- return false;
- }
- }
- return true;
-}
-
-std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) {
- os << "ClusterStateBundle('" << *bundle.getBaselineClusterState();
- if (!bundle.getDerivedClusterStates().empty()) {
- // Output ordering is undefined for of per-space states.
- for (auto& ds : bundle.getDerivedClusterStates()) {
- os << "', ";
- os << document::FixedBucketSpaces::to_string(ds.first);
- os << " '" << *ds.second;
- }
- }
- os << "')";
- return os;
+ return *_baselineClusterState == *rhs._baselineClusterState;
}
}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
index a8ee0b54713..77d26092f4e 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
@@ -3,8 +3,6 @@
#pragma once
#include <vespa/document/bucket/bucketspace.h>
-#include <unordered_map>
-#include <iosfwd>
namespace storage::lib {
@@ -16,28 +14,14 @@ class ClusterState;
*/
class ClusterStateBundle
{
-public:
- using BucketSpaceStateMapping = std::unordered_map<
- document::BucketSpace,
- std::shared_ptr<const ClusterState>,
- document::BucketSpace::hash
- >;
std::shared_ptr<const ClusterState> _baselineClusterState;
- BucketSpaceStateMapping _derivedBucketSpaceStates;
public:
explicit ClusterStateBundle(const ClusterState &baselineClusterState);
- ClusterStateBundle(const ClusterState& baselineClusterState,
- BucketSpaceStateMapping derivedBucketSpaceStates);
~ClusterStateBundle();
const std::shared_ptr<const ClusterState> &getBaselineClusterState() const;
const std::shared_ptr<const ClusterState> &getDerivedClusterState(document::BucketSpace bucketSpace) const;
- const BucketSpaceStateMapping& getDerivedClusterStates() const noexcept {
- return _derivedBucketSpaceStates;
- }
uint32_t getVersion() const;
bool operator==(const ClusterStateBundle &rhs) const;
};
-std::ostream& operator<<(std::ostream&, const ClusterStateBundle&);
-
}
diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.cpp b/vdslib/src/vespa/vdslib/state/clusterstate.cpp
index fbc9943e22d..9a69ed98c79 100644
--- a/vdslib/src/vespa/vdslib/state/clusterstate.cpp
+++ b/vdslib/src/vespa/vdslib/state/clusterstate.cpp
@@ -55,7 +55,7 @@ struct NodeData {
}
};
-ClusterState::ClusterState(const vespalib::string& serialized)
+ClusterState::ClusterState(const vespalib::stringref & serialized)
: Printable(),
_version(0),
_clusterState(&State::UP),
diff --git a/vdslib/src/vespa/vdslib/state/clusterstate.h b/vdslib/src/vespa/vdslib/state/clusterstate.h
index 9e8dd0f292a..b60f2f6f1cc 100644
--- a/vdslib/src/vespa/vdslib/state/clusterstate.h
+++ b/vdslib/src/vespa/vdslib/state/clusterstate.h
@@ -36,9 +36,7 @@ public:
ClusterState();
ClusterState(const ClusterState&);
- // FIXME make ClusterState parsing not require null termination of string,
- // then move to vespalib::stringref
- explicit ClusterState(const vespalib::string& serialized);
+ explicit ClusterState(const vespalib::stringref & serialized);
~ClusterState();
std::string getTextualDifference(const ClusterState& other) const;