From 32f8d5bb20479e0895f97cebe9f575d7b09de8c5 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Tue, 3 Oct 2017 11:37:51 +0000 Subject: Remove unused support for SPI over RPC. --- persistence/src/tests/proxy/.gitignore | 10 - persistence/src/tests/proxy/CMakeLists.txt | 31 - .../src/tests/proxy/dummy_provider_factory.h | 35 - .../external_providerproxy_conformancetest.cpp | 42 - persistence/src/tests/proxy/mockprovider.h | 163 ---- .../tests/proxy/providerproxy_conformancetest.cpp | 61 -- persistence/src/tests/proxy/providerproxy_test.cpp | 404 --------- persistence/src/tests/proxy/providerstub_test.cpp | 543 ------------ .../src/tests/proxy/proxy_factory_wrapper.h | 56 -- persistence/src/tests/proxy/proxy_test.sh | 6 - persistence/src/tests/proxy/proxyfactory.h | 37 - persistence/src/vespa/persistence/CMakeLists.txt | 1 - persistence/src/vespa/persistence/proxy/.gitignore | 2 - .../src/vespa/persistence/proxy/CMakeLists.txt | 8 - .../src/vespa/persistence/proxy/buildid.cpp | 8 - persistence/src/vespa/persistence/proxy/buildid.h | 12 - .../src/vespa/persistence/proxy/providerproxy.cpp | 493 ----------- .../src/vespa/persistence/proxy/providerproxy.h | 76 -- .../src/vespa/persistence/proxy/providerstub.cpp | 928 --------------------- .../src/vespa/persistence/proxy/providerstub.h | 97 --- 20 files changed, 3013 deletions(-) delete mode 100644 persistence/src/tests/proxy/.gitignore delete mode 100644 persistence/src/tests/proxy/CMakeLists.txt delete mode 100644 persistence/src/tests/proxy/dummy_provider_factory.h delete mode 100644 persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp delete mode 100644 persistence/src/tests/proxy/mockprovider.h delete mode 100644 persistence/src/tests/proxy/providerproxy_conformancetest.cpp delete mode 100644 persistence/src/tests/proxy/providerproxy_test.cpp delete mode 100644 persistence/src/tests/proxy/providerstub_test.cpp delete mode 100644 persistence/src/tests/proxy/proxy_factory_wrapper.h delete mode 100755 persistence/src/tests/proxy/proxy_test.sh delete mode 100644 persistence/src/tests/proxy/proxyfactory.h delete mode 100644 persistence/src/vespa/persistence/proxy/.gitignore delete mode 100644 persistence/src/vespa/persistence/proxy/CMakeLists.txt delete mode 100644 persistence/src/vespa/persistence/proxy/buildid.cpp delete mode 100644 persistence/src/vespa/persistence/proxy/buildid.h delete mode 100644 persistence/src/vespa/persistence/proxy/providerproxy.cpp delete mode 100644 persistence/src/vespa/persistence/proxy/providerproxy.h delete mode 100644 persistence/src/vespa/persistence/proxy/providerstub.cpp delete mode 100644 persistence/src/vespa/persistence/proxy/providerstub.h (limited to 'persistence/src') diff --git a/persistence/src/tests/proxy/.gitignore b/persistence/src/tests/proxy/.gitignore deleted file mode 100644 index 03bce028dd9..00000000000 --- a/persistence/src/tests/proxy/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -/.depend -/Makefile -/providerstub_test -/providerproxy_test -/providerproxy_conformancetest -/vespa-external-providerproxy-conformancetest -persistence_providerproxy_conformance_test_app -persistence_providerproxy_test_app -persistence_providerstub_test_app -persistence_external_providerproxy_conformancetest_app diff --git a/persistence/src/tests/proxy/CMakeLists.txt b/persistence/src/tests/proxy/CMakeLists.txt deleted file mode 100644 index 598a3a6a69d..00000000000 --- a/persistence/src/tests/proxy/CMakeLists.txt +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(persistence_providerstub_test_app - SOURCES - providerstub_test.cpp - DEPENDS - persistence -) -vespa_add_executable(persistence_providerproxy_test_app - SOURCES - providerproxy_test.cpp - DEPENDS - persistence -) -vespa_add_executable(persistence_providerproxy_conformance_test_app TEST - SOURCES - providerproxy_conformancetest.cpp - DEPENDS - persistence - persistence_persistence_conformancetest -) -vespa_add_executable(persistence_external_providerproxy_conformancetest_app - SOURCES - external_providerproxy_conformancetest.cpp - OUTPUT_NAME vespa-external-providerproxy-conformancetest - INSTALL bin - DEPENDS - persistence - persistence_persistence_conformancetest -) -vespa_add_test(NAME persistence_providerproxy_conformance_test_app COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/proxy_test.sh - DEPENDS persistence_providerstub_test_app persistence_providerproxy_test_app persistence_providerproxy_conformance_test_app) diff --git a/persistence/src/tests/proxy/dummy_provider_factory.h b/persistence/src/tests/proxy/dummy_provider_factory.h deleted file mode 100644 index 808bce29fac..00000000000 --- a/persistence/src/tests/proxy/dummy_provider_factory.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include -#include - -namespace storage { -namespace spi { - -/** - * A simple rpc server persistence provider factory that will only - * work once, by returning a precreated persistence provider instance. - **/ -struct DummyProviderFactory : ProviderStub::PersistenceProviderFactory -{ - typedef std::unique_ptr UP; - typedef storage::spi::PersistenceProvider Provider; - - mutable std::unique_ptr provider; - - DummyProviderFactory(std::unique_ptr p) : provider(std::move(p)) {} - - std::unique_ptr create() const override { - ASSERT_TRUE(provider.get() != 0); - std::unique_ptr ret = std::move(provider); - ASSERT_TRUE(provider.get() == 0); - return ret; - } -}; - -} // namespace spi -} // namespace storage - diff --git a/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp b/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp deleted file mode 100644 index adf7a84dbd4..00000000000 --- a/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "proxyfactory.h" -#include -#include -#include -#include -#include - -using namespace storage::spi; -typedef document::DocumentTypeRepo Repo; -typedef ConformanceTest::PersistenceFactory Factory; - -namespace { - -struct ConformanceFixture : public ConformanceTest { - ConformanceFixture(Factory::UP f) : ConformanceTest(std::move(f)) { setUp(); } - ~ConformanceFixture() { tearDown(); } -}; - -Factory::UP getFactory() { - return Factory::UP(new ProxyFactory()); -} - -#define CONVERT_TEST(testFunction, makeFactory) \ -namespace ns_ ## testFunction { \ -TEST_F(TEST_STR(testFunction) " " TEST_STR(makeFactory), ConformanceFixture(makeFactory)) { \ - f.testFunction(); \ -} \ -} // namespace testFunction - -#undef CPPUNIT_TEST -#define CPPUNIT_TEST(testFunction) CONVERT_TEST(testFunction, MAKE_FACTORY) - -#define MAKE_FACTORY getFactory() -DEFINE_CONFORMANCE_TESTS(); - -} // namespace - -TEST_MAIN() { - TEST_RUN_ALL(); -} diff --git a/persistence/src/tests/proxy/mockprovider.h b/persistence/src/tests/proxy/mockprovider.h deleted file mode 100644 index fda0b4aa922..00000000000 --- a/persistence/src/tests/proxy/mockprovider.h +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include -#include - -namespace storage { -namespace spi { - -struct MockProvider : PersistenceProvider { - enum Function { NONE, INITIALIZE, GET_PARTITION_STATES, LIST_BUCKETS, - SET_CLUSTER_STATE, - SET_ACTIVE_STATE, GET_BUCKET_INFO, PUT, REMOVE_BY_ID, - REMOVE_IF_FOUND, REPLACE_WITH_REMOVE, UPDATE, FLUSH, GET, - CREATE_ITERATOR, ITERATE, DESTROY_ITERATOR, CREATE_BUCKET, - DELETE_BUCKET, GET_MODIFIED_BUCKETS, SPLIT, JOIN, MOVE, MAINTAIN, - REMOVE_ENTRY }; - - mutable Function last_called; - - MockProvider() : last_called(NONE) {} - - Result initialize() override { - last_called = INITIALIZE; - return Result(); - } - - PartitionStateListResult getPartitionStates() const override { - last_called = GET_PARTITION_STATES; - return PartitionStateListResult(PartitionStateList(1u)); - } - - BucketIdListResult listBuckets(PartitionId id) const override { - last_called = LIST_BUCKETS; - BucketIdListResult::List result; - result.push_back(document::BucketId(id)); - return BucketIdListResult(result); - } - - Result setClusterState(const ClusterState &) override { - last_called = SET_CLUSTER_STATE; - return Result(); - } - - Result setActiveState(const Bucket &, BucketInfo::ActiveState) override { - last_called = SET_ACTIVE_STATE; - return Result(); - } - - BucketInfoResult getBucketInfo(const Bucket &bucket) const override { - last_called = GET_BUCKET_INFO; - return BucketInfoResult(BucketInfo(BucketChecksum(1), 2, 3, - bucket.getBucketId().getRawId(), - bucket.getPartition(), - BucketInfo::READY, - BucketInfo::ACTIVE)); - } - - Result put(const Bucket &, Timestamp, const DocumentSP&, Context&) override { - last_called = PUT; - return Result(); - } - - RemoveResult remove(const Bucket &, Timestamp, const DocumentId &, Context&) override { - last_called = REMOVE_BY_ID; - return RemoveResult(true); - } - - RemoveResult removeIfFound(const Bucket &, Timestamp, const DocumentId &, Context&) override { - last_called = REMOVE_IF_FOUND; - return RemoveResult(true); - } - - virtual RemoveResult replaceWithRemove(const Bucket &, Timestamp, - const DocumentId &, Context&) { - last_called = REPLACE_WITH_REMOVE; - return RemoveResult(true); - } - - UpdateResult update(const Bucket &, Timestamp timestamp, const DocumentUpdateSP&, Context&) override { - last_called = UPDATE; - return UpdateResult(Timestamp(timestamp - 10)); - } - - Result flush(const Bucket&, Context&) override { - last_called = FLUSH; - return Result(); - } - - GetResult get(const Bucket &, const document::FieldSet&, const DocumentId&, Context&) const override { - last_called = GET; - return GetResult(Document::UP(new Document), - Timestamp(6u)); - } - - CreateIteratorResult createIterator(const Bucket& bucket, - const document::FieldSet&, - const Selection&, - IncludedVersions, - Context&) override - { - last_called = CREATE_ITERATOR; - return CreateIteratorResult(IteratorId(bucket.getPartition())); - } - - IterateResult iterate(IteratorId, uint64_t, Context&) const override { - last_called = ITERATE; - IterateResult::List result; - result.push_back(DocEntry::UP(new DocEntry(Timestamp(1), 0))); - return IterateResult(std::move(result), true); - } - - Result destroyIterator(IteratorId, Context&) override { - last_called = DESTROY_ITERATOR; - return Result(); - } - - Result createBucket(const Bucket&, Context&) override { - last_called = CREATE_BUCKET; - return Result(); - } - Result deleteBucket(const Bucket&, Context&) override { - last_called = DELETE_BUCKET; - return Result(); - } - - BucketIdListResult getModifiedBuckets() const override { - last_called = GET_MODIFIED_BUCKETS; - BucketIdListResult::List list; - list.push_back(document::BucketId(2)); - list.push_back(document::BucketId(3)); - return BucketIdListResult(list); - } - - Result split(const Bucket &, const Bucket &, const Bucket &, Context&) override { - last_called = SPLIT; - return Result(); - } - - Result join(const Bucket &, const Bucket &, const Bucket &, Context&) override { - last_called = JOIN; - return Result(); - } - - Result move(const Bucket &, PartitionId, Context&) override { - last_called = MOVE; - return Result(); - } - - - Result maintain(const Bucket &, MaintenanceLevel) override { - last_called = MAINTAIN; - return Result(); - } - - Result removeEntry(const Bucket &, Timestamp, Context&) override { - last_called = REMOVE_ENTRY; - return Result(); - } -}; - -} // namespace spi -} // namespace storage diff --git a/persistence/src/tests/proxy/providerproxy_conformancetest.cpp b/persistence/src/tests/proxy/providerproxy_conformancetest.cpp deleted file mode 100644 index fda3f42f0d5..00000000000 --- a/persistence/src/tests/proxy/providerproxy_conformancetest.cpp +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include -#include -#include -#include -#include -#include -#include "proxy_factory_wrapper.h" - -using namespace storage::spi; -typedef document::DocumentTypeRepo Repo; -typedef ConformanceTest::PersistenceFactory Factory; - -namespace { - -struct DummyFactory : Factory { - PersistenceProvider::UP getPersistenceImplementation(const Repo::SP& repo, - const Repo::DocumenttypesConfig &) override { - return PersistenceProvider::UP(new dummy::DummyPersistence(repo, 4)); - } - - bool supportsActiveState() const override { - return true; - } -}; - -struct ConformanceFixture : public ConformanceTest { - ConformanceFixture(Factory::UP f) : ConformanceTest(std::move(f)) { setUp(); } - ~ConformanceFixture() { tearDown(); } -}; - -Factory::UP dummyViaProxy(size_t n) { - if (n == 0) { - return Factory::UP(new DummyFactory()); - } - return Factory::UP(new ProxyFactoryWrapper(dummyViaProxy(n - 1))); -} - -#define CONVERT_TEST(testFunction, makeFactory) \ -namespace ns_ ## testFunction { \ -TEST_F(TEST_STR(testFunction) " " TEST_STR(makeFactory), ConformanceFixture(makeFactory)) { \ - f.testFunction(); \ -} \ -} // namespace testFunction - -#undef CPPUNIT_TEST -#define CPPUNIT_TEST(testFunction) CONVERT_TEST(testFunction, MAKE_FACTORY) - -#define MAKE_FACTORY dummyViaProxy(1) -DEFINE_CONFORMANCE_TESTS(); - -#undef MAKE_FACTORY -#define MAKE_FACTORY dummyViaProxy(7) -DEFINE_CONFORMANCE_TESTS(); - -} // namespace - -TEST_MAIN() { - TEST_RUN_ALL(); -} diff --git a/persistence/src/tests/proxy/providerproxy_test.cpp b/persistence/src/tests/proxy/providerproxy_test.cpp deleted file mode 100644 index 5fcfe0b3cab..00000000000 --- a/persistence/src/tests/proxy/providerproxy_test.cpp +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Unit tests for providerproxy. - -#include "dummy_provider_factory.h" -#include "mockprovider.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using document::BucketId; -using document::DataType; -using document::DocumentTypeRepo; -using std::ostringstream; -using vespalib::Gate; -using vespalib::ThreadStackExecutor; -using vespalib::makeClosure; -using vespalib::makeTask; -using namespace storage::spi; -using namespace storage; - -namespace { - -const int port = 14863; -const string connect_spec = "tcp/localhost:14863"; -LoadType defaultLoadType(0, "default"); - -void startServer(const DocumentTypeRepo *repo, Gate *gate) { - DummyProviderFactory factory(MockProvider::UP(new MockProvider)); - ProviderStub stub(port, 8, *repo, factory); - gate->await(); - EXPECT_TRUE(stub.hasClient()); -} - -TEST("require that client can start connecting before server is up") { - const DocumentTypeRepo repo; - Gate gate; - ThreadStackExecutor executor(1, 65536); - executor.execute(makeTask(makeClosure(startServer, &repo, &gate))); - ProviderProxy proxy(connect_spec, repo); - gate.countDown(); - executor.sync(); -} - -TEST("require that when the server goes down it causes permanent failure.") { - const DocumentTypeRepo repo; - DummyProviderFactory factory(MockProvider::UP(new MockProvider)); - ProviderStub::UP server(new ProviderStub(port, 8, repo, factory)); - ProviderProxy proxy(connect_spec, repo); - server.reset(0); - - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - Result result = proxy.flush(bucket, context); - EXPECT_EQUAL(Result::FATAL_ERROR, result.getErrorCode()); -} - -struct Fixture { - MockProvider &mock_spi; - DummyProviderFactory factory; - DocumentTypeRepo repo; - ProviderStub stub; - ProviderProxy proxy; - - Fixture() - : mock_spi(*(new MockProvider)), - factory(PersistenceProvider::UP(&mock_spi)), - repo(), - stub(port, 8, repo, factory), - proxy(connect_spec, repo) {} -}; - -TEST_F("require that client handles initialize", Fixture) { - Result result = f.proxy.initialize(); - EXPECT_EQUAL(MockProvider::INITIALIZE, f.mock_spi.last_called); -} - -TEST_F("require that client handles getPartitionStates", Fixture) { - PartitionStateListResult result = f.proxy.getPartitionStates(); - EXPECT_EQUAL(MockProvider::GET_PARTITION_STATES, f.mock_spi.last_called); - EXPECT_EQUAL(1u, result.getList().size()); -} - -TEST_F("require that client handles listBuckets", Fixture) { - const PartitionId partition_id(42); - - BucketIdListResult result = f.proxy.listBuckets(partition_id); - EXPECT_EQUAL(MockProvider::LIST_BUCKETS, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - ASSERT_EQUAL(1u, result.getList().size()); -} - -TEST_F("require that client handles setClusterState", Fixture) { - lib::ClusterState s("version:1 storage:3 distributor:3"); - lib::Distribution d(lib::Distribution::getDefaultDistributionConfig(3, 3)); - ClusterState state(s, 0, d); - - Result result = f.proxy.setClusterState(state); - EXPECT_EQUAL(MockProvider::SET_CLUSTER_STATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles setActiveState", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const BucketInfo::ActiveState bucket_state = BucketInfo::NOT_ACTIVE; - - Result result = f.proxy.setActiveState(bucket, bucket_state); - EXPECT_EQUAL(MockProvider::SET_ACTIVE_STATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles getBucketInfo", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - - BucketInfoResult result = f.proxy.getBucketInfo(bucket); - EXPECT_EQUAL(MockProvider::GET_BUCKET_INFO, f.mock_spi.last_called); - - const BucketInfo& info(result.getBucketInfo()); - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(1u, info.getChecksum()); - EXPECT_EQUAL(2u, info.getDocumentCount()); - EXPECT_EQUAL(3u, info.getDocumentSize()); - EXPECT_EQUAL(bucket_id, info.getEntryCount()); - EXPECT_EQUAL(partition_id, info.getUsedSize()); - EXPECT_EQUAL(true, info.isReady()); - EXPECT_EQUAL(true, info.isActive()); -} - -TEST_F("require that client handles put", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const Timestamp timestamp(84); - Document::SP doc(new Document()); - - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - Result result = f.proxy.put(bucket, timestamp, doc, context); - EXPECT_EQUAL(MockProvider::PUT, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles remove by id", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const Timestamp timestamp(84); - const DocumentId id("doc:test:1"); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - RemoveResult result = f.proxy.remove(bucket, timestamp, id, context); - EXPECT_EQUAL(MockProvider::REMOVE_BY_ID, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(true, result.wasFound()); -} - -TEST_F("require that client handles removeIfFound", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const Timestamp timestamp(84); - const DocumentId id("doc:test:1"); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - RemoveResult result = f.proxy.removeIfFound(bucket, timestamp, id, context); - EXPECT_EQUAL(MockProvider::REMOVE_IF_FOUND, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(true, result.wasFound()); -} - -TEST_F("require that client handles update", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const Timestamp timestamp(84); - DocumentUpdate::SP update(new DocumentUpdate(*DataType::DOCUMENT, DocumentId("doc:test:1"))); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - UpdateResult result = f.proxy.update(bucket, timestamp, update, context); - EXPECT_EQUAL(MockProvider::UPDATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(timestamp - 10, result.getExistingTimestamp()); -} - -TEST_F("require that client handles flush", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - Result result = f.proxy.flush(bucket, context); - EXPECT_EQUAL(MockProvider::FLUSH, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles get", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - - document::AllFields field_set; - const DocumentId id("doc:test:1"); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - GetResult result = f.proxy.get(bucket, field_set, id, context); - EXPECT_EQUAL(MockProvider::GET, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(6u, result.getTimestamp()); - ASSERT_TRUE(result.hasDocument()); - EXPECT_EQUAL(Document(), result.getDocument()); -} - -TEST_F("require that client handles createIterator", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const DocumentSelection doc_sel("docsel"); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - document::AllFields field_set; - - Selection selection(doc_sel); - selection.setFromTimestamp(Timestamp(84)); - selection.setToTimestamp(Timestamp(126)); - - CreateIteratorResult result = - f.proxy.createIterator(bucket, field_set, selection, - NEWEST_DOCUMENT_ONLY, context); - - EXPECT_EQUAL(MockProvider::CREATE_ITERATOR, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(partition_id, result.getIteratorId()); -} - -TEST_F("require that client handles iterate", Fixture) { - const IteratorId iterator_id(42); - const uint64_t max_byte_size = 21; - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - IterateResult result = f.proxy.iterate(iterator_id, max_byte_size, context); - EXPECT_EQUAL(MockProvider::ITERATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); - EXPECT_EQUAL(1u, result.getEntries().size()); - EXPECT_TRUE(result.isCompleted()); -} - -TEST_F("require that client handles destroyIterator", Fixture) { - const IteratorId iterator_id(42); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - f.proxy.destroyIterator(iterator_id, context); - EXPECT_EQUAL(MockProvider::DESTROY_ITERATOR, f.mock_spi.last_called); -} - -TEST_F("require that client handles createBucket", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - f.proxy.createBucket(bucket, context); - EXPECT_EQUAL(MockProvider::CREATE_BUCKET, f.mock_spi.last_called); -} - -TEST_F("require that server accepts deleteBucket", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - f.proxy.deleteBucket(bucket, context); - EXPECT_EQUAL(MockProvider::DELETE_BUCKET, f.mock_spi.last_called); -} - -TEST_F("require that client handles getModifiedBuckets", Fixture) { - BucketIdListResult modifiedBuckets = f.proxy.getModifiedBuckets(); - EXPECT_EQUAL(MockProvider::GET_MODIFIED_BUCKETS, f.mock_spi.last_called); - - EXPECT_EQUAL(2u, modifiedBuckets.getList().size()); -} - -TEST_F("require that client handles split", Fixture) { - const uint64_t bucket_id_1 = 21; - const PartitionId partition_id_1(42); - const Bucket bucket_1(BucketId(bucket_id_1), partition_id_1); - const uint64_t bucket_id_2 = 210; - const PartitionId partition_id_2(420); - const Bucket bucket_2(BucketId(bucket_id_2), partition_id_2); - const uint64_t bucket_id_3 = 2100; - const PartitionId partition_id_3(4200); - const Bucket bucket_3(BucketId(bucket_id_3), partition_id_3); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - Result result = f.proxy.split(bucket_1, bucket_2, bucket_3, context); - EXPECT_EQUAL(MockProvider::SPLIT, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles join", Fixture) { - const uint64_t bucket_id_1 = 21; - const PartitionId partition_id_1(42); - const Bucket bucket_1(BucketId(bucket_id_1), partition_id_1); - const uint64_t bucket_id_2 = 210; - const PartitionId partition_id_2(420); - const Bucket bucket_2(BucketId(bucket_id_2), partition_id_2); - const uint64_t bucket_id_3 = 2100; - const PartitionId partition_id_3(4200); - const Bucket bucket_3(BucketId(bucket_id_3), partition_id_3); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - Result result = f.proxy.join(bucket_1, bucket_2, bucket_3, context); - EXPECT_EQUAL(MockProvider::JOIN, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles move", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId from_partition_id(42); - const PartitionId to_partition_id(43); - const Bucket bucket(BucketId(bucket_id), from_partition_id); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - Result result = f.proxy.move(bucket, to_partition_id, context); - EXPECT_EQUAL(MockProvider::MOVE, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles maintain", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - - Result result = f.proxy.maintain(bucket, HIGH); - EXPECT_EQUAL(MockProvider::MAINTAIN, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -TEST_F("require that client handles remove entry", Fixture) { - const uint64_t bucket_id = 21; - const PartitionId partition_id(42); - const Bucket bucket(BucketId(bucket_id), partition_id); - const Timestamp timestamp(345); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - - Result result = f.proxy.removeEntry(bucket, timestamp, context); - EXPECT_EQUAL(MockProvider::REMOVE_ENTRY, f.mock_spi.last_called); - - EXPECT_EQUAL(0, result.getErrorCode()); - EXPECT_EQUAL("", result.getErrorMessage()); -} - -} // namespace - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/persistence/src/tests/proxy/providerstub_test.cpp b/persistence/src/tests/proxy/providerstub_test.cpp deleted file mode 100644 index fa94cfa0cdc..00000000000 --- a/persistence/src/tests/proxy/providerstub_test.cpp +++ /dev/null @@ -1,543 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Unit tests for providerstub. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using document::BucketId; -using document::ByteBuffer; -using document::DataType; -using document::DocumentTypeRepo; -using document::VespaDocumentSerializer; -using vespalib::nbostream; -using namespace storage::spi; -using namespace storage; - -#include -#include "dummy_provider_factory.h" - -namespace { - -const int port = 14863; -const char connect_spec[] = "tcp/localhost:14863"; -const string build_id = getBuildId(); - -struct Fixture { - MockProvider &mock_spi; - DummyProviderFactory factory; - DocumentTypeRepo repo; - ProviderStub stub; - FRT_Supervisor supervisor; - FRT_RPCRequest *current_request; - FRT_Target *target; - - Fixture() - : mock_spi(*(new MockProvider())), - factory(PersistenceProvider::UP(&mock_spi)), - repo(), - stub(port, 8, repo, factory), - supervisor(), - current_request(0), - target(supervisor.GetTarget(connect_spec)) - { - supervisor.Start(); - ASSERT_TRUE(target); - } - ~Fixture() { - if (current_request) { - current_request->SubRef(); - } - target->SubRef(); - supervisor.ShutDown(true); - } - FRT_RPCRequest *getRequest(const string &name) { - FRT_RPCRequest *req = supervisor.AllocRPCRequest(current_request); - current_request = req; - req->SetMethodName(name.c_str()); - return req; - } - void callRpc(FRT_RPCRequest *req, const string &return_spec) { - target->InvokeSync(req, 5.0); - req->CheckReturnTypes(return_spec.c_str()); - if (!EXPECT_EQUAL(uint32_t(FRTE_NO_ERROR), req->GetErrorCode())) { - TEST_FATAL(req->GetErrorMessage()); - } - } - void failRpc(FRT_RPCRequest *req, uint32_t error_code) { - target->InvokeSync(req, 5.0); - EXPECT_EQUAL(error_code, req->GetErrorCode()); - } -}; - -struct ConnectedFixture : Fixture { - ConnectedFixture() { - FRT_RPCRequest *req = getRequest("vespa.persistence.connect"); - req->GetParams()->AddString(build_id.data(), build_id.size()); - callRpc(req, ""); - } -}; - -TEST("print build id") { fprintf(stderr, "build id: '%s'\n", getBuildId()); } - -TEST_F("require that server accepts connect", Fixture) { - FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect"); - req->GetParams()->AddString(build_id.data(), build_id.size()); - f.callRpc(req, ""); - EXPECT_TRUE(f.stub.hasClient()); -} - -TEST_F("require that connect can be called twice", ConnectedFixture) { - EXPECT_TRUE(f.stub.hasClient()); - FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect"); - req->GetParams()->AddString(build_id.data(), build_id.size()); - f.callRpc(req, ""); - EXPECT_TRUE(f.stub.hasClient()); -} - -TEST_F("require that connect fails with wrong build id", Fixture) { - FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect"); - const string wrong_id = "wrong build id"; - req->GetParams()->AddString(wrong_id.data(), wrong_id.size()); - f.failRpc(req, FRTE_RPC_METHOD_FAILED); - string prefix("Wrong build id. Got 'wrong build id', required "); - EXPECT_EQUAL(prefix, - string(req->GetErrorMessage()).substr(0, prefix.size())); - EXPECT_FALSE(f.stub.hasClient()); -} - -TEST_F("require that only one client can connect", ConnectedFixture) { - EXPECT_TRUE(f.stub.hasClient()); - FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect"); - req->GetParams()->AddString(build_id.data(), build_id.size()); - FRT_Target *target = f.supervisor.GetTarget(connect_spec); - target->InvokeSync(req, 5.0); - target->SubRef(); - EXPECT_EQUAL(uint32_t(FRTE_RPC_METHOD_FAILED), req->GetErrorCode()); - EXPECT_EQUAL("Server is already connected", - string(req->GetErrorMessage())); -} - -TEST_F("require that server accepts getPartitionStates", ConnectedFixture) { - FRT_RPCRequest *req = f.getRequest("vespa.persistence.getPartitionStates"); - f.callRpc(req, "bsIS"); - EXPECT_EQUAL(MockProvider::GET_PARTITION_STATES, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int32_array._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(3)._string_array._len); -} - -TEST_F("require that server accepts listBuckets", ConnectedFixture) { - const uint64_t partition_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.listBuckets"); - req->GetParams()->AddInt64(partition_id); - f.callRpc(req, "bsL"); - EXPECT_EQUAL(MockProvider::LIST_BUCKETS, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int64_array._len); - EXPECT_EQUAL(partition_id, - req->GetReturn()->GetValue(2)._int64_array._pt[0]); -} - -TEST_F("require that server accepts setClusterState", ConnectedFixture) { - FRT_RPCRequest *req = f.getRequest("vespa.persistence.setClusterState"); - - lib::ClusterState s("version:1 storage:3 distributor:3"); - lib::Distribution d(lib::Distribution::getDefaultDistributionConfig(3, 3)); - ClusterState state(s, 0, d); - vespalib::nbostream o; - state.serialize(o); - req->GetParams()->AddData(o.c_str(), o.size()); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::SET_CLUSTER_STATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts setActiveState", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const BucketInfo::ActiveState bucket_state = BucketInfo::NOT_ACTIVE; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.setActiveState"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt8(bucket_state); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::SET_ACTIVE_STATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts getBucketInfo", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.getBucketInfo"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - f.callRpc(req, "bsiiiiibb"); - EXPECT_EQUAL(MockProvider::GET_BUCKET_INFO, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._intval32); - EXPECT_EQUAL(2u, req->GetReturn()->GetValue(3)._intval32); - EXPECT_EQUAL(3u, req->GetReturn()->GetValue(4)._intval32); - EXPECT_EQUAL(bucket_id, req->GetReturn()->GetValue(5)._intval32); - EXPECT_EQUAL(partition_id, req->GetReturn()->GetValue(6)._intval32); - EXPECT_EQUAL(static_cast(BucketInfo::READY), - req->GetReturn()->GetValue(7)._intval8); - EXPECT_EQUAL(static_cast(BucketInfo::ACTIVE), - req->GetReturn()->GetValue(8)._intval8); -} - -TEST_F("require that server accepts put", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const Timestamp timestamp(84); - Document::UP doc(new Document); - nbostream stream; - VespaDocumentSerializer serializer(stream); - serializer.write(*doc, document::COMPLETE); - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.put"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt64(timestamp); - req->GetParams()->AddData(stream.c_str(), stream.size()); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::PUT, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -void testRemove(ConnectedFixture &f, const string &rpc_name, - MockProvider::Function func) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const Timestamp timestamp(84); - const DocumentId id("doc:test:1"); - - FRT_RPCRequest *req = f.getRequest(rpc_name); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt64(timestamp); - req->GetParams()->AddString(id.toString().data(), id.toString().size()); - f.callRpc(req, "bsb"); - EXPECT_EQUAL(func, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_TRUE(req->GetReturn()->GetValue(2)._intval8); -} - -TEST_F("require that server accepts remove by id", ConnectedFixture) { - testRemove(f, "vespa.persistence.removeById", MockProvider::REMOVE_BY_ID); -} - -TEST_F("require that server accepts removeIfFound", ConnectedFixture) { - testRemove(f, "vespa.persistence.removeIfFound", - MockProvider::REMOVE_IF_FOUND); -} - -TEST_F("require that server accepts update", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const Timestamp timestamp(84); - DocumentUpdate update(*DataType::DOCUMENT, DocumentId("doc:test:1")); - vespalib::nbostream stream; - update.serializeHEAD(stream); - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.update"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt64(timestamp); - req->GetParams()->AddData(stream.c_str(), stream.size()); - f.callRpc(req, "bsl"); - EXPECT_EQUAL(MockProvider::UPDATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(timestamp - 10, req->GetReturn()->GetValue(2)._intval64); -} - -TEST_F("require that server accepts flush", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.flush"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::FLUSH, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts get", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const string field_set_1 = "[all]"; - const DocumentId id("doc:test:1"); - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.get"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddString(field_set_1.data(), field_set_1.size()); - req->GetParams()->AddString(id.toString().data(), id.toString().size()); - f.callRpc(req, "bslx"); - EXPECT_EQUAL(MockProvider::GET, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(6u, req->GetReturn()->GetValue(2)._intval64); - EXPECT_EQUAL(25u, req->GetReturn()->GetValue(3)._data._len); -} - -TEST_F("require that server accepts createIterator", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const string doc_sel = "docsel"; - const Timestamp timestamp_from(84); - const Timestamp timestamp_to(126); - const Timestamp timestamp_subset(168); - const string field_set_1 = "[all]"; - const bool include_removes = false; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.createIterator"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddString(field_set_1.data(), field_set_1.size()); - req->GetParams()->AddString(doc_sel.data(), doc_sel.size()); - req->GetParams()->AddInt64(timestamp_from); - req->GetParams()->AddInt64(timestamp_to); - req->GetParams()->AddInt64Array(1)[0] = timestamp_subset; - req->GetParams()->AddInt8(include_removes); - - f.callRpc(req, "bsl"); - EXPECT_EQUAL(MockProvider::CREATE_ITERATOR, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(partition_id, req->GetReturn()->GetValue(2)._intval64); -} - -TEST_F("require that server accepts iterate", ConnectedFixture) { - const uint64_t iterator_id = 42; - const uint64_t max_byte_size = 21; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.iterate"); - req->GetParams()->AddInt64(iterator_id); - req->GetParams()->AddInt64(max_byte_size); - f.callRpc(req, "bsLISXb"); - EXPECT_EQUAL(MockProvider::ITERATE, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int64_array._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(3)._int32_array._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(4)._string_array._len); - EXPECT_EQUAL(1u, req->GetReturn()->GetValue(5)._data_array._len); - EXPECT_TRUE(req->GetReturn()->GetValue(6)._intval8); -} - -TEST_F("require that server accepts destroyIterator", ConnectedFixture) { - const uint64_t iterator_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.destroyIterator"); - req->GetParams()->AddInt64(iterator_id); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::DESTROY_ITERATOR, f.mock_spi.last_called); -} - -TEST_F("require that server accepts createBucket", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.createBucket"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::CREATE_BUCKET, f.mock_spi.last_called); -} - -TEST_F("require that server accepts deleteBucket", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.deleteBucket"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::DELETE_BUCKET, f.mock_spi.last_called); -} - -TEST_F("require that server accepts getModifiedBuckets", ConnectedFixture) { - FRT_RPCRequest *req = f.getRequest("vespa.persistence.getModifiedBuckets"); - f.callRpc(req, "bsL"); - EXPECT_EQUAL(MockProvider::GET_MODIFIED_BUCKETS, f.mock_spi.last_called); - EXPECT_EQUAL(2u, req->GetReturn()->GetValue(2)._int64_array._len); -} - -TEST_F("require that server accepts split", ConnectedFixture) { - const uint64_t bucket_id_1 = 21; - const uint64_t partition_id_1 = 42; - const uint64_t bucket_id_2 = 210; - const uint64_t partition_id_2 = 420; - const uint64_t bucket_id_3 = 2100; - const uint64_t partition_id_3 = 4200; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.split"); - req->GetParams()->AddInt64(bucket_id_1); - req->GetParams()->AddInt64(partition_id_1); - req->GetParams()->AddInt64(bucket_id_2); - req->GetParams()->AddInt64(partition_id_2); - req->GetParams()->AddInt64(bucket_id_3); - req->GetParams()->AddInt64(partition_id_3); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::SPLIT, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts join", ConnectedFixture) { - const uint64_t bucket_id_1 = 21; - const uint64_t partition_id_1 = 42; - const uint64_t bucket_id_2 = 210; - const uint64_t partition_id_2 = 420; - const uint64_t bucket_id_3 = 2100; - const uint64_t partition_id_3 = 4200; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.join"); - req->GetParams()->AddInt64(bucket_id_1); - req->GetParams()->AddInt64(partition_id_1); - req->GetParams()->AddInt64(bucket_id_2); - req->GetParams()->AddInt64(partition_id_2); - req->GetParams()->AddInt64(bucket_id_3); - req->GetParams()->AddInt64(partition_id_3); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::JOIN, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts move", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t from_partition_id = 42; - const uint64_t to_partition_id = 43; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.move"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(from_partition_id); - req->GetParams()->AddInt64(to_partition_id); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::MOVE, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts maintain", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const MaintenanceLevel verification_level = HIGH; - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.maintain"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt8(verification_level); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::MAINTAIN, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -TEST_F("require that server accepts remove_entry", ConnectedFixture) { - const uint64_t bucket_id = 21; - const uint64_t partition_id = 42; - const Timestamp timestamp(345); - - FRT_RPCRequest *req = f.getRequest("vespa.persistence.removeEntry"); - req->GetParams()->AddInt64(bucket_id); - req->GetParams()->AddInt64(partition_id); - req->GetParams()->AddInt64(timestamp); - f.callRpc(req, "bs"); - EXPECT_EQUAL(MockProvider::REMOVE_ENTRY, f.mock_spi.last_called); - - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8); - EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len); -} - -void checkRpcFails(const string &name, const string ¶m_spec, Fixture &f) { - TEST_STATE(name.c_str()); - FRT_RPCRequest *req = f.getRequest("vespa.persistence." + name); - for (size_t i = 0; i < param_spec.size(); ++i) { - switch(param_spec[i]) { - case 'b': req->GetParams()->AddInt8(0); break; - case 'l': req->GetParams()->AddInt64(0); break; - case 'L': req->GetParams()->AddInt64Array(0); break; - case 's': req->GetParams()->AddString(0, 0); break; - case 'S': req->GetParams()->AddStringArray(0); break; - case 'x': req->GetParams()->AddData(0, 0); break; - } - } - f.failRpc(req, FRTE_RPC_METHOD_FAILED); -} - -TEST_F("require that unconnected server fails all SPI calls.", Fixture) -{ - checkRpcFails("initialize", "", f); - checkRpcFails("getPartitionStates", "", f); - checkRpcFails("listBuckets", "l", f); - checkRpcFails("setClusterState", "x", f); - checkRpcFails("setActiveState", "llb", f); - checkRpcFails("getBucketInfo", "ll", f); - checkRpcFails("put", "lllx", f); - checkRpcFails("removeById", "llls", f); - checkRpcFails("removeIfFound", "llls", f); - checkRpcFails("update", "lllx", f); - checkRpcFails("flush", "ll", f); - checkRpcFails("get", "llss", f); - checkRpcFails("createIterator", "llssllLb", f); - checkRpcFails("iterate", "ll", f); - checkRpcFails("destroyIterator", "l", f); - checkRpcFails("createBucket", "ll", f); - checkRpcFails("deleteBucket", "ll", f); - checkRpcFails("getModifiedBuckets", "", f); - checkRpcFails("split", "llllll", f); - checkRpcFails("join", "llllll", f); - checkRpcFails("maintain", "llb", f); - checkRpcFails("removeEntry", "lll", f); -} - -} // namespace - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/persistence/src/tests/proxy/proxy_factory_wrapper.h b/persistence/src/tests/proxy/proxy_factory_wrapper.h deleted file mode 100644 index c42b262bcac..00000000000 --- a/persistence/src/tests/proxy/proxy_factory_wrapper.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include -#include -#include -#include "dummy_provider_factory.h" - -namespace storage { -namespace spi { - -/** - * Generic wrapper for persistence conformance test factories. This - * wrapper will take any other factory and expose a factory interface - * that will create persistence instances that communicate with - * persistence instances created by the wrapped factory using the RPC - * persistence Proxy. - **/ -struct ProxyFactoryWrapper : ConformanceTest::PersistenceFactory -{ - typedef storage::spi::ConformanceTest::PersistenceFactory Factory; - typedef storage::spi::PersistenceProvider Provider; - typedef storage::spi::ProviderStub Server; - typedef storage::spi::ProviderProxy Client; - typedef document::DocumentTypeRepo Repo; - - Factory::UP factory; - ProxyFactoryWrapper(Factory::UP f) : factory(std::move(f)) {} - - struct Wrapper : Client { - DummyProviderFactory::UP provider; - Server::UP server; - Wrapper(DummyProviderFactory::UP p, Server::UP s, const Repo &repo) - : Client(vespalib::make_string("tcp/localhost:%u", s->getPort()), repo), - provider(std::move(p)), - server(std::move(s)) - {} - }; - - virtual Provider::UP - getPersistenceImplementation(const Repo::SP &repo, - const Repo::DocumenttypesConfig &typesCfg) override{ - DummyProviderFactory::UP provider(new DummyProviderFactory(factory->getPersistenceImplementation(repo, typesCfg))); - Server::UP server(new Server(0, 8, *repo, *provider)); - return Provider::UP(new Wrapper(std::move(provider), std::move(server), *repo)); - } - - bool supportsActiveState() const override { - return factory->supportsActiveState(); - } -}; -} // namespace spi -} // namespace storage - diff --git a/persistence/src/tests/proxy/proxy_test.sh b/persistence/src/tests/proxy/proxy_test.sh deleted file mode 100755 index 637ff192356..00000000000 --- a/persistence/src/tests/proxy/proxy_test.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -set -e -$VALGRIND ./persistence_providerstub_test_app -$VALGRIND ./persistence_providerproxy_test_app -$VALGRIND ./persistence_providerproxy_conformance_test_app diff --git a/persistence/src/tests/proxy/proxyfactory.h b/persistence/src/tests/proxy/proxyfactory.h deleted file mode 100644 index b785fab4290..00000000000 --- a/persistence/src/tests/proxy/proxyfactory.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include -#include - -namespace storage { -namespace spi { - -/** - * Generic wrapper for persistence conformance test factories. This - * wrapper will take any other factory and expose a factory interface - * that will create persistence instances that communicate with - * persistence instances created by the wrapped factory using the RPC - * persistence Proxy. - **/ -struct ProxyFactory : ConformanceTest::PersistenceFactory -{ - using Provider = storage::spi::PersistenceProvider; - using Client = storage::spi::ProviderProxy; - using Repo = document::DocumentTypeRepo; - - ProxyFactory() {} - - Provider::UP - getPersistenceImplementation(const Repo::SP &repo, const Repo::DocumenttypesConfig &) override { - return Provider::UP(new Client("tcp/localhost:3456", *repo)); - } - - bool supportsActiveState() const override { - return false; - } -}; -} // namespace spi -} // namespace storage diff --git a/persistence/src/vespa/persistence/CMakeLists.txt b/persistence/src/vespa/persistence/CMakeLists.txt index 3b7920128ce..da8eda2164f 100644 --- a/persistence/src/vespa/persistence/CMakeLists.txt +++ b/persistence/src/vespa/persistence/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(persistence SOURCES $ $ - $ INSTALL lib64 DEPENDS ) diff --git a/persistence/src/vespa/persistence/proxy/.gitignore b/persistence/src/vespa/persistence/proxy/.gitignore deleted file mode 100644 index 7e7c0fe7fae..00000000000 --- a/persistence/src/vespa/persistence/proxy/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/.depend -/Makefile diff --git a/persistence/src/vespa/persistence/proxy/CMakeLists.txt b/persistence/src/vespa/persistence/proxy/CMakeLists.txt deleted file mode 100644 index fdebad2fe49..00000000000 --- a/persistence/src/vespa/persistence/proxy/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(persistence_proxy OBJECT - SOURCES - buildid.cpp - providerproxy.cpp - providerstub.cpp - DEPENDS -) diff --git a/persistence/src/vespa/persistence/proxy/buildid.cpp b/persistence/src/vespa/persistence/proxy/buildid.cpp deleted file mode 100644 index 2ac018069b8..00000000000 --- a/persistence/src/vespa/persistence/proxy/buildid.cpp +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "buildid.h" -#include - -const char *storage::spi::getBuildId() { - return vespalib::VersionTagComponent; -} diff --git a/persistence/src/vespa/persistence/proxy/buildid.h b/persistence/src/vespa/persistence/proxy/buildid.h deleted file mode 100644 index ab32b09c533..00000000000 --- a/persistence/src/vespa/persistence/proxy/buildid.h +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -namespace storage { -namespace spi { - -const char *getBuildId(); - -} // namespace spi -} // namespace storage - diff --git a/persistence/src/vespa/persistence/proxy/providerproxy.cpp b/persistence/src/vespa/persistence/proxy/providerproxy.cpp deleted file mode 100644 index 52e641db74f..00000000000 --- a/persistence/src/vespa/persistence/proxy/providerproxy.cpp +++ /dev/null @@ -1,493 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "providerproxy.h" -#include "buildid.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -LOG_SETUP(".providerproxy"); - -using document::BucketId; -using document::ByteBuffer; -using document::DocumentTypeRepo; -using document::VespaDocumentDeserializer; -using document::VespaDocumentSerializer; -using vespalib::nbostream; - -namespace storage { -namespace spi { -namespace { -void addBucket(FRT_Values &values, const Bucket &bucket) { - values.AddInt64(bucket.getBucketId().getId()); - values.AddInt64(bucket.getPartition()); -} - -void addDocument(FRT_Values &values, const Document &doc) { - nbostream stream; - VespaDocumentSerializer serializer(stream); - serializer.write(doc, document::COMPLETE); - values.AddData(stream.c_str(), stream.size()); -} - -void addString(FRT_Values &values, const string &s) { - values.AddString(s.data(), s.size()); -} - -void addSelection(FRT_Values &values, const Selection &selection) { - addString(values, selection.getDocumentSelection().getDocumentSelection()); - values.AddInt64(selection.getFromTimestamp()); - values.AddInt64(selection.getToTimestamp()); - std::copy(selection.getTimestampSubset().begin(), - selection.getTimestampSubset().end(), - values.AddInt64Array(selection.getTimestampSubset().size())); -} - -void addDocumentUpdate(FRT_Values &values, const DocumentUpdate &update) { - nbostream stream; - update.serializeHEAD(stream); - values.AddData(stream.c_str(), stream.size()); -} - -Document::UP readDocument(nbostream &stream, const DocumentTypeRepo &repo) { - const uint16_t version = 8; - VespaDocumentDeserializer deserializer(repo, stream, version); - Document::UP doc(new Document); - deserializer.read(*doc); - return doc; -} - -string getString(const FRT_StringValue &str) { - return string(str._str, str._len); -} - -string getString(const FRT_Value &value) { - return getString(value._string); -} - -template -ResultType readError(const FRT_Values &values) { - uint8_t error_code = values[0]._intval8; - string error_msg = getString(values[1]); - return ResultType(Result::ErrorType(error_code), error_msg); -} - -bool invokeRpc(FRT_Target *target, FRT_RPCRequest &req, const char *res_spec) { - target->InvokeSync(&req, 0.0); // no timeout - req.CheckReturnTypes(res_spec); - return req.GetErrorCode() == FRTE_NO_ERROR; -} - -struct RequestScopedPtr : vespalib::noncopyable { - FRT_RPCRequest *req; - RequestScopedPtr(FRT_RPCRequest *r) : req(r) { assert(req); } - ~RequestScopedPtr() { req->SubRef(); } - FRT_RPCRequest *operator->() { return req; } - FRT_RPCRequest &operator*() { return *req; } -}; -} // namespace - -template -ResultType ProviderProxy::invokeRpc_Return(FRT_RPCRequest &req, - const char *res_spec) const -{ - if (!invokeRpc(_target, req, res_spec)) { - - - return ResultType(Result::FATAL_ERROR, - vespalib::make_string("Error %s when running RPC request %s", - req.GetErrorMessage(), - req.GetMethodName())); - } - return readResult(*req.GetReturn()); -} - -template -ResultType ProviderProxy::readResult(const FRT_Values &values) const { - if (values[0]._intval8 != Result::NONE) { - return readError(values); - } - return readNoError(values); -} - -template <> -Result ProviderProxy::readNoError(const FRT_Values &) const { - return Result(); -} - -template <> -PartitionStateListResult -ProviderProxy::readNoError(const FRT_Values &values) const { - FRT_LPT(uint32_t) state_array = values[2]._int32_array; - FRT_LPT(FRT_StringValue) reason_array = values[3]._string_array; - PartitionStateList states(state_array._len); - for (size_t i = 0; i < state_array._len; ++i) { - PartitionState::State state = - static_cast(state_array._pt[i]); - string reason = getString(reason_array._pt[i]); - states[i] = PartitionState(state, reason); - } - return PartitionStateListResult(states); -} - -template <> -BucketIdListResult ProviderProxy::readNoError(const FRT_Values &values) const { - BucketIdListResult::List list; - for (uint32_t i = 0; i < values[2]._int64_array._len; ++i) { - list.push_back(BucketId(values[2]._int64_array._pt[i])); - } - return BucketIdListResult(list); -} - -template <> -BucketInfoResult ProviderProxy::readNoError(const FRT_Values &values) const { - BucketInfo info(BucketChecksum(values[2]._intval32), - values[3]._intval32, - values[4]._intval32, - values[5]._intval32, - values[6]._intval32, - static_cast( - values[7]._intval8), - static_cast( - values[8]._intval8)); - return BucketInfoResult(info); -} - -template <> -RemoveResult ProviderProxy::readNoError(const FRT_Values &values) const { - return RemoveResult(values[2]._intval8); -} - -template <> -UpdateResult ProviderProxy::readNoError(const FRT_Values &values) const { - return UpdateResult(Timestamp(values[2]._intval64)); -} - -template <> -GetResult ProviderProxy::readNoError(const FRT_Values &values) const { - nbostream stream(values[3]._data._buf, values[3]._data._len); - if (stream.empty()) { - return GetResult(); - } - return GetResult(readDocument(stream, *_repo), - Timestamp(values[2]._intval64)); -} - -template <> -CreateIteratorResult ProviderProxy::readNoError(const FRT_Values &values) const -{ - return CreateIteratorResult(IteratorId(values[2]._intval64)); -} - -template <> -IterateResult ProviderProxy::readNoError(const FRT_Values &values) const { - IterateResult::List result; - assert(values[2]._int64_array._len == values[3]._int32_array._len && - values[2]._int64_array._len == values[4]._string_array._len && - values[2]._int64_array._len == values[5]._data_array._len); - for (uint32_t i = 0; i < values[2]._int64_array._len; ++i) { - Timestamp timestamp(values[2]._int64_array._pt[i]); - uint32_t meta_flags = values[3]._int32_array._pt[i]; - string doc_id(getString(values[4]._string_array._pt[i])); - nbostream stream(values[5]._data_array._pt[i]._buf, - values[5]._data_array._pt[i]._len); - DocEntry::UP entry; - if (!stream.empty()) { - Document::UP doc = readDocument(stream, *_repo); - entry.reset(new DocEntry(timestamp, meta_flags, std::move(doc))); - } else if (!doc_id.empty()) { - entry.reset( - new DocEntry(timestamp, meta_flags, DocumentId(doc_id))); - } else { - entry.reset(new DocEntry(timestamp, meta_flags)); - } - result.push_back(std::move(entry)); - } - - return IterateResult(std::move(result), values[6]._intval8); -} - -namespace { -bool shouldFailFast(uint32_t error_code) { - return error_code != FRTE_RPC_TIMEOUT - && error_code != FRTE_RPC_CONNECTION - && error_code != FRTE_RPC_OVERLOAD - && error_code != FRTE_NO_ERROR; -} -} // namespace - -ProviderProxy::ProviderProxy(const vespalib::string &connect_spec, - const DocumentTypeRepo &repo) - : _supervisor(new FRT_Supervisor()), - _target(0), - _repo(&repo) -{ - _supervisor->Start(); - bool connected = false; - _target = _supervisor->GetTarget(connect_spec.c_str()); - for (size_t i = 0; !connected && (i < (100 + 300)); ++i) { - FRT_RPCRequest *req = new FRT_RPCRequest(); - req->SetMethodName("vespa.persistence.connect"); - const string build_id = getBuildId(); - req->GetParams()->AddString(build_id.data(), build_id.size()); - _target->InvokeSync(req, 5.0); - connected = req->CheckReturnTypes(""); - uint32_t error_code = req->GetErrorCode(); - req->SubRef(); - if (!connected) { - if (shouldFailFast(error_code)) { - break; - } - _target->SubRef(); - if (i < 100) { - FastOS_Thread::Sleep(100); // retry each 100ms for 10s - } else { - FastOS_Thread::Sleep(1000); // retry each 1s for 5m - } - _target = _supervisor->GetTarget(connect_spec.c_str()); - } - } - if (!connected) { - LOG(error, "could not connect to peer"); - } -} - -ProviderProxy::~ProviderProxy() { - _target->SubRef(); - _supervisor->ShutDown(true); -} - -Result ProviderProxy::initialize() { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.initialize"); - return invokeRpc_Return(*req, "bs"); -} - -PartitionStateListResult ProviderProxy::getPartitionStates() const { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.getPartitionStates"); - return invokeRpc_Return(*req, "bsIS"); -} - -BucketIdListResult ProviderProxy::listBuckets(PartitionId partition) const { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.listBuckets"); - req->GetParams()->AddInt64(partition); - - return invokeRpc_Return(*req, "bsL"); -} - -Result ProviderProxy::setClusterState(const ClusterState& clusterState) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.setClusterState"); - - vespalib::nbostream o; - clusterState.serialize(o); - req->GetParams()->AddData(o.c_str(), o.size()); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::setActiveState(const Bucket &bucket, - BucketInfo::ActiveState newState) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.setActiveState"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt8(newState); - return invokeRpc_Return(*req, "bs"); -} - -BucketInfoResult ProviderProxy::getBucketInfo(const Bucket &bucket) const { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.getBucketInfo"); - addBucket(*req->GetParams(), bucket); - return invokeRpc_Return(*req, "bsiiiiibb"); -} - -Result ProviderProxy::put(const Bucket &bucket, Timestamp timestamp, - const Document::SP& doc, Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.put"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt64(timestamp); - addDocument(*req->GetParams(), *doc); - return invokeRpc_Return(*req, "bs"); -} - -RemoveResult ProviderProxy::remove(const Bucket &bucket, - Timestamp timestamp, - const DocumentId &id, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.removeById"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt64(timestamp); - addString(*req->GetParams(), id.toString()); - return invokeRpc_Return(*req, "bsb"); -} - -RemoveResult ProviderProxy::removeIfFound(const Bucket &bucket, - Timestamp timestamp, - const DocumentId &id, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.removeIfFound"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt64(timestamp); - addString(*req->GetParams(), id.toString()); - return invokeRpc_Return(*req, "bsb"); -} - -UpdateResult ProviderProxy::update(const Bucket &bucket, Timestamp timestamp, - const DocumentUpdate::SP& doc_update, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.update"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt64(timestamp); - addDocumentUpdate(*req->GetParams(), *doc_update); - return invokeRpc_Return(*req, "bsl"); -} - -Result ProviderProxy::flush(const Bucket &bucket, Context&) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.flush"); - addBucket(*req->GetParams(), bucket); - return invokeRpc_Return(*req, "bs"); -} - -GetResult ProviderProxy::get(const Bucket &bucket, - const document::FieldSet& fieldSet, - const DocumentId &doc_id, - Context&) const -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.get"); - document::FieldSetRepo repo; - addBucket(*req->GetParams(), bucket); - addString(*req->GetParams(), repo.serialize(fieldSet)); - addString(*req->GetParams(), doc_id.toString()); - return invokeRpc_Return(*req, "bslx"); -} - -CreateIteratorResult ProviderProxy::createIterator(const Bucket &bucket, - const document::FieldSet& fieldSet, - const Selection &select, - IncludedVersions versions, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.createIterator"); - addBucket(*req->GetParams(), bucket); - - document::FieldSetRepo repo; - addString(*req->GetParams(), repo.serialize(fieldSet)); - addSelection(*req->GetParams(), select); - req->GetParams()->AddInt8(versions); - return invokeRpc_Return(*req, "bsl"); -} - -IterateResult ProviderProxy::iterate(IteratorId id, - uint64_t max_byte_size, - Context&) const -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.iterate"); - req->GetParams()->AddInt64(id); - req->GetParams()->AddInt64(max_byte_size); - return invokeRpc_Return(*req, "bsLISXb"); -} - -Result ProviderProxy::destroyIterator(IteratorId id, Context&) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.destroyIterator"); - req->GetParams()->AddInt64(id); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::createBucket(const Bucket &bucket, Context&) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.createBucket"); - addBucket(*req->GetParams(), bucket); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::deleteBucket(const Bucket &bucket, Context&) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.deleteBucket"); - addBucket(*req->GetParams(), bucket); - return invokeRpc_Return(*req, "bs"); -} - -BucketIdListResult ProviderProxy::getModifiedBuckets() const { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.getModifiedBuckets"); - return invokeRpc_Return(*req, "bsL"); -} - -Result ProviderProxy::split(const Bucket &source, - const Bucket &target1, - const Bucket &target2, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.split"); - addBucket(*req->GetParams(), source); - addBucket(*req->GetParams(), target1); - addBucket(*req->GetParams(), target2); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::join(const Bucket &source1, - const Bucket &source2, - const Bucket &target, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.join"); - addBucket(*req->GetParams(), source1); - addBucket(*req->GetParams(), source2); - addBucket(*req->GetParams(), target); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::move(const Bucket &source, - PartitionId target, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.move"); - addBucket(*req->GetParams(), source); - req->GetParams()->AddInt64(target); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::maintain(const Bucket &bucket, MaintenanceLevel level) { - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.maintain"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt8(level); - return invokeRpc_Return(*req, "bs"); -} - -Result ProviderProxy::removeEntry(const Bucket &bucket, Timestamp timestamp, - Context&) -{ - RequestScopedPtr req(_supervisor->AllocRPCRequest()); - req->SetMethodName("vespa.persistence.removeEntry"); - addBucket(*req->GetParams(), bucket); - req->GetParams()->AddInt64(timestamp); - return invokeRpc_Return(*req, "bs"); -} - -} // namespace spi -} // namespace storage diff --git a/persistence/src/vespa/persistence/proxy/providerproxy.h b/persistence/src/vespa/persistence/proxy/providerproxy.h deleted file mode 100644 index 7fa59fe07d0..00000000000 --- a/persistence/src/vespa/persistence/proxy/providerproxy.h +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include - -class FRT_Target; -class FRT_Supervisor; -class FRT_RPCRequest; -class FRT_Values; - -namespace document { - class DocumentTypeRepo; -} - -namespace storage { -namespace spi { - -class ProviderProxy : public PersistenceProvider { - std::unique_ptr _supervisor; - FRT_Target *_target; - const document::DocumentTypeRepo *_repo; - - template - ResultType invokeRpc_Return(FRT_RPCRequest &req, const char *res_spec) const; - template - ResultType readResult(const FRT_Values &values) const; - template - ResultType readNoError(const FRT_Values &values) const; - -public: - typedef std::unique_ptr UP; - - ProviderProxy(const vespalib::string &connect_spec, const document::DocumentTypeRepo &repo); - ~ProviderProxy(); - - void setRepo(const document::DocumentTypeRepo &repo) { - _repo = &repo; - } - - Result initialize() override; - PartitionStateListResult getPartitionStates() const override; - BucketIdListResult listBuckets(PartitionId) const override; - Result setClusterState(const ClusterState&) override; - Result setActiveState(const Bucket&, BucketInfo::ActiveState) override; - BucketInfoResult getBucketInfo(const Bucket &) const override; - - Result put(const Bucket &, Timestamp, const DocumentSP&, Context&) override; - RemoveResult remove(const Bucket &, Timestamp, const DocumentId &, Context&) override; - RemoveResult removeIfFound(const Bucket &, Timestamp, const DocumentId &, Context&) override; - UpdateResult update(const Bucket &, Timestamp, const DocumentUpdateSP&, Context&) override; - - Result flush(const Bucket &, Context&) override; - - GetResult get(const Bucket &, const document::FieldSet&, const DocumentId &, Context&) const override; - - CreateIteratorResult createIterator(const Bucket &, const document::FieldSet&, const Selection&, - IncludedVersions versions, Context&) override; - - IterateResult iterate(IteratorId, uint64_t max_byte_size, Context&) const override; - Result destroyIterator(IteratorId, Context&) override; - - Result createBucket(const Bucket &, Context&) override; - Result deleteBucket(const Bucket &, Context&) override; - BucketIdListResult getModifiedBuckets() const override; - Result split(const Bucket &source, const Bucket &target1, const Bucket &target2, Context&) override; - Result join(const Bucket &source1, const Bucket &source2, const Bucket &target, Context&) override; - Result move(const Bucket &source, PartitionId partition, Context&) override; - - Result maintain(const Bucket &, MaintenanceLevel) override; - Result removeEntry(const Bucket &, Timestamp, Context&) override; -}; - -} // namespace spi -} // namespace storage - diff --git a/persistence/src/vespa/persistence/proxy/providerstub.cpp b/persistence/src/vespa/persistence/proxy/providerstub.cpp deleted file mode 100644 index b4137f0eb0c..00000000000 --- a/persistence/src/vespa/persistence/proxy/providerstub.cpp +++ /dev/null @@ -1,928 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "buildid.h" -#include "providerstub.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -using document::BucketId; -using document::ByteBuffer; -using document::DocumentTypeRepo; -using document::VespaDocumentDeserializer; -using document::VespaDocumentSerializer; -using std::vector; -using vespalib::Closure; -using vespalib::makeClosure; -using vespalib::makeTask; -using vespalib::nbostream; - -namespace storage::spi { -namespace { - -LoadType defaultLoadType(0, "default"); - -// Serialize return values -void addResult(FRT_Values &ret, const Result &result) { - ret.AddInt8(result.getErrorCode()); - ret.AddString(result.getErrorMessage().data(), - result.getErrorMessage().size()); -} - -void addPartitionStateListResult(FRT_Values &ret, - const PartitionStateListResult &result) { - addResult(ret, result); - PartitionStateList states = result.getList(); - uint32_t *stateValues = ret.AddInt32Array(states.size()); - FRT_StringValue *reasons = ret.AddStringArray(states.size()); - for (size_t i = 0; i < states.size(); ++i) { - stateValues[i] = states[i].getState(); - string reason(states[i].getReason()); - ret.SetString(&reasons[i], reason.data(), reason.size()); - } -} - -void addBucketInfoResult(FRT_Values &ret, const BucketInfoResult &result) { - addResult(ret, result); - const BucketInfo& info = result.getBucketInfo(); - ret.AddInt32(info.getChecksum()); - ret.AddInt32(info.getDocumentCount()); - ret.AddInt32(info.getDocumentSize()); - ret.AddInt32(info.getEntryCount()); - ret.AddInt32(info.getUsedSize()); - ret.AddInt8(static_cast(info.isReady())); - ret.AddInt8(static_cast(info.isActive())); -} - -void addRemoveResult(FRT_Values &ret, const RemoveResult &result) { - addResult(ret, result); - ret.AddInt8(result.wasFound()); -} - -void addUpdateResult(FRT_Values &ret, const UpdateResult &result) { - addResult(ret, result); - ret.AddInt64(result.getExistingTimestamp()); -} - -void addGetResult(FRT_Values &ret, const GetResult &result) { - addResult(ret, result); - ret.AddInt64(result.getTimestamp()); - if (result.hasDocument()) { - nbostream stream; - VespaDocumentSerializer serializer(stream); - serializer.write(result.getDocument(), document::COMPLETE); - ret.AddData(stream.c_str(), stream.size()); - } else { - ret.AddData(0, 0); - } -} - -void addCreateIteratorResult(FRT_Values &ret, - const CreateIteratorResult &result) -{ - addResult(ret, result); - ret.AddInt64(result.getIteratorId()); -} - -void addIterateResult(FRT_Values &ret, const IterateResult &result) -{ - addResult(ret, result); - - const vector &entries = result.getEntries(); - uint64_t *timestamps = ret.AddInt64Array(entries.size()); - uint32_t *flags = ret.AddInt32Array(entries.size()); - assert(sizeof(DocEntry::SizeType) == sizeof(uint32_t)); - FRT_StringValue *doc_id_array = ret.AddStringArray(entries.size()); - FRT_DataValue *doc_array = ret.AddDataArray(entries.size()); - - for (size_t i = 0; i < entries.size(); ++i) { - string doc_id_str; - nbostream stream; - const DocumentId *doc_id = entries[i]->getDocumentId(); - if (doc_id) { - doc_id_str = doc_id->toString(); - } - const Document *doc = entries[i]->getDocument(); - if (doc) { - VespaDocumentSerializer serializer(stream); - serializer.write(*doc, document::COMPLETE); - } - - timestamps[i] = entries[i]->getTimestamp(); - flags[i] = entries[i]->getFlags(); - ret.SetString(&doc_id_array[i], doc_id_str.data(), doc_id_str.size()); - ret.SetData(&doc_array[i], stream.c_str(), stream.size()); - } - - ret.AddInt8(result.isCompleted()); -} - -void addBucketIdListResult(FRT_Values &ret, const BucketIdListResult& result) { - addResult(ret, result); - - size_t modified_bucket_size = result.getList().size(); - uint64_t *bucket_id = ret.AddInt64Array(modified_bucket_size); - for (size_t i = 0; i < modified_bucket_size; ++i) { - bucket_id[i] = result.getList()[i].getRawId(); - } -} - -string getString(const FRT_StringValue &str) { - return string(str._str, str._len); -} - -string getString(const FRT_Value &value) { - return getString(value._string); -} - -Bucket getBucket(const FRT_Value &bucket_val, const FRT_Value &partition_val) { - BucketId bucket_id(bucket_val._intval64); - PartitionId partition_id(partition_val._intval64); - return Bucket(bucket_id, partition_id); -} - -Document::UP getDocument(const FRT_Value &val, const DocumentTypeRepo &repo) { - nbostream stream(val._data._buf, val._data._len); - const uint16_t version = 8; - VespaDocumentDeserializer deserializer(repo, stream, version); - Document::UP doc(new Document); - deserializer.read(*doc); - return doc; -} - -Selection getSelection(const FRT_Values ¶ms, int i) { - DocumentSelection doc_sel(getString(params[i])); - Timestamp timestamp_from(params[i + 1]._intval64); - Timestamp timestamp_to(params[i + 2]._intval64); - FRT_Array array = params[i + 3]._int64_array; - TimestampList timestamp_subset(array._pt, array._pt + array._len); - - Selection selection(doc_sel); - selection.setFromTimestamp(timestamp_from); - selection.setToTimestamp(timestamp_to); - selection.setTimestampSubset(timestamp_subset); - return selection; -} - -void addConnect( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.connect", - "s", "", true, func, obj); - rb.MethodDesc("Set up connection to proxy."); - rb.ParamDesc("build_id", "Id to make sure client and server come from the " - "same build."); -} - -void addGetPartitionStates( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.getPartitionStates", - "", "bsIS", true, func, obj); - rb.MethodDesc("???"); - rb.ReturnDesc("ret", "An array of serialized PartitionStates."); -} - -void doGetPartitionStates(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values &ret = *req->GetReturn(); - addPartitionStateListResult(ret, provider->getPartitionStates()); - req->Return(); -} - -void addInitialize( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.initialize", - "", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doInitialize(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values &ret = *req->GetReturn(); - addResult(ret, provider->initialize()); - req->Return(); -} - -void addListBuckets( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.listBuckets", - "l", "bsL", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("bucket_ids", "An array of BucketIds."); -} - -void doListBuckets(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - PartitionId partition_id(params[0]._intval64); - - FRT_Values &ret = *req->GetReturn(); - addBucketIdListResult(ret, provider->listBuckets(partition_id)); - req->Return(); -} - -void addSetClusterState( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.setClusterState", - "x", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("cluster_state", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doSetClusterState(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - vespalib::nbostream stream(params[0]._data._buf, params[0]._data._len); - - ClusterState state(stream); - FRT_Values &ret = *req->GetReturn(); - addResult(ret, provider->setClusterState(state)); - req->Return(); -} - -void addSetActiveState( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.setActiveState", - "llb", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("bucket_state", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doSetActiveState(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - BucketInfo::ActiveState state = BucketInfo::ActiveState(params[2]._intval8); - - FRT_Values &ret = *req->GetReturn(); - addResult(ret, provider->setActiveState(bucket, state)); - req->Return(); -} - -void addGetBucketInfo( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.getBucketInfo", - "ll", "bsiiiiibb", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("checksum", ""); - rb.ReturnDesc("document_count", ""); - rb.ReturnDesc("document_size", ""); - rb.ReturnDesc("entry_count", ""); - rb.ReturnDesc("used_size", ""); - rb.ReturnDesc("ready", ""); - rb.ReturnDesc("active", ""); -} - -void doGetBucketInfo(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - FRT_Values &ret = *req->GetReturn(); - addBucketInfoResult(ret, provider->getBucketInfo(bucket)); - req->Return(); -} - -void addPut( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.put", - "lllx", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("timestamp", ""); - rb.ParamDesc("document", "A serialized document"); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doPut(FRT_RPCRequest *req, PersistenceProvider *provider, - const DocumentTypeRepo *repo) -{ - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - Timestamp timestamp(params[2]._intval64); - Document::SP doc(getDocument(params[3], *repo).release()); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->put(bucket, timestamp, doc, context)); - req->Return(); -} - -void addRemoveById( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.removeById", - "llls", "bsb", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("timestamp", ""); - rb.ParamDesc("document_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("existed", ""); -} - -void doRemoveById(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - Timestamp timestamp(params[2]._intval64); - DocumentId id(getString(params[3])); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addRemoveResult(ret, provider->remove(bucket, timestamp, id, context)); - req->Return(); -} - -void addRemoveIfFound( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.removeIfFound", - "llls", "bsb", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("timestamp", ""); - rb.ParamDesc("document_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("existed", ""); -} - -void doRemoveIfFound(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - Timestamp timestamp(params[2]._intval64); - DocumentId id(getString(params[3])); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addRemoveResult(ret, - provider->removeIfFound(bucket, timestamp, id, context)); - req->Return(); -} - -void addUpdate( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.update", - "lllx", "bsl", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("timestamp", ""); - rb.ParamDesc("document_update", "A serialized DocumentUpdate"); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("existing timestamp", ""); -} - -void doUpdate(FRT_RPCRequest *req, PersistenceProvider *provider, - const DocumentTypeRepo *repo) -{ - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - Timestamp timestamp(params[2]._intval64); - ByteBuffer buffer(params[3]._data._buf, params[3]._data._len); - auto update = std::make_shared(*repo, buffer, - DocumentUpdate:: - SerializeVersion:: - SERIALIZE_HEAD); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addUpdateResult(ret, provider->update(bucket, timestamp, update, context)); - req->Return(); -} - -void addFlush( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.flush", "ll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doFlush(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->flush(bucket, context)); - req->Return(); -} - -void addGet( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.get", - "llss", "bslx", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("field_set", "Array of fields in the set"); - rb.ParamDesc("document_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("timestamp", ""); - rb.ReturnDesc("document", "A serialized document"); -} - -void doGet(FRT_RPCRequest *req, - PersistenceProvider *provider, - const DocumentTypeRepo* repo) -{ - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - document::FieldSetRepo fsr; - document::FieldSet::UP fieldSet = fsr.parse(*repo, getString(params[2])); - DocumentId id(getString(params[3])); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addGetResult(ret, provider->get(bucket, *fieldSet, id, context)); - req->Return(); -} - -void addCreateIterator( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.createIterator", - "llssllLb", "bsl", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("field_set", "Field set string (comma-separated list of strings)"); - rb.ParamDesc("document_selection_string", ""); - rb.ParamDesc("timestamp_from", ""); - rb.ParamDesc("timestamp_to", ""); - rb.ParamDesc("timestamp_subset", ""); - rb.ParamDesc("includedversions", ""); - - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("iterator_id", ""); -} - -void doCreateIterator(FRT_RPCRequest *req, PersistenceProvider *provider, - const DocumentTypeRepo* repo) -{ - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - document::FieldSetRepo fsr; - document::FieldSet::UP fieldSet = fsr.parse(*repo, getString(params[2])); - Selection selection = getSelection(params, 3); - IncludedVersions versions = - static_cast(params[7]._intval8); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addCreateIteratorResult(ret, provider->createIterator( - bucket, *fieldSet, selection, versions, context)); - req->Return(); -} - -void addIterate( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.iterate", - "ll", "bsLISXb", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("iterator_id", ""); - rb.ParamDesc("max_byte_size", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("doc_entry_timestamp", "Array of timestamps for DocEntries"); - rb.ReturnDesc("doc_entry_flags", "Array of flags for DocEntries"); - rb.ReturnDesc("doc_entry_doc_id", "Array of DocumentIds for DocEntries"); - rb.ReturnDesc("doc_entry_doc", "Array of Documents for DocEntries"); - rb.ReturnDesc("completed", "bool"); -} - -void doIterate(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - IteratorId id(params[0]._intval64); - uint64_t max_byte_size = params[1]._intval64; - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addIterateResult(ret, provider->iterate(id, max_byte_size, context)); - req->Return(); -} - -void addDestroyIterator( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.destroyIterator", - "l", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("iterator_id", ""); -} - -void doDestroyIterator(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - IteratorId id(params[0]._intval64); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->destroyIterator(id, context)); - req->Return(); -} - -void addCreateBucket( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.createBucket", - "ll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doCreateBucket(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->createBucket(bucket, context)); - req->Return(); -} - -void addDeleteBucket( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.deleteBucket", - "ll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doDeleteBucket(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->deleteBucket(bucket, context)); - req->Return(); -} - -void addGetModifiedBuckets( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.getModifiedBuckets", - "", "bsL", true, func, obj); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); - rb.ReturnDesc("modified_buckets_bucket_ids", "Array of bucket ids"); -} - -void doGetModifiedBuckets(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values &ret = *req->GetReturn(); - addBucketIdListResult(ret, provider->getModifiedBuckets()); - req->Return(); -} - -void addSplit( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.split", - "llllll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("source_bucket_id", ""); - rb.ParamDesc("source_partition_id", ""); - rb.ParamDesc("target1_bucket_id", ""); - rb.ParamDesc("target1_partition_id", ""); - rb.ParamDesc("target2_bucket_id", ""); - rb.ParamDesc("target2_partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doSplit(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket source = getBucket(params[0], params[1]); - Bucket target1 = getBucket(params[2], params[3]); - Bucket target2 = getBucket(params[4], params[5]); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->split(source, target1, target2, context)); - req->Return(); -} - -void addJoin( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.join", - "llllll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("source1_bucket_id", ""); - rb.ParamDesc("source1_partition_id", ""); - rb.ParamDesc("source2_bucket_id", ""); - rb.ParamDesc("source2_partition_id", ""); - rb.ParamDesc("target_bucket_id", ""); - rb.ParamDesc("target_partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doJoin(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket source1 = getBucket(params[0], params[1]); - Bucket source2 = getBucket(params[2], params[3]); - Bucket target = getBucket(params[4], params[5]); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->join(source1, source2, target, context)); - req->Return(); -} - -void addMove( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.move", - "lll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("source_bucket_id", ""); - rb.ParamDesc("source_partition_id", ""); - rb.ParamDesc("target_partition_id", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doMove(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket source = getBucket(params[0], params[1]); - PartitionId partition_id(params[2]._intval64); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->move(source, partition_id, context)); - req->Return(); -} - - -void addMaintain( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.maintain", - "llb", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("verification_level", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doMaintain(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - MaintenanceLevel level = - static_cast(params[2]._intval8); - - FRT_Values &ret = *req->GetReturn(); - addResult(ret, provider->maintain(bucket, level)); - req->Return(); -} - -void addRemoveEntry( - FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) { - rb.DefineMethod("vespa.persistence.removeEntry", - "lll", "bs", true, func, obj); - rb.MethodDesc("???"); - rb.ParamDesc("bucket_id", ""); - rb.ParamDesc("partition_id", ""); - rb.ParamDesc("timestamp", ""); - rb.ReturnDesc("error_code", ""); - rb.ReturnDesc("error_message", ""); -} - -void doRemoveEntry(FRT_RPCRequest *req, PersistenceProvider *provider) { - FRT_Values ¶ms = *req->GetParams(); - Bucket bucket = getBucket(params[0], params[1]); - Timestamp timestamp(params[2]._intval64); - - FRT_Values &ret = *req->GetReturn(); - Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0)); - addResult(ret, provider->removeEntry(bucket, timestamp, context)); - req->Return(); -} - -const uint32_t magic_number = 0xf00ba2; - -bool checkConnection(FNET_Connection *connection) { - return connection && connection->GetContext()._value.INT == magic_number; -} -} //namespace - -void ProviderStub::HOOK_fini(FRT_RPCRequest *req) { - FNET_Connection *connection = req->GetConnection(); - if (checkConnection(connection)) { - assert(_provider.get() != 0); - _providerCleanupTask.ScheduleNow(); - } -} - -void ProviderStub::RPC_connect(FRT_RPCRequest *req) { - FRT_Values ¶ms = *req->GetParams(); - FNET_Connection *connection = req->GetConnection(); - if (checkConnection(connection)) { - return; - } - string build_id = getString(params[0]); - if (build_id != getBuildId()) { - req->SetError(FRTE_RPC_METHOD_FAILED, - ("Wrong build id. Got '" + build_id + - "', required '" + getBuildId() + "'").c_str()); - return; - } else if (_provider.get()) { - req->SetError(FRTE_RPC_METHOD_FAILED, "Server is already connected"); - return; - } - if (!connection) { - req->SetError(FRTE_RPC_METHOD_FAILED); - return; - } - connection->SetContext(FNET_Context(magic_number)); - _provider = _factory.create(); -} - -void ProviderStub::detachAndRun(FRT_RPCRequest *req, Closure::UP closure) { - if (!checkConnection(req->GetConnection())) { - req->SetError(FRTE_RPC_METHOD_FAILED); - return; - } - assert(_provider.get() != 0); - req->Detach(); - _executor.execute(makeTask(std::move(closure))); -} - -void ProviderStub::RPC_getPartitionStates(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doGetPartitionStates, req, _provider.get())); -} - -void ProviderStub::RPC_initialize(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doInitialize, req, _provider.get())); -} - -void ProviderStub::RPC_listBuckets(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doListBuckets, req, _provider.get())); -} - -void ProviderStub::RPC_setClusterState(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doSetClusterState, req, _provider.get())); -} - -void ProviderStub::RPC_setActiveState(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doSetActiveState, req, _provider.get())); -} - -void ProviderStub::RPC_getBucketInfo(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doGetBucketInfo, req, _provider.get())); -} - -void ProviderStub::RPC_put(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doPut, req, _provider.get(), _repo)); -} - -void ProviderStub::RPC_removeById(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doRemoveById, req, _provider.get())); -} - -void ProviderStub::RPC_removeIfFound(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doRemoveIfFound, req, _provider.get())); -} - -void ProviderStub::RPC_update(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doUpdate, req, _provider.get(), _repo)); -} - -void ProviderStub::RPC_flush(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doFlush, req, _provider.get())); -} - -void ProviderStub::RPC_get(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doGet, req, _provider.get(), _repo)); -} - -void ProviderStub::RPC_createIterator(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doCreateIterator, req, _provider.get(), _repo)); -} - -void ProviderStub::RPC_iterate(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doIterate, req, _provider.get())); -} - -void ProviderStub::RPC_destroyIterator(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doDestroyIterator, req, _provider.get())); -} - -void ProviderStub::RPC_createBucket(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doCreateBucket, req, _provider.get())); -} - -void ProviderStub::RPC_deleteBucket(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doDeleteBucket, req, _provider.get())); -} - -void ProviderStub::RPC_getModifiedBuckets(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doGetModifiedBuckets, req, _provider.get())); -} - -void ProviderStub::RPC_split(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doSplit, req, _provider.get())); -} - -void ProviderStub::RPC_join(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doJoin, req, _provider.get())); -} - -void ProviderStub::RPC_move(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doMove, req, _provider.get())); -} - -void ProviderStub::RPC_maintain(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doMaintain, req, _provider.get())); -} - -void ProviderStub::RPC_removeEntry(FRT_RPCRequest *req) { - detachAndRun(req, makeClosure(doRemoveEntry, req, _provider.get())); -} - -void ProviderStub::SetupRpcCalls() { - FRT_ReflectionBuilder rb(_supervisor.get()); - addConnect(rb, FRT_METHOD(ProviderStub::RPC_connect), this); - addInitialize(rb, FRT_METHOD(ProviderStub::RPC_initialize), this); - addGetPartitionStates(rb, FRT_METHOD(ProviderStub::RPC_getPartitionStates), this); - addListBuckets(rb, FRT_METHOD(ProviderStub::RPC_listBuckets), this); - addSetClusterState(rb, FRT_METHOD(ProviderStub::RPC_setClusterState), this); - addSetActiveState(rb, FRT_METHOD(ProviderStub::RPC_setActiveState), this); - addGetBucketInfo(rb, FRT_METHOD(ProviderStub::RPC_getBucketInfo), this); - addPut(rb, FRT_METHOD(ProviderStub::RPC_put), this); - addRemoveById(rb, FRT_METHOD(ProviderStub::RPC_removeById), this); - addRemoveIfFound(rb, FRT_METHOD(ProviderStub::RPC_removeIfFound), this); - addUpdate(rb, FRT_METHOD(ProviderStub::RPC_update), this); - addFlush(rb, FRT_METHOD(ProviderStub::RPC_flush), this); - addGet(rb, FRT_METHOD(ProviderStub::RPC_get), this); - addCreateIterator(rb, FRT_METHOD(ProviderStub::RPC_createIterator), this); - addIterate(rb, FRT_METHOD(ProviderStub::RPC_iterate), this); - addDestroyIterator(rb, FRT_METHOD(ProviderStub::RPC_destroyIterator), this); - addCreateBucket(rb, FRT_METHOD(ProviderStub::RPC_createBucket), this); - addDeleteBucket(rb, FRT_METHOD(ProviderStub::RPC_deleteBucket), this); - addGetModifiedBuckets(rb, FRT_METHOD(ProviderStub::RPC_getModifiedBuckets), this); - addSplit(rb, FRT_METHOD(ProviderStub::RPC_split), this); - addJoin(rb, FRT_METHOD(ProviderStub::RPC_join), this); - addMove(rb, FRT_METHOD(ProviderStub::RPC_move), this); - addMaintain(rb, FRT_METHOD(ProviderStub::RPC_maintain), this); - addRemoveEntry(rb, FRT_METHOD(ProviderStub::RPC_removeEntry), this); -} - -ProviderStub::ProviderStub(int port, uint32_t threads, - const document::DocumentTypeRepo &repo, - PersistenceProviderFactory &factory) - : _supervisor(std::make_unique()), - _executor(threads, 256*1024), - _repo(&repo), - _factory(factory), - _provider(), - _providerCleanupTask(_supervisor->GetScheduler(), _executor, _provider) -{ - SetupRpcCalls(); - _supervisor->SetSessionFiniHook(FRT_METHOD(ProviderStub::HOOK_fini), this); - _supervisor->Start(); - _supervisor->Listen(port); -} - -ProviderStub::~ProviderStub() { - _supervisor->ShutDown(true); - sync(); -} - -int -ProviderStub::getPort() const { - return _supervisor->GetListenPort(); -} - -} diff --git a/persistence/src/vespa/persistence/proxy/providerstub.h b/persistence/src/vespa/persistence/proxy/providerstub.h deleted file mode 100644 index cd0665171b1..00000000000 --- a/persistence/src/vespa/persistence/proxy/providerstub.h +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include -#include -#include -#include - -class FRT_Supervisor; - -namespace document { class DocumentTypeRepo; } - -namespace storage { -namespace spi { -class PersistenceProvider; - -class ProviderStub : private FRT_Invokable -{ -public: - struct PersistenceProviderFactory { - virtual std::unique_ptr create() const = 0; - virtual ~PersistenceProviderFactory() {} - }; - -private: - struct ProviderCleanupTask : FNET_Task { - vespalib::ThreadStackExecutor &executor; - std::unique_ptr &provider; - ProviderCleanupTask(FNET_Scheduler *s, - vespalib::ThreadStackExecutor &e, - std::unique_ptr &p) - : FNET_Task(s), executor(e), provider(p) {} - void PerformTask() override { - executor.sync(); - assert(provider.get() != 0); - provider.reset(); - } - }; - - std::unique_ptr _supervisor; - vespalib::ThreadStackExecutor _executor; - const document::DocumentTypeRepo *_repo; - PersistenceProviderFactory &_factory; - std::unique_ptr _provider; - ProviderCleanupTask _providerCleanupTask; - - void HOOK_fini(FRT_RPCRequest *req); - - void detachAndRun(FRT_RPCRequest *req, vespalib::Closure::UP closure); - void RPC_connect(FRT_RPCRequest *req); - void RPC_initialize(FRT_RPCRequest *req); - void RPC_getPartitionStates(FRT_RPCRequest *req); - void RPC_listBuckets(FRT_RPCRequest *req); - void RPC_setClusterState(FRT_RPCRequest *req); - void RPC_setActiveState(FRT_RPCRequest *req); - void RPC_getBucketInfo(FRT_RPCRequest *req); - void RPC_put(FRT_RPCRequest *req); - void RPC_removeById(FRT_RPCRequest *req); - void RPC_removeIfFound(FRT_RPCRequest *req); - void RPC_update(FRT_RPCRequest *req); - void RPC_flush(FRT_RPCRequest *req); - void RPC_get(FRT_RPCRequest *req); - void RPC_createIterator(FRT_RPCRequest *req); - void RPC_iterate(FRT_RPCRequest *req); - void RPC_destroyIterator(FRT_RPCRequest *req); - void RPC_createBucket(FRT_RPCRequest *req); - void RPC_deleteBucket(FRT_RPCRequest *req); - void RPC_getModifiedBuckets(FRT_RPCRequest *req); - void RPC_split(FRT_RPCRequest *req); - void RPC_join(FRT_RPCRequest *req); - void RPC_move(FRT_RPCRequest *req); - void RPC_maintain(FRT_RPCRequest *req); - void RPC_removeEntry(FRT_RPCRequest *req); - - void SetupRpcCalls(); - -public: - typedef std::unique_ptr UP; - - ProviderStub(int port, uint32_t threads, - const document::DocumentTypeRepo &repo, - PersistenceProviderFactory &factory); - ~ProviderStub(); - - bool hasClient() const { return (_provider.get() != 0); } - int getPort() const; - void setRepo(const document::DocumentTypeRepo &repo) { - _repo = &repo; - } - void sync() { _executor.sync(); } -}; - -} // namespace spi -} // namespace storage - -- cgit v1.2.3