summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2017-03-23 10:59:35 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2017-03-23 10:59:35 +0000
commit27691bcd31ecc462ed440dc753d1878937ea0fab (patch)
treeb180fbd0755eaa178b7bf8f2a23062e238091ef4 /searchcore
parent4f4e4a45d5d5e9d002bce40c64259f10af2d9f30 (diff)
Use new AttributeDirectory in flush targets.
Remove old methods for pruning old snapshots/removing attributes.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp12
-rw-r--r--searchcore/src/tests/proton/attribute/attributeflush_test.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp169
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp138
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h37
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp147
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h40
10 files changed, 179 insertions, 404 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp b/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp
index ad694348029..a38e5c583a5 100644
--- a/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_manager/attribute_manager_test.cpp
@@ -409,7 +409,7 @@ TEST_F("require that reconfig can add attributes", Fixture)
newSpec.push_back(AttrSpec("a2", INT32_SINGLE));
newSpec.push_back(AttrSpec("a3", INT32_SINGLE));
- SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, f._m.getNumDocs(), 0));
+ SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, f._m.getNumDocs(), 10));
std::vector<AttributeGuard> list;
sam.mgr.getAttributeList(list);
std::sort(list.begin(), list.end(), [](const AttributeGuard & a, const AttributeGuard & b) {
@@ -432,7 +432,7 @@ TEST_F("require that reconfig can remove attributes", Fixture)
AttrSpecList newSpec;
newSpec.push_back(AttrSpec("a2", INT32_SINGLE));
- SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, 1, 0));
+ SequentialAttributeManager sam(f._m, AttrMgrSpec(newSpec, 1, 10));
std::vector<AttributeGuard> list;
sam.mgr.getAttributeList(list);
EXPECT_EQUAL(1u, list.size());
@@ -471,7 +471,7 @@ TEST_F("require that new attributes after reconfig are initialized", Fixture)
EXPECT_EQUAL(0u, a3->getStatus().getLastSyncToken());
}
-TEST_F("require that removed attributes can resurrect", BaseFixture)
+TEST_F("require that removed attributes cannot resurrect", BaseFixture)
{
proton::AttributeManager::SP am1(
new proton::AttributeManager(test_dir, "test.subdb",
@@ -499,11 +499,11 @@ TEST_F("require that removed attributes can resurrect", BaseFixture)
AttributeGuard &ag1(*ag1ap);
ASSERT_TRUE(ag1.valid());
EXPECT_EQUAL(5u, ag1->getNumDocs());
- EXPECT_EQUAL(10, ag1->getInt(1));
- EXPECT_EQUAL(10, ag1->getInt(2));
+ EXPECT_TRUE(search::attribute::isUndefined<int32_t>(ag1->getInt(1)));
+ EXPECT_TRUE(search::attribute::isUndefined<int32_t>(ag1->getInt(2)));
EXPECT_TRUE(search::attribute::isUndefined<int32_t>(ag1->getInt(3)));
EXPECT_TRUE(search::attribute::isUndefined<int32_t>(ag1->getInt(4)));
- EXPECT_EQUAL(16u, ag1->getStatus().getLastSyncToken());
+ EXPECT_EQUAL(0u, ag1->getStatus().getLastSyncToken());
}
TEST_F("require that extra attribute is not treated as removed", Fixture)
diff --git a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
index 91b05e4e026..d3adbed63a0 100644
--- a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
@@ -6,6 +6,7 @@ LOG_SETUP("attributeflush_test");
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/searchcore/proton/attribute/attributemanager.h>
+#include <vespa/searchcore/proton/attribute/attributedisklayout.h>
#include <vespa/searchcore/proton/attribute/attribute_writer.h>
#include <vespa/searchcore/proton/attribute/flushableattribute.h>
#include <vespa/searchlib/attribute/attributefactory.h>
@@ -78,6 +79,9 @@ public:
void
doFlushing(Executor::Task::UP task)
{
+ if (!task) {
+ return;
+ }
Executor::Task::UP wrapper(new TaskWrapper(std::move(task), gate));
Executor::Task::UP ok = _executor.execute(std::move(wrapper));
assert(ok.get() == NULL);
@@ -410,8 +414,9 @@ Test::requireThatCleanUpIsPerformedAfterFlush(void)
info.addSnapshot(IndexMetaInfo::Snapshot(true, 10, "snapshot-10"));
info.addSnapshot(IndexMetaInfo::Snapshot(false, 20, "snapshot-20"));
EXPECT_TRUE(info.save());
+ auto diskLayout = AttributeDiskLayout::create("flush");
- FlushableAttribute fa(av, "flush", TuneFileAttributes(),
+ FlushableAttribute fa(av, diskLayout->getAttributeDir("a6"), TuneFileAttributes(),
f._fileHeaderContext, f._attributeFieldWriter,
f._hwInfo);
fa.initFlush(30)->run();
@@ -457,7 +462,9 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime(void)
for (size_t i = 10; i < 100; ++i) {
av->commit(i, i);
vespalib::Executor::Task::UP task = ft->initFlush(i);
- exec.execute(std::move(task));
+ if (task) {
+ exec.execute(std::move(task));
+ }
}
exec.sync();
exec.shutdown();
@@ -470,9 +477,6 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime(void)
}
IndexMetaInfo::Snapshot best = info.getBestSnapshot();
EXPECT_EQUAL(true, best.valid);
- EXPECT_EQUAL(99u, best.syncToken);
- FlushStats stats = ft->getLastFlushStats();
- EXPECT_EQUAL("flush/a8/snapshot-99", stats.getPath());
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp
index 265e9bb11a6..4e96ca59eaf 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp
@@ -60,6 +60,9 @@ AttributeDirectory::getDirName() const
diskLayout = _diskLayout.lock();
}
assert(diskLayout);
+ if (_name.empty()) {
+ return diskLayout->getBaseDir();
+ }
return AttributeDiskLayout::getAttributeBaseDir(diskLayout->getBaseDir(), _name);
}
@@ -98,10 +101,8 @@ AttributeDirectory::saveSnapInfo()
vespalib::string
AttributeDirectory::getSnapshotDir(search::SerialNum serialNum)
{
- auto snap = _snapInfo.getSnapshot(serialNum);
- assert(snap.syncToken == serialNum);
vespalib::string dirName(getDirName());
- return dirName + "/" + snap.dirName;
+ return dirName + "/" + getSnapshotDirComponent(serialNum);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp
index f17f7c76566..cec46694ae2 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp
@@ -16,6 +16,16 @@ using search::AttributeVector;
namespace proton
{
+namespace {
+
+vespalib::string
+getSnapshotDir(uint64_t syncToken)
+{
+ return vespalib::make_string("snapshot-%" PRIu64, syncToken);
+}
+
+}
+
AttributeDiskLayout::AttributeDiskLayout(const vespalib::string &baseDir, PrivateConstructorTag)
: _baseDir(baseDir),
_mutex(),
@@ -34,24 +44,6 @@ AttributeDiskLayout::createBaseDir()
}
vespalib::string
-AttributeDiskLayout::getSnapshotDir(uint64_t syncToken)
-{
- return vespalib::make_string("snapshot-%" PRIu64, syncToken);
-}
-
-vespalib::string
-AttributeDiskLayout::getSnapshotRemoveDir(const vespalib::string &baseDir,
- const vespalib::string &snapDir)
-{
- if (baseDir.empty()) {
- return snapDir;
- }
- return vespalib::make_string("%s/%s",
- baseDir.c_str(),
- snapDir.c_str());
-}
-
-vespalib::string
AttributeDiskLayout::getAttributeBaseDir(const vespalib::string &baseDir,
const vespalib::string &attrName)
{
@@ -73,139 +65,13 @@ AttributeDiskLayout::getAttributeFileName(const vespalib::string &baseDir,
attrName);
}
-bool
-AttributeDiskLayout::removeOldSnapshots(IndexMetaInfo &snapInfo,
- vespalib::Lock &snapInfoLock)
-{
- IndexMetaInfo::Snapshot best = snapInfo.getBestSnapshot();
- if (!best.valid) {
- return true;
- }
- std::vector<IndexMetaInfo::Snapshot> toRemove;
- const IndexMetaInfo::SnapshotList & list = snapInfo.snapshots();
- for (const auto &snap : list) {
- if (!(snap == best)) {
- toRemove.push_back(snap);
- }
- }
- LOG(debug,
- "About to remove %zu old snapshots. "
- "Will keep best snapshot with sync token %" PRIu64,
- toRemove.size(),
- best.syncToken);
- for (const auto &snap : toRemove) {
- if (snap.valid) {
- {
- vespalib::LockGuard guard(snapInfoLock);
- snapInfo.invalidateSnapshot(snap.syncToken);
- }
- if (!snapInfo.save()) {
- LOG(warning,
- "Could not save meta info file in directory '%s' after "
- "invalidating snapshot with sync token %" PRIu64,
- snapInfo.getPath().c_str(),
- snap.syncToken);
- return false;
- }
- }
- vespalib::string rmDir =
- getSnapshotRemoveDir(snapInfo.getPath(), snap.dirName);
- FastOS_StatInfo statInfo;
- if (!FastOS_File::Stat(rmDir.c_str(), &statInfo) &&
- statInfo._error == FastOS_StatInfo::FileNotFound)
- {
- // Directory already removed
- } else {
- FastOS_FileInterface:: EmptyAndRemoveDirectory(rmDir.c_str());
-#if 0
- LOG(warning,
- "Could not remove snapshot directory '%s'",
- rmDir.c_str());
- return false;
-#endif
- }
- {
- vespalib::LockGuard guard(snapInfoLock);
- snapInfo.removeSnapshot(snap.syncToken);
- }
- if (!snapInfo.save()) {
- LOG(warning,
- "Could not save meta info file in directory '%s' after "
- "removing snapshot with sync token %" PRIu64,
- snapInfo.getPath().c_str(), snap.syncToken);
- return false;
- }
- LOG(debug, "Removed snapshot directory '%s'", rmDir.c_str());
- }
- return true;
-}
-
-bool
-AttributeDiskLayout::removeAttribute(const vespalib::string &baseDir,
- const vespalib::string &attrName,
- uint64_t wipeSerial)
-{
- const vespalib::string currDir = getAttributeBaseDir(baseDir, attrName);
- search::IndexMetaInfo snapInfo(currDir);
- IndexMetaInfo::Snapshot best = snapInfo.getBestSnapshot();
- if (best.valid && best.syncToken >= wipeSerial) {
- return true; // Attribute has been resurrected and flushed later on
- }
- const vespalib::string rmDir =
- getAttributeBaseDir(baseDir,
- vespalib::make_string("remove.%s",
- attrName.c_str()));
-
- FastOS_StatInfo statInfo;
- if (FastOS_File::Stat(rmDir.c_str(), &statInfo) && statInfo._isDirectory) {
- FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str());
-#if 0
- if (!FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str())) {
- LOG(warning,
- "Could not remove attribute directory '%s'",
- rmDir.c_str());
- return false;
- }
-#endif
- }
- if (!FastOS_File::Stat(currDir.c_str(), &statInfo) &&
- statInfo._error == FastOS_StatInfo::FileNotFound)
- {
- // Directory already removed
- return true;
- }
- if (!FastOS_FileInterface::MoveFile(currDir.c_str(), rmDir.c_str())) {
- LOG(warning,
- "Could not move attribute directory '%s' to '%s'",
- currDir.c_str(),
- rmDir.c_str());
- return false;
- }
- FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str());
-#if 0
- if (!FastOS_FileInterface::EmptyAndRemoveDirectory(rmDir.c_str())) {
- LOG(warning,
- "Could not remove attribute directory '%s'",
- rmDir.c_str());
- return false;
- }
-#endif
- return true;
-}
-
std::vector<vespalib::string>
-AttributeDiskLayout::listAttributes(const vespalib::string &baseDir)
+AttributeDiskLayout::listAttributes()
{
std::vector<vespalib::string> attributes;
- FastOS_DirectoryScan dir(baseDir.c_str());
- while (dir.ReadNext()) {
- if (strcmp(dir.GetName(), "..") != 0 &&
- strcmp(dir.GetName(), ".") != 0)
- {
- if (dir.IsDirectory()) {
- attributes.emplace_back(dir.GetName());
- }
- }
+ std::shared_lock<std::shared_timed_mutex> guard(_mutex);
+ for (const auto &dir : _dirs) {
+ attributes.emplace_back(dir.first);
}
return attributes;
}
@@ -284,5 +150,12 @@ AttributeDiskLayout::create(const vespalib::string &baseDir)
return diskLayout;
}
+std::shared_ptr<AttributeDiskLayout>
+AttributeDiskLayout::createSimple(const vespalib::string &baseDir)
+{
+ auto diskLayout = std::make_shared<AttributeDiskLayout>(baseDir, PrivateConstructorTag());
+ return diskLayout;
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h
index 466f9426f56..e002d1655d3 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.h
@@ -24,9 +24,6 @@ private:
mutable std::shared_timed_mutex _mutex;
std::map<vespalib::string, std::shared_ptr<AttributeDirectory>> _dirs;
- static vespalib::string getSnapshotDir(uint64_t syncToken);
- static vespalib::string getSnapshotRemoveDir(const vespalib::string &baseDir, const vespalib::string &snapDir);
-
void scanDir();
struct PrivateConstructorTag { };
public:
@@ -34,15 +31,14 @@ public:
~AttributeDiskLayout();
static vespalib::string getAttributeBaseDir(const vespalib::string &baseDir, const vespalib::string &attrName);
static search::AttributeVector::BaseName getAttributeFileName(const vespalib::string &baseDir, const vespalib::string &attrName, uint64_t syncToken);
- static bool removeOldSnapshots(search::IndexMetaInfo &snapInfo, vespalib::Lock &snapInfoLock);
- static bool removeAttribute(const vespalib::string &baseDir, const vespalib::string &attrName, uint64_t wipeSerial);
- static std::vector<vespalib::string> listAttributes(const vespalib::string &baseDir);
+ std::vector<vespalib::string> listAttributes();
const vespalib::string &getBaseDir() const { return _baseDir; }
void createBaseDir();
std::shared_ptr<AttributeDirectory> getAttributeDir(const vespalib::string &name);
std::shared_ptr<AttributeDirectory> createAttributeDir(const vespalib::string &name);
void removeAttributeDir(const vespalib::string &name, search::SerialNum serialNum);
static std::shared_ptr<AttributeDiskLayout> create(const vespalib::string &baseDir);
+ static std::shared_ptr<AttributeDiskLayout> createSimple(const vespalib::string &baseDir);
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
index 30bd4900f64..bb48f63dda9 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
@@ -54,7 +54,7 @@ AttributeManager::addAttribute(const AttributeWrap &attribute)
if ( ! attribute.isExtra() ) {
// Flushing of extra attributes is handled elsewhere
_flushables[attribute->getName()] = FlushableAttribute::SP
- (new FlushableAttribute(attribute, _diskLayout->getBaseDir(),
+ (new FlushableAttribute(attribute, _diskLayout->createAttributeDir(attribute->getName()),
_tuneFileAttributes,
_fileHeaderContext,
_attributeFieldWriter,
@@ -104,11 +104,11 @@ AttributeManager::flushRemovedAttributes(const AttributeManager &currMgr,
if (!newSpec.hasAttribute(kv.first) &&
!kv.second.isExtra() &&
kv.second->getStatus().getLastSyncToken() <
- newSpec.getCurrentSerialNum()) {
+ newSpec.getCurrentSerialNum() - 1) {
FlushableAttribute::SP flushable =
currMgr.findFlushable(kv.first);
vespalib::Executor::Task::UP flushTask =
- flushable->initFlush(newSpec.getCurrentSerialNum());
+ flushable->initFlush(newSpec.getCurrentSerialNum() - 1);
if (flushTask.get() != NULL) {
LOG(debug, "Flushing removed attribute vector '%s' with %u docs and serial number %lu",
kv.first.c_str(), kv.second->getNumDocs(), kv.second->getStatus().getLastSyncToken());
@@ -441,12 +441,11 @@ AttributeManager::getAttributeListAll(std::vector<AttributeGuard> &list) const
void
AttributeManager::wipeHistory(search::SerialNum wipeSerial)
{
- const vespalib::string &baseDir = _diskLayout->getBaseDir();
- std::vector<vespalib::string> attributes = AttributeDiskLayout::listAttributes(baseDir);
+ std::vector<vespalib::string> attributes = _diskLayout->listAttributes();
for (const auto &attribute : attributes) {
auto itr = _attributes.find(attribute);
if (itr == _attributes.end()) {
- AttributeDiskLayout::removeAttribute(baseDir, attribute, wipeSerial);
+ _diskLayout->removeAttributeDir(attribute, wipeSerial);
}
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
index d5829a05164..3f48761a42f 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
@@ -12,6 +12,7 @@
#include <vespa/searchlib/common/serialnumfileheadercontext.h>
#include <vespa/searchlib/common/isequencedtaskexecutor.h>
#include <future>
+#include "attribute_directory.h"
#include <vespa/log/log.h>
LOG_SETUP(".proton.attribute.flushableattribute");
@@ -25,7 +26,37 @@ using vespalib::makeClosure;
namespace proton {
-FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncToken)
+/**
+ * Task performing the actual flushing to disk.
+ **/
+class FlushableAttribute::Flusher : public Task {
+private:
+ FlushableAttribute & _fattr;
+ search::AttributeMemorySaveTarget _saveTarget;
+ std::unique_ptr<search::AttributeSaver> _saver;
+ uint64_t _syncToken;
+ search::AttributeVector::BaseName _flushFile;
+
+ bool saveAttribute(); // not updating snap info.
+public:
+ Flusher(FlushableAttribute & fattr, uint64_t syncToken, AttributeDirectory::Writer &writer);
+ ~Flusher();
+ uint64_t getSyncToken() const { return _syncToken; }
+ bool flush(AttributeDirectory::Writer &writer);
+ void updateStats();
+ bool cleanUp(AttributeDirectory::Writer &writer);
+ // Implements vespalib::Executor::Task
+ virtual void run();
+
+ virtual SerialNum
+ getFlushSerial(void) const
+ {
+ return _syncToken;
+ }
+};
+
+
+FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncToken, AttributeDirectory::Writer &writer)
: _fattr(fattr),
_saveTarget(),
_saver(),
@@ -38,9 +69,7 @@ FlushableAttribute::Flusher::Flusher(FlushableAttribute & fattr, SerialNum syncT
if (attr.canShrinkLidSpace()) {
attr.shrinkLidSpace();
}
- _flushFile = AttributeDiskLayout::getAttributeFileName(_fattr._baseDir,
- attr.getName(),
- _syncToken);
+ _flushFile = writer.getSnapshotDir(_syncToken) + "/" + attr.getName();
attr.setBaseFileName(_flushFile);
_saver = attr.initSave();
if (!_saver) {
@@ -55,18 +84,6 @@ FlushableAttribute::Flusher::~Flusher()
}
bool
-FlushableAttribute::Flusher::saveSnapInfo()
-{
- if (!_fattr._snapInfo.save()) {
- LOG(warning,
- "Could not save meta-info file for attribute vector '%s' to disk",
- _fattr._attr->getBaseFileName().c_str());
- return false;
- }
- return true;
-}
-
-bool
FlushableAttribute::Flusher::saveAttribute()
{
vespalib::mkdir(_flushFile.getDirName(), false);
@@ -93,31 +110,16 @@ FlushableAttribute::Flusher::saveAttribute()
}
bool
-FlushableAttribute::Flusher::flush()
+FlushableAttribute::Flusher::flush(AttributeDirectory::Writer &writer)
{
- IndexMetaInfo::Snapshot newSnap(false, _syncToken,
- _flushFile.getSnapshotName());
- {
- vespalib::LockGuard guard(_fattr._snapInfoLock);
- _fattr._snapInfo.addSnapshot(newSnap);
- }
- if (!saveSnapInfo()) {
- return false;
- }
+ writer.createInvalidSnapshot(_syncToken);
if (!saveAttribute()) {
LOG(warning, "Could not write attribute vector '%s' to disk",
_flushFile.c_str());
return false;
}
- {
- vespalib::LockGuard guard(_fattr._snapInfoLock);
- _fattr._snapInfo.validateSnapshot(_syncToken);
- }
- if (!saveSnapInfo()) {
- return false;
- }
- _fattr._lastFlushTime =
- search::FileKit::getModificationTime(_flushFile.getDirName());
+ writer.markValidSnapshot(_syncToken);
+ writer.setLastFlushTime(search::FileKit::getModificationTime(_flushFile.getDirName()));
return true;
}
@@ -128,17 +130,11 @@ FlushableAttribute::Flusher::updateStats()
}
bool
-FlushableAttribute::Flusher::cleanUp()
+FlushableAttribute::Flusher::cleanUp(AttributeDirectory::Writer &writer)
{
if (_fattr._cleanUpAfterFlush) {
- if (!AttributeDiskLayout::removeOldSnapshots(_fattr._snapInfo,
- _fattr._snapInfoLock)) {
- LOG(warning,
- "Encountered problems when removing old snapshot directories"
- "after flushing attribute vector '%s' to disk",
- _fattr._attr->getBaseFileName().c_str());
- return false;
- }
+ writer.invalidateOldSnapshots();
+ writer.removeInvalidSnapshots(false);
}
return true;
}
@@ -146,23 +142,23 @@ FlushableAttribute::Flusher::cleanUp()
void
FlushableAttribute::Flusher::run()
{
- vespalib::LockGuard guard(_fattr._flusherLock);
- if (_syncToken <= _fattr.getFlushedSerialNum()) {
+ auto writer = _fattr._attrDir->getWriter();
+ if (!writer || _syncToken <= _fattr.getFlushedSerialNum()) {
// another flusher has created an equal or better snapshot
// after this flusher was created
return;
}
- if (!flush()) {
+ if (!flush(*writer)) {
// TODO (geirst): throw exception ?
}
updateStats();
- if (!cleanUp()) {
+ if (!cleanUp(*writer)) {
// TODO (geirst): throw exception ?
}
}
FlushableAttribute::FlushableAttribute(const AttributeVector::SP attr,
- const vespalib::string & baseDir,
+ const std::shared_ptr<AttributeDirectory> &attrDir,
const TuneFileAttributes &
tuneFileAttributes,
const FileHeaderContext &
@@ -175,28 +171,14 @@ FlushableAttribute::FlushableAttribute(const AttributeVector::SP attr,
attr->getName().c_str()),
Type::SYNC, Component::ATTRIBUTE),
_attr(attr),
- _baseDir(baseDir),
- _snapInfo(AttributeDiskLayout::getAttributeBaseDir(baseDir,
- attr->getName())),
- _snapInfoLock(),
- _flusherLock(),
_cleanUpAfterFlush(true),
_lastStats(),
_tuneFileAttributes(tuneFileAttributes),
_fileHeaderContext(fileHeaderContext),
- _lastFlushTime(),
_attributeFieldWriter(attributeFieldWriter),
- _hwInfo(hwInfo)
+ _hwInfo(hwInfo),
+ _attrDir(attrDir)
{
- if (!_snapInfo.load()) {
- _snapInfo.save();
- } else {
- vespalib::string dirName =
- AttributeDiskLayout::getAttributeFileName(_baseDir,
- _attr->getName(),
- getFlushedSerialNum()).getDirName();
- _lastFlushTime = search::FileKit::getModificationTime(dirName);
- }
_lastStats.setPathElementsToLog(8);
}
@@ -209,9 +191,7 @@ FlushableAttribute::~FlushableAttribute()
IFlushTarget::SerialNum
FlushableAttribute::getFlushedSerialNum() const
{
- vespalib::LockGuard guard(_snapInfoLock);
- IndexMetaInfo::Snapshot bestSnap = _snapInfo.getBestSnapshot();
- return bestSnap.valid ? bestSnap.syncToken : 0;
+ return _attrDir->getFlushedSerialNum();
}
IFlushTarget::MemoryGain
@@ -253,34 +233,36 @@ FlushableAttribute::getApproxDiskGain() const
IFlushTarget::Time
FlushableAttribute::getLastFlushTime() const
{
- return _lastFlushTime;
+ return _attrDir->getLastFlushTime();
}
IFlushTarget::Task::UP
FlushableAttribute::internalInitFlush(SerialNum currentSerial)
{
- // Called by document db executor
- (void)currentSerial;
+ // Called by attribute field writer thread while document db executor waits
_attr->removeAllOldGenerations();
- SerialNum syncToken = currentSerial;
- syncToken = std::max(currentSerial,
- _attr->getStatus().getLastSyncToken());
+ SerialNum syncToken = std::max(currentSerial,
+ _attr->getStatus().getLastSyncToken());
+ auto writer = _attrDir->tryGetWriter();
+ if (!writer) {
+ return Task::UP();
+ }
if (syncToken <= getFlushedSerialNum()) {
- vespalib::LockGuard guard(_flusherLock);
- _lastFlushTime = fastos::ClockSystem::now();
+ writer->setLastFlushTime(fastos::ClockSystem::now());
LOG(debug,
"No attribute vector to flush."
" Update flush time to current: lastFlushTime(%f)",
- _lastFlushTime.sec());
+ getLastFlushTime().sec());
return Task::UP();
}
- return Task::UP(new Flusher(*this, syncToken));
+ return Task::UP(new Flusher(*this, syncToken, *writer));
}
IFlushTarget::Task::UP
FlushableAttribute::initFlush(SerialNum currentSerial)
{
+ // Called by document db executor
std::promise<IFlushTarget::Task::UP> promise;
std::future<IFlushTarget::Task::UP> future = promise.get_future();
_attributeFieldWriter.execute(_attr->getName(),
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
index 7ef3235b0d8..e72ec166453 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
@@ -22,6 +22,8 @@ namespace proton {
using searchcorespi::FlushStats;
using searchcorespi::IFlushTarget;
+class AttributeDirectory;
+
/**
* Implementation of IFlushTarget interface for attribute vectors.
*/
@@ -31,45 +33,16 @@ private:
/**
* Task performing the actual flushing to disk.
**/
- class Flusher : public Task {
- private:
- FlushableAttribute & _fattr;
- search::AttributeMemorySaveTarget _saveTarget;
- std::unique_ptr<search::AttributeSaver> _saver;
- uint64_t _syncToken;
- search::AttributeVector::BaseName _flushFile;
-
- bool saveAttribute(); // not updating snap info.
- public:
- Flusher(FlushableAttribute & fattr, uint64_t syncToken);
- ~Flusher();
- uint64_t getSyncToken() const { return _syncToken; }
- bool saveSnapInfo();
- bool flush();
- void updateStats();
- bool cleanUp();
- // Implements vespalib::Executor::Task
- virtual void run();
-
- virtual SerialNum
- getFlushSerial(void) const
- {
- return _syncToken;
- }
- };
+ class Flusher;
search::AttributeVector::SP _attr;
- vespalib::string _baseDir;
- search::IndexMetaInfo _snapInfo;
- vespalib::Lock _snapInfoLock;
- vespalib::Lock _flusherLock;
bool _cleanUpAfterFlush;
FlushStats _lastStats;
const search::TuneFileAttributes _tuneFileAttributes;
const search::common::FileHeaderContext &_fileHeaderContext;
- fastos::TimeStamp _lastFlushTime;
search::ISequencedTaskExecutor &_attributeFieldWriter;
HwInfo _hwInfo;
+ std::shared_ptr<AttributeDirectory> _attrDir;
Task::UP internalInitFlush(SerialNum currentSerial);
@@ -83,7 +56,7 @@ public:
* fileHeaderContext must be kept alive by caller.
**/
FlushableAttribute(const search::AttributeVector::SP attr,
- const vespalib::string & baseDir,
+ const std::shared_ptr<AttributeDirectory> &attrDir,
const search::TuneFileAttributes &tuneFileAttributes,
const search::common::FileHeaderContext &
fileHeaderContext,
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
index b189b176b80..d97f90ebc2b 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
@@ -17,6 +17,8 @@ LOG_SETUP(".proton.documentmetastore.documentmetastoreflushtarget");
#include <fstream>
#include <vespa/searchlib/common/serialnumfileheadercontext.h>
#include <vespa/searchcore/proton/server/itlssyncer.h>
+#include <vespa/searchcore/proton/attribute/attributedisklayout.h>
+#include <vespa/searchcore/proton/attribute/attribute_directory.h>
using namespace search;
using namespace vespalib;
@@ -27,9 +29,38 @@ using vespalib::makeClosure;
namespace proton {
+/**
+ * Task performing the actual flushing to disk.
+ **/
+class DocumentMetaStoreFlushTarget::Flusher : public Task {
+private:
+ DocumentMetaStoreFlushTarget &_dmsft;
+ std::unique_ptr<search::AttributeSaver> _saver;
+ uint64_t _syncToken;
+ vespalib::string _flushDir;
+
+ bool saveDocumentMetaStore(); // not updating snap info.
+public:
+ Flusher(DocumentMetaStoreFlushTarget &dmsft, uint64_t syncToken, AttributeDirectory::Writer &writer);
+ ~Flusher();
+ uint64_t getSyncToken() const { return _syncToken; }
+ bool saveSnapInfo();
+ bool flush(AttributeDirectory::Writer &writer);
+ void updateStats();
+ bool cleanUp(AttributeDirectory::Writer &writer);
+ // Implements vespalib::Executor::Task
+ virtual void run();
+
+ virtual SerialNum
+ getFlushSerial(void) const
+ {
+ return _syncToken;
+ }
+};
+
DocumentMetaStoreFlushTarget::Flusher::
Flusher(DocumentMetaStoreFlushTarget &dmsft,
- SerialNum syncToken)
+ SerialNum syncToken, AttributeDirectory::Writer &writer)
: _dmsft(dmsft),
_saver(),
_syncToken(syncToken),
@@ -40,7 +71,7 @@ Flusher(DocumentMetaStoreFlushTarget &dmsft,
if (dms.canShrinkLidSpace()) {
dms.shrinkLidSpace();
}
- _flushDir = _dmsft.getSnapshotDir(syncToken);
+ _flushDir = writer.getSnapshotDir(syncToken);
vespalib::string newBaseFileName(_flushDir + "/" + dms.getName());
dms.setBaseFileName(newBaseFileName);
_saver = dms.initSave();
@@ -53,19 +84,6 @@ DocumentMetaStoreFlushTarget::Flusher::~Flusher()
}
bool
-DocumentMetaStoreFlushTarget::Flusher::saveSnapInfo()
-{
- if (!_dmsft._snapInfo.save()) {
- LOG(warning,
- "Could not save meta-info file for document meta store"
- " '%s' to disk",
- _dmsft._dms->getBaseFileName().c_str());
- return false;
- }
- return true;
-}
-
-bool
DocumentMetaStoreFlushTarget::Flusher::saveDocumentMetaStore()
{
vespalib::mkdir(_flushDir, false);
@@ -90,17 +108,9 @@ DocumentMetaStoreFlushTarget::Flusher::saveDocumentMetaStore()
}
bool
-DocumentMetaStoreFlushTarget::Flusher::flush()
+DocumentMetaStoreFlushTarget::Flusher::flush(AttributeDirectory::Writer &writer)
{
- IndexMetaInfo::Snapshot newSnap(false, _syncToken,
- getSnapshotName(_syncToken));
- {
- vespalib::LockGuard guard(_dmsft._snapInfoLock);
- _dmsft._snapInfo.addSnapshot(newSnap);
- }
- if (!saveSnapInfo()) {
- return false;
- }
+ writer.createInvalidSnapshot(_syncToken);
if (!saveDocumentMetaStore()) {
LOG(warning, "Could not write document meta store '%s' to disk",
_dmsft._dms->getBaseFileName().c_str());
@@ -111,16 +121,14 @@ DocumentMetaStoreFlushTarget::Flusher::flush()
* flush is activated to ensure that same future will occur that has
* already been observable in the saved document meta store (future
* timestamp or bucket id).
+ *
+ * Assume only flush engine tries to flush document meta store, i.e.
+ * noone else tries to get writer while flush task is flushing
+ * document meta store to disk.
*/
_dmsft._tlsSyncer.sync();
- {
- vespalib::LockGuard guard(_dmsft._snapInfoLock);
- _dmsft._snapInfo.validateSnapshot(_syncToken);
- }
- if (!saveSnapInfo()) {
- return false;
- }
- _dmsft._lastFlushTime = search::FileKit::getModificationTime(_flushDir);
+ writer.markValidSnapshot(_syncToken);
+ writer.setLastFlushTime(search::FileKit::getModificationTime(_flushDir));
return true;
}
@@ -131,17 +139,11 @@ DocumentMetaStoreFlushTarget::Flusher::updateStats()
}
bool
-DocumentMetaStoreFlushTarget::Flusher::cleanUp()
+DocumentMetaStoreFlushTarget::Flusher::cleanUp(AttributeDirectory::Writer &writer)
{
if (_dmsft._cleanUpAfterFlush) {
- if (!AttributeDiskLayout::removeOldSnapshots(_dmsft._snapInfo,
- _dmsft._snapInfoLock)) {
- LOG(warning,
- "Encountered problems when removing old snapshot directories"
- "after flushing document meta store '%s' to disk",
- _dmsft._dms->getBaseFileName().c_str());
- return false;
- }
+ writer.invalidateOldSnapshots();
+ writer.removeInvalidSnapshots(false);
}
return true;
}
@@ -149,17 +151,17 @@ DocumentMetaStoreFlushTarget::Flusher::cleanUp()
void
DocumentMetaStoreFlushTarget::Flusher::run()
{
- vespalib::LockGuard guard(_dmsft._flusherLock);
- if (_syncToken <= _dmsft.getFlushedSerialNum()) {
+ auto writer = _dmsft._dmsDir->getWriter();
+ if (!writer || _syncToken <= _dmsft.getFlushedSerialNum()) {
// another flusher has created an equal or better snapshot
// after this flusher was created
return;
}
- if (!flush()) {
+ if (!flush(*writer)) {
// TODO (geirst): throw exception ?
}
updateStats();
- if (!cleanUp()) {
+ if (!cleanUp(*writer)) {
// TODO (geirst): throw exception ?
}
}
@@ -175,23 +177,15 @@ DocumentMetaStoreFlushTarget(const DocumentMetaStore::SP dms,
_dms(dms),
_tlsSyncer(tlsSyncer),
_baseDir(baseDir),
- _snapInfo(_baseDir),
- _snapInfoLock(),
- _flusherLock(),
_cleanUpAfterFlush(true),
_lastStats(),
_tuneFileAttributes(tuneFileAttributes),
_fileHeaderContext(fileHeaderContext),
- _lastFlushTime(),
- _hwInfo(hwInfo)
+ _hwInfo(hwInfo),
+ _diskLayout(AttributeDiskLayout::createSimple(baseDir)),
+ _dmsDir(_diskLayout->createAttributeDir(""))
{
- if (!_snapInfo.load()) {
- _snapInfo.save();
- } else {
- vespalib::string dirName(getSnapshotDir(getFlushedSerialNum()));
- _lastFlushTime = search::FileKit::getModificationTime(dirName);
- }
_lastStats.setPathElementsToLog(8);
}
@@ -201,28 +195,10 @@ DocumentMetaStoreFlushTarget::~DocumentMetaStoreFlushTarget()
}
-vespalib::string
-DocumentMetaStoreFlushTarget::getSnapshotName(uint64_t syncToken)
-{
- return vespalib::make_string("snapshot-%" PRIu64, syncToken);
-}
-
-
-vespalib::string
-DocumentMetaStoreFlushTarget::getSnapshotDir(uint64_t syncToken)
-{
- return vespalib::make_string("%s/%s",
- _baseDir.c_str(),
- getSnapshotName(syncToken).c_str());
-}
-
-
IFlushTarget::SerialNum
DocumentMetaStoreFlushTarget::getFlushedSerialNum() const
{
- vespalib::LockGuard guard(_snapInfoLock);
- IndexMetaInfo::Snapshot bestSnap = _snapInfo.getBestSnapshot();
- return bestSnap.valid ? bestSnap.syncToken : 0;
+ return _dmsDir->getFlushedSerialNum();
}
@@ -255,7 +231,7 @@ DocumentMetaStoreFlushTarget::getApproxDiskGain() const
IFlushTarget::Time
DocumentMetaStoreFlushTarget::getLastFlushTime() const
{
- return _lastFlushTime;
+ return _dmsDir->getLastFlushTime();
}
@@ -263,21 +239,22 @@ IFlushTarget::Task::UP
DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial)
{
// Called by document db executor
- (void)currentSerial;
_dms->removeAllOldGenerations();
- SerialNum syncToken = currentSerial;
- syncToken = std::max(currentSerial,
- _dms->getStatus().getLastSyncToken());
+ SerialNum syncToken = std::max(currentSerial,
+ _dms->getStatus().getLastSyncToken());
+ auto writer = _dmsDir->tryGetWriter();
+ if (!writer) {
+ return Task::UP();
+ }
if (syncToken <= getFlushedSerialNum()) {
- vespalib::LockGuard guard(_flusherLock);
- _lastFlushTime = fastos::ClockSystem::now();
+ writer->setLastFlushTime(fastos::ClockSystem::now());
LOG(debug,
"No document meta store to flush."
" Update flush time to current: lastFlushTime(%f)",
- _lastFlushTime.sec());
+ getLastFlushTime().sec());
return Task::UP();
}
- return Task::UP(new Flusher(*this, syncToken));
+ return Task::UP(new Flusher(*this, syncToken, *writer));
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
index d26c55906e2..f6ea0613e7f 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
@@ -25,6 +25,8 @@ namespace proton
{
class ITlsSyncer;
+class AttributeDiskLayout;
+class AttributeDirectory;
using searchcorespi::FlushStats;
using searchcorespi::IFlushTarget;
@@ -38,50 +40,18 @@ private:
/**
* Task performing the actual flushing to disk.
**/
- class Flusher : public Task {
- private:
- DocumentMetaStoreFlushTarget &_dmsft;
- std::unique_ptr<search::AttributeSaver> _saver;
- uint64_t _syncToken;
- vespalib::string _flushDir;
-
- bool saveDocumentMetaStore(); // not updating snap info.
- public:
- Flusher(DocumentMetaStoreFlushTarget &dmsft, uint64_t syncToken);
- ~Flusher();
- uint64_t getSyncToken() const { return _syncToken; }
- bool saveSnapInfo();
- bool flush();
- void updateStats();
- bool cleanUp();
- // Implements vespalib::Executor::Task
- virtual void run();
-
- virtual SerialNum
- getFlushSerial(void) const
- {
- return _syncToken;
- }
- };
+ class Flusher;
DocumentMetaStore::SP _dms;
ITlsSyncer &_tlsSyncer;
vespalib::string _baseDir;
- search::IndexMetaInfo _snapInfo;
- vespalib::Lock _snapInfoLock;
- vespalib::Lock _flusherLock;
bool _cleanUpAfterFlush;
FlushStats _lastStats;
const search::TuneFileAttributes _tuneFileAttributes;
const search::common::FileHeaderContext &_fileHeaderContext;
- fastos::TimeStamp _lastFlushTime;
HwInfo _hwInfo;
-
- static vespalib::string
- getSnapshotName(uint64_t syncToken);
-
- vespalib::string
- getSnapshotDir(uint64_t syncToken);
+ std::shared_ptr<AttributeDiskLayout> _diskLayout;
+ std::shared_ptr<AttributeDirectory> _dmsDir;
public:
typedef std::shared_ptr<DocumentMetaStoreFlushTarget> SP;