summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-14 13:51:39 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-14 13:51:39 +0200
commitbeb641c2a972af3c992ce46407297e87c3d8e193 (patch)
treefd095454748a433b28ce5b2a86e2c2874e87fbe2
parente165d6b5512922999b91f78054d3070e33307c8b (diff)
Use multiple threads in decompress due to more expensive zstd
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.cpp77
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.h4
-rw-r--r--searchlib/src/vespa/searchlib/docstore/lid_info.h2
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.h2
5 files changed, 63 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
index 2a748a302c6..c823bacb10c 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
@@ -10,8 +10,11 @@
#include <vespa/vespalib/util/array.hpp>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/searchlib/util/filekit.h>
+#include <vespa/searchlib/common/lambdatask.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/fastos/file.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".search.filechunk");
@@ -296,35 +299,65 @@ FileChunk::getModificationTime() const
return _modificationTime;
}
+namespace {
+
+using FutureChunk = std::future<Chunk::UP>;
+
+struct FixedParams {
+ const IGetLid & db;
+ IWriteData & dest;
+ const vespalib::GenerationHandler::Guard & lidReadGuard;
+ uint32_t fileId;
+ IFileChunkVisitorProgress *visitorProgress;
+};
+
void
-FileChunk::appendTo(const IGetLid & db, IWriteData & dest,
- uint32_t numChunks,
- IFileChunkVisitorProgress *visitorProgress)
+appendChunks(FixedParams * args, Chunk::UP chunk)
+{
+ const Chunk::LidList ll(chunk->getUniqueLids());
+ for (const Chunk::Entry & e : ll) {
+ LidInfo lidInfo(args->fileId, chunk->getId(), e.netSize());
+ if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) {
+ vespalib::LockGuard guard(args->db.getLidGuard(e.getLid()));
+ if (args->db.getLid(args->lidReadGuard, e.getLid()) == lidInfo) {
+ // I am still in use so I need to taken care of.
+ vespalib::ConstBufferRef data(chunk->getLid(e.getLid()));
+ args->dest.write(guard, chunk->getId(), e.getLid(), data.c_str(), data.size());
+ }
+ }
+ }
+ if (args->visitorProgress != NULL) {
+ args->visitorProgress->updateProgress();
+ }
+}
+
+}
+
+
+void
+FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
+ uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress)
{
assert(frozen() || visitorProgress);
vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard());
assert(numChunks <= getNumChunks());
+ FixedParams fixedParams = {db, dest, lidReadGuard, getFileId().getId(), visitorProgress};
+ vespalib::ThreadStackExecutor singleExecutor(1, 64*1024);
for (size_t chunkId(0); chunkId < numChunks; chunkId++) {
- const ChunkInfo & cInfo(_chunkInfo[chunkId]);
- vespalib::DataBuffer whole(0ul, ALIGNMENT);
- FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize()));
- Chunk chunk(chunkId, whole.getData(), whole.getDataLen());
- const Chunk::LidList ll(chunk.getUniqueLids());
- for (const Chunk::Entry & e : ll) {
- LidInfo lidInfo(getFileId().getId(), chunk.getId(), e.netSize());
- if (db.getLid(lidReadGuard, e.getLid()) == lidInfo) {
- vespalib::LockGuard guard(db.getLidGuard(e.getLid()));
- if (db.getLid(lidReadGuard, e.getLid()) == lidInfo) {
- // I am still in use so I need to taken care of.
- vespalib::ConstBufferRef data(chunk.getLid(e.getLid()));
- dest.write(guard, chunk.getId(), e.getLid(), data.c_str(), data.size());
- }
- }
- }
- if (visitorProgress != NULL) {
- visitorProgress->updateProgress();
- }
+ std::promise<Chunk::UP> promisedChunk;
+ std::future<Chunk::UP> futureChunk = promisedChunk.get_future();
+ executor.execute(makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable {
+ const ChunkInfo & cInfo(_chunkInfo[chunkId]);
+ vespalib::DataBuffer whole(0ul, ALIGNMENT);
+ FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize()));
+ promise.set_value(std::make_unique<Chunk>(chunkId, whole.getData(), whole.getDataLen()));
+ }));
+
+ singleExecutor.execute(makeLambdaTask([args = &fixedParams, chunk = std::move(futureChunk)]() mutable {
+ appendChunks(args, chunk.get());
+ }));
}
+ singleExecutor.sync();
dest.close();
}
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h
index 326a185f9de..a7b6556a0a3 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h
@@ -18,6 +18,7 @@ class FastOS_FileInterface;
namespace vespalib {
class DataBuffer;
class GenericHeader;
+ class Executor;
}
namespace search {
@@ -161,7 +162,8 @@ public:
virtual bool frozen() const { return true; }
const vespalib::string & getName() const { return _name; }
void compact(const IGetLid & iGetLid);
- void appendTo(const IGetLid & db, IWriteData & dest, uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress);
+ void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
+ uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress);
/**
* Must be called after chunk has been created to allow correct
* underlying file object to be created. Must be called before
diff --git a/searchlib/src/vespa/searchlib/docstore/lid_info.h b/searchlib/src/vespa/searchlib/docstore/lid_info.h
index e86a86bce49..10ddd868c41 100644
--- a/searchlib/src/vespa/searchlib/docstore/lid_info.h
+++ b/searchlib/src/vespa/searchlib/docstore/lid_info.h
@@ -79,7 +79,7 @@ public:
using Guard = vespalib::GenerationHandler::Guard;
virtual ~IGetLid() { }
- virtual LidInfo getLid(Guard & guard, uint32_t lid) const = 0;
+ virtual LidInfo getLid(const Guard & guard, uint32_t lid) const = 0;
virtual vespalib::LockGuard getLidGuard(uint32_t lid) const = 0;
virtual Guard getLidReadGuard() const = 0;
};
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
index 024e64c0bdd..4fa4142813c 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
@@ -461,7 +461,7 @@ void LogDataStore::compactFile(FileId fileId)
compacter.reset(new docstore::Compacter(*this));
}
- fc->appendTo(*this, *compacter, fc->getNumChunks(), nullptr);
+ fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr);
if (destinationFileId.isActive()) {
flushActiveAndWait(0);
@@ -1067,7 +1067,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor,
WrapVisitorProgress wrapProgress(visitorProgress, totalChunks);
for (FileId fcId : fileChunks) {
FileChunk & fc = *_fileChunks[fcId.getId()];
- fc.appendTo(*this, wrap, fc.getNumChunks(), &wrapProgress);
+ fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress);
if (prune) {
internalFlushAll();
FileChunk::UP toDie;
@@ -1078,7 +1078,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor,
toDie->erase();
}
}
- lfc.appendTo(*this, wrap, lastChunks, &wrapProgress);
+ lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress);
if (prune) {
internalFlushAll();
}
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
index 080e6f80503..eb46e5438a9 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
@@ -176,7 +176,7 @@ public:
}
// Implements IGetLid API
- LidInfo getLid(Guard & guard, uint32_t lid) const override {
+ LidInfo getLid(const Guard & guard, uint32_t lid) const override {
(void) guard;
if (lid < getDocIdLimit()) {
return _lidInfo[lid];