summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-06 22:13:29 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-10 16:28:01 +0200
commit961e88a3c382a2390262f390b3fb3d554d010d9a (patch)
tree19ce3dde9c1ecb58d1808240e988cffe5647065c /searchlib
parentf28c605b6ba4e2674bcef0de2a6d477370537268 (diff)
executor -> sessionExecutor and some minor syntax changes. No Sematics.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp31
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h42
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h34
4 files changed, 48 insertions, 65 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index a324e42ab24..d5c9fa5afcb 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/fastos/file.h>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.domain");
@@ -20,15 +21,11 @@ using std::runtime_error;
namespace search::transactionlog {
-Domain::Domain(const string &domainName,
- const string & baseDir,
- vespalib::ThreadStackExecutor & executor,
- uint64_t domainPartSize,
- bool useFsync,
- DomainPart::Crc defaultCrcType,
- const FileHeaderContext &fileHeaderContext) :
+Domain::Domain(const string &domainName, const string & baseDir,
+ vespalib::ThreadExecutor & sessionExecutor, uint64_t domainPartSize, bool useFsync,
+ DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) :
_defaultCrcType(defaultCrcType),
- _executor(executor),
+ _sessionExecutor(sessionExecutor),
_sessionId(1),
_useFsync(useFsync),
_syncMonitor(),
@@ -54,10 +51,10 @@ Domain::Domain(const string &domainName,
const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back();
for (const int64_t partId : partIdVector) {
if ( partId != -1) {
- _executor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart)));
+ _sessionExecutor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart)));
}
}
- _executor.sync();
+ _sessionExecutor.sync();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
_parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _useFsync, _defaultCrcType, _fileHeaderContext, false));
}
@@ -111,8 +108,7 @@ Domain::getDomainInfo() const
DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard));
for (const auto &entry: _parts) {
const DomainPart &part = *entry.second;
- info.parts.emplace_back(PartInfo(part.range(), part.size(),
- part.byteSize(), part.fileName()));
+ info.parts.emplace_back(PartInfo(part.range(), part.size(), part.byteSize(), part.fileName()));
}
return info;
}
@@ -198,7 +194,7 @@ Domain::triggerSyncNow()
if (!_pendingSync) {
_pendingSync = true;
DomainPart::SP dp(_parts.rbegin()->second);
- _executor.execute(Sync::UP(new Sync(_syncMonitor, dp, _pendingSync)));
+ _sessionExecutor.execute(Sync::UP(new Sync(_syncMonitor, dp, _pendingSync)));
}
}
@@ -302,7 +298,7 @@ void Domain::commit(const Packet & packet)
}
}
-bool Domain::erase(const SerialNum & to)
+bool Domain::erase(SerialNum to)
{
bool retval(true);
/// Do not erase the last element
@@ -320,7 +316,8 @@ bool Domain::erase(const SerialNum & to)
return retval;
}
-int Domain::visit(const Domain::SP & domain, const SerialNum & from, const SerialNum & to, FRT_Supervisor & supervisor, FNET_Connection *conn)
+int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
+ FRT_Supervisor & supervisor, FNET_Connection *conn)
{
assert(this == domain.get());
cleanSessions();
@@ -354,7 +351,7 @@ int Domain::closeSession(int sessionId)
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
retval = 1;
- _executor.sync();
+ _sessionExecutor.sync();
}
}
if (retval == 1) {
@@ -371,7 +368,7 @@ int Domain::closeSession(int sessionId)
return retval;
}
-int Domain::subscribe(const Domain::SP & domain, const SerialNum & from, FRT_Supervisor & supervisor, FNET_Connection *conn)
+int Domain::subscribe(const Domain::SP & domain, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn)
{
assert(this == domain.get());
cleanSessions();
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index f70ce7654c1..555744f3dac 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -1,9 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/searchlib/transactionlog/domainpart.h>
-#include <vespa/searchlib/transactionlog/session.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
+#include "domainpart.h"
+#include "session.h"
+#include <vespa/vespalib/util/threadexecutor.h>
namespace search::transactionlog {
@@ -36,30 +36,19 @@ class Domain
{
public:
typedef std::shared_ptr<Domain> SP;
- Domain(const vespalib::string &name,
- const vespalib::string &baseDir,
- vespalib::ThreadStackExecutor & executor,
- uint64_t domainPartSize,
- bool useFsync,
- DomainPart::Crc defaultCrcType,
- const common::FileHeaderContext &fileHeaderContext);
+ Domain(const vespalib::string &name, const vespalib::string &baseDir,
+ vespalib::ThreadExecutor & sessionExecutor, uint64_t domainPartSize, bool useFsync,
+ DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext);
virtual ~Domain();
DomainInfo getDomainInfo() const;
-
const vespalib::string & name() const { return _name; }
- bool erase(const SerialNum & to);
+ bool erase(SerialNum to);
void commit(const Packet & packet);
- int
- visit(const Domain::SP & self,
- const SerialNum & from,
- const SerialNum & to,
- FRT_Supervisor & supervisor,
- FNET_Connection *conn);
-
- int subscribe(const Domain::SP & self, const SerialNum & from, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ int subscribe(const Domain::SP & self, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn);
SerialNum begin() const;
SerialNum end() const;
@@ -82,7 +71,7 @@ public:
return base + "/" + domain;
}
vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) {
- return _executor.execute(std::move(task));
+ return _sessionExecutor.execute(std::move(task));
}
uint64_t size() const;
private:
@@ -94,16 +83,16 @@ private:
vespalib::string dir() const { return getDir(_baseDir, _name); }
void addPart(int64_t partId, bool isLastPart);
- typedef std::vector<SerialNum> SerialNumList;
+ using SerialNumList = std::vector<SerialNum>;
SerialNumList scanDir();
- typedef std::map<int, Session::SP > SessionList;
- typedef std::map<int64_t, DomainPart::SP > DomainPartList;
- typedef vespalib::ThreadStackExecutor Executor;
+ using SessionList = std::map<int, Session::SP>;
+ using DomainPartList = std::map<int64_t, DomainPart::SP>;
+ using Executor = vespalib::ThreadExecutor;
DomainPart::Crc _defaultCrcType;
- Executor & _executor;
+ Executor & _sessionExecutor;
std::atomic<int> _sessionId;
const bool _useFsync;
vespalib::Monitor _syncMonitor;
@@ -117,7 +106,6 @@ private:
vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
bool _markedDeleted;
- bool _urgentSync;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index ad5654978ae..b273f5f092a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -91,7 +91,7 @@ TransLogServer::TransLogServer(const vespalib::string &name,
_domainPartSize(domainPartSize),
_useFsync(useFsync),
_defaultCrcType(defaultCrcType),
- _executor(maxThreads, 128*1024),
+ _sessionExecutor(maxThreads, 128*1024),
_threadPool(8192, 1),
_supervisor(std::make_unique<FRT_Supervisor>()),
_domains(),
@@ -109,7 +109,7 @@ TransLogServer::TransLogServer(const vespalib::string &name,
try {
Domain::SP domain(new Domain(domainName,
dir(),
- _executor,
+ _sessionExecutor,
_domainPartSize,
_useFsync,
_defaultCrcType,
@@ -361,7 +361,7 @@ void TransLogServer::createDomain(FRT_RPCRequest *req)
try {
domain.reset(new Domain(domainName,
dir(),
- _executor,
+ _sessionExecutor,
_domainPartSize,
_useFsync,
_defaultCrcType,
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index e7aca212b07..d04fb4f56d1 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -3,6 +3,7 @@
#include "domain.h"
#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/document/util/queue.h>
#include <vespa/fnet/frt/invokable.h>
#include <mutex>
@@ -20,8 +21,7 @@ public:
typedef std::unique_ptr<TransLogServer> UP;
typedef std::shared_ptr<TransLogServer> SP;
- TransLogServer(const vespalib::string &name,
- int listenPort,
+ TransLogServer(const vespalib::string &name, int listenPort,
const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext,
uint64_t domainPartSize=0x10000000,
@@ -29,8 +29,6 @@ public:
size_t maxThreads=4,
DomainPart::Crc defaultCrc=DomainPart::xxh64);
virtual ~TransLogServer();
- uint64_t getDomainPartSize() const { return _domainPartSize; }
- uint64_t setDomainPartSize();
DomainStats getDomainStats() const;
void commit(const vespalib::string & domainName, const Packet & packet) override;
@@ -79,20 +77,20 @@ private:
static const Session::SP & getSession(FRT_RPCRequest *req);
- typedef std::map<vespalib::string, Domain::SP > DomainList;
-
- vespalib::string _name;
- vespalib::string _baseDir;
- const uint64_t _domainPartSize;
- const bool _useFsync;
- const DomainPart::Crc _defaultCrcType;
- vespalib::ThreadStackExecutor _executor;
- FastOS_ThreadPool _threadPool;
- std::unique_ptr<FRT_Supervisor> _supervisor;
- DomainList _domains;
- mutable std::mutex _lock; // Protects _domains
- std::mutex _fileLock; // Protects the creating and deleting domains including file system operations.
- document::Queue<FRT_RPCRequest *> _reqQ;
+ using DomainList = std::map<vespalib::string, Domain::SP >;
+
+ vespalib::string _name;
+ vespalib::string _baseDir;
+ const uint64_t _domainPartSize;
+ const bool _useFsync;
+ const DomainPart::Crc _defaultCrcType;
+ vespalib::ThreadStackExecutor _sessionExecutor;
+ FastOS_ThreadPool _threadPool;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
+ DomainList _domains;
+ mutable std::mutex _lock; // Protects _domains
+ std::mutex _fileLock; // Protects the creating and deleting domains including file system operations.
+ document::Queue<FRT_RPCRequest *> _reqQ;
const common::FileHeaderContext &_fileHeaderContext;
using Guard = std::lock_guard<std::mutex>;
};