summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence/persistencetestutils.cpp
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/persistence/persistencetestutils.cpp
Publish
Diffstat (limited to 'storage/src/tests/persistence/persistencetestutils.cpp')
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp412
1 files changed, 412 insertions, 0 deletions
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
new file mode 100644
index 00000000000..47ec23147f1
--- /dev/null
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -0,0 +1,412 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/persistence/persistencetestutils.h>
+#include <vespa/persistence/dummyimpl/dummypersistence.h>
+
+using document::DocumentType;
+using storage::framework::defaultimplementation::AllocationLogic;
+
+namespace storage {
+
+namespace {
+
+ spi::LoadType defaultLoadType(0, "default");
+
+ vdstestlib::DirConfig initialize(uint32_t numDisks) {
+ system(vespalib::make_string("rm -rf vdsroot").c_str());
+ for (uint32_t i = 0; i < numDisks; i++) {
+ system(vespalib::make_string("mkdir -p vdsroot/disks/d%d", i).c_str());
+ }
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ return config;
+ }
+
+ template<typename T>
+ struct ConfigReader : public T::Subscriber
+ {
+ T config;
+
+ ConfigReader(const std::string& configId) {
+ T::subscribe(configId, *this);
+ }
+ void configure(const T& c) { config = c; }
+ };
+}
+
+PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks)
+ : _config(initialize(numDisks)),
+ _messageKeeper(),
+ _node(numDisks, NodeIndex(0), _config.getConfigId()),
+ _component(_node.getComponentRegister(), "persistence test env"),
+ _metrics(_component.getLoadTypes()->getMetricLoadTypes())
+{
+ _node.setupDummyPersistence();
+ _metrics.initDiskMetrics(
+ numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1);
+ _handler.reset(new FileStorHandler(
+ _messageKeeper, _metrics,
+ _node.getPersistenceProvider().getPartitionStates().getList(),
+ _node.getComponentRegister(), 255, 0));
+ for (uint32_t i = 0; i < numDisks; i++) {
+ _diskEnvs.push_back(
+ vespalib::LinkedPtr<PersistenceUtil>(
+ new PersistenceUtil(
+ _config.getConfigId(),
+ _node.getComponentRegister(),
+ *_handler,
+ *_metrics.disks[i]->threads[0],
+ i,
+ 255,
+ _node.getPersistenceProvider())));
+ }
+}
+
+PersistenceTestUtils::PersistenceTestUtils()
+{
+}
+
+PersistenceTestUtils::~PersistenceTestUtils()
+{
+}
+
+std::string
+PersistenceTestUtils::dumpBucket(const document::BucketId& bid,
+ uint16_t disk) {
+ return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(spi::Bucket(bid, spi::PartitionId(disk)));
+}
+
+void
+PersistenceTestUtils::setupDisks(uint32_t numDisks) {
+ _env.reset(new PersistenceTestEnvironment(DiskCount(numDisks)));
+}
+
+std::unique_ptr<PersistenceThread>
+PersistenceTestUtils::createPersistenceThread(uint32_t disk)
+{
+ return std::unique_ptr<PersistenceThread>(
+ new PersistenceThread(_env->_node.getComponentRegister(),
+ _env->_config.getConfigId(),
+ getPersistenceProvider(),
+ getEnv()._fileStorHandler,
+ getEnv()._metrics,
+ disk,
+ 255,
+ false));
+}
+
+document::Document::SP
+PersistenceTestUtils::schedulePut(
+ uint32_t location,
+ spi::Timestamp timestamp,
+ uint16_t disk,
+ uint32_t minSize,
+ uint32_t maxSize)
+{
+ document::Document::SP doc(createRandomDocumentAtLocation(
+ location, timestamp, minSize, maxSize));
+ std::shared_ptr<api::StorageMessage> msg(
+ new api::PutCommand(
+ document::BucketId(16, location), doc, timestamp));
+ fsHandler().schedule(msg, disk);
+ return doc;
+}
+
+StorBucketDatabase::WrappedEntry
+PersistenceTestUtils::getBucket(const document::BucketId& id)
+{
+ return _env->_node.getStorageBucketDatabase().get(id, "foo");
+}
+
+StorBucketDatabase::WrappedEntry
+PersistenceTestUtils::createBucket(const document::BucketId& id)
+{
+ return _env->_node.getStorageBucketDatabase().get(
+ id,
+ "foo",
+ StorBucketDatabase::CREATE_IF_NONEXISTING);
+}
+
+spi::PersistenceProvider&
+PersistenceTestUtils::getPersistenceProvider()
+{
+ return _env->_node.getPersistenceProvider();
+}
+
+std::string
+PersistenceTestUtils::getBucketStatus(const document::BucketId& id)
+{
+ std::ostringstream ost;
+ StorBucketDatabase::WrappedEntry entry(
+ _env->_node.getStorageBucketDatabase().get(
+ id, "foo"));
+
+ ost << id << ": ";
+ if (!entry.exist()) {
+ ost << "null";
+ } else {
+ ost << entry->getBucketInfo().getDocumentCount() << "," << entry->disk;
+ }
+
+ return ost.str();
+}
+
+document::Document::SP
+PersistenceTestUtils::doPutOnDisk(
+ uint16_t disk,
+ uint32_t location,
+ spi::Timestamp timestamp,
+ uint32_t minSize,
+ uint32_t maxSize)
+{
+ document::Document::SP doc(createRandomDocumentAtLocation(
+ location, timestamp, minSize, maxSize));
+ spi::Bucket b(document::BucketId(16, location), spi::PartitionId(disk));
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+
+ getPersistenceProvider().createBucket(b, context);
+
+ getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context);
+
+ getPersistenceProvider().flush(b, context);
+ return doc;
+}
+
+bool
+PersistenceTestUtils::doRemoveOnDisk(
+ uint16_t disk,
+ const document::BucketId& bucketId,
+ const document::DocumentId& docId,
+ spi::Timestamp timestamp,
+ bool persistRemove)
+{
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ if (persistRemove) {
+ spi::RemoveResult result = getPersistenceProvider().removeIfFound(
+ spi::Bucket(bucketId, spi::PartitionId(disk)),
+ timestamp, docId, context);
+ return result.wasFound();
+ }
+ spi::RemoveResult result = getPersistenceProvider().remove(
+ spi::Bucket(bucketId, spi::PartitionId(disk)),
+ timestamp, docId, context);
+
+ return result.wasFound();
+}
+
+bool
+PersistenceTestUtils::doUnrevertableRemoveOnDisk(
+ uint16_t disk,
+ const document::BucketId& bucketId,
+ const document::DocumentId& docId,
+ spi::Timestamp timestamp)
+{
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ spi::RemoveResult result = getPersistenceProvider().remove(
+ spi::Bucket(bucketId, spi::PartitionId(disk)),
+ timestamp, docId, context);
+ return result.wasFound();
+}
+
+spi::GetResult
+PersistenceTestUtils::doGetOnDisk(
+ uint16_t disk,
+ const document::BucketId& bucketId,
+ const document::DocumentId& docId,
+ bool headerOnly)
+{
+ document::FieldSet::UP fieldSet(new document::AllFields());
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ if (headerOnly) {
+ fieldSet.reset(new document::HeaderFields());
+ }
+ return getPersistenceProvider().get(spi::Bucket(
+ bucketId, spi::PartitionId(disk)), *fieldSet, docId, context);
+}
+
+document::DocumentUpdate::SP
+PersistenceTestUtils::createBodyUpdate(
+ const document::DocumentId& docId,
+ const document::FieldValue& updateValue)
+{
+ const DocumentType* docType(_env->_component.getTypeRepo()
+ ->getDocumentType("testdoctype1"));
+ document::DocumentUpdate::SP update(
+ new document::DocumentUpdate(*docType, docId));
+ std::shared_ptr<document::AssignValueUpdate> assignUpdate(
+ new document::AssignValueUpdate(updateValue));
+ document::FieldUpdate fieldUpdate(docType->getField("content"));
+ fieldUpdate.addUpdate(*assignUpdate);
+ update->addUpdate(fieldUpdate);
+ return update;
+}
+
+document::DocumentUpdate::SP
+PersistenceTestUtils::createHeaderUpdate(
+ const document::DocumentId& docId,
+ const document::FieldValue& updateValue)
+{
+ const DocumentType* docType(_env->_component.getTypeRepo()
+ ->getDocumentType("testdoctype1"));
+ document::DocumentUpdate::SP update(
+ new document::DocumentUpdate(*docType, docId));
+ std::shared_ptr<document::AssignValueUpdate> assignUpdate(
+ new document::AssignValueUpdate(updateValue));
+ document::FieldUpdate fieldUpdate(docType->getField("headerval"));
+ fieldUpdate.addUpdate(*assignUpdate);
+ update->addUpdate(fieldUpdate);
+ return update;
+}
+
+uint16_t
+PersistenceTestUtils::getDiskFromBucketDatabaseIfUnset(const document::BucketId& bucket,
+ uint16_t disk)
+{
+ if (disk == 0xffff) {
+ StorBucketDatabase::WrappedEntry entry(
+ getEnv().getBucketDatabase().get(bucket, "createTestBucket"));
+ if (entry.exist()) {
+ return entry->disk;
+ } else {
+ std::ostringstream error;
+ error << bucket << " not in db and disk unset";
+ throw vespalib::IllegalStateException(error.str(), VESPA_STRLOC);
+ }
+ }
+ return disk;
+}
+
+void
+PersistenceTestUtils::doPut(const document::Document::SP& doc,
+ spi::Timestamp time,
+ uint16_t disk,
+ uint16_t usedBits)
+{
+ document::BucketId bucket(
+ _env->_component.getBucketIdFactory().getBucketId(doc->getId()));
+ bucket.setUsedBits(usedBits);
+ disk = getDiskFromBucketDatabaseIfUnset(bucket, disk);
+
+ doPut(doc, bucket, time, disk);
+}
+
+void
+PersistenceTestUtils::doPut(const document::Document::SP& doc,
+ document::BucketId bid,
+ spi::Timestamp time,
+ uint16_t disk)
+{
+ spi::Bucket b(bid, spi::PartitionId(disk));
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ getPersistenceProvider().createBucket(b, context);
+ getPersistenceProvider().put(b, time, doc, context);
+}
+
+spi::UpdateResult
+PersistenceTestUtils::doUpdate(document::BucketId bid,
+ const document::DocumentUpdate::SP& update,
+ spi::Timestamp time,
+ uint16_t disk)
+{
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ return getPersistenceProvider().update(
+ spi::Bucket(bid, spi::PartitionId(disk)), time, update, context);
+}
+
+void
+PersistenceTestUtils::doRemove(const document::DocumentId& id, spi::Timestamp time,
+ uint16_t disk, bool unrevertableRemove,
+ uint16_t usedBits)
+{
+ document::BucketId bucket(
+ _env->_component.getBucketIdFactory().getBucketId(id));
+ bucket.setUsedBits(usedBits);
+ disk = getDiskFromBucketDatabaseIfUnset(bucket, disk);
+ spi::Context context(defaultLoadType, spi::Priority(0),
+ spi::Trace::TraceLevel(0));
+ if (unrevertableRemove) {
+ getPersistenceProvider().remove(
+ spi::Bucket(bucket, spi::PartitionId(disk)), time, id, context);
+ } else {
+ spi::RemoveResult result = getPersistenceProvider().removeIfFound(
+ spi::Bucket(bucket, spi::PartitionId(disk)), time, id, context);
+ if (!result.wasFound()) {
+ throw vespalib::IllegalStateException(
+ "Attempted to remove non-existing doc " + id.toString(),
+ VESPA_STRLOC);
+ }
+ }
+}
+
+void
+PersistenceTestUtils::clearBody(document::Document& doc)
+{
+ // FIXME(vekterli): temporary solution while we don't have
+ // fieldset pruning functionality in Document.
+ //doc->getBody().clear();
+ vespalib::nbostream stream;
+ doc.serializeHeader(stream);
+ doc.deserialize(*_env->_component.getTypeRepo(), stream);
+}
+
+document::Document::UP
+PersistenceTestUtils::createRandomDocumentAtLocation(
+ uint64_t location, uint32_t seed,
+ uint32_t minDocSize, uint32_t maxDocSize)
+{
+ return _env->_testDocMan.createRandomDocumentAtLocation(
+ location, seed, minDocSize, maxDocSize);
+}
+
+void
+PersistenceTestUtils::createTestBucket(const document::BucketId& bucket,
+ uint16_t disk)
+{
+
+ uint32_t opsPerType = 2;
+ uint32_t numberOfLocations = 2;
+ uint32_t minDocSize = 0;
+ uint32_t maxDocSize = 128;
+ for (uint32_t useHeaderOnly = 0; useHeaderOnly < 2; ++useHeaderOnly) {
+ bool headerOnly = (useHeaderOnly == 1);
+ for (uint32_t optype=0; optype < 4; ++optype) {
+ for (uint32_t i=0; i<opsPerType; ++i) {
+ uint32_t seed = useHeaderOnly * 10000 + optype * 1000 + i + 1;
+ uint64_t location = (seed % numberOfLocations);
+ location <<= 32;
+ location += (bucket.getRawId() & 0xffffffff);
+ document::Document::SP doc(
+ createRandomDocumentAtLocation(
+ location, seed, minDocSize, maxDocSize));
+ if (headerOnly) {
+ clearBody(*doc);
+ }
+ doPut(doc, spi::Timestamp(seed), disk, bucket.getUsedBits());
+ if (optype == 0) { // Regular put
+ } else if (optype == 1) { // Overwritten later in time
+ document::Document::SP doc2(new document::Document(*doc));
+ doc2->setValue(doc2->getField("content"),
+ document::StringFieldValue("overwritten"));
+ doPut(doc2, spi::Timestamp(seed + 500),
+ disk, bucket.getUsedBits());
+ } else if (optype == 2) { // Removed
+ doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false,
+ bucket.getUsedBits());
+ } else if (optype == 3) { // Unrevertable removed
+ doRemove(doc->getId(), spi::Timestamp(seed), disk, true,
+ bucket.getUsedBits());
+ }
+ }
+ }
+ }
+}
+
+} // storage