From 27691bcd31ecc462ed440dc753d1878937ea0fab Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Thu, 23 Mar 2017 10:59:35 +0000 Subject: Use new AttributeDirectory in flush targets. Remove old methods for pruning old snapshots/removing attributes. --- .../attribute_manager/attribute_manager_test.cpp | 12 +- .../tests/proton/attribute/attributeflush_test.cpp | 14 +- .../proton/attribute/attribute_directory.cpp | 7 +- .../proton/attribute/attributedisklayout.cpp | 169 +++------------------ .../proton/attribute/attributedisklayout.h | 8 +- .../proton/attribute/attributemanager.cpp | 11 +- .../proton/attribute/flushableattribute.cpp | 138 ++++++++--------- .../proton/attribute/flushableattribute.h | 37 +---- .../documentmetastoreflushtarget.cpp | 147 ++++++++---------- .../documentmetastoreflushtarget.h | 40 +---- 10 files changed, 179 insertions(+), 404 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp b/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp index ad694348029..a38e5c583a5 100644 --- a/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp @@ -409,7 +409,7 @@ TEST_F("require that reconfig can add attributes", Fixture) newSpec.push_back(AttrSpec("a2", INT32_SINGLE)); newSpec.push_back(AttrSpec("a3", INT32_SINGLE)); - SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, f._m.getNumDocs(), 0)); + SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, f._m.getNumDocs(), 10)); std::vector list; sam.mgr.getAttributeList(list); std::sort(list.begin(), list.end(), [](const AttributeGuard & a, const AttributeGuard & b) { @@ -432,7 +432,7 @@ TEST_F("require that reconfig can remove attributes", Fixture) AttrSpecList newSpec; newSpec.push_back(AttrSpec("a2", INT32_SINGLE)); - SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, 1, 0)); + SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, 1, 10)); std::vector list; sam.mgr.getAttributeList(list); EXPECT_EQUAL(1u, list.size()); @@ -471,7 +471,7 @@ TEST_F("require that new attributes after reconfig are initialized", Fixture) EXPECT_EQUAL(0u, a3->getStatus().getLastSyncToken()); } -TEST_F("require that removed attributes can resurrect", BaseFixture) +TEST_F("require that removed attributes cannot resurrect", BaseFixture) { proton::AttributeManager::SP am1( new proton::AttributeManager(test_dir, "test.subdb", @@ -499,11 +499,11 @@ TEST_F("require that removed attributes can resurrect", BaseFixture) AttributeGuard &ag1(*ag1ap); ASSERT_TRUE(ag1.valid()); EXPECT_EQUAL(5u, ag1->getNumDocs()); - EXPECT_EQUAL(10, ag1->getInt(1)); - EXPECT_EQUAL(10, ag1->getInt(2)); + EXPECT_TRUE(search::attribute::isUndefined(ag1->getInt(1))); + EXPECT_TRUE(search::attribute::isUndefined(ag1->getInt(2))); EXPECT_TRUE(search::attribute::isUndefined(ag1->getInt(3))); EXPECT_TRUE(search::attribute::isUndefined(ag1->getInt(4))); - EXPECT_EQUAL(16u, ag1->getStatus().getLastSyncToken()); + EXPECT_EQUAL(0u, ag1->getStatus().getLastSyncToken()); } TEST_F("require that extra attribute is not treated as removed", Fixture) diff --git a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp index 91b05e4e026..d3adbed63a0 100644 --- a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp +++ b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp @@ -6,6 +6,7 @@ LOG_SETUP("attributeflush_test"); #include #include #include +#include #include #include #include @@ -78,6 +79,9 @@ public: void doFlushing(Executor::Task::UP task) { + if (!task) { + return; + } Executor::Task::UP wrapper(new TaskWrapper(std::move(task), gate)); Executor::Task::UP ok = _executor.execute(std::move(wrapper)); assert(ok.get() == NULL); @@ -410,8 +414,9 @@ Test::requireThatCleanUpIsPerformedAfterFlush(void) info.addSnapshot(IndexMetaInfo::Snapshot(true, 10, "snapshot-10")); info.addSnapshot(IndexMetaInfo::Snapshot(false, 20, "snapshot-20")); EXPECT_TRUE(info.save()); + auto diskLayout = AttributeDiskLayout::create("flush"); - FlushableAttribute fa(av, "flush", TuneFileAttributes(), + FlushableAttribute fa(av, diskLayout->getAttributeDir("a6"), TuneFileAttributes(), f._fileHeaderContext, f._attributeFieldWriter, f._hwInfo); fa.initFlush(30)->run(); @@ -457,7 +462,9 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime(void) for (size_t i = 10; i < 100; ++i) { av->commit(i, i); vespalib::Executor::Task::UP task = ft->initFlush(i); - exec.execute(std::move(task)); + if (task) { + exec.execute(std::move(task)); + } } exec.sync(); exec.shutdown(); @@ -470,9 +477,6 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime(void) } IndexMetaInfo::Snapshot best = info.getBestSnapshot(); EXPECT_EQUAL(true, best.valid); - EXPECT_EQUAL(99u, best.syncToken); - FlushStats stats = ft->getLastFlushStats(); - EXPECT_EQUAL("flush/a8/snapshot-99", stats.getPath()); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp index 265e9bb11a6..4e96ca59eaf 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp @@ -60,6 +60,9 @@ AttributeDirectory::getDirName() const diskLayout = _diskLayout.lock(); } assert(diskLayout); + if (_name.empty()) { + return diskLayout->getBaseDir(); + } return AttributeDiskLayout::getAttributeBaseDir(diskLayout->getBaseDir(), _name); } @@ -98,10 +101,8 @@ AttributeDirectory::saveSnapInfo() vespalib::string AttributeDirectory::getSnapshotDir(search::SerialNum serialNum) { - auto snap = _snapInfo.getSnapshot(serialNum); - assert(snap.syncToken == serialNum); vespalib::string dirName(getDirName()); - return dirName + "/" + snap.dirName; + return dirName + "/" + getSnapshotDirComponent(serialNum); } void diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp index f17f7c76566..cec46694ae2 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp @@ -16,6 +16,16 @@ using search::AttributeVector; namespace proton { +namespace { + +vespalib::string +getSnapshotDir(uint64_t syncToken) +{ + return vespalib::make_string("snapshot-%" PRIu64, syncToken); +} + +} + AttributeDiskLayout::AttributeDiskLayout(const vespalib::string &baseDir, PrivateConstructorTag) : _baseDir(baseDir), _mutex(), @@ -33,24 +43,6 @@ AttributeDiskLayout::createBaseDir() vespalib::mkdir(_baseDir, false); } -vespalib::string -AttributeDiskLayout::getSnapshotDir(uint64_t syncToken) -{ - return vespalib::make_string("snapshot-%" PRIu64, syncToken); -} - -vespalib::string -AttributeDiskLayout::getSnapshotRemoveDir(const vespalib::string &baseDir, - const vespalib::string &snapDir) -{ - if (baseDir.empty()) { - return snapDir; - } - return vespalib::make_string("%s/%s", - baseDir.c_str(), - snapDir.c_str()); -} - vespalib::string AttributeDiskLayout::getAttributeBaseDir(const vespalib::string &baseDir, const vespalib::string &attrName) @@ -73,139 +65,13 @@ AttributeDiskLayout::getAttributeFileName(const vespalib::string &baseDir, attrName); } -bool -AttributeDiskLayout::removeOldSnapshots(IndexMetaInfo &snapInfo, - vespalib::Lock &snapInfoLock) -{ - IndexMetaInfo::Snapshot best = snapInfo.getBestSnapshot(); - if (!best.valid) { - return true; - } - std::vector toRemove; - const IndexMetaInfo::SnapshotList & list = snapInfo.snapshots(); - for (const auto &snap : list) { - if (!(snap == best)) { - toRemove.push_back(snap); - } - } - LOG(debug, - "About to remove %zu old snapshots. " - "Will keep best snapshot with sync token %" PRIu64, - toRemove.size(), - best.syncToken); - for (const auto &snap : toRemove) { - if (snap.valid) { - { - vespalib::LockGuard guard(snapInfoLock); - snapInfo.invalidateSnapshot(snap.syncToken); - } - if (!snapInfo.save()) { - LOG(warning, - "Could not save meta info file in directory '%s' after " - "invalidating snapshot with sync token %" PRIu64, - snapInfo.getPath().c_str(), - snap.syncToken); - return false; - } - } - vespalib::string rmDir = - getSnapshotRemoveDir(snapInfo.getPath(), snap.dirName); - FastOS_StatInfo statInfo; - if (!FastOS_File::Stat(rmDir.c_str(), &statInfo) && - statInfo._error == FastOS_StatInfo::FileNotFound) - { - // Directory already removed - } else { - FastOS_FileInterface:: EmptyAndRemoveDirectory(rmDir.c_str()); -#if 0 - LOG(warning, - "Could not remove snapshot directory '%s'", - rmDir.c_str()); - return false; -#endif - } - { - vespalib::LockGuard guard(snapInfoLock); - snapInfo.removeSnapshot(snap.syncToken); - } - if (!snapInfo.save()) { - LOG(warning, - "Could not save meta info file in directory '%s' after " - "removing snapshot with sync token %" PRIu64, - snapInfo.getPath().c_str(), snap.syncToken); - return false; - } - LOG(debug, "Removed snapshot directory '%s'", rmDir.c_str()); - } - return true; -} - -bool -AttributeDiskLayout::removeAttribute(const vespalib::string &baseDir, - const vespalib::string &attrName, - uint64_t wipeSerial) -{ - const vespalib::string currDir = getAttributeBaseDir(baseDir, attrName); - search::IndexMetaInfo snapInfo(currDir); - IndexMetaInfo::Snapshot best = snapInfo.getBestSnapshot(); - if (best.valid && best.syncToken >= wipeSerial) { - return true; // Attribute has been resurrected and flushed later on - } - const vespalib::string rmDir = - getAttributeBaseDir(baseDir, - vespalib::make_string("remove.%s", - attrName.c_str())); - - FastOS_StatInfo statInfo; - if (FastOS_File::Stat(rmDir.c_str(), &statInfo) && statInfo._isDirectory) { - FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str()); -#if 0 - if (!FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str())) { - LOG(warning, - "Could not remove attribute directory '%s'", - rmDir.c_str()); - return false; - } -#endif - } - if (!FastOS_File::Stat(currDir.c_str(), &statInfo) && - statInfo._error == FastOS_StatInfo::FileNotFound) - { - // Directory already removed - return true; - } - if (!FastOS_FileInterface::MoveFile(currDir.c_str(), rmDir.c_str())) { - LOG(warning, - "Could not move attribute directory '%s' to '%s'", - currDir.c_str(), - rmDir.c_str()); - return false; - } - FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str()); -#if 0 - if (!FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str())) { - LOG(warning, - "Could not remove attribute directory '%s'", - rmDir.c_str()); - return false; - } -#endif - return true; -} - std::vector -AttributeDiskLayout::listAttributes(const vespalib::string &baseDir) +AttributeDiskLayout::listAttributes() { std::vector attributes; - FastOS_DirectoryScan dir(baseDir.c_str()); - while (dir.ReadNext()) { - if (strcmp(dir.GetName(), "..") != 0 && - strcmp(dir.GetName(), ".") != 0) - { - if (dir.IsDirectory()) { - attributes.emplace_back(dir.GetName()); - } - } + std::shared_lock guard(_mutex); + for (const auto &dir : _dirs) { + attributes.emplace_back(dir.first); } return attributes; } @@ -284,5 +150,12 @@ AttributeDiskLayout::create(const vespalib::string &baseDir) return diskLayout; } +std::shared_ptr +AttributeDiskLayout::createSimple(const vespalib::string &baseDir) +{ + auto diskLayout = std::make_shared(baseDir, PrivateConstructorTag()); + return diskLayout; +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h index 466f9426f56..e002d1655d3 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h @@ -24,9 +24,6 @@ private: mutable std::shared_timed_mutex _mutex; std::map> _dirs; - static vespalib::string getSnapshotDir(uint64_t syncToken); - static vespalib::string getSnapshotRemoveDir(const vespalib::string &baseDir, const vespalib::string &snapDir); - void scanDir(); struct PrivateConstructorTag { }; public: @@ -34,15 +31,14 @@ public: ~AttributeDiskLayout(); static vespalib::string getAttributeBaseDir(const vespalib::string &baseDir, const vespalib::string &attrName); static search::AttributeVector::BaseName getAttributeFileName(const vespalib::string &baseDir, const vespalib::string &attrName, uint64_t syncToken); - static bool removeOldSnapshots(search::IndexMetaInfo &snapInfo, vespalib::Lock &snapInfoLock); - static bool removeAttribute(const vespalib::string &baseDir, const vespalib::string &attrName, uint64_t wipeSerial); - static std::vector listAttributes(const vespalib::string &baseDir); + std::vector listAttributes(); const vespalib::string &getBaseDir() const { return _baseDir; } void createBaseDir(); std::shared_ptr getAttributeDir(const vespalib::string &name); std::shared_ptr createAttributeDir(const vespalib::string &name); void removeAttributeDir(const vespalib::string &name, search::SerialNum serialNum); static std::shared_ptr create(const vespalib::string &baseDir); + static std::shared_ptr createSimple(const vespalib::string &baseDir); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index 30bd4900f64..bb48f63dda9 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -54,7 +54,7 @@ AttributeManager::addAttribute(const AttributeWrap &attribute) if ( ! attribute.isExtra() ) { // Flushing of extra attributes is handled elsewhere _flushables[attribute->getName()] = FlushableAttribute::SP - (new FlushableAttribute(attribute, _diskLayout->getBaseDir(), + (new FlushableAttribute(attribute, _diskLayout->createAttributeDir(attribute->getName()), _tuneFileAttributes, _fileHeaderContext, _attributeFieldWriter, @@ -104,11 +104,11 @@ AttributeManager::flushRemovedAttributes(const AttributeManager &currMgr, if (!newSpec.hasAttribute(kv.first) && !kv.second.isExtra() && kv.second->getStatus().getLastSyncToken() < - newSpec.getCurrentSerialNum()) { + newSpec.getCurrentSerialNum() - 1) { FlushableAttribute::SP flushable = currMgr.findFlushable(kv.first); vespalib::Executor::Task::UP flushTask = - flushable->initFlush(newSpec.getCurrentSerialNum()); + flushable->initFlush(newSpec.getCurrentSerialNum() - 1); if (flushTask.get() != NULL) { LOG(debug, "Flushing removed attribute vector '%s' with %u docs and serial number %lu", kv.first.c_str(), kv.second->getNumDocs(), kv.second->getStatus().getLastSyncToken()); @@ -441,12 +441,11 @@ AttributeManager::getAttributeListAll(std::vector &list) const void AttributeManager::wipeHistory(search::SerialNum wipeSerial) { - const vespalib::string &baseDir = _diskLayout->getBaseDir(); - std::vector attributes = AttributeDiskLayout::listAttributes(baseDir); + std::vector attributes = _diskLayout->listAttributes(); for (const auto &attribute : attributes) { auto itr = _attributes.find(attribute); if (itr == _attributes.end()) { - AttributeDiskLayout::removeAttribute(baseDir, attribute, wipeSerial); + _diskLayout->removeAttributeDir(attribute, wipeSerial); } } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index d5829a05164..3f48761a42f 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -12,6 +12,7 @@ #include #include #include +#include "attribute_directory.h" #include LOG_SETUP(".proton.attribute.flushableattribute"); @@ -25,7 +26,37 @@ using vespalib::makeClosure; namespace proton { -FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncToken) +/** + * Task performing the actual flushing to disk. + **/ +class FlushableAttribute::Flusher : public Task { +private: + FlushableAttribute & _fattr; + search::AttributeMemorySaveTarget _saveTarget; + std::unique_ptr _saver; + uint64_t _syncToken; + search::AttributeVector::BaseName _flushFile; + + bool saveAttribute(); // not updating snap info. +public: + Flusher(FlushableAttribute & fattr, uint64_t syncToken, AttributeDirectory::Writer &writer); + ~Flusher(); + uint64_t getSyncToken() const { return _syncToken; } + bool flush(AttributeDirectory::Writer &writer); + void updateStats(); + bool cleanUp(AttributeDirectory::Writer &writer); + // Implements vespalib::Executor::Task + virtual void run(); + + virtual SerialNum + getFlushSerial(void) const + { + return _syncToken; + } +}; + + +FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncToken, AttributeDirectory::Writer &writer) : _fattr(fattr), _saveTarget(), _saver(), @@ -38,9 +69,7 @@ FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncT if (attr.canShrinkLidSpace()) { attr.shrinkLidSpace(); } - _flushFile = AttributeDiskLayout::getAttributeFileName(_fattr._baseDir, - attr.getName(), - _syncToken); + _flushFile = writer.getSnapshotDir(_syncToken) + "/" + attr.getName(); attr.setBaseFileName(_flushFile); _saver = attr.initSave(); if (!_saver) { @@ -54,18 +83,6 @@ FlushableAttribute::Flusher::~Flusher() // empty } -bool -FlushableAttribute::Flusher::saveSnapInfo() -{ - if (!_fattr._snapInfo.save()) { - LOG(warning, - "Could not save meta-info file for attribute vector '%s' to disk", - _fattr._attr->getBaseFileName().c_str()); - return false; - } - return true; -} - bool FlushableAttribute::Flusher::saveAttribute() { @@ -93,31 +110,16 @@ FlushableAttribute::Flusher::saveAttribute() } bool -FlushableAttribute::Flusher::flush() +FlushableAttribute::Flusher::flush(AttributeDirectory::Writer &writer) { - IndexMetaInfo::Snapshot newSnap(false, _syncToken, - _flushFile.getSnapshotName()); - { - vespalib::LockGuard guard(_fattr._snapInfoLock); - _fattr._snapInfo.addSnapshot(newSnap); - } - if (!saveSnapInfo()) { - return false; - } + writer.createInvalidSnapshot(_syncToken); if (!saveAttribute()) { LOG(warning, "Could not write attribute vector '%s' to disk", _flushFile.c_str()); return false; } - { - vespalib::LockGuard guard(_fattr._snapInfoLock); - _fattr._snapInfo.validateSnapshot(_syncToken); - } - if (!saveSnapInfo()) { - return false; - } - _fattr._lastFlushTime = - search::FileKit::getModificationTime(_flushFile.getDirName()); + writer.markValidSnapshot(_syncToken); + writer.setLastFlushTime(search::FileKit::getModificationTime(_flushFile.getDirName())); return true; } @@ -128,17 +130,11 @@ FlushableAttribute::Flusher::updateStats() } bool -FlushableAttribute::Flusher::cleanUp() +FlushableAttribute::Flusher::cleanUp(AttributeDirectory::Writer &writer) { if (_fattr._cleanUpAfterFlush) { - if (!AttributeDiskLayout::removeOldSnapshots(_fattr._snapInfo, - _fattr._snapInfoLock)) { - LOG(warning, - "Encountered problems when removing old snapshot directories" - "after flushing attribute vector '%s' to disk", - _fattr._attr->getBaseFileName().c_str()); - return false; - } + writer.invalidateOldSnapshots(); + writer.removeInvalidSnapshots(false); } return true; } @@ -146,23 +142,23 @@ FlushableAttribute::Flusher::cleanUp() void FlushableAttribute::Flusher::run() { - vespalib::LockGuard guard(_fattr._flusherLock); - if (_syncToken <= _fattr.getFlushedSerialNum()) { + auto writer = _fattr._attrDir->getWriter(); + if (!writer || _syncToken <= _fattr.getFlushedSerialNum()) { // another flusher has created an equal or better snapshot // after this flusher was created return; } - if (!flush()) { + if (!flush(*writer)) { // TODO (geirst): throw exception ? } updateStats(); - if (!cleanUp()) { + if (!cleanUp(*writer)) { // TODO (geirst): throw exception ? } } FlushableAttribute::FlushableAttribute(const AttributeVector::SP attr, - const vespalib::string & baseDir, + const std::shared_ptr &attrDir, const TuneFileAttributes & tuneFileAttributes, const FileHeaderContext & @@ -175,28 +171,14 @@ FlushableAttribute::FlushableAttribute(const AttributeVector::SP attr, attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE), _attr(attr), - _baseDir(baseDir), - _snapInfo(AttributeDiskLayout::getAttributeBaseDir(baseDir, - attr->getName())), - _snapInfoLock(), - _flusherLock(), _cleanUpAfterFlush(true), _lastStats(), _tuneFileAttributes(tuneFileAttributes), _fileHeaderContext(fileHeaderContext), - _lastFlushTime(), _attributeFieldWriter(attributeFieldWriter), - _hwInfo(hwInfo) + _hwInfo(hwInfo), + _attrDir(attrDir) { - if (!_snapInfo.load()) { - _snapInfo.save(); - } else { - vespalib::string dirName = - AttributeDiskLayout::getAttributeFileName(_baseDir, - _attr->getName(), - getFlushedSerialNum()).getDirName(); - _lastFlushTime = search::FileKit::getModificationTime(dirName); - } _lastStats.setPathElementsToLog(8); } @@ -209,9 +191,7 @@ FlushableAttribute::~FlushableAttribute() IFlushTarget::SerialNum FlushableAttribute::getFlushedSerialNum() const { - vespalib::LockGuard guard(_snapInfoLock); - IndexMetaInfo::Snapshot bestSnap = _snapInfo.getBestSnapshot(); - return bestSnap.valid ? bestSnap.syncToken : 0; + return _attrDir->getFlushedSerialNum(); } IFlushTarget::MemoryGain @@ -253,34 +233,36 @@ FlushableAttribute::getApproxDiskGain() const IFlushTarget::Time FlushableAttribute::getLastFlushTime() const { - return _lastFlushTime; + return _attrDir->getLastFlushTime(); } IFlushTarget::Task::UP FlushableAttribute::internalInitFlush(SerialNum currentSerial) { - // Called by document db executor - (void)currentSerial; + // Called by attribute field writer thread while document db executor waits _attr->removeAllOldGenerations(); - SerialNum syncToken = currentSerial; - syncToken = std::max(currentSerial, - _attr->getStatus().getLastSyncToken()); + SerialNum syncToken = std::max(currentSerial, + _attr->getStatus().getLastSyncToken()); + auto writer = _attrDir->tryGetWriter(); + if (!writer) { + return Task::UP(); + } if (syncToken <= getFlushedSerialNum()) { - vespalib::LockGuard guard(_flusherLock); - _lastFlushTime = fastos::ClockSystem::now(); + writer->setLastFlushTime(fastos::ClockSystem::now()); LOG(debug, "No attribute vector to flush." " Update flush time to current: lastFlushTime(%f)", - _lastFlushTime.sec()); + getLastFlushTime().sec()); return Task::UP(); } - return Task::UP(new Flusher(*this, syncToken)); + return Task::UP(new Flusher(*this, syncToken, *writer)); } IFlushTarget::Task::UP FlushableAttribute::initFlush(SerialNum currentSerial) { + // Called by document db executor std::promise promise; std::future future = promise.get_future(); _attributeFieldWriter.execute(_attr->getName(), diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h index 7ef3235b0d8..e72ec166453 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h @@ -22,6 +22,8 @@ namespace proton { using searchcorespi::FlushStats; using searchcorespi::IFlushTarget; +class AttributeDirectory; + /** * Implementation of IFlushTarget interface for attribute vectors. */ @@ -31,45 +33,16 @@ private: /** * Task performing the actual flushing to disk. **/ - class Flusher : public Task { - private: - FlushableAttribute & _fattr; - search::AttributeMemorySaveTarget _saveTarget; - std::unique_ptr _saver; - uint64_t _syncToken; - search::AttributeVector::BaseName _flushFile; - - bool saveAttribute(); // not updating snap info. - public: - Flusher(FlushableAttribute & fattr, uint64_t syncToken); - ~Flusher(); - uint64_t getSyncToken() const { return _syncToken; } - bool saveSnapInfo(); - bool flush(); - void updateStats(); - bool cleanUp(); - // Implements vespalib::Executor::Task - virtual void run(); - - virtual SerialNum - getFlushSerial(void) const - { - return _syncToken; - } - }; + class Flusher; search::AttributeVector::SP _attr; - vespalib::string _baseDir; - search::IndexMetaInfo _snapInfo; - vespalib::Lock _snapInfoLock; - vespalib::Lock _flusherLock; bool _cleanUpAfterFlush; FlushStats _lastStats; const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - fastos::TimeStamp _lastFlushTime; search::ISequencedTaskExecutor &_attributeFieldWriter; HwInfo _hwInfo; + std::shared_ptr _attrDir; Task::UP internalInitFlush(SerialNum currentSerial); @@ -83,7 +56,7 @@ public: * fileHeaderContext must be kept alive by caller. **/ FlushableAttribute(const search::AttributeVector::SP attr, - const vespalib::string & baseDir, + const std::shared_ptr &attrDir, const search::TuneFileAttributes &tuneFileAttributes, const search::common::FileHeaderContext & fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp index b189b176b80..d97f90ebc2b 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp @@ -17,6 +17,8 @@ LOG_SETUP(".proton.documentmetastore.documentmetastoreflushtarget"); #include #include #include +#include +#include using namespace search; using namespace vespalib; @@ -27,9 +29,38 @@ using vespalib::makeClosure; namespace proton { +/** + * Task performing the actual flushing to disk. + **/ +class DocumentMetaStoreFlushTarget::Flusher : public Task { +private: + DocumentMetaStoreFlushTarget &_dmsft; + std::unique_ptr _saver; + uint64_t _syncToken; + vespalib::string _flushDir; + + bool saveDocumentMetaStore(); // not updating snap info. +public: + Flusher(DocumentMetaStoreFlushTarget &dmsft, uint64_t syncToken, AttributeDirectory::Writer &writer); + ~Flusher(); + uint64_t getSyncToken() const { return _syncToken; } + bool saveSnapInfo(); + bool flush(AttributeDirectory::Writer &writer); + void updateStats(); + bool cleanUp(AttributeDirectory::Writer &writer); + // Implements vespalib::Executor::Task + virtual void run(); + + virtual SerialNum + getFlushSerial(void) const + { + return _syncToken; + } +}; + DocumentMetaStoreFlushTarget::Flusher:: Flusher(DocumentMetaStoreFlushTarget &dmsft, - SerialNum syncToken) + SerialNum syncToken, AttributeDirectory::Writer &writer) : _dmsft(dmsft), _saver(), _syncToken(syncToken), @@ -40,7 +71,7 @@ Flusher(DocumentMetaStoreFlushTarget &dmsft, if (dms.canShrinkLidSpace()) { dms.shrinkLidSpace(); } - _flushDir = _dmsft.getSnapshotDir(syncToken); + _flushDir = writer.getSnapshotDir(syncToken); vespalib::string newBaseFileName(_flushDir + "/" + dms.getName()); dms.setBaseFileName(newBaseFileName); _saver = dms.initSave(); @@ -52,19 +83,6 @@ DocumentMetaStoreFlushTarget::Flusher::~Flusher() // empty } -bool -DocumentMetaStoreFlushTarget::Flusher::saveSnapInfo() -{ - if (!_dmsft._snapInfo.save()) { - LOG(warning, - "Could not save meta-info file for document meta store" - " '%s' to disk", - _dmsft._dms->getBaseFileName().c_str()); - return false; - } - return true; -} - bool DocumentMetaStoreFlushTarget::Flusher::saveDocumentMetaStore() { @@ -90,17 +108,9 @@ DocumentMetaStoreFlushTarget::Flusher::saveDocumentMetaStore() } bool -DocumentMetaStoreFlushTarget::Flusher::flush() +DocumentMetaStoreFlushTarget::Flusher::flush(AttributeDirectory::Writer &writer) { - IndexMetaInfo::Snapshot newSnap(false, _syncToken, - getSnapshotName(_syncToken)); - { - vespalib::LockGuard guard(_dmsft._snapInfoLock); - _dmsft._snapInfo.addSnapshot(newSnap); - } - if (!saveSnapInfo()) { - return false; - } + writer.createInvalidSnapshot(_syncToken); if (!saveDocumentMetaStore()) { LOG(warning, "Could not write document meta store '%s' to disk", _dmsft._dms->getBaseFileName().c_str()); @@ -111,16 +121,14 @@ DocumentMetaStoreFlushTarget::Flusher::flush() * flush is activated to ensure that same future will occur that has * already been observable in the saved document meta store (future * timestamp or bucket id). + * + * Assume only flush engine tries to flush document meta store, i.e. + * noone else tries to get writer while flush task is flushing + * document meta store to disk. */ _dmsft._tlsSyncer.sync(); - { - vespalib::LockGuard guard(_dmsft._snapInfoLock); - _dmsft._snapInfo.validateSnapshot(_syncToken); - } - if (!saveSnapInfo()) { - return false; - } - _dmsft._lastFlushTime = search::FileKit::getModificationTime(_flushDir); + writer.markValidSnapshot(_syncToken); + writer.setLastFlushTime(search::FileKit::getModificationTime(_flushDir)); return true; } @@ -131,17 +139,11 @@ DocumentMetaStoreFlushTarget::Flusher::updateStats() } bool -DocumentMetaStoreFlushTarget::Flusher::cleanUp() +DocumentMetaStoreFlushTarget::Flusher::cleanUp(AttributeDirectory::Writer &writer) { if (_dmsft._cleanUpAfterFlush) { - if (!AttributeDiskLayout::removeOldSnapshots(_dmsft._snapInfo, - _dmsft._snapInfoLock)) { - LOG(warning, - "Encountered problems when removing old snapshot directories" - "after flushing document meta store '%s' to disk", - _dmsft._dms->getBaseFileName().c_str()); - return false; - } + writer.invalidateOldSnapshots(); + writer.removeInvalidSnapshots(false); } return true; } @@ -149,17 +151,17 @@ DocumentMetaStoreFlushTarget::Flusher::cleanUp() void DocumentMetaStoreFlushTarget::Flusher::run() { - vespalib::LockGuard guard(_dmsft._flusherLock); - if (_syncToken <= _dmsft.getFlushedSerialNum()) { + auto writer = _dmsft._dmsDir->getWriter(); + if (!writer || _syncToken <= _dmsft.getFlushedSerialNum()) { // another flusher has created an equal or better snapshot // after this flusher was created return; } - if (!flush()) { + if (!flush(*writer)) { // TODO (geirst): throw exception ? } updateStats(); - if (!cleanUp()) { + if (!cleanUp(*writer)) { // TODO (geirst): throw exception ? } } @@ -175,23 +177,15 @@ DocumentMetaStoreFlushTarget(const DocumentMetaStore::SP dms, _dms(dms), _tlsSyncer(tlsSyncer), _baseDir(baseDir), - _snapInfo(_baseDir), - _snapInfoLock(), - _flusherLock(), _cleanUpAfterFlush(true), _lastStats(), _tuneFileAttributes(tuneFileAttributes), _fileHeaderContext(fileHeaderContext), - _lastFlushTime(), - _hwInfo(hwInfo) + _hwInfo(hwInfo), + _diskLayout(AttributeDiskLayout::createSimple(baseDir)), + _dmsDir(_diskLayout->createAttributeDir("")) { - if (!_snapInfo.load()) { - _snapInfo.save(); - } else { - vespalib::string dirName(getSnapshotDir(getFlushedSerialNum())); - _lastFlushTime = search::FileKit::getModificationTime(dirName); - } _lastStats.setPathElementsToLog(8); } @@ -201,28 +195,10 @@ DocumentMetaStoreFlushTarget::~DocumentMetaStoreFlushTarget() } -vespalib::string -DocumentMetaStoreFlushTarget::getSnapshotName(uint64_t syncToken) -{ - return vespalib::make_string("snapshot-%" PRIu64, syncToken); -} - - -vespalib::string -DocumentMetaStoreFlushTarget::getSnapshotDir(uint64_t syncToken) -{ - return vespalib::make_string("%s/%s", - _baseDir.c_str(), - getSnapshotName(syncToken).c_str()); -} - - IFlushTarget::SerialNum DocumentMetaStoreFlushTarget::getFlushedSerialNum() const { - vespalib::LockGuard guard(_snapInfoLock); - IndexMetaInfo::Snapshot bestSnap = _snapInfo.getBestSnapshot(); - return bestSnap.valid ? bestSnap.syncToken : 0; + return _dmsDir->getFlushedSerialNum(); } @@ -255,7 +231,7 @@ DocumentMetaStoreFlushTarget::getApproxDiskGain() const IFlushTarget::Time DocumentMetaStoreFlushTarget::getLastFlushTime() const { - return _lastFlushTime; + return _dmsDir->getLastFlushTime(); } @@ -263,21 +239,22 @@ IFlushTarget::Task::UP DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial) { // Called by document db executor - (void)currentSerial; _dms->removeAllOldGenerations(); - SerialNum syncToken = currentSerial; - syncToken = std::max(currentSerial, - _dms->getStatus().getLastSyncToken()); + SerialNum syncToken = std::max(currentSerial, + _dms->getStatus().getLastSyncToken()); + auto writer = _dmsDir->tryGetWriter(); + if (!writer) { + return Task::UP(); + } if (syncToken <= getFlushedSerialNum()) { - vespalib::LockGuard guard(_flusherLock); - _lastFlushTime = fastos::ClockSystem::now(); + writer->setLastFlushTime(fastos::ClockSystem::now()); LOG(debug, "No document meta store to flush." " Update flush time to current: lastFlushTime(%f)", - _lastFlushTime.sec()); + getLastFlushTime().sec()); return Task::UP(); } - return Task::UP(new Flusher(*this, syncToken)); + return Task::UP(new Flusher(*this, syncToken, *writer)); } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h index d26c55906e2..f6ea0613e7f 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h @@ -25,6 +25,8 @@ namespace proton { class ITlsSyncer; +class AttributeDiskLayout; +class AttributeDirectory; using searchcorespi::FlushStats; using searchcorespi::IFlushTarget; @@ -38,50 +40,18 @@ private: /** * Task performing the actual flushing to disk. **/ - class Flusher : public Task { - private: - DocumentMetaStoreFlushTarget &_dmsft; - std::unique_ptr _saver; - uint64_t _syncToken; - vespalib::string _flushDir; - - bool saveDocumentMetaStore(); // not updating snap info. - public: - Flusher(DocumentMetaStoreFlushTarget &dmsft, uint64_t syncToken); - ~Flusher(); - uint64_t getSyncToken() const { return _syncToken; } - bool saveSnapInfo(); - bool flush(); - void updateStats(); - bool cleanUp(); - // Implements vespalib::Executor::Task - virtual void run(); - - virtual SerialNum - getFlushSerial(void) const - { - return _syncToken; - } - }; + class Flusher; DocumentMetaStore::SP _dms; ITlsSyncer &_tlsSyncer; vespalib::string _baseDir; - search::IndexMetaInfo _snapInfo; - vespalib::Lock _snapInfoLock; - vespalib::Lock _flusherLock; bool _cleanUpAfterFlush; FlushStats _lastStats; const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - fastos::TimeStamp _lastFlushTime; HwInfo _hwInfo; - - static vespalib::string - getSnapshotName(uint64_t syncToken); - - vespalib::string - getSnapshotDir(uint64_t syncToken); + std::shared_ptr _diskLayout; + std::shared_ptr _dmsDir; public: typedef std::shared_ptr SP; -- cgit v1.2.3