summaryrefslogtreecommitdiffstats
path: root/persistence
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-03 11:37:51 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-03 11:40:31 +0000
commit32f8d5bb20479e0895f97cebe9f575d7b09de8c5 (patch)
tree9d4d7808a5da6315df2088c03f419d7ece0c250f /persistence
parent6e134d121f14dd940f84b25b5bc27ad22d80af2f (diff)
Remove unused support for SPI over RPC.
Diffstat (limited to 'persistence')
-rw-r--r--persistence/CMakeLists.txt2
-rw-r--r--persistence/src/tests/proxy/.gitignore10
-rw-r--r--persistence/src/tests/proxy/CMakeLists.txt31
-rw-r--r--persistence/src/tests/proxy/dummy_provider_factory.h35
-rw-r--r--persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp42
-rw-r--r--persistence/src/tests/proxy/mockprovider.h163
-rw-r--r--persistence/src/tests/proxy/providerproxy_conformancetest.cpp61
-rw-r--r--persistence/src/tests/proxy/providerproxy_test.cpp404
-rw-r--r--persistence/src/tests/proxy/providerstub_test.cpp543
-rw-r--r--persistence/src/tests/proxy/proxy_factory_wrapper.h56
-rwxr-xr-xpersistence/src/tests/proxy/proxy_test.sh6
-rw-r--r--persistence/src/tests/proxy/proxyfactory.h37
-rw-r--r--persistence/src/vespa/persistence/CMakeLists.txt1
-rw-r--r--persistence/src/vespa/persistence/proxy/.gitignore2
-rw-r--r--persistence/src/vespa/persistence/proxy/CMakeLists.txt8
-rw-r--r--persistence/src/vespa/persistence/proxy/buildid.cpp8
-rw-r--r--persistence/src/vespa/persistence/proxy/buildid.h12
-rw-r--r--persistence/src/vespa/persistence/proxy/providerproxy.cpp493
-rw-r--r--persistence/src/vespa/persistence/proxy/providerproxy.h76
-rw-r--r--persistence/src/vespa/persistence/proxy/providerstub.cpp928
-rw-r--r--persistence/src/vespa/persistence/proxy/providerstub.h97
21 files changed, 0 insertions, 3015 deletions
diff --git a/persistence/CMakeLists.txt b/persistence/CMakeLists.txt
index b4cb36e8bcf..d868bfcba6e 100644
--- a/persistence/CMakeLists.txt
+++ b/persistence/CMakeLists.txt
@@ -17,7 +17,6 @@ vespa_define_module(
src/vespa/persistence
src/vespa/persistence/conformancetest
src/vespa/persistence/dummyimpl
- src/vespa/persistence/proxy
src/vespa/persistence/spi
TEST_DEPENDS
@@ -26,7 +25,6 @@ vespa_define_module(
TESTS
src/tests
src/tests/dummyimpl
- src/tests/proxy
src/tests/spi
)
diff --git a/persistence/src/tests/proxy/.gitignore b/persistence/src/tests/proxy/.gitignore
deleted file mode 100644
index 03bce028dd9..00000000000
--- a/persistence/src/tests/proxy/.gitignore
+++ /dev/null
@@ -1,10 +0,0 @@
-/.depend
-/Makefile
-/providerstub_test
-/providerproxy_test
-/providerproxy_conformancetest
-/vespa-external-providerproxy-conformancetest
-persistence_providerproxy_conformance_test_app
-persistence_providerproxy_test_app
-persistence_providerstub_test_app
-persistence_external_providerproxy_conformancetest_app
diff --git a/persistence/src/tests/proxy/CMakeLists.txt b/persistence/src/tests/proxy/CMakeLists.txt
deleted file mode 100644
index 598a3a6a69d..00000000000
--- a/persistence/src/tests/proxy/CMakeLists.txt
+++ /dev/null
@@ -1,31 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(persistence_providerstub_test_app
- SOURCES
- providerstub_test.cpp
- DEPENDS
- persistence
-)
-vespa_add_executable(persistence_providerproxy_test_app
- SOURCES
- providerproxy_test.cpp
- DEPENDS
- persistence
-)
-vespa_add_executable(persistence_providerproxy_conformance_test_app TEST
- SOURCES
- providerproxy_conformancetest.cpp
- DEPENDS
- persistence
- persistence_persistence_conformancetest
-)
-vespa_add_executable(persistence_external_providerproxy_conformancetest_app
- SOURCES
- external_providerproxy_conformancetest.cpp
- OUTPUT_NAME vespa-external-providerproxy-conformancetest
- INSTALL bin
- DEPENDS
- persistence
- persistence_persistence_conformancetest
-)
-vespa_add_test(NAME persistence_providerproxy_conformance_test_app COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/proxy_test.sh
- DEPENDS persistence_providerstub_test_app persistence_providerproxy_test_app persistence_providerproxy_conformance_test_app)
diff --git a/persistence/src/tests/proxy/dummy_provider_factory.h b/persistence/src/tests/proxy/dummy_provider_factory.h
deleted file mode 100644
index 808bce29fac..00000000000
--- a/persistence/src/tests/proxy/dummy_provider_factory.h
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include <memory>
-
-namespace storage {
-namespace spi {
-
-/**
- * A simple rpc server persistence provider factory that will only
- * work once, by returning a precreated persistence provider instance.
- **/
-struct DummyProviderFactory : ProviderStub::PersistenceProviderFactory
-{
- typedef std::unique_ptr<DummyProviderFactory> UP;
- typedef storage::spi::PersistenceProvider Provider;
-
- mutable std::unique_ptr<Provider> provider;
-
- DummyProviderFactory(std::unique_ptr<Provider> p) : provider(std::move(p)) {}
-
- std::unique_ptr<Provider> create() const override {
- ASSERT_TRUE(provider.get() != 0);
- std::unique_ptr<Provider> ret = std::move(provider);
- ASSERT_TRUE(provider.get() == 0);
- return ret;
- }
-};
-
-} // namespace spi
-} // namespace storage
-
diff --git a/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp b/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp
deleted file mode 100644
index adf7a84dbd4..00000000000
--- a/persistence/src/tests/proxy/external_providerproxy_conformancetest.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "proxyfactory.h"
-#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/persistence/conformancetest/conformancetest.h>
-#include <vespa/persistence/proxy/providerproxy.h>
-#include <vespa/persistence/proxy/providerstub.h>
-
-using namespace storage::spi;
-typedef document::DocumentTypeRepo Repo;
-typedef ConformanceTest::PersistenceFactory Factory;
-
-namespace {
-
-struct ConformanceFixture : public ConformanceTest {
- ConformanceFixture(Factory::UP f) : ConformanceTest(std::move(f)) { setUp(); }
- ~ConformanceFixture() { tearDown(); }
-};
-
-Factory::UP getFactory() {
- return Factory::UP(new ProxyFactory());
-}
-
-#define CONVERT_TEST(testFunction, makeFactory) \
-namespace ns_ ## testFunction { \
-TEST_F(TEST_STR(testFunction) " " TEST_STR(makeFactory), ConformanceFixture(makeFactory)) { \
- f.testFunction(); \
-} \
-} // namespace testFunction
-
-#undef CPPUNIT_TEST
-#define CPPUNIT_TEST(testFunction) CONVERT_TEST(testFunction, MAKE_FACTORY)
-
-#define MAKE_FACTORY getFactory()
-DEFINE_CONFORMANCE_TESTS();
-
-} // namespace
-
-TEST_MAIN() {
- TEST_RUN_ALL();
-}
diff --git a/persistence/src/tests/proxy/mockprovider.h b/persistence/src/tests/proxy/mockprovider.h
deleted file mode 100644
index fda0b4aa922..00000000000
--- a/persistence/src/tests/proxy/mockprovider.h
+++ /dev/null
@@ -1,163 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/document/fieldvalue/document.h>
-
-namespace storage {
-namespace spi {
-
-struct MockProvider : PersistenceProvider {
- enum Function { NONE, INITIALIZE, GET_PARTITION_STATES, LIST_BUCKETS,
- SET_CLUSTER_STATE,
- SET_ACTIVE_STATE, GET_BUCKET_INFO, PUT, REMOVE_BY_ID,
- REMOVE_IF_FOUND, REPLACE_WITH_REMOVE, UPDATE, FLUSH, GET,
- CREATE_ITERATOR, ITERATE, DESTROY_ITERATOR, CREATE_BUCKET,
- DELETE_BUCKET, GET_MODIFIED_BUCKETS, SPLIT, JOIN, MOVE, MAINTAIN,
- REMOVE_ENTRY };
-
- mutable Function last_called;
-
- MockProvider() : last_called(NONE) {}
-
- Result initialize() override {
- last_called = INITIALIZE;
- return Result();
- }
-
- PartitionStateListResult getPartitionStates() const override {
- last_called = GET_PARTITION_STATES;
- return PartitionStateListResult(PartitionStateList(1u));
- }
-
- BucketIdListResult listBuckets(PartitionId id) const override {
- last_called = LIST_BUCKETS;
- BucketIdListResult::List result;
- result.push_back(document::BucketId(id));
- return BucketIdListResult(result);
- }
-
- Result setClusterState(const ClusterState &) override {
- last_called = SET_CLUSTER_STATE;
- return Result();
- }
-
- Result setActiveState(const Bucket &, BucketInfo::ActiveState) override {
- last_called = SET_ACTIVE_STATE;
- return Result();
- }
-
- BucketInfoResult getBucketInfo(const Bucket &bucket) const override {
- last_called = GET_BUCKET_INFO;
- return BucketInfoResult(BucketInfo(BucketChecksum(1), 2, 3,
- bucket.getBucketId().getRawId(),
- bucket.getPartition(),
- BucketInfo::READY,
- BucketInfo::ACTIVE));
- }
-
- Result put(const Bucket &, Timestamp, const DocumentSP&, Context&) override {
- last_called = PUT;
- return Result();
- }
-
- RemoveResult remove(const Bucket &, Timestamp, const DocumentId &, Context&) override {
- last_called = REMOVE_BY_ID;
- return RemoveResult(true);
- }
-
- RemoveResult removeIfFound(const Bucket &, Timestamp, const DocumentId &, Context&) override {
- last_called = REMOVE_IF_FOUND;
- return RemoveResult(true);
- }
-
- virtual RemoveResult replaceWithRemove(const Bucket &, Timestamp,
- const DocumentId &, Context&) {
- last_called = REPLACE_WITH_REMOVE;
- return RemoveResult(true);
- }
-
- UpdateResult update(const Bucket &, Timestamp timestamp, const DocumentUpdateSP&, Context&) override {
- last_called = UPDATE;
- return UpdateResult(Timestamp(timestamp - 10));
- }
-
- Result flush(const Bucket&, Context&) override {
- last_called = FLUSH;
- return Result();
- }
-
- GetResult get(const Bucket &, const document::FieldSet&, const DocumentId&, Context&) const override {
- last_called = GET;
- return GetResult(Document::UP(new Document),
- Timestamp(6u));
- }
-
- CreateIteratorResult createIterator(const Bucket& bucket,
- const document::FieldSet&,
- const Selection&,
- IncludedVersions,
- Context&) override
- {
- last_called = CREATE_ITERATOR;
- return CreateIteratorResult(IteratorId(bucket.getPartition()));
- }
-
- IterateResult iterate(IteratorId, uint64_t, Context&) const override {
- last_called = ITERATE;
- IterateResult::List result;
- result.push_back(DocEntry::UP(new DocEntry(Timestamp(1), 0)));
- return IterateResult(std::move(result), true);
- }
-
- Result destroyIterator(IteratorId, Context&) override {
- last_called = DESTROY_ITERATOR;
- return Result();
- }
-
- Result createBucket(const Bucket&, Context&) override {
- last_called = CREATE_BUCKET;
- return Result();
- }
- Result deleteBucket(const Bucket&, Context&) override {
- last_called = DELETE_BUCKET;
- return Result();
- }
-
- BucketIdListResult getModifiedBuckets() const override {
- last_called = GET_MODIFIED_BUCKETS;
- BucketIdListResult::List list;
- list.push_back(document::BucketId(2));
- list.push_back(document::BucketId(3));
- return BucketIdListResult(list);
- }
-
- Result split(const Bucket &, const Bucket &, const Bucket &, Context&) override {
- last_called = SPLIT;
- return Result();
- }
-
- Result join(const Bucket &, const Bucket &, const Bucket &, Context&) override {
- last_called = JOIN;
- return Result();
- }
-
- Result move(const Bucket &, PartitionId, Context&) override {
- last_called = MOVE;
- return Result();
- }
-
-
- Result maintain(const Bucket &, MaintenanceLevel) override {
- last_called = MAINTAIN;
- return Result();
- }
-
- Result removeEntry(const Bucket &, Timestamp, Context&) override {
- last_called = REMOVE_ENTRY;
- return Result();
- }
-};
-
-} // namespace spi
-} // namespace storage
diff --git a/persistence/src/tests/proxy/providerproxy_conformancetest.cpp b/persistence/src/tests/proxy/providerproxy_conformancetest.cpp
deleted file mode 100644
index fda3f42f0d5..00000000000
--- a/persistence/src/tests/proxy/providerproxy_conformancetest.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/persistence/conformancetest/conformancetest.h>
-#include <vespa/persistence/dummyimpl/dummypersistence.h>
-#include <vespa/persistence/proxy/providerproxy.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include "proxy_factory_wrapper.h"
-
-using namespace storage::spi;
-typedef document::DocumentTypeRepo Repo;
-typedef ConformanceTest::PersistenceFactory Factory;
-
-namespace {
-
-struct DummyFactory : Factory {
- PersistenceProvider::UP getPersistenceImplementation(const Repo::SP& repo,
- const Repo::DocumenttypesConfig &) override {
- return PersistenceProvider::UP(new dummy::DummyPersistence(repo, 4));
- }
-
- bool supportsActiveState() const override {
- return true;
- }
-};
-
-struct ConformanceFixture : public ConformanceTest {
- ConformanceFixture(Factory::UP f) : ConformanceTest(std::move(f)) { setUp(); }
- ~ConformanceFixture() { tearDown(); }
-};
-
-Factory::UP dummyViaProxy(size_t n) {
- if (n == 0) {
- return Factory::UP(new DummyFactory());
- }
- return Factory::UP(new ProxyFactoryWrapper(dummyViaProxy(n - 1)));
-}
-
-#define CONVERT_TEST(testFunction, makeFactory) \
-namespace ns_ ## testFunction { \
-TEST_F(TEST_STR(testFunction) " " TEST_STR(makeFactory), ConformanceFixture(makeFactory)) { \
- f.testFunction(); \
-} \
-} // namespace testFunction
-
-#undef CPPUNIT_TEST
-#define CPPUNIT_TEST(testFunction) CONVERT_TEST(testFunction, MAKE_FACTORY)
-
-#define MAKE_FACTORY dummyViaProxy(1)
-DEFINE_CONFORMANCE_TESTS();
-
-#undef MAKE_FACTORY
-#define MAKE_FACTORY dummyViaProxy(7)
-DEFINE_CONFORMANCE_TESTS();
-
-} // namespace
-
-TEST_MAIN() {
- TEST_RUN_ALL();
-}
diff --git a/persistence/src/tests/proxy/providerproxy_test.cpp b/persistence/src/tests/proxy/providerproxy_test.cpp
deleted file mode 100644
index 5fcfe0b3cab..00000000000
--- a/persistence/src/tests/proxy/providerproxy_test.cpp
+++ /dev/null
@@ -1,404 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// Unit tests for providerproxy.
-
-#include "dummy_provider_factory.h"
-#include "mockprovider.h"
-#include <vespa/document/bucket/bucketid.h>
-#include <vespa/document/datatype/datatype.h>
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/persistence/proxy/providerproxy.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include <vespa/persistence/spi/abstractpersistenceprovider.h>
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/vespalib/util/closure.h>
-#include <vespa/vespalib/util/closuretask.h>
-#include <vespa/vespalib/util/sync.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/config-stor-distribution.h>
-
-using document::BucketId;
-using document::DataType;
-using document::DocumentTypeRepo;
-using std::ostringstream;
-using vespalib::Gate;
-using vespalib::ThreadStackExecutor;
-using vespalib::makeClosure;
-using vespalib::makeTask;
-using namespace storage::spi;
-using namespace storage;
-
-namespace {
-
-const int port = 14863;
-const string connect_spec = "tcp/localhost:14863";
-LoadType defaultLoadType(0, "default");
-
-void startServer(const DocumentTypeRepo *repo, Gate *gate) {
- DummyProviderFactory factory(MockProvider::UP(new MockProvider));
- ProviderStub stub(port, 8, *repo, factory);
- gate->await();
- EXPECT_TRUE(stub.hasClient());
-}
-
-TEST("require that client can start connecting before server is up") {
- const DocumentTypeRepo repo;
- Gate gate;
- ThreadStackExecutor executor(1, 65536);
- executor.execute(makeTask(makeClosure(startServer, &repo, &gate)));
- ProviderProxy proxy(connect_spec, repo);
- gate.countDown();
- executor.sync();
-}
-
-TEST("require that when the server goes down it causes permanent failure.") {
- const DocumentTypeRepo repo;
- DummyProviderFactory factory(MockProvider::UP(new MockProvider));
- ProviderStub::UP server(new ProviderStub(port, 8, repo, factory));
- ProviderProxy proxy(connect_spec, repo);
- server.reset(0);
-
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
- Result result = proxy.flush(bucket, context);
- EXPECT_EQUAL(Result::FATAL_ERROR, result.getErrorCode());
-}
-
-struct Fixture {
- MockProvider &mock_spi;
- DummyProviderFactory factory;
- DocumentTypeRepo repo;
- ProviderStub stub;
- ProviderProxy proxy;
-
- Fixture()
- : mock_spi(*(new MockProvider)),
- factory(PersistenceProvider::UP(&mock_spi)),
- repo(),
- stub(port, 8, repo, factory),
- proxy(connect_spec, repo) {}
-};
-
-TEST_F("require that client handles initialize", Fixture) {
- Result result = f.proxy.initialize();
- EXPECT_EQUAL(MockProvider::INITIALIZE, f.mock_spi.last_called);
-}
-
-TEST_F("require that client handles getPartitionStates", Fixture) {
- PartitionStateListResult result = f.proxy.getPartitionStates();
- EXPECT_EQUAL(MockProvider::GET_PARTITION_STATES, f.mock_spi.last_called);
- EXPECT_EQUAL(1u, result.getList().size());
-}
-
-TEST_F("require that client handles listBuckets", Fixture) {
- const PartitionId partition_id(42);
-
- BucketIdListResult result = f.proxy.listBuckets(partition_id);
- EXPECT_EQUAL(MockProvider::LIST_BUCKETS, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- ASSERT_EQUAL(1u, result.getList().size());
-}
-
-TEST_F("require that client handles setClusterState", Fixture) {
- lib::ClusterState s("version:1 storage:3 distributor:3");
- lib::Distribution d(lib::Distribution::getDefaultDistributionConfig(3, 3));
- ClusterState state(s, 0, d);
-
- Result result = f.proxy.setClusterState(state);
- EXPECT_EQUAL(MockProvider::SET_CLUSTER_STATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles setActiveState", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const BucketInfo::ActiveState bucket_state = BucketInfo::NOT_ACTIVE;
-
- Result result = f.proxy.setActiveState(bucket, bucket_state);
- EXPECT_EQUAL(MockProvider::SET_ACTIVE_STATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles getBucketInfo", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
-
- BucketInfoResult result = f.proxy.getBucketInfo(bucket);
- EXPECT_EQUAL(MockProvider::GET_BUCKET_INFO, f.mock_spi.last_called);
-
- const BucketInfo& info(result.getBucketInfo());
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(1u, info.getChecksum());
- EXPECT_EQUAL(2u, info.getDocumentCount());
- EXPECT_EQUAL(3u, info.getDocumentSize());
- EXPECT_EQUAL(bucket_id, info.getEntryCount());
- EXPECT_EQUAL(partition_id, info.getUsedSize());
- EXPECT_EQUAL(true, info.isReady());
- EXPECT_EQUAL(true, info.isActive());
-}
-
-TEST_F("require that client handles put", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const Timestamp timestamp(84);
- Document::SP doc(new Document());
-
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
- Result result = f.proxy.put(bucket, timestamp, doc, context);
- EXPECT_EQUAL(MockProvider::PUT, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles remove by id", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const Timestamp timestamp(84);
- const DocumentId id("doc:test:1");
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- RemoveResult result = f.proxy.remove(bucket, timestamp, id, context);
- EXPECT_EQUAL(MockProvider::REMOVE_BY_ID, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(true, result.wasFound());
-}
-
-TEST_F("require that client handles removeIfFound", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const Timestamp timestamp(84);
- const DocumentId id("doc:test:1");
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- RemoveResult result = f.proxy.removeIfFound(bucket, timestamp, id, context);
- EXPECT_EQUAL(MockProvider::REMOVE_IF_FOUND, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(true, result.wasFound());
-}
-
-TEST_F("require that client handles update", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const Timestamp timestamp(84);
- DocumentUpdate::SP update(new DocumentUpdate(*DataType::DOCUMENT, DocumentId("doc:test:1")));
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- UpdateResult result = f.proxy.update(bucket, timestamp, update, context);
- EXPECT_EQUAL(MockProvider::UPDATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(timestamp - 10, result.getExistingTimestamp());
-}
-
-TEST_F("require that client handles flush", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- Result result = f.proxy.flush(bucket, context);
- EXPECT_EQUAL(MockProvider::FLUSH, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles get", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
-
- document::AllFields field_set;
- const DocumentId id("doc:test:1");
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- GetResult result = f.proxy.get(bucket, field_set, id, context);
- EXPECT_EQUAL(MockProvider::GET, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(6u, result.getTimestamp());
- ASSERT_TRUE(result.hasDocument());
- EXPECT_EQUAL(Document(), result.getDocument());
-}
-
-TEST_F("require that client handles createIterator", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const DocumentSelection doc_sel("docsel");
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- document::AllFields field_set;
-
- Selection selection(doc_sel);
- selection.setFromTimestamp(Timestamp(84));
- selection.setToTimestamp(Timestamp(126));
-
- CreateIteratorResult result =
- f.proxy.createIterator(bucket, field_set, selection,
- NEWEST_DOCUMENT_ONLY, context);
-
- EXPECT_EQUAL(MockProvider::CREATE_ITERATOR, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(partition_id, result.getIteratorId());
-}
-
-TEST_F("require that client handles iterate", Fixture) {
- const IteratorId iterator_id(42);
- const uint64_t max_byte_size = 21;
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- IterateResult result = f.proxy.iterate(iterator_id, max_byte_size, context);
- EXPECT_EQUAL(MockProvider::ITERATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
- EXPECT_EQUAL(1u, result.getEntries().size());
- EXPECT_TRUE(result.isCompleted());
-}
-
-TEST_F("require that client handles destroyIterator", Fixture) {
- const IteratorId iterator_id(42);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- f.proxy.destroyIterator(iterator_id, context);
- EXPECT_EQUAL(MockProvider::DESTROY_ITERATOR, f.mock_spi.last_called);
-}
-
-TEST_F("require that client handles createBucket", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- f.proxy.createBucket(bucket, context);
- EXPECT_EQUAL(MockProvider::CREATE_BUCKET, f.mock_spi.last_called);
-}
-
-TEST_F("require that server accepts deleteBucket", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- f.proxy.deleteBucket(bucket, context);
- EXPECT_EQUAL(MockProvider::DELETE_BUCKET, f.mock_spi.last_called);
-}
-
-TEST_F("require that client handles getModifiedBuckets", Fixture) {
- BucketIdListResult modifiedBuckets = f.proxy.getModifiedBuckets();
- EXPECT_EQUAL(MockProvider::GET_MODIFIED_BUCKETS, f.mock_spi.last_called);
-
- EXPECT_EQUAL(2u, modifiedBuckets.getList().size());
-}
-
-TEST_F("require that client handles split", Fixture) {
- const uint64_t bucket_id_1 = 21;
- const PartitionId partition_id_1(42);
- const Bucket bucket_1(BucketId(bucket_id_1), partition_id_1);
- const uint64_t bucket_id_2 = 210;
- const PartitionId partition_id_2(420);
- const Bucket bucket_2(BucketId(bucket_id_2), partition_id_2);
- const uint64_t bucket_id_3 = 2100;
- const PartitionId partition_id_3(4200);
- const Bucket bucket_3(BucketId(bucket_id_3), partition_id_3);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- Result result = f.proxy.split(bucket_1, bucket_2, bucket_3, context);
- EXPECT_EQUAL(MockProvider::SPLIT, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles join", Fixture) {
- const uint64_t bucket_id_1 = 21;
- const PartitionId partition_id_1(42);
- const Bucket bucket_1(BucketId(bucket_id_1), partition_id_1);
- const uint64_t bucket_id_2 = 210;
- const PartitionId partition_id_2(420);
- const Bucket bucket_2(BucketId(bucket_id_2), partition_id_2);
- const uint64_t bucket_id_3 = 2100;
- const PartitionId partition_id_3(4200);
- const Bucket bucket_3(BucketId(bucket_id_3), partition_id_3);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- Result result = f.proxy.join(bucket_1, bucket_2, bucket_3, context);
- EXPECT_EQUAL(MockProvider::JOIN, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles move", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId from_partition_id(42);
- const PartitionId to_partition_id(43);
- const Bucket bucket(BucketId(bucket_id), from_partition_id);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- Result result = f.proxy.move(bucket, to_partition_id, context);
- EXPECT_EQUAL(MockProvider::MOVE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles maintain", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
-
- Result result = f.proxy.maintain(bucket, HIGH);
- EXPECT_EQUAL(MockProvider::MAINTAIN, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-TEST_F("require that client handles remove entry", Fixture) {
- const uint64_t bucket_id = 21;
- const PartitionId partition_id(42);
- const Bucket bucket(BucketId(bucket_id), partition_id);
- const Timestamp timestamp(345);
- Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0));
-
- Result result = f.proxy.removeEntry(bucket, timestamp, context);
- EXPECT_EQUAL(MockProvider::REMOVE_ENTRY, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0, result.getErrorCode());
- EXPECT_EQUAL("", result.getErrorMessage());
-}
-
-} // namespace
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/persistence/src/tests/proxy/providerstub_test.cpp b/persistence/src/tests/proxy/providerstub_test.cpp
deleted file mode 100644
index fa94cfa0cdc..00000000000
--- a/persistence/src/tests/proxy/providerstub_test.cpp
+++ /dev/null
@@ -1,543 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// Unit tests for providerstub.
-
-#include <vespa/document/datatype/datatype.h>
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/serialization/vespadocumentserializer.h>
-#include <vespa/document/util/bytebuffer.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/persistence/proxy/buildid.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include <vespa/persistence/spi/abstractpersistenceprovider.h>
-#include <vespa/vespalib/objects/nbostream.h>
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/config-stor-distribution.h>
-#include <vespa/fnet/frt/supervisor.h>
-#include <vespa/fnet/frt/rpcrequest.h>
-#include <vespa/fnet/frt/target.h>
-
-using document::BucketId;
-using document::ByteBuffer;
-using document::DataType;
-using document::DocumentTypeRepo;
-using document::VespaDocumentSerializer;
-using vespalib::nbostream;
-using namespace storage::spi;
-using namespace storage;
-
-#include <tests/proxy/mockprovider.h>
-#include "dummy_provider_factory.h"
-
-namespace {
-
-const int port = 14863;
-const char connect_spec[] = "tcp/localhost:14863";
-const string build_id = getBuildId();
-
-struct Fixture {
- MockProvider &mock_spi;
- DummyProviderFactory factory;
- DocumentTypeRepo repo;
- ProviderStub stub;
- FRT_Supervisor supervisor;
- FRT_RPCRequest *current_request;
- FRT_Target *target;
-
- Fixture()
- : mock_spi(*(new MockProvider())),
- factory(PersistenceProvider::UP(&mock_spi)),
- repo(),
- stub(port, 8, repo, factory),
- supervisor(),
- current_request(0),
- target(supervisor.GetTarget(connect_spec))
- {
- supervisor.Start();
- ASSERT_TRUE(target);
- }
- ~Fixture() {
- if (current_request) {
- current_request->SubRef();
- }
- target->SubRef();
- supervisor.ShutDown(true);
- }
- FRT_RPCRequest *getRequest(const string &name) {
- FRT_RPCRequest *req = supervisor.AllocRPCRequest(current_request);
- current_request = req;
- req->SetMethodName(name.c_str());
- return req;
- }
- void callRpc(FRT_RPCRequest *req, const string &return_spec) {
- target->InvokeSync(req, 5.0);
- req->CheckReturnTypes(return_spec.c_str());
- if (!EXPECT_EQUAL(uint32_t(FRTE_NO_ERROR), req->GetErrorCode())) {
- TEST_FATAL(req->GetErrorMessage());
- }
- }
- void failRpc(FRT_RPCRequest *req, uint32_t error_code) {
- target->InvokeSync(req, 5.0);
- EXPECT_EQUAL(error_code, req->GetErrorCode());
- }
-};
-
-struct ConnectedFixture : Fixture {
- ConnectedFixture() {
- FRT_RPCRequest *req = getRequest("vespa.persistence.connect");
- req->GetParams()->AddString(build_id.data(), build_id.size());
- callRpc(req, "");
- }
-};
-
-TEST("print build id") { fprintf(stderr, "build id: '%s'\n", getBuildId()); }
-
-TEST_F("require that server accepts connect", Fixture) {
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect");
- req->GetParams()->AddString(build_id.data(), build_id.size());
- f.callRpc(req, "");
- EXPECT_TRUE(f.stub.hasClient());
-}
-
-TEST_F("require that connect can be called twice", ConnectedFixture) {
- EXPECT_TRUE(f.stub.hasClient());
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect");
- req->GetParams()->AddString(build_id.data(), build_id.size());
- f.callRpc(req, "");
- EXPECT_TRUE(f.stub.hasClient());
-}
-
-TEST_F("require that connect fails with wrong build id", Fixture) {
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect");
- const string wrong_id = "wrong build id";
- req->GetParams()->AddString(wrong_id.data(), wrong_id.size());
- f.failRpc(req, FRTE_RPC_METHOD_FAILED);
- string prefix("Wrong build id. Got 'wrong build id', required ");
- EXPECT_EQUAL(prefix,
- string(req->GetErrorMessage()).substr(0, prefix.size()));
- EXPECT_FALSE(f.stub.hasClient());
-}
-
-TEST_F("require that only one client can connect", ConnectedFixture) {
- EXPECT_TRUE(f.stub.hasClient());
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.connect");
- req->GetParams()->AddString(build_id.data(), build_id.size());
- FRT_Target *target = f.supervisor.GetTarget(connect_spec);
- target->InvokeSync(req, 5.0);
- target->SubRef();
- EXPECT_EQUAL(uint32_t(FRTE_RPC_METHOD_FAILED), req->GetErrorCode());
- EXPECT_EQUAL("Server is already connected",
- string(req->GetErrorMessage()));
-}
-
-TEST_F("require that server accepts getPartitionStates", ConnectedFixture) {
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.getPartitionStates");
- f.callRpc(req, "bsIS");
- EXPECT_EQUAL(MockProvider::GET_PARTITION_STATES, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int32_array._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(3)._string_array._len);
-}
-
-TEST_F("require that server accepts listBuckets", ConnectedFixture) {
- const uint64_t partition_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.listBuckets");
- req->GetParams()->AddInt64(partition_id);
- f.callRpc(req, "bsL");
- EXPECT_EQUAL(MockProvider::LIST_BUCKETS, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int64_array._len);
- EXPECT_EQUAL(partition_id,
- req->GetReturn()->GetValue(2)._int64_array._pt[0]);
-}
-
-TEST_F("require that server accepts setClusterState", ConnectedFixture) {
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.setClusterState");
-
- lib::ClusterState s("version:1 storage:3 distributor:3");
- lib::Distribution d(lib::Distribution::getDefaultDistributionConfig(3, 3));
- ClusterState state(s, 0, d);
- vespalib::nbostream o;
- state.serialize(o);
- req->GetParams()->AddData(o.c_str(), o.size());
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::SET_CLUSTER_STATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts setActiveState", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const BucketInfo::ActiveState bucket_state = BucketInfo::NOT_ACTIVE;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.setActiveState");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt8(bucket_state);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::SET_ACTIVE_STATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts getBucketInfo", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.getBucketInfo");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- f.callRpc(req, "bsiiiiibb");
- EXPECT_EQUAL(MockProvider::GET_BUCKET_INFO, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._intval32);
- EXPECT_EQUAL(2u, req->GetReturn()->GetValue(3)._intval32);
- EXPECT_EQUAL(3u, req->GetReturn()->GetValue(4)._intval32);
- EXPECT_EQUAL(bucket_id, req->GetReturn()->GetValue(5)._intval32);
- EXPECT_EQUAL(partition_id, req->GetReturn()->GetValue(6)._intval32);
- EXPECT_EQUAL(static_cast<uint8_t>(BucketInfo::READY),
- req->GetReturn()->GetValue(7)._intval8);
- EXPECT_EQUAL(static_cast<uint8_t>(BucketInfo::ACTIVE),
- req->GetReturn()->GetValue(8)._intval8);
-}
-
-TEST_F("require that server accepts put", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const Timestamp timestamp(84);
- Document::UP doc(new Document);
- nbostream stream;
- VespaDocumentSerializer serializer(stream);
- serializer.write(*doc, document::COMPLETE);
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.put");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt64(timestamp);
- req->GetParams()->AddData(stream.c_str(), stream.size());
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::PUT, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-void testRemove(ConnectedFixture &f, const string &rpc_name,
- MockProvider::Function func) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const Timestamp timestamp(84);
- const DocumentId id("doc:test:1");
-
- FRT_RPCRequest *req = f.getRequest(rpc_name);
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt64(timestamp);
- req->GetParams()->AddString(id.toString().data(), id.toString().size());
- f.callRpc(req, "bsb");
- EXPECT_EQUAL(func, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_TRUE(req->GetReturn()->GetValue(2)._intval8);
-}
-
-TEST_F("require that server accepts remove by id", ConnectedFixture) {
- testRemove(f, "vespa.persistence.removeById", MockProvider::REMOVE_BY_ID);
-}
-
-TEST_F("require that server accepts removeIfFound", ConnectedFixture) {
- testRemove(f, "vespa.persistence.removeIfFound",
- MockProvider::REMOVE_IF_FOUND);
-}
-
-TEST_F("require that server accepts update", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const Timestamp timestamp(84);
- DocumentUpdate update(*DataType::DOCUMENT, DocumentId("doc:test:1"));
- vespalib::nbostream stream;
- update.serializeHEAD(stream);
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.update");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt64(timestamp);
- req->GetParams()->AddData(stream.c_str(), stream.size());
- f.callRpc(req, "bsl");
- EXPECT_EQUAL(MockProvider::UPDATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(timestamp - 10, req->GetReturn()->GetValue(2)._intval64);
-}
-
-TEST_F("require that server accepts flush", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.flush");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::FLUSH, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts get", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const string field_set_1 = "[all]";
- const DocumentId id("doc:test:1");
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.get");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddString(field_set_1.data(), field_set_1.size());
- req->GetParams()->AddString(id.toString().data(), id.toString().size());
- f.callRpc(req, "bslx");
- EXPECT_EQUAL(MockProvider::GET, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(6u, req->GetReturn()->GetValue(2)._intval64);
- EXPECT_EQUAL(25u, req->GetReturn()->GetValue(3)._data._len);
-}
-
-TEST_F("require that server accepts createIterator", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const string doc_sel = "docsel";
- const Timestamp timestamp_from(84);
- const Timestamp timestamp_to(126);
- const Timestamp timestamp_subset(168);
- const string field_set_1 = "[all]";
- const bool include_removes = false;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.createIterator");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddString(field_set_1.data(), field_set_1.size());
- req->GetParams()->AddString(doc_sel.data(), doc_sel.size());
- req->GetParams()->AddInt64(timestamp_from);
- req->GetParams()->AddInt64(timestamp_to);
- req->GetParams()->AddInt64Array(1)[0] = timestamp_subset;
- req->GetParams()->AddInt8(include_removes);
-
- f.callRpc(req, "bsl");
- EXPECT_EQUAL(MockProvider::CREATE_ITERATOR, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(partition_id, req->GetReturn()->GetValue(2)._intval64);
-}
-
-TEST_F("require that server accepts iterate", ConnectedFixture) {
- const uint64_t iterator_id = 42;
- const uint64_t max_byte_size = 21;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.iterate");
- req->GetParams()->AddInt64(iterator_id);
- req->GetParams()->AddInt64(max_byte_size);
- f.callRpc(req, "bsLISXb");
- EXPECT_EQUAL(MockProvider::ITERATE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(2)._int64_array._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(3)._int32_array._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(4)._string_array._len);
- EXPECT_EQUAL(1u, req->GetReturn()->GetValue(5)._data_array._len);
- EXPECT_TRUE(req->GetReturn()->GetValue(6)._intval8);
-}
-
-TEST_F("require that server accepts destroyIterator", ConnectedFixture) {
- const uint64_t iterator_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.destroyIterator");
- req->GetParams()->AddInt64(iterator_id);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::DESTROY_ITERATOR, f.mock_spi.last_called);
-}
-
-TEST_F("require that server accepts createBucket", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.createBucket");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::CREATE_BUCKET, f.mock_spi.last_called);
-}
-
-TEST_F("require that server accepts deleteBucket", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.deleteBucket");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::DELETE_BUCKET, f.mock_spi.last_called);
-}
-
-TEST_F("require that server accepts getModifiedBuckets", ConnectedFixture) {
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.getModifiedBuckets");
- f.callRpc(req, "bsL");
- EXPECT_EQUAL(MockProvider::GET_MODIFIED_BUCKETS, f.mock_spi.last_called);
- EXPECT_EQUAL(2u, req->GetReturn()->GetValue(2)._int64_array._len);
-}
-
-TEST_F("require that server accepts split", ConnectedFixture) {
- const uint64_t bucket_id_1 = 21;
- const uint64_t partition_id_1 = 42;
- const uint64_t bucket_id_2 = 210;
- const uint64_t partition_id_2 = 420;
- const uint64_t bucket_id_3 = 2100;
- const uint64_t partition_id_3 = 4200;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.split");
- req->GetParams()->AddInt64(bucket_id_1);
- req->GetParams()->AddInt64(partition_id_1);
- req->GetParams()->AddInt64(bucket_id_2);
- req->GetParams()->AddInt64(partition_id_2);
- req->GetParams()->AddInt64(bucket_id_3);
- req->GetParams()->AddInt64(partition_id_3);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::SPLIT, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts join", ConnectedFixture) {
- const uint64_t bucket_id_1 = 21;
- const uint64_t partition_id_1 = 42;
- const uint64_t bucket_id_2 = 210;
- const uint64_t partition_id_2 = 420;
- const uint64_t bucket_id_3 = 2100;
- const uint64_t partition_id_3 = 4200;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.join");
- req->GetParams()->AddInt64(bucket_id_1);
- req->GetParams()->AddInt64(partition_id_1);
- req->GetParams()->AddInt64(bucket_id_2);
- req->GetParams()->AddInt64(partition_id_2);
- req->GetParams()->AddInt64(bucket_id_3);
- req->GetParams()->AddInt64(partition_id_3);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::JOIN, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts move", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t from_partition_id = 42;
- const uint64_t to_partition_id = 43;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.move");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(from_partition_id);
- req->GetParams()->AddInt64(to_partition_id);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::MOVE, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts maintain", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const MaintenanceLevel verification_level = HIGH;
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.maintain");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt8(verification_level);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::MAINTAIN, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-TEST_F("require that server accepts remove_entry", ConnectedFixture) {
- const uint64_t bucket_id = 21;
- const uint64_t partition_id = 42;
- const Timestamp timestamp(345);
-
- FRT_RPCRequest *req = f.getRequest("vespa.persistence.removeEntry");
- req->GetParams()->AddInt64(bucket_id);
- req->GetParams()->AddInt64(partition_id);
- req->GetParams()->AddInt64(timestamp);
- f.callRpc(req, "bs");
- EXPECT_EQUAL(MockProvider::REMOVE_ENTRY, f.mock_spi.last_called);
-
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(0)._intval8);
- EXPECT_EQUAL(0u, req->GetReturn()->GetValue(1)._string._len);
-}
-
-void checkRpcFails(const string &name, const string &param_spec, Fixture &f) {
- TEST_STATE(name.c_str());
- FRT_RPCRequest *req = f.getRequest("vespa.persistence." + name);
- for (size_t i = 0; i < param_spec.size(); ++i) {
- switch(param_spec[i]) {
- case 'b': req->GetParams()->AddInt8(0); break;
- case 'l': req->GetParams()->AddInt64(0); break;
- case 'L': req->GetParams()->AddInt64Array(0); break;
- case 's': req->GetParams()->AddString(0, 0); break;
- case 'S': req->GetParams()->AddStringArray(0); break;
- case 'x': req->GetParams()->AddData(0, 0); break;
- }
- }
- f.failRpc(req, FRTE_RPC_METHOD_FAILED);
-}
-
-TEST_F("require that unconnected server fails all SPI calls.", Fixture)
-{
- checkRpcFails("initialize", "", f);
- checkRpcFails("getPartitionStates", "", f);
- checkRpcFails("listBuckets", "l", f);
- checkRpcFails("setClusterState", "x", f);
- checkRpcFails("setActiveState", "llb", f);
- checkRpcFails("getBucketInfo", "ll", f);
- checkRpcFails("put", "lllx", f);
- checkRpcFails("removeById", "llls", f);
- checkRpcFails("removeIfFound", "llls", f);
- checkRpcFails("update", "lllx", f);
- checkRpcFails("flush", "ll", f);
- checkRpcFails("get", "llss", f);
- checkRpcFails("createIterator", "llssllLb", f);
- checkRpcFails("iterate", "ll", f);
- checkRpcFails("destroyIterator", "l", f);
- checkRpcFails("createBucket", "ll", f);
- checkRpcFails("deleteBucket", "ll", f);
- checkRpcFails("getModifiedBuckets", "", f);
- checkRpcFails("split", "llllll", f);
- checkRpcFails("join", "llllll", f);
- checkRpcFails("maintain", "llb", f);
- checkRpcFails("removeEntry", "lll", f);
-}
-
-} // namespace
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/persistence/src/tests/proxy/proxy_factory_wrapper.h b/persistence/src/tests/proxy/proxy_factory_wrapper.h
deleted file mode 100644
index c42b262bcac..00000000000
--- a/persistence/src/tests/proxy/proxy_factory_wrapper.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/persistence/conformancetest/conformancetest.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include <vespa/persistence/proxy/providerproxy.h>
-#include "dummy_provider_factory.h"
-
-namespace storage {
-namespace spi {
-
-/**
- * Generic wrapper for persistence conformance test factories. This
- * wrapper will take any other factory and expose a factory interface
- * that will create persistence instances that communicate with
- * persistence instances created by the wrapped factory using the RPC
- * persistence Proxy.
- **/
-struct ProxyFactoryWrapper : ConformanceTest::PersistenceFactory
-{
- typedef storage::spi::ConformanceTest::PersistenceFactory Factory;
- typedef storage::spi::PersistenceProvider Provider;
- typedef storage::spi::ProviderStub Server;
- typedef storage::spi::ProviderProxy Client;
- typedef document::DocumentTypeRepo Repo;
-
- Factory::UP factory;
- ProxyFactoryWrapper(Factory::UP f) : factory(std::move(f)) {}
-
- struct Wrapper : Client {
- DummyProviderFactory::UP provider;
- Server::UP server;
- Wrapper(DummyProviderFactory::UP p, Server::UP s, const Repo &repo)
- : Client(vespalib::make_string("tcp/localhost:%u", s->getPort()), repo),
- provider(std::move(p)),
- server(std::move(s))
- {}
- };
-
- virtual Provider::UP
- getPersistenceImplementation(const Repo::SP &repo,
- const Repo::DocumenttypesConfig &typesCfg) override{
- DummyProviderFactory::UP provider(new DummyProviderFactory(factory->getPersistenceImplementation(repo, typesCfg)));
- Server::UP server(new Server(0, 8, *repo, *provider));
- return Provider::UP(new Wrapper(std::move(provider), std::move(server), *repo));
- }
-
- bool supportsActiveState() const override {
- return factory->supportsActiveState();
- }
-};
-} // namespace spi
-} // namespace storage
-
diff --git a/persistence/src/tests/proxy/proxy_test.sh b/persistence/src/tests/proxy/proxy_test.sh
deleted file mode 100755
index 637ff192356..00000000000
--- a/persistence/src/tests/proxy/proxy_test.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/bash
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-set -e
-$VALGRIND ./persistence_providerstub_test_app
-$VALGRIND ./persistence_providerproxy_test_app
-$VALGRIND ./persistence_providerproxy_conformance_test_app
diff --git a/persistence/src/tests/proxy/proxyfactory.h b/persistence/src/tests/proxy/proxyfactory.h
deleted file mode 100644
index b785fab4290..00000000000
--- a/persistence/src/tests/proxy/proxyfactory.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/persistence/conformancetest/conformancetest.h>
-#include <vespa/persistence/proxy/providerstub.h>
-#include <vespa/persistence/proxy/providerproxy.h>
-
-namespace storage {
-namespace spi {
-
-/**
- * Generic wrapper for persistence conformance test factories. This
- * wrapper will take any other factory and expose a factory interface
- * that will create persistence instances that communicate with
- * persistence instances created by the wrapped factory using the RPC
- * persistence Proxy.
- **/
-struct ProxyFactory : ConformanceTest::PersistenceFactory
-{
- using Provider = storage::spi::PersistenceProvider;
- using Client = storage::spi::ProviderProxy;
- using Repo = document::DocumentTypeRepo;
-
- ProxyFactory() {}
-
- Provider::UP
- getPersistenceImplementation(const Repo::SP &repo, const Repo::DocumenttypesConfig &) override {
- return Provider::UP(new Client("tcp/localhost:3456", *repo));
- }
-
- bool supportsActiveState() const override {
- return false;
- }
-};
-} // namespace spi
-} // namespace storage
diff --git a/persistence/src/vespa/persistence/CMakeLists.txt b/persistence/src/vespa/persistence/CMakeLists.txt
index 3b7920128ce..da8eda2164f 100644
--- a/persistence/src/vespa/persistence/CMakeLists.txt
+++ b/persistence/src/vespa/persistence/CMakeLists.txt
@@ -3,7 +3,6 @@ vespa_add_library(persistence
SOURCES
$<TARGET_OBJECTS:persistence_dummyimpl>
$<TARGET_OBJECTS:persistence_spi>
- $<TARGET_OBJECTS:persistence_proxy>
INSTALL lib64
DEPENDS
)
diff --git a/persistence/src/vespa/persistence/proxy/.gitignore b/persistence/src/vespa/persistence/proxy/.gitignore
deleted file mode 100644
index 7e7c0fe7fae..00000000000
--- a/persistence/src/vespa/persistence/proxy/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/.depend
-/Makefile
diff --git a/persistence/src/vespa/persistence/proxy/CMakeLists.txt b/persistence/src/vespa/persistence/proxy/CMakeLists.txt
deleted file mode 100644
index fdebad2fe49..00000000000
--- a/persistence/src/vespa/persistence/proxy/CMakeLists.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(persistence_proxy OBJECT
- SOURCES
- buildid.cpp
- providerproxy.cpp
- providerstub.cpp
- DEPENDS
-)
diff --git a/persistence/src/vespa/persistence/proxy/buildid.cpp b/persistence/src/vespa/persistence/proxy/buildid.cpp
deleted file mode 100644
index 2ac018069b8..00000000000
--- a/persistence/src/vespa/persistence/proxy/buildid.cpp
+++ /dev/null
@@ -1,8 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "buildid.h"
-#include <vespa/vespalib/component/vtag.h>
-
-const char *storage::spi::getBuildId() {
- return vespalib::VersionTagComponent;
-}
diff --git a/persistence/src/vespa/persistence/proxy/buildid.h b/persistence/src/vespa/persistence/proxy/buildid.h
deleted file mode 100644
index ab32b09c533..00000000000
--- a/persistence/src/vespa/persistence/proxy/buildid.h
+++ /dev/null
@@ -1,12 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-namespace storage {
-namespace spi {
-
-const char *getBuildId();
-
-} // namespace spi
-} // namespace storage
-
diff --git a/persistence/src/vespa/persistence/proxy/providerproxy.cpp b/persistence/src/vespa/persistence/proxy/providerproxy.cpp
deleted file mode 100644
index 52e641db74f..00000000000
--- a/persistence/src/vespa/persistence/proxy/providerproxy.cpp
+++ /dev/null
@@ -1,493 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "providerproxy.h"
-#include "buildid.h"
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/serialization/vespadocumentdeserializer.h>
-#include <vespa/document/serialization/vespadocumentserializer.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/objects/nbostream.h>
-#include <vespa/fnet/frt/frt.h>
-#include <vespa/log/log.h>
-LOG_SETUP(".providerproxy");
-
-using document::BucketId;
-using document::ByteBuffer;
-using document::DocumentTypeRepo;
-using document::VespaDocumentDeserializer;
-using document::VespaDocumentSerializer;
-using vespalib::nbostream;
-
-namespace storage {
-namespace spi {
-namespace {
-void addBucket(FRT_Values &values, const Bucket &bucket) {
- values.AddInt64(bucket.getBucketId().getId());
- values.AddInt64(bucket.getPartition());
-}
-
-void addDocument(FRT_Values &values, const Document &doc) {
- nbostream stream;
- VespaDocumentSerializer serializer(stream);
- serializer.write(doc, document::COMPLETE);
- values.AddData(stream.c_str(), stream.size());
-}
-
-void addString(FRT_Values &values, const string &s) {
- values.AddString(s.data(), s.size());
-}
-
-void addSelection(FRT_Values &values, const Selection &selection) {
- addString(values, selection.getDocumentSelection().getDocumentSelection());
- values.AddInt64(selection.getFromTimestamp());
- values.AddInt64(selection.getToTimestamp());
- std::copy(selection.getTimestampSubset().begin(),
- selection.getTimestampSubset().end(),
- values.AddInt64Array(selection.getTimestampSubset().size()));
-}
-
-void addDocumentUpdate(FRT_Values &values, const DocumentUpdate &update) {
- nbostream stream;
- update.serializeHEAD(stream);
- values.AddData(stream.c_str(), stream.size());
-}
-
-Document::UP readDocument(nbostream &stream, const DocumentTypeRepo &repo) {
- const uint16_t version = 8;
- VespaDocumentDeserializer deserializer(repo, stream, version);
- Document::UP doc(new Document);
- deserializer.read(*doc);
- return doc;
-}
-
-string getString(const FRT_StringValue &str) {
- return string(str._str, str._len);
-}
-
-string getString(const FRT_Value &value) {
- return getString(value._string);
-}
-
-template <typename ResultType>
-ResultType readError(const FRT_Values &values) {
- uint8_t error_code = values[0]._intval8;
- string error_msg = getString(values[1]);
- return ResultType(Result::ErrorType(error_code), error_msg);
-}
-
-bool invokeRpc(FRT_Target *target, FRT_RPCRequest &req, const char *res_spec) {
- target->InvokeSync(&req, 0.0); // no timeout
- req.CheckReturnTypes(res_spec);
- return req.GetErrorCode() == FRTE_NO_ERROR;
-}
-
-struct RequestScopedPtr : vespalib::noncopyable {
- FRT_RPCRequest *req;
- RequestScopedPtr(FRT_RPCRequest *r) : req(r) { assert(req); }
- ~RequestScopedPtr() { req->SubRef(); }
- FRT_RPCRequest *operator->() { return req; }
- FRT_RPCRequest &operator*() { return *req; }
-};
-} // namespace
-
-template <typename ResultType>
-ResultType ProviderProxy::invokeRpc_Return(FRT_RPCRequest &req,
- const char *res_spec) const
-{
- if (!invokeRpc(_target, req, res_spec)) {
-
-
- return ResultType(Result::FATAL_ERROR,
- vespalib::make_string("Error %s when running RPC request %s",
- req.GetErrorMessage(),
- req.GetMethodName()));
- }
- return readResult<ResultType>(*req.GetReturn());
-}
-
-template <typename ResultType>
-ResultType ProviderProxy::readResult(const FRT_Values &values) const {
- if (values[0]._intval8 != Result::NONE) {
- return readError<ResultType>(values);
- }
- return readNoError<ResultType>(values);
-}
-
-template <>
-Result ProviderProxy::readNoError(const FRT_Values &) const {
- return Result();
-}
-
-template <>
-PartitionStateListResult
-ProviderProxy::readNoError(const FRT_Values &values) const {
- FRT_LPT(uint32_t) state_array = values[2]._int32_array;
- FRT_LPT(FRT_StringValue) reason_array = values[3]._string_array;
- PartitionStateList states(state_array._len);
- for (size_t i = 0; i < state_array._len; ++i) {
- PartitionState::State state =
- static_cast<PartitionState::State>(state_array._pt[i]);
- string reason = getString(reason_array._pt[i]);
- states[i] = PartitionState(state, reason);
- }
- return PartitionStateListResult(states);
-}
-
-template <>
-BucketIdListResult ProviderProxy::readNoError(const FRT_Values &values) const {
- BucketIdListResult::List list;
- for (uint32_t i = 0; i < values[2]._int64_array._len; ++i) {
- list.push_back(BucketId(values[2]._int64_array._pt[i]));
- }
- return BucketIdListResult(list);
-}
-
-template <>
-BucketInfoResult ProviderProxy::readNoError(const FRT_Values &values) const {
- BucketInfo info(BucketChecksum(values[2]._intval32),
- values[3]._intval32,
- values[4]._intval32,
- values[5]._intval32,
- values[6]._intval32,
- static_cast<BucketInfo::ReadyState>(
- values[7]._intval8),
- static_cast<BucketInfo::ActiveState>(
- values[8]._intval8));
- return BucketInfoResult(info);
-}
-
-template <>
-RemoveResult ProviderProxy::readNoError(const FRT_Values &values) const {
- return RemoveResult(values[2]._intval8);
-}
-
-template <>
-UpdateResult ProviderProxy::readNoError(const FRT_Values &values) const {
- return UpdateResult(Timestamp(values[2]._intval64));
-}
-
-template <>
-GetResult ProviderProxy::readNoError(const FRT_Values &values) const {
- nbostream stream(values[3]._data._buf, values[3]._data._len);
- if (stream.empty()) {
- return GetResult();
- }
- return GetResult(readDocument(stream, *_repo),
- Timestamp(values[2]._intval64));
-}
-
-template <>
-CreateIteratorResult ProviderProxy::readNoError(const FRT_Values &values) const
-{
- return CreateIteratorResult(IteratorId(values[2]._intval64));
-}
-
-template <>
-IterateResult ProviderProxy::readNoError(const FRT_Values &values) const {
- IterateResult::List result;
- assert(values[2]._int64_array._len == values[3]._int32_array._len &&
- values[2]._int64_array._len == values[4]._string_array._len &&
- values[2]._int64_array._len == values[5]._data_array._len);
- for (uint32_t i = 0; i < values[2]._int64_array._len; ++i) {
- Timestamp timestamp(values[2]._int64_array._pt[i]);
- uint32_t meta_flags = values[3]._int32_array._pt[i];
- string doc_id(getString(values[4]._string_array._pt[i]));
- nbostream stream(values[5]._data_array._pt[i]._buf,
- values[5]._data_array._pt[i]._len);
- DocEntry::UP entry;
- if (!stream.empty()) {
- Document::UP doc = readDocument(stream, *_repo);
- entry.reset(new DocEntry(timestamp, meta_flags, std::move(doc)));
- } else if (!doc_id.empty()) {
- entry.reset(
- new DocEntry(timestamp, meta_flags, DocumentId(doc_id)));
- } else {
- entry.reset(new DocEntry(timestamp, meta_flags));
- }
- result.push_back(std::move(entry));
- }
-
- return IterateResult(std::move(result), values[6]._intval8);
-}
-
-namespace {
-bool shouldFailFast(uint32_t error_code) {
- return error_code != FRTE_RPC_TIMEOUT
- && error_code != FRTE_RPC_CONNECTION
- && error_code != FRTE_RPC_OVERLOAD
- && error_code != FRTE_NO_ERROR;
-}
-} // namespace
-
-ProviderProxy::ProviderProxy(const vespalib::string &connect_spec,
- const DocumentTypeRepo &repo)
- : _supervisor(new FRT_Supervisor()),
- _target(0),
- _repo(&repo)
-{
- _supervisor->Start();
- bool connected = false;
- _target = _supervisor->GetTarget(connect_spec.c_str());
- for (size_t i = 0; !connected && (i < (100 + 300)); ++i) {
- FRT_RPCRequest *req = new FRT_RPCRequest();
- req->SetMethodName("vespa.persistence.connect");
- const string build_id = getBuildId();
- req->GetParams()->AddString(build_id.data(), build_id.size());
- _target->InvokeSync(req, 5.0);
- connected = req->CheckReturnTypes("");
- uint32_t error_code = req->GetErrorCode();
- req->SubRef();
- if (!connected) {
- if (shouldFailFast(error_code)) {
- break;
- }
- _target->SubRef();
- if (i < 100) {
- FastOS_Thread::Sleep(100); // retry each 100ms for 10s
- } else {
- FastOS_Thread::Sleep(1000); // retry each 1s for 5m
- }
- _target = _supervisor->GetTarget(connect_spec.c_str());
- }
- }
- if (!connected) {
- LOG(error, "could not connect to peer");
- }
-}
-
-ProviderProxy::~ProviderProxy() {
- _target->SubRef();
- _supervisor->ShutDown(true);
-}
-
-Result ProviderProxy::initialize() {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.initialize");
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-PartitionStateListResult ProviderProxy::getPartitionStates() const {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.getPartitionStates");
- return invokeRpc_Return<PartitionStateListResult>(*req, "bsIS");
-}
-
-BucketIdListResult ProviderProxy::listBuckets(PartitionId partition) const {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.listBuckets");
- req->GetParams()->AddInt64(partition);
-
- return invokeRpc_Return<BucketIdListResult>(*req, "bsL");
-}
-
-Result ProviderProxy::setClusterState(const ClusterState& clusterState) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.setClusterState");
-
- vespalib::nbostream o;
- clusterState.serialize(o);
- req->GetParams()->AddData(o.c_str(), o.size());
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::setActiveState(const Bucket &bucket,
- BucketInfo::ActiveState newState) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.setActiveState");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt8(newState);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-BucketInfoResult ProviderProxy::getBucketInfo(const Bucket &bucket) const {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.getBucketInfo");
- addBucket(*req->GetParams(), bucket);
- return invokeRpc_Return<BucketInfoResult>(*req, "bsiiiiibb");
-}
-
-Result ProviderProxy::put(const Bucket &bucket, Timestamp timestamp,
- const Document::SP& doc, Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.put");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt64(timestamp);
- addDocument(*req->GetParams(), *doc);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-RemoveResult ProviderProxy::remove(const Bucket &bucket,
- Timestamp timestamp,
- const DocumentId &id,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.removeById");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt64(timestamp);
- addString(*req->GetParams(), id.toString());
- return invokeRpc_Return<RemoveResult>(*req, "bsb");
-}
-
-RemoveResult ProviderProxy::removeIfFound(const Bucket &bucket,
- Timestamp timestamp,
- const DocumentId &id,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.removeIfFound");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt64(timestamp);
- addString(*req->GetParams(), id.toString());
- return invokeRpc_Return<RemoveResult>(*req, "bsb");
-}
-
-UpdateResult ProviderProxy::update(const Bucket &bucket, Timestamp timestamp,
- const DocumentUpdate::SP& doc_update,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.update");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt64(timestamp);
- addDocumentUpdate(*req->GetParams(), *doc_update);
- return invokeRpc_Return<UpdateResult>(*req, "bsl");
-}
-
-Result ProviderProxy::flush(const Bucket &bucket, Context&) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.flush");
- addBucket(*req->GetParams(), bucket);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-GetResult ProviderProxy::get(const Bucket &bucket,
- const document::FieldSet& fieldSet,
- const DocumentId &doc_id,
- Context&) const
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.get");
- document::FieldSetRepo repo;
- addBucket(*req->GetParams(), bucket);
- addString(*req->GetParams(), repo.serialize(fieldSet));
- addString(*req->GetParams(), doc_id.toString());
- return invokeRpc_Return<GetResult>(*req, "bslx");
-}
-
-CreateIteratorResult ProviderProxy::createIterator(const Bucket &bucket,
- const document::FieldSet& fieldSet,
- const Selection &select,
- IncludedVersions versions,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.createIterator");
- addBucket(*req->GetParams(), bucket);
-
- document::FieldSetRepo repo;
- addString(*req->GetParams(), repo.serialize(fieldSet));
- addSelection(*req->GetParams(), select);
- req->GetParams()->AddInt8(versions);
- return invokeRpc_Return<CreateIteratorResult>(*req, "bsl");
-}
-
-IterateResult ProviderProxy::iterate(IteratorId id,
- uint64_t max_byte_size,
- Context&) const
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.iterate");
- req->GetParams()->AddInt64(id);
- req->GetParams()->AddInt64(max_byte_size);
- return invokeRpc_Return<IterateResult>(*req, "bsLISXb");
-}
-
-Result ProviderProxy::destroyIterator(IteratorId id, Context&) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.destroyIterator");
- req->GetParams()->AddInt64(id);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::createBucket(const Bucket &bucket, Context&) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.createBucket");
- addBucket(*req->GetParams(), bucket);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::deleteBucket(const Bucket &bucket, Context&) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.deleteBucket");
- addBucket(*req->GetParams(), bucket);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-BucketIdListResult ProviderProxy::getModifiedBuckets() const {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.getModifiedBuckets");
- return invokeRpc_Return<BucketIdListResult>(*req, "bsL");
-}
-
-Result ProviderProxy::split(const Bucket &source,
- const Bucket &target1,
- const Bucket &target2,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.split");
- addBucket(*req->GetParams(), source);
- addBucket(*req->GetParams(), target1);
- addBucket(*req->GetParams(), target2);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::join(const Bucket &source1,
- const Bucket &source2,
- const Bucket &target,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.join");
- addBucket(*req->GetParams(), source1);
- addBucket(*req->GetParams(), source2);
- addBucket(*req->GetParams(), target);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::move(const Bucket &source,
- PartitionId target,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.move");
- addBucket(*req->GetParams(), source);
- req->GetParams()->AddInt64(target);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::maintain(const Bucket &bucket, MaintenanceLevel level) {
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.maintain");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt8(level);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-Result ProviderProxy::removeEntry(const Bucket &bucket, Timestamp timestamp,
- Context&)
-{
- RequestScopedPtr req(_supervisor->AllocRPCRequest());
- req->SetMethodName("vespa.persistence.removeEntry");
- addBucket(*req->GetParams(), bucket);
- req->GetParams()->AddInt64(timestamp);
- return invokeRpc_Return<Result>(*req, "bs");
-}
-
-} // namespace spi
-} // namespace storage
diff --git a/persistence/src/vespa/persistence/proxy/providerproxy.h b/persistence/src/vespa/persistence/proxy/providerproxy.h
deleted file mode 100644
index 7fa59fe07d0..00000000000
--- a/persistence/src/vespa/persistence/proxy/providerproxy.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/persistence/spi/persistenceprovider.h>
-
-class FRT_Target;
-class FRT_Supervisor;
-class FRT_RPCRequest;
-class FRT_Values;
-
-namespace document {
- class DocumentTypeRepo;
-}
-
-namespace storage {
-namespace spi {
-
-class ProviderProxy : public PersistenceProvider {
- std::unique_ptr<FRT_Supervisor> _supervisor;
- FRT_Target *_target;
- const document::DocumentTypeRepo *_repo;
-
- template <typename ResultType>
- ResultType invokeRpc_Return(FRT_RPCRequest &req, const char *res_spec) const;
- template <typename ResultType>
- ResultType readResult(const FRT_Values &values) const;
- template <typename ResultType>
- ResultType readNoError(const FRT_Values &values) const;
-
-public:
- typedef std::unique_ptr<ProviderProxy> UP;
-
- ProviderProxy(const vespalib::string &connect_spec, const document::DocumentTypeRepo &repo);
- ~ProviderProxy();
-
- void setRepo(const document::DocumentTypeRepo &repo) {
- _repo = &repo;
- }
-
- Result initialize() override;
- PartitionStateListResult getPartitionStates() const override;
- BucketIdListResult listBuckets(PartitionId) const override;
- Result setClusterState(const ClusterState&) override;
- Result setActiveState(const Bucket&, BucketInfo::ActiveState) override;
- BucketInfoResult getBucketInfo(const Bucket &) const override;
-
- Result put(const Bucket &, Timestamp, const DocumentSP&, Context&) override;
- RemoveResult remove(const Bucket &, Timestamp, const DocumentId &, Context&) override;
- RemoveResult removeIfFound(const Bucket &, Timestamp, const DocumentId &, Context&) override;
- UpdateResult update(const Bucket &, Timestamp, const DocumentUpdateSP&, Context&) override;
-
- Result flush(const Bucket &, Context&) override;
-
- GetResult get(const Bucket &, const document::FieldSet&, const DocumentId &, Context&) const override;
-
- CreateIteratorResult createIterator(const Bucket &, const document::FieldSet&, const Selection&,
- IncludedVersions versions, Context&) override;
-
- IterateResult iterate(IteratorId, uint64_t max_byte_size, Context&) const override;
- Result destroyIterator(IteratorId, Context&) override;
-
- Result createBucket(const Bucket &, Context&) override;
- Result deleteBucket(const Bucket &, Context&) override;
- BucketIdListResult getModifiedBuckets() const override;
- Result split(const Bucket &source, const Bucket &target1, const Bucket &target2, Context&) override;
- Result join(const Bucket &source1, const Bucket &source2, const Bucket &target, Context&) override;
- Result move(const Bucket &source, PartitionId partition, Context&) override;
-
- Result maintain(const Bucket &, MaintenanceLevel) override;
- Result removeEntry(const Bucket &, Timestamp, Context&) override;
-};
-
-} // namespace spi
-} // namespace storage
-
diff --git a/persistence/src/vespa/persistence/proxy/providerstub.cpp b/persistence/src/vespa/persistence/proxy/providerstub.cpp
deleted file mode 100644
index b4137f0eb0c..00000000000
--- a/persistence/src/vespa/persistence/proxy/providerstub.cpp
+++ /dev/null
@@ -1,928 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "buildid.h"
-#include "providerstub.h"
-#include <vespa/document/serialization/vespadocumentdeserializer.h>
-#include <vespa/document/serialization/vespadocumentserializer.h>
-#include <vespa/document/util/bytebuffer.h>
-#include <vespa/document/base/documentid.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/objects/nbostream.h>
-#include <vespa/vespalib/util/closuretask.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/fnet/frt/values.h>
-#include <vespa/fnet/frt/supervisor.h>
-#include <vespa/fnet/frt/rpcrequest.h>
-
-
-using document::BucketId;
-using document::ByteBuffer;
-using document::DocumentTypeRepo;
-using document::VespaDocumentDeserializer;
-using document::VespaDocumentSerializer;
-using std::vector;
-using vespalib::Closure;
-using vespalib::makeClosure;
-using vespalib::makeTask;
-using vespalib::nbostream;
-
-namespace storage::spi {
-namespace {
-
-LoadType defaultLoadType(0, "default");
-
-// Serialize return values
-void addResult(FRT_Values &ret, const Result &result) {
- ret.AddInt8(result.getErrorCode());
- ret.AddString(result.getErrorMessage().data(),
- result.getErrorMessage().size());
-}
-
-void addPartitionStateListResult(FRT_Values &ret,
- const PartitionStateListResult &result) {
- addResult(ret, result);
- PartitionStateList states = result.getList();
- uint32_t *stateValues = ret.AddInt32Array(states.size());
- FRT_StringValue *reasons = ret.AddStringArray(states.size());
- for (size_t i = 0; i < states.size(); ++i) {
- stateValues[i] = states[i].getState();
- string reason(states[i].getReason());
- ret.SetString(&reasons[i], reason.data(), reason.size());
- }
-}
-
-void addBucketInfoResult(FRT_Values &ret, const BucketInfoResult &result) {
- addResult(ret, result);
- const BucketInfo& info = result.getBucketInfo();
- ret.AddInt32(info.getChecksum());
- ret.AddInt32(info.getDocumentCount());
- ret.AddInt32(info.getDocumentSize());
- ret.AddInt32(info.getEntryCount());
- ret.AddInt32(info.getUsedSize());
- ret.AddInt8(static_cast<uint8_t>(info.isReady()));
- ret.AddInt8(static_cast<uint8_t>(info.isActive()));
-}
-
-void addRemoveResult(FRT_Values &ret, const RemoveResult &result) {
- addResult(ret, result);
- ret.AddInt8(result.wasFound());
-}
-
-void addUpdateResult(FRT_Values &ret, const UpdateResult &result) {
- addResult(ret, result);
- ret.AddInt64(result.getExistingTimestamp());
-}
-
-void addGetResult(FRT_Values &ret, const GetResult &result) {
- addResult(ret, result);
- ret.AddInt64(result.getTimestamp());
- if (result.hasDocument()) {
- nbostream stream;
- VespaDocumentSerializer serializer(stream);
- serializer.write(result.getDocument(), document::COMPLETE);
- ret.AddData(stream.c_str(), stream.size());
- } else {
- ret.AddData(0, 0);
- }
-}
-
-void addCreateIteratorResult(FRT_Values &ret,
- const CreateIteratorResult &result)
-{
- addResult(ret, result);
- ret.AddInt64(result.getIteratorId());
-}
-
-void addIterateResult(FRT_Values &ret, const IterateResult &result)
-{
- addResult(ret, result);
-
- const vector<DocEntry::UP> &entries = result.getEntries();
- uint64_t *timestamps = ret.AddInt64Array(entries.size());
- uint32_t *flags = ret.AddInt32Array(entries.size());
- assert(sizeof(DocEntry::SizeType) == sizeof(uint32_t));
- FRT_StringValue *doc_id_array = ret.AddStringArray(entries.size());
- FRT_DataValue *doc_array = ret.AddDataArray(entries.size());
-
- for (size_t i = 0; i < entries.size(); ++i) {
- string doc_id_str;
- nbostream stream;
- const DocumentId *doc_id = entries[i]->getDocumentId();
- if (doc_id) {
- doc_id_str = doc_id->toString();
- }
- const Document *doc = entries[i]->getDocument();
- if (doc) {
- VespaDocumentSerializer serializer(stream);
- serializer.write(*doc, document::COMPLETE);
- }
-
- timestamps[i] = entries[i]->getTimestamp();
- flags[i] = entries[i]->getFlags();
- ret.SetString(&doc_id_array[i], doc_id_str.data(), doc_id_str.size());
- ret.SetData(&doc_array[i], stream.c_str(), stream.size());
- }
-
- ret.AddInt8(result.isCompleted());
-}
-
-void addBucketIdListResult(FRT_Values &ret, const BucketIdListResult& result) {
- addResult(ret, result);
-
- size_t modified_bucket_size = result.getList().size();
- uint64_t *bucket_id = ret.AddInt64Array(modified_bucket_size);
- for (size_t i = 0; i < modified_bucket_size; ++i) {
- bucket_id[i] = result.getList()[i].getRawId();
- }
-}
-
-string getString(const FRT_StringValue &str) {
- return string(str._str, str._len);
-}
-
-string getString(const FRT_Value &value) {
- return getString(value._string);
-}
-
-Bucket getBucket(const FRT_Value &bucket_val, const FRT_Value &partition_val) {
- BucketId bucket_id(bucket_val._intval64);
- PartitionId partition_id(partition_val._intval64);
- return Bucket(bucket_id, partition_id);
-}
-
-Document::UP getDocument(const FRT_Value &val, const DocumentTypeRepo &repo) {
- nbostream stream(val._data._buf, val._data._len);
- const uint16_t version = 8;
- VespaDocumentDeserializer deserializer(repo, stream, version);
- Document::UP doc(new Document);
- deserializer.read(*doc);
- return doc;
-}
-
-Selection getSelection(const FRT_Values &params, int i) {
- DocumentSelection doc_sel(getString(params[i]));
- Timestamp timestamp_from(params[i + 1]._intval64);
- Timestamp timestamp_to(params[i + 2]._intval64);
- FRT_Array<uint64_t> array = params[i + 3]._int64_array;
- TimestampList timestamp_subset(array._pt, array._pt + array._len);
-
- Selection selection(doc_sel);
- selection.setFromTimestamp(timestamp_from);
- selection.setToTimestamp(timestamp_to);
- selection.setTimestampSubset(timestamp_subset);
- return selection;
-}
-
-void addConnect(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.connect",
- "s", "", true, func, obj);
- rb.MethodDesc("Set up connection to proxy.");
- rb.ParamDesc("build_id", "Id to make sure client and server come from the "
- "same build.");
-}
-
-void addGetPartitionStates(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.getPartitionStates",
- "", "bsIS", true, func, obj);
- rb.MethodDesc("???");
- rb.ReturnDesc("ret", "An array of serialized PartitionStates.");
-}
-
-void doGetPartitionStates(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &ret = *req->GetReturn();
- addPartitionStateListResult(ret, provider->getPartitionStates());
- req->Return();
-}
-
-void addInitialize(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.initialize",
- "", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doInitialize(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &ret = *req->GetReturn();
- addResult(ret, provider->initialize());
- req->Return();
-}
-
-void addListBuckets(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.listBuckets",
- "l", "bsL", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("bucket_ids", "An array of BucketIds.");
-}
-
-void doListBuckets(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- PartitionId partition_id(params[0]._intval64);
-
- FRT_Values &ret = *req->GetReturn();
- addBucketIdListResult(ret, provider->listBuckets(partition_id));
- req->Return();
-}
-
-void addSetClusterState(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.setClusterState",
- "x", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("cluster_state", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doSetClusterState(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- vespalib::nbostream stream(params[0]._data._buf, params[0]._data._len);
-
- ClusterState state(stream);
- FRT_Values &ret = *req->GetReturn();
- addResult(ret, provider->setClusterState(state));
- req->Return();
-}
-
-void addSetActiveState(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.setActiveState",
- "llb", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("bucket_state", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doSetActiveState(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- BucketInfo::ActiveState state = BucketInfo::ActiveState(params[2]._intval8);
-
- FRT_Values &ret = *req->GetReturn();
- addResult(ret, provider->setActiveState(bucket, state));
- req->Return();
-}
-
-void addGetBucketInfo(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.getBucketInfo",
- "ll", "bsiiiiibb", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("checksum", "");
- rb.ReturnDesc("document_count", "");
- rb.ReturnDesc("document_size", "");
- rb.ReturnDesc("entry_count", "");
- rb.ReturnDesc("used_size", "");
- rb.ReturnDesc("ready", "");
- rb.ReturnDesc("active", "");
-}
-
-void doGetBucketInfo(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- FRT_Values &ret = *req->GetReturn();
- addBucketInfoResult(ret, provider->getBucketInfo(bucket));
- req->Return();
-}
-
-void addPut(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.put",
- "lllx", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("timestamp", "");
- rb.ParamDesc("document", "A serialized document");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doPut(FRT_RPCRequest *req, PersistenceProvider *provider,
- const DocumentTypeRepo *repo)
-{
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- Timestamp timestamp(params[2]._intval64);
- Document::SP doc(getDocument(params[3], *repo).release());
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->put(bucket, timestamp, doc, context));
- req->Return();
-}
-
-void addRemoveById(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.removeById",
- "llls", "bsb", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("timestamp", "");
- rb.ParamDesc("document_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("existed", "");
-}
-
-void doRemoveById(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- Timestamp timestamp(params[2]._intval64);
- DocumentId id(getString(params[3]));
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addRemoveResult(ret, provider->remove(bucket, timestamp, id, context));
- req->Return();
-}
-
-void addRemoveIfFound(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.removeIfFound",
- "llls", "bsb", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("timestamp", "");
- rb.ParamDesc("document_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("existed", "");
-}
-
-void doRemoveIfFound(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- Timestamp timestamp(params[2]._intval64);
- DocumentId id(getString(params[3]));
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addRemoveResult(ret,
- provider->removeIfFound(bucket, timestamp, id, context));
- req->Return();
-}
-
-void addUpdate(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.update",
- "lllx", "bsl", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("timestamp", "");
- rb.ParamDesc("document_update", "A serialized DocumentUpdate");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("existing timestamp", "");
-}
-
-void doUpdate(FRT_RPCRequest *req, PersistenceProvider *provider,
- const DocumentTypeRepo *repo)
-{
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- Timestamp timestamp(params[2]._intval64);
- ByteBuffer buffer(params[3]._data._buf, params[3]._data._len);
- auto update = std::make_shared<DocumentUpdate>(*repo, buffer,
- DocumentUpdate::
- SerializeVersion::
- SERIALIZE_HEAD);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addUpdateResult(ret, provider->update(bucket, timestamp, update, context));
- req->Return();
-}
-
-void addFlush(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.flush", "ll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doFlush(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->flush(bucket, context));
- req->Return();
-}
-
-void addGet(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.get",
- "llss", "bslx", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("field_set", "Array of fields in the set");
- rb.ParamDesc("document_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("timestamp", "");
- rb.ReturnDesc("document", "A serialized document");
-}
-
-void doGet(FRT_RPCRequest *req,
- PersistenceProvider *provider,
- const DocumentTypeRepo* repo)
-{
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- document::FieldSetRepo fsr;
- document::FieldSet::UP fieldSet = fsr.parse(*repo, getString(params[2]));
- DocumentId id(getString(params[3]));
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addGetResult(ret, provider->get(bucket, *fieldSet, id, context));
- req->Return();
-}
-
-void addCreateIterator(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.createIterator",
- "llssllLb", "bsl", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("field_set", "Field set string (comma-separated list of strings)");
- rb.ParamDesc("document_selection_string", "");
- rb.ParamDesc("timestamp_from", "");
- rb.ParamDesc("timestamp_to", "");
- rb.ParamDesc("timestamp_subset", "");
- rb.ParamDesc("includedversions", "");
-
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("iterator_id", "");
-}
-
-void doCreateIterator(FRT_RPCRequest *req, PersistenceProvider *provider,
- const DocumentTypeRepo* repo)
-{
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- document::FieldSetRepo fsr;
- document::FieldSet::UP fieldSet = fsr.parse(*repo, getString(params[2]));
- Selection selection = getSelection(params, 3);
- IncludedVersions versions =
- static_cast<IncludedVersions>(params[7]._intval8);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addCreateIteratorResult(ret, provider->createIterator(
- bucket, *fieldSet, selection, versions, context));
- req->Return();
-}
-
-void addIterate(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.iterate",
- "ll", "bsLISXb", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("iterator_id", "");
- rb.ParamDesc("max_byte_size", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("doc_entry_timestamp", "Array of timestamps for DocEntries");
- rb.ReturnDesc("doc_entry_flags", "Array of flags for DocEntries");
- rb.ReturnDesc("doc_entry_doc_id", "Array of DocumentIds for DocEntries");
- rb.ReturnDesc("doc_entry_doc", "Array of Documents for DocEntries");
- rb.ReturnDesc("completed", "bool");
-}
-
-void doIterate(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- IteratorId id(params[0]._intval64);
- uint64_t max_byte_size = params[1]._intval64;
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addIterateResult(ret, provider->iterate(id, max_byte_size, context));
- req->Return();
-}
-
-void addDestroyIterator(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.destroyIterator",
- "l", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("iterator_id", "");
-}
-
-void doDestroyIterator(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- IteratorId id(params[0]._intval64);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->destroyIterator(id, context));
- req->Return();
-}
-
-void addCreateBucket(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.createBucket",
- "ll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doCreateBucket(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->createBucket(bucket, context));
- req->Return();
-}
-
-void addDeleteBucket(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.deleteBucket",
- "ll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doDeleteBucket(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->deleteBucket(bucket, context));
- req->Return();
-}
-
-void addGetModifiedBuckets(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.getModifiedBuckets",
- "", "bsL", true, func, obj);
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
- rb.ReturnDesc("modified_buckets_bucket_ids", "Array of bucket ids");
-}
-
-void doGetModifiedBuckets(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &ret = *req->GetReturn();
- addBucketIdListResult(ret, provider->getModifiedBuckets());
- req->Return();
-}
-
-void addSplit(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.split",
- "llllll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("source_bucket_id", "");
- rb.ParamDesc("source_partition_id", "");
- rb.ParamDesc("target1_bucket_id", "");
- rb.ParamDesc("target1_partition_id", "");
- rb.ParamDesc("target2_bucket_id", "");
- rb.ParamDesc("target2_partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doSplit(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket source = getBucket(params[0], params[1]);
- Bucket target1 = getBucket(params[2], params[3]);
- Bucket target2 = getBucket(params[4], params[5]);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->split(source, target1, target2, context));
- req->Return();
-}
-
-void addJoin(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.join",
- "llllll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("source1_bucket_id", "");
- rb.ParamDesc("source1_partition_id", "");
- rb.ParamDesc("source2_bucket_id", "");
- rb.ParamDesc("source2_partition_id", "");
- rb.ParamDesc("target_bucket_id", "");
- rb.ParamDesc("target_partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doJoin(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket source1 = getBucket(params[0], params[1]);
- Bucket source2 = getBucket(params[2], params[3]);
- Bucket target = getBucket(params[4], params[5]);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->join(source1, source2, target, context));
- req->Return();
-}
-
-void addMove(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.move",
- "lll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("source_bucket_id", "");
- rb.ParamDesc("source_partition_id", "");
- rb.ParamDesc("target_partition_id", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doMove(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket source = getBucket(params[0], params[1]);
- PartitionId partition_id(params[2]._intval64);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->move(source, partition_id, context));
- req->Return();
-}
-
-
-void addMaintain(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.maintain",
- "llb", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("verification_level", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doMaintain(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- MaintenanceLevel level =
- static_cast<MaintenanceLevel>(params[2]._intval8);
-
- FRT_Values &ret = *req->GetReturn();
- addResult(ret, provider->maintain(bucket, level));
- req->Return();
-}
-
-void addRemoveEntry(
- FRT_ReflectionBuilder &rb, FRT_METHOD_PT func, FRT_Invokable *obj) {
- rb.DefineMethod("vespa.persistence.removeEntry",
- "lll", "bs", true, func, obj);
- rb.MethodDesc("???");
- rb.ParamDesc("bucket_id", "");
- rb.ParamDesc("partition_id", "");
- rb.ParamDesc("timestamp", "");
- rb.ReturnDesc("error_code", "");
- rb.ReturnDesc("error_message", "");
-}
-
-void doRemoveEntry(FRT_RPCRequest *req, PersistenceProvider *provider) {
- FRT_Values &params = *req->GetParams();
- Bucket bucket = getBucket(params[0], params[1]);
- Timestamp timestamp(params[2]._intval64);
-
- FRT_Values &ret = *req->GetReturn();
- Context context(defaultLoadType, Priority(0x80), Trace::TraceLevel(0));
- addResult(ret, provider->removeEntry(bucket, timestamp, context));
- req->Return();
-}
-
-const uint32_t magic_number = 0xf00ba2;
-
-bool checkConnection(FNET_Connection *connection) {
- return connection && connection->GetContext()._value.INT == magic_number;
-}
-} //namespace
-
-void ProviderStub::HOOK_fini(FRT_RPCRequest *req) {
- FNET_Connection *connection = req->GetConnection();
- if (checkConnection(connection)) {
- assert(_provider.get() != 0);
- _providerCleanupTask.ScheduleNow();
- }
-}
-
-void ProviderStub::RPC_connect(FRT_RPCRequest *req) {
- FRT_Values &params = *req->GetParams();
- FNET_Connection *connection = req->GetConnection();
- if (checkConnection(connection)) {
- return;
- }
- string build_id = getString(params[0]);
- if (build_id != getBuildId()) {
- req->SetError(FRTE_RPC_METHOD_FAILED,
- ("Wrong build id. Got '" + build_id +
- "', required '" + getBuildId() + "'").c_str());
- return;
- } else if (_provider.get()) {
- req->SetError(FRTE_RPC_METHOD_FAILED, "Server is already connected");
- return;
- }
- if (!connection) {
- req->SetError(FRTE_RPC_METHOD_FAILED);
- return;
- }
- connection->SetContext(FNET_Context(magic_number));
- _provider = _factory.create();
-}
-
-void ProviderStub::detachAndRun(FRT_RPCRequest *req, Closure::UP closure) {
- if (!checkConnection(req->GetConnection())) {
- req->SetError(FRTE_RPC_METHOD_FAILED);
- return;
- }
- assert(_provider.get() != 0);
- req->Detach();
- _executor.execute(makeTask(std::move(closure)));
-}
-
-void ProviderStub::RPC_getPartitionStates(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doGetPartitionStates, req, _provider.get()));
-}
-
-void ProviderStub::RPC_initialize(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doInitialize, req, _provider.get()));
-}
-
-void ProviderStub::RPC_listBuckets(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doListBuckets, req, _provider.get()));
-}
-
-void ProviderStub::RPC_setClusterState(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doSetClusterState, req, _provider.get()));
-}
-
-void ProviderStub::RPC_setActiveState(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doSetActiveState, req, _provider.get()));
-}
-
-void ProviderStub::RPC_getBucketInfo(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doGetBucketInfo, req, _provider.get()));
-}
-
-void ProviderStub::RPC_put(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doPut, req, _provider.get(), _repo));
-}
-
-void ProviderStub::RPC_removeById(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doRemoveById, req, _provider.get()));
-}
-
-void ProviderStub::RPC_removeIfFound(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doRemoveIfFound, req, _provider.get()));
-}
-
-void ProviderStub::RPC_update(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doUpdate, req, _provider.get(), _repo));
-}
-
-void ProviderStub::RPC_flush(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doFlush, req, _provider.get()));
-}
-
-void ProviderStub::RPC_get(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doGet, req, _provider.get(), _repo));
-}
-
-void ProviderStub::RPC_createIterator(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doCreateIterator, req, _provider.get(), _repo));
-}
-
-void ProviderStub::RPC_iterate(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doIterate, req, _provider.get()));
-}
-
-void ProviderStub::RPC_destroyIterator(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doDestroyIterator, req, _provider.get()));
-}
-
-void ProviderStub::RPC_createBucket(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doCreateBucket, req, _provider.get()));
-}
-
-void ProviderStub::RPC_deleteBucket(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doDeleteBucket, req, _provider.get()));
-}
-
-void ProviderStub::RPC_getModifiedBuckets(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doGetModifiedBuckets, req, _provider.get()));
-}
-
-void ProviderStub::RPC_split(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doSplit, req, _provider.get()));
-}
-
-void ProviderStub::RPC_join(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doJoin, req, _provider.get()));
-}
-
-void ProviderStub::RPC_move(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doMove, req, _provider.get()));
-}
-
-void ProviderStub::RPC_maintain(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doMaintain, req, _provider.get()));
-}
-
-void ProviderStub::RPC_removeEntry(FRT_RPCRequest *req) {
- detachAndRun(req, makeClosure(doRemoveEntry, req, _provider.get()));
-}
-
-void ProviderStub::SetupRpcCalls() {
- FRT_ReflectionBuilder rb(_supervisor.get());
- addConnect(rb, FRT_METHOD(ProviderStub::RPC_connect), this);
- addInitialize(rb, FRT_METHOD(ProviderStub::RPC_initialize), this);
- addGetPartitionStates(rb, FRT_METHOD(ProviderStub::RPC_getPartitionStates), this);
- addListBuckets(rb, FRT_METHOD(ProviderStub::RPC_listBuckets), this);
- addSetClusterState(rb, FRT_METHOD(ProviderStub::RPC_setClusterState), this);
- addSetActiveState(rb, FRT_METHOD(ProviderStub::RPC_setActiveState), this);
- addGetBucketInfo(rb, FRT_METHOD(ProviderStub::RPC_getBucketInfo), this);
- addPut(rb, FRT_METHOD(ProviderStub::RPC_put), this);
- addRemoveById(rb, FRT_METHOD(ProviderStub::RPC_removeById), this);
- addRemoveIfFound(rb, FRT_METHOD(ProviderStub::RPC_removeIfFound), this);
- addUpdate(rb, FRT_METHOD(ProviderStub::RPC_update), this);
- addFlush(rb, FRT_METHOD(ProviderStub::RPC_flush), this);
- addGet(rb, FRT_METHOD(ProviderStub::RPC_get), this);
- addCreateIterator(rb, FRT_METHOD(ProviderStub::RPC_createIterator), this);
- addIterate(rb, FRT_METHOD(ProviderStub::RPC_iterate), this);
- addDestroyIterator(rb, FRT_METHOD(ProviderStub::RPC_destroyIterator), this);
- addCreateBucket(rb, FRT_METHOD(ProviderStub::RPC_createBucket), this);
- addDeleteBucket(rb, FRT_METHOD(ProviderStub::RPC_deleteBucket), this);
- addGetModifiedBuckets(rb, FRT_METHOD(ProviderStub::RPC_getModifiedBuckets), this);
- addSplit(rb, FRT_METHOD(ProviderStub::RPC_split), this);
- addJoin(rb, FRT_METHOD(ProviderStub::RPC_join), this);
- addMove(rb, FRT_METHOD(ProviderStub::RPC_move), this);
- addMaintain(rb, FRT_METHOD(ProviderStub::RPC_maintain), this);
- addRemoveEntry(rb, FRT_METHOD(ProviderStub::RPC_removeEntry), this);
-}
-
-ProviderStub::ProviderStub(int port, uint32_t threads,
- const document::DocumentTypeRepo &repo,
- PersistenceProviderFactory &factory)
- : _supervisor(std::make_unique<FRT_Supervisor>()),
- _executor(threads, 256*1024),
- _repo(&repo),
- _factory(factory),
- _provider(),
- _providerCleanupTask(_supervisor->GetScheduler(), _executor, _provider)
-{
- SetupRpcCalls();
- _supervisor->SetSessionFiniHook(FRT_METHOD(ProviderStub::HOOK_fini), this);
- _supervisor->Start();
- _supervisor->Listen(port);
-}
-
-ProviderStub::~ProviderStub() {
- _supervisor->ShutDown(true);
- sync();
-}
-
-int
-ProviderStub::getPort() const {
- return _supervisor->GetListenPort();
-}
-
-}
diff --git a/persistence/src/vespa/persistence/proxy/providerstub.h b/persistence/src/vespa/persistence/proxy/providerstub.h
deleted file mode 100644
index cd0665171b1..00000000000
--- a/persistence/src/vespa/persistence/proxy/providerstub.h
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/util/closure.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <vespa/fnet/frt/invokable.h>
-#include <vespa/fnet/task.h>
-#include <memory>
-
-class FRT_Supervisor;
-
-namespace document { class DocumentTypeRepo; }
-
-namespace storage {
-namespace spi {
-class PersistenceProvider;
-
-class ProviderStub : private FRT_Invokable
-{
-public:
- struct PersistenceProviderFactory {
- virtual std::unique_ptr<PersistenceProvider> create() const = 0;
- virtual ~PersistenceProviderFactory() {}
- };
-
-private:
- struct ProviderCleanupTask : FNET_Task {
- vespalib::ThreadStackExecutor &executor;
- std::unique_ptr<PersistenceProvider> &provider;
- ProviderCleanupTask(FNET_Scheduler *s,
- vespalib::ThreadStackExecutor &e,
- std::unique_ptr<PersistenceProvider> &p)
- : FNET_Task(s), executor(e), provider(p) {}
- void PerformTask() override {
- executor.sync();
- assert(provider.get() != 0);
- provider.reset();
- }
- };
-
- std::unique_ptr<FRT_Supervisor> _supervisor;
- vespalib::ThreadStackExecutor _executor;
- const document::DocumentTypeRepo *_repo;
- PersistenceProviderFactory &_factory;
- std::unique_ptr<PersistenceProvider> _provider;
- ProviderCleanupTask _providerCleanupTask;
-
- void HOOK_fini(FRT_RPCRequest *req);
-
- void detachAndRun(FRT_RPCRequest *req, vespalib::Closure::UP closure);
- void RPC_connect(FRT_RPCRequest *req);
- void RPC_initialize(FRT_RPCRequest *req);
- void RPC_getPartitionStates(FRT_RPCRequest *req);
- void RPC_listBuckets(FRT_RPCRequest *req);
- void RPC_setClusterState(FRT_RPCRequest *req);
- void RPC_setActiveState(FRT_RPCRequest *req);
- void RPC_getBucketInfo(FRT_RPCRequest *req);
- void RPC_put(FRT_RPCRequest *req);
- void RPC_removeById(FRT_RPCRequest *req);
- void RPC_removeIfFound(FRT_RPCRequest *req);
- void RPC_update(FRT_RPCRequest *req);
- void RPC_flush(FRT_RPCRequest *req);
- void RPC_get(FRT_RPCRequest *req);
- void RPC_createIterator(FRT_RPCRequest *req);
- void RPC_iterate(FRT_RPCRequest *req);
- void RPC_destroyIterator(FRT_RPCRequest *req);
- void RPC_createBucket(FRT_RPCRequest *req);
- void RPC_deleteBucket(FRT_RPCRequest *req);
- void RPC_getModifiedBuckets(FRT_RPCRequest *req);
- void RPC_split(FRT_RPCRequest *req);
- void RPC_join(FRT_RPCRequest *req);
- void RPC_move(FRT_RPCRequest *req);
- void RPC_maintain(FRT_RPCRequest *req);
- void RPC_removeEntry(FRT_RPCRequest *req);
-
- void SetupRpcCalls();
-
-public:
- typedef std::unique_ptr<ProviderStub> UP;
-
- ProviderStub(int port, uint32_t threads,
- const document::DocumentTypeRepo &repo,
- PersistenceProviderFactory &factory);
- ~ProviderStub();
-
- bool hasClient() const { return (_provider.get() != 0); }
- int getPort() const;
- void setRepo(const document::DocumentTypeRepo &repo) {
- _repo = &repo;
- }
- void sync() { _executor.sync(); }
-};
-
-} // namespace spi
-} // namespace storage
-