summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-11 07:17:02 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-11 07:26:50 +0000
commitd93eabf248753da362bd4ac4f468612bfb8e21ed (patch)
tree7cbfde86e4357b681c3859dbd5a51eade8f60f23
parentd481454782579688bb4db794f09c29124a35d51e (diff)
Reduce visibility to avoid having to see everything.
-rw-r--r--fnet/src/vespa/fnet/task.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h17
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h11
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h70
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp16
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainconfig.h63
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h15
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp29
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h22
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h4
23 files changed, 180 insertions, 141 deletions
diff --git a/fnet/src/vespa/fnet/task.cpp b/fnet/src/vespa/fnet/task.cpp
index 0888cba6582..f5263450e6a 100644
--- a/fnet/src/vespa/fnet/task.cpp
+++ b/fnet/src/vespa/fnet/task.cpp
@@ -14,9 +14,7 @@ FNET_Task::FNET_Task(FNET_Scheduler *scheduler)
}
-FNET_Task::~FNET_Task()
-{
-}
+FNET_Task::~FNET_Task() = default;
void
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp
index 85644c2111a..348adf5bf41 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp
@@ -17,7 +17,7 @@ TransLogServerMetrics::DomainMetrics::DomainMetrics(metrics::MetricSet *parent,
{
}
-TransLogServerMetrics::DomainMetrics::~DomainMetrics() {}
+TransLogServerMetrics::DomainMetrics::~DomainMetrics() = default;
void
TransLogServerMetrics::DomainMetrics::update(const DomainInfo &stats)
@@ -66,7 +66,7 @@ TransLogServerMetrics::TransLogServerMetrics(metrics::MetricSet *parent)
{
}
-TransLogServerMetrics::~TransLogServerMetrics() { }
+TransLogServerMetrics::~TransLogServerMetrics() = default;
void
TransLogServerMetrics::update(const DomainStats &stats)
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h
index 830d643e93e..727b0c6304d 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h
@@ -3,7 +3,7 @@
#pragma once
#include <vespa/metrics/metrics.h>
-#include <vespa/searchlib/transactionlog/domain.h>
+#include <vespa/searchlib/transactionlog/domainconfig.h>
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index b297dd860ea..c40f1263324 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -22,6 +22,7 @@
#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h>
#include <vespa/searchcore/proton/matchengine/matchengine.h>
#include <vespa/searchlib/transactionlog/trans_log_server_explorer.h>
+#include <vespa/searchlib/transactionlog/translogserverapp.h>
#include <vespa/searchlib/util/fileheadertk.h>
#include <vespa/searchlib/common/packets.h>
#include <vespa/document/base/exceptions.h>
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 6b561ad5deb..55fd3594463 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -19,7 +19,6 @@
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/searchlib/engine/monitorapi.h>
-#include <vespa/searchlib/transactionlog/translogserverapp.h>
#include <vespa/vespalib/net/component_config_producer.h>
#include <vespa/vespalib/net/generic_state_handler.h>
#include <vespa/vespalib/net/json_get_handler.h>
@@ -31,7 +30,7 @@
#include <shared_mutex>
namespace vespalib { class StateServer; }
-
+namespace search::transactionlog { class TransLogServerApp; }
namespace proton {
class DiskMemUsageSampler;
@@ -52,12 +51,12 @@ class Proton : public IProtonConfigurerOwner,
public vespalib::StateExplorer
{
private:
- typedef search::transactionlog::TransLogServerApp TLS;
- typedef search::engine::MonitorRequest MonitorRequest;
- typedef search::engine::MonitorReply MonitorReply;
- typedef search::engine::MonitorClient MonitorClient;
- typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap;
- typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP;
+ using TLS = search::transactionlog::TransLogServerApp;
+ using MonitorRequest = search::engine::MonitorRequest;
+ using MonitorReply = search::engine::MonitorReply;
+ using MonitorClient = search::engine::MonitorClient;
+ using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>;
+ using ProtonConfigSP = BootstrapConfig::ProtonConfigSP;
using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
using BucketSpace = document::BucketSpace;
@@ -91,7 +90,7 @@ private:
MetricsUpdateHook _metricsHook;
std::unique_ptr<MetricsEngine> _metricsEngine;
ProtonFileHeaderContext _fileHeaderContext;
- TLS::UP _tls;
+ std::unique_ptr<TLS> _tls;
std::unique_ptr<DiskMemUsageSampler> _diskMemUsageSampler;
PersistenceEngine::UP _persistenceEngine;
DocumentDBMap _documentDBMap;
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index b7eb56d1fd9..a20e0cc3aaa 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -256,7 +256,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size())));
EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())),
std::runtime_error,
- "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3).");
+ "commit failed with code -2. server says: Exception during commit on " + name + " : Incoming serial number(1) must be bigger than the last one (3).");
EXPECT_EQUAL(a.size(), 1u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 1u);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
index 4ead34552bd..5dca84a26c1 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(searchlib_transactionlog OBJECT
chunks.cpp
common.cpp
domain.cpp
+ domainconfig.cpp
domainpart.cpp
ichunk.cpp
nosyncproxy.cpp
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 0deceb2668a..5cb1d67d525 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -90,8 +90,17 @@ int makeDirectory(const char * dir);
class Writer {
public:
using DoneCallback = std::shared_ptr<IDestructorCallback>;
- virtual ~Writer() { }
+ virtual ~Writer() = default;
virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0;
};
+class Destination {
+public:
+ virtual ~Destination() = default;
+ virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
+ virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
+ virtual bool connected() const = 0;
+ virtual bool ok() const = 0;
+};
+
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 6b47393336a..aaf70964068 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "domain.h"
+#include "domainpart.h"
+#include "session.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/io/fileutil.h>
@@ -15,10 +17,11 @@
LOG_SETUP(".transactionlog.domain");
using vespalib::string;
-using vespalib::make_string;
+using vespalib::make_string_short::fmt;
using vespalib::LockGuard;
using vespalib::makeTask;
using vespalib::makeClosure;
+using vespalib::makeLambdaTask;
using vespalib::Monitor;
using vespalib::MonitorGuard;
using search::common::FileHeaderContext;
@@ -29,14 +32,6 @@ namespace search::transactionlog {
VESPA_THREAD_STACK_TAG(domain_commit_executor);
-DomainConfig::DomainConfig()
- : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
- _compressionLevel(9),
- _partSizeLimit(0x10000000), // 256M
- _chunkSizeLimit(0x40000), // 256k
- _chunkAgeLimit(10ms)
-{ }
-
Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext)
: _config(cfg),
@@ -60,10 +55,10 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
- throw runtime_error(make_string("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno));
+ throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno));
}
if ((retval = makeDirectory(dir().c_str())) != 0) {
- throw runtime_error(make_string("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno));
+ throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno));
}
SerialNumList partIdVector = scanDir();
const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back();
@@ -356,7 +351,7 @@ Domain::erase(SerialNum to)
int
Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
- std::unique_ptr<Session::Destination> dest)
+ std::unique_ptr<Destination> dest)
{
assert(this == domain.get());
cleanSessions();
@@ -442,7 +437,7 @@ Domain::scanDir()
continue;
const char *p = ename + wantPrefixLen + 1;
uint64_t num = strtoull(p, NULL, 10);
- string checkName = make_string("%s-%016" PRIu64, _name.c_str(), num);
+ string checkName = fmt("%s-%016" PRIu64, _name.c_str(), num);
if (strcmp(checkName.c_str(), ename) != 0)
continue;
res.push_back(static_cast<SerialNum>(num));
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index a6c6dad5fe8..9c8b578a2c9 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -1,70 +1,23 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "domainpart.h"
-#include "session.h"
+#include "domainconfig.h"
+#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadexecutor.h>
-#include <vespa/vespalib/util/time.h>
-#include <vespa/fastos/thread.h>
-#include <chrono>
+#include <atomic>
+namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {
-class DomainConfig {
-public:
- using duration = vespalib::duration;
- DomainConfig();
- DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
- DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
- DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
- DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; }
- DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
- Encoding getEncoding() const { return _encoding; }
- size_t getPartSizeLimit() const { return _partSizeLimit; }
- size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
- duration getChunkAgeLimit() const { return _chunkAgeLimit; }
- uint8_t getCompressionlevel() const { return _compressionLevel; }
-private:
- Encoding _encoding;
- uint8_t _compressionLevel;
- size_t _partSizeLimit;
- size_t _chunkSizeLimit;
- duration _chunkAgeLimit;
-};
-
-struct PartInfo {
- SerialNumRange range;
- size_t numEntries;
- size_t byteSize;
- vespalib::string file;
- PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
- : range(range_in),
- numEntries(numEntries_in),
- byteSize(byteSize_in),
- file(file_in)
- {}
-};
-
-struct DomainInfo {
- using DurationSeconds = std::chrono::duration<double>;
- SerialNumRange range;
- size_t numEntries;
- size_t byteSize;
- DurationSeconds maxSessionRunTime;
- std::vector<PartInfo> parts;
- DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in)
- : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {}
- DomainInfo()
- : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {}
-};
-
-typedef std::map<vespalib::string, DomainInfo> DomainStats;
+class DomainPart;
+class Session;
class Domain
{
public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::SyncableThreadExecutor;
+ using DomainPartSP = std::shared_ptr<DomainPart>;
Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext);
@@ -75,12 +28,13 @@ public:
bool erase(SerialNum to);
void commit(const Packet & packet, Writer::DoneCallback onDone);
- int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest);
+ int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Destination> dest);
SerialNum begin() const;
SerialNum end() const;
SerialNum getSynced() const;
void triggerSyncNow();
+ bool commitIfStale();
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -91,7 +45,7 @@ public:
int closeSession(int sessionId);
SerialNum findOldestActiveVisit() const;
- DomainPart::SP findPart(SerialNum s);
+ DomainPartSP findPart(SerialNum s);
static vespalib::string
getDir(const vespalib::string & base, const vespalib::string & domain) {
@@ -115,8 +69,8 @@ private:
SerialNumList scanDir();
- using SessionList = std::map<int, Session::SP>;
- using DomainPartList = std::map<int64_t, DomainPart::SP>;
+ using SessionList = std::map<int, std::shared_ptr<Session>>;
+ using DomainPartList = std::map<int64_t, DomainPartSP>;
using DurationSeconds = std::chrono::duration<double>;
DomainConfig _config;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp
new file mode 100644
index 00000000000..beac8cf714b
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp
@@ -0,0 +1,16 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "domainconfig.h"
+
+namespace search::transactionlog {
+
+DomainConfig::DomainConfig()
+ : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
+ _compressionLevel(9),
+ _fSyncOnCommit(false),
+ _partSizeLimit(0x10000000), // 256M
+ _chunkSizeLimit(0x40000), // 256k
+ _chunkAgeLimit(10ms)
+{ }
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h
new file mode 100644
index 00000000000..ada1e20e095
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h
@@ -0,0 +1,63 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "ichunk.h"
+#include <vespa/vespalib/util/time.h>
+#include <map>
+
+namespace search::transactionlog {
+
+class DomainConfig {
+public:
+ using duration = vespalib::duration;
+ DomainConfig();
+ DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
+ DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
+ DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
+ DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; }
+ DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
+ DomainConfig & setFSyncOnCommit(bool v) { _fSyncOnCommit = v; return *this; }
+ Encoding getEncoding() const { return _encoding; }
+ size_t getPartSizeLimit() const { return _partSizeLimit; }
+ size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
+ duration getChunkAgeLimit() const { return _chunkAgeLimit; }
+ uint8_t getCompressionlevel() const { return _compressionLevel; }
+ bool getFSyncOnCommit() const { return _fSyncOnCommit; }
+private:
+ Encoding _encoding;
+ uint8_t _compressionLevel;
+ bool _fSyncOnCommit;
+ size_t _partSizeLimit;
+ size_t _chunkSizeLimit;
+ duration _chunkAgeLimit;
+};
+
+struct PartInfo {
+ SerialNumRange range;
+ size_t numEntries;
+ size_t byteSize;
+ vespalib::string file;
+ PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
+ : range(range_in),
+ numEntries(numEntries_in),
+ byteSize(byteSize_in),
+ file(file_in)
+ {}
+};
+
+struct DomainInfo {
+ using DurationSeconds = std::chrono::duration<double>;
+ SerialNumRange range;
+ size_t numEntries;
+ size_t byteSize;
+ DurationSeconds maxSessionRunTime;
+ std::vector<PartInfo> parts;
+ DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in)
+ : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {}
+ DomainInfo()
+ : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {}
+};
+
+using DomainStats = std::map<vespalib::string, DomainInfo>;
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 96e75c35a81..8855183226d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -409,7 +409,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
_sz++;
_range.to(entry.serial());
} else {
- throw runtime_error(fmt("Incomming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
+ throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
entry.serial(), _range.to()));
}
}
@@ -429,7 +429,8 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
}
}
-void DomainPart::sync()
+void
+DomainPart::sync()
{
SerialNum syncSerial(0);
{
@@ -449,7 +450,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
{
bool retval(false);
LockGuard guard(_lock);
- LOG(debug, "Visit r(%" PRIu64 ", %" PRIu64 "] Checking %" PRIu64 " packets",
+ LOG(spam, "Visit r(%" PRIu64 ", %" PRIu64 "] Checking %" PRIu64 " packets",
r.from(), r.to(), uint64_t(_packets.size()));
if ( ! isClosed() ) {
PacketList::const_iterator start(_packets.lower_bound(r.from() + 1));
@@ -474,7 +475,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
((next != end) || ((next != _packets.end()) && ((r.to() + 1) == next->first))))
{
packet = start->second;
- LOG(debug, "Visit whole packet[%" PRIu64 ", %" PRIu64 "]", packet.range().from(), packet.range().to());
+ LOG(spam, "Visit whole packet[%" PRIu64 ", %" PRIu64 "]", packet.range().from(), packet.range().to());
if (next != _packets.end()) {
r.from(next->first - 1);
retval = true;
@@ -484,7 +485,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
} else {
const nbostream & tmp = start->second.getHandle();
nbostream_longlivedbuf h(tmp.data(), tmp.size());
- LOG(debug, "Visit partial[%" PRIu64 ", %" PRIu64 "] (%zd, %zd, %zd)",
+ LOG(spam, "Visit partial[%" PRIu64 ", %" PRIu64 "] (%zd, %zd, %zd)",
start->second.range().from(), start->second.range().to(), h.rp(), h.size(), h.capacity());
Packet newPacket(h.size());
for (; (h.size() > 0) && (r.from() < r.to()); ) {
@@ -546,6 +547,8 @@ DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk)
if ( ! file.CheckedWrite(os.data(), os.size()) ) {
throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), chunk.range(), os.size()));
}
+ LOG(debug, "Wrote chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)",
+ chunk.getEntries().size(), os.size(), chunk.range().from(), chunk.range().to(), _encoding.getRaw(), realEncoding.getRaw());
_writtenSerial = chunk.range().to();
_byteSize.fetch_add(os.size(), std::memory_order_release);
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 5256b731125..31d6938b654 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -15,12 +15,10 @@ namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {
class DomainPart {
-private:
- DomainPart(const DomainPart &);
- DomainPart& operator=(const DomainPart &);
-
public:
- typedef std::shared_ptr<DomainPart> SP;
+ using SP = std::shared_ptr<DomainPart>;
+ DomainPart(const DomainPart &) = delete;
+ DomainPart& operator=(const DomainPart &) = delete;
DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index aa2a2085aab..722aae828db 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -3,9 +3,6 @@
#pragma once
#include "common.h"
-#include <memory>
-
-namespace vespalib { class nbostream; }
namespace search::transactionlog {
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index dda840808ce..c91b719be37 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "session.h"
#include "domain.h"
+#include "domainpart.h"
#include <vespa/fastlib/io/bufferedfile.h>
#include <vespa/log/log.h>
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 9b8d23371e8..ddbe218ed4e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -23,14 +23,6 @@ private:
using time_point = std::chrono::time_point<std::chrono::steady_clock>;
public:
- class Destination {
- public:
- virtual ~Destination() = default;
- virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
- virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
- virtual bool connected() const = 0;
- virtual bool ok() const = 0;
- };
typedef std::shared_ptr<Session> SP;
Session(const Session &) = delete;
Session & operator = (const Session &) = delete;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp
index 1d1edbed658..bdf12ab64e8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "trans_log_server_explorer.h"
+#include "translogserver.h"
+#include "domain.h"
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/fastos/file.h>
@@ -44,6 +46,7 @@ struct DomainExplorer : vespalib::StateExplorer {
} // namespace search::transactionlog::<unnamed>
+TransLogServerExplorer::~TransLogServerExplorer() = default;
void
TransLogServerExplorer::get_state(const Inserter &inserter, bool full) const
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h
index 65d3a687bc9..66fb1698104 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h
@@ -2,24 +2,27 @@
#pragma once
-#include "translogserver.h"
#include <vespa/vespalib/net/state_explorer.h>
namespace search::transactionlog {
+class TransLogServer;
+
/**
* Class used to explore the state of a transaction log server.
*/
class TransLogServerExplorer : public vespalib::StateExplorer
{
private:
- TransLogServer::SP _server;
+ using TransLogServerSP = std::shared_ptr<TransLogServer>;
+ TransLogServerSP _server;
public:
- TransLogServerExplorer(TransLogServer::SP server) : _server(std::move(server)) {}
- virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override;
- virtual std::vector<vespalib::string> get_children_names() const override;
- virtual std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override;
+ TransLogServerExplorer(TransLogServerSP server) : _server(std::move(server)) {}
+ ~TransLogServerExplorer() override;
+ void get_state(const vespalib::slime::Inserter &inserter, bool full) const override;
+ std::vector<vespalib::string> get_children_names() const override;
+ std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index edfbf846688..93b65002492 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,9 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
+#include "domain.h"
#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/rpcrequest.h>
@@ -42,7 +42,6 @@ public:
void PerformTask() override;
};
-
SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
const TransLogServer::Session::SP &session, SerialNum syncTo)
: FNET_Task(supervisor->GetScheduler()),
@@ -53,10 +52,8 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const
{
}
-
SyncHandler::~SyncHandler() = default;
-
void
SyncHandler::PerformTask()
{
@@ -207,10 +204,16 @@ TransLogServer::run()
LOG(info, "TLS Stopped");
}
+vespalib::duration
+TransLogServer::getChunkAgeLimit() const
+{
+ Guard domainGuard(_domainMutex);
+ return _domainConfig.getChunkAgeLimit();
+}
TransLogServer &
TransLogServer::setDomainConfig(const DomainConfig & cfg) {
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
_domainConfig = cfg;
for(auto &domain: _domains) {
domain.second->setConfig(cfg);
@@ -222,7 +225,7 @@ DomainStats
TransLogServer::getDomainStats() const
{
DomainStats retval;
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
for (const auto &elem : _domains) {
retval[elem.first] = elem.second->getDomainInfo();
}
@@ -233,7 +236,7 @@ std::vector<vespalib::string>
TransLogServer::getDomainNames()
{
std::vector<vespalib::string> names;
- Guard guard(_lock);
+ Guard guard(_domainMutex);
for(const auto &domain: _domains) {
names.push_back(domain.first);
}
@@ -243,7 +246,7 @@ TransLogServer::getDomainNames()
Domain::SP
TransLogServer::findDomain(stringref domainName)
{
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
Domain::SP domain;
DomainList::iterator found(_domains.find(domainName));
if (found != _domains.end()) {
@@ -363,7 +366,7 @@ writeDomainDir(std::lock_guard<std::mutex> &guard,
vespalib::File::sync(dir);
}
-class RPCDestination : public Session::Destination {
+class RPCDestination : public Destination {
public:
RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection)
: _supervisor(supervisor), _connection(connection), _ok(true)
@@ -448,7 +451,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req)
try {
domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor,
_sessionExecutor, _domainConfig, _fileHeaderContext);
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
_domains[domain->name()] = domain;
writeDomainDir(domainGuard, dir(), domainList(), _domains);
} catch (const std::exception & e) {
@@ -477,12 +480,12 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req)
try {
if (domain) {
domain->markDeleted();
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
_domains.erase(domainName);
}
vespalib::rmdir(Domain::getDir(dir(), domainName), true);
vespalib::File::sync(dir());
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
writeDomainDir(domainGuard, dir(), domainList(), _domains);
} catch (const std::exception & e) {
msg = make_string("Failed deleting %s domain. Exception = %s", domainName, e.what());
@@ -523,7 +526,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req)
LOG(debug, "listDomains()");
vespalib::string domains;
- Guard domainGuard(_lock);
+ Guard domainGuard(_domainMutex);
for(DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) {
domains += it->second->name();
domains += "\n";
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 3f945977386..9351f3244dd 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -1,30 +1,29 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "domain.h"
+#include "domainconfig.h"
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/document/util/queue.h>
#include <vespa/fnet/frt/invokable.h>
#include <mutex>
-
class FRT_Supervisor;
class FNET_Transport;
+namespace std {class thread; }
namespace search::common { class FileHeaderContext; }
-
namespace search::transactionlog {
class TransLogServerExplorer;
+class Domain;
class TransLogServer : public document::Runnable, private FRT_Invokable, public Writer
{
public:
friend class TransLogServerExplorer;
- typedef std::unique_ptr<TransLogServer> UP;
- typedef std::shared_ptr<TransLogServer> SP;
-
+ using SP = std::shared_ptr<TransLogServer>;
+ using DomainSP = std::shared_ptr<Domain>;
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
@@ -33,8 +32,10 @@ public:
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
+ bool commitIfStale();
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
TransLogServer & setDomainConfig(const DomainConfig & cfg);
+ vespalib::duration getChunkAgeLimit() const;
class Session
{
@@ -71,13 +72,13 @@ private:
void downSession(FRT_RPCRequest *req);
std::vector<vespalib::string> getDomainNames();
- Domain::SP findDomain(vespalib::stringref name);
+ DomainSP findDomain(vespalib::stringref name);
vespalib::string dir() const { return _baseDir + "/" + _name; }
vespalib::string domainList() const { return dir() + "/" + _name + ".domains"; }
static const Session::SP & getSession(FRT_RPCRequest *req);
- using DomainList = std::map<vespalib::string, Domain::SP >;
+ using DomainList = std::map<vespalib::string, DomainSP >;
vespalib::string _name;
vespalib::string _baseDir;
@@ -87,12 +88,15 @@ private:
std::unique_ptr<FastOS_ThreadPool> _threadPool;
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _supervisor;
+ std::unique_ptr<std::thread> _staleCommitThread;
DomainList _domains;
- mutable std::mutex _lock; // Protects _domains
+ mutable std::mutex _domainMutex; // Protects _domains
+ std::condition_variable _domainCondition;
std::mutex _fileLock; // Protects the creating and deleting domains including file system operations.
document::Queue<FRT_RPCRequest *> _reqQ;
const common::FileHeaderContext &_fileHeaderContext;
using Guard = std::lock_guard<std::mutex>;
+ using MonitorGuard = std::unique_lock<std::mutex>;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index d83623661ff..12c38ab5739 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -66,7 +66,8 @@ getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
.setCompressionLevel(cfg.compression.level)
.setPartSizeLimit(cfg.filesizemax)
.setChunkSizeLimit(cfg.chunk.sizelimit)
- .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit));
+ .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit))
+ .setFSyncOnCommit(cfg.usefsync);
return dcfg;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
index d46805c105c..fb93559c29f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
@@ -23,11 +23,9 @@ private:
void configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) override ;
public:
- typedef std::unique_ptr<TransLogServerApp> UP;
-
TransLogServerApp(const config::ConfigUri & tlsConfigUri,
const common::FileHeaderContext &fileHeaderContext);
- ~TransLogServerApp();
+ ~TransLogServerApp() override;
TransLogServer::SP getTransLogServer() const;