summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-05 14:56:24 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-05 14:56:24 +0000
commitcc59a63bfdf4c58aa0c523f04ef8913c04640389 (patch)
tree3148624fd91861ce83084db576153aaab4e73214
parentf979a03306ad9d80263684d71dc25a6c66188249 (diff)
Drop uncompressed buffer immediately after compression is completed.
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h6
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h7
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h2
6 files changed, 15 insertions, 8 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index 1e9923793bd..967f8748174 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -65,6 +65,8 @@ Packet::Packet(const void * buf, size_t sz) :
}
}
+Packet::~Packet() = default;
+
void
Packet::merge(const Packet & packet)
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 98601525fdb..135681037ef 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -70,6 +70,11 @@ public:
public:
explicit Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
Packet(const void * buf, size_t sz);
+ Packet(const Packet &) = delete;
+ Packet & operator =(const Packet &) = delete;
+ Packet(Packet &&) noexcept = default;
+ Packet & operator =(Packet &&) noexcept = default;
+ ~Packet();
void add(const Entry & data);
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
const SerialNumRange & range() const { return _range; }
@@ -133,6 +138,7 @@ public:
void add(const Packet & packet, Writer::DoneCallback onDone);
size_t sizeBytes() const { return _data.sizeBytes(); }
const Packet & getPacket() const { return _data; }
+ Packet stealPacket() { return std::move(_data); }
size_t getNumCallBacks() const { return _callBacks->size(); }
Writer::CommitResult createCommitResult() const;
void setCommitDoneCallback(Writer::DoneCallback onDone) { _onCommitDone = std::move(onDone); }
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 96b94955570..bbde39a42f6 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -35,7 +35,7 @@ createCommitChunk(const DomainConfig &cfg) {
}
-Domain::Domain(const string &domainName, const string & baseDir, Executor & executor,
+Domain::Domain(const string &domainName, const string & baseDir, vespalib::Executor & executor,
const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext)
: _config(cfg),
_currentChunk(createCommitChunk(cfg)),
@@ -420,7 +420,7 @@ Domain::doCommit(const SerializedChunk & serialized) {
}
cleanSessions();
LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
- serialized.commitChunk().getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size());
+ serialized.getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index eb3d0b6b10b..8dd5164689e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -17,10 +17,9 @@ class Domain : public Writer
{
public:
using SP = std::shared_ptr<Domain>;
- using Executor = vespalib::SyncableThreadExecutor;
using DomainPartSP = std::shared_ptr<DomainPart>;
using FileHeaderContext = common::FileHeaderContext;
- Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor,
+ Domain(const vespalib::string &name, const vespalib::string &baseDir, vespalib::Executor & executor,
const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext);
~Domain() override;
@@ -85,8 +84,8 @@ private:
DomainConfig _config;
std::unique_ptr<CommitChunk> _currentChunk;
SerialNum _lastSerial;
- std::unique_ptr<Executor> _singleCommitter;
- Executor &_executor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleCommitter;
+ vespalib::Executor &_executor;
std::atomic<int> _sessionId;
std::mutex _syncMonitor;
std::condition_variable _syncCond;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index d86d9dc763c..2a4a042d967 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -141,7 +141,7 @@ SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encod
_range(_commitChunk->getPacket().range()),
_numEntries(_commitChunk->getPacket().size())
{
- const Packet & packet = _commitChunk->getPacket();
+ Packet packet = _commitChunk->stealPacket();
nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
IChunk::UP chunk = IChunk::create(encoding, compressionLevel);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index cf8d12c1feb..b8a1f519c50 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -47,7 +47,7 @@ public:
vespalib::ConstBufferRef getData() const;
SerialNumRange range() const { return _range; }
size_t getNumEntries() const { return _numEntries; }
- const CommitChunk & commitChunk() const { return *_commitChunk; }
+ size_t getNumCallBacks() const { return _commitChunk->getNumCallBacks(); }
private:
// CommitChunk is required to ensure we do not reply until committed to the TLS.
std::unique_ptr<CommitChunk> _commitChunk;