aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 15:58:53 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 19:38:04 +0200
commit1aeded94c43e98b1b82c4a71530ea46ab2389bb2 (patch)
treec8cdfa5a401bbbd75133b44de95782cfb138d741
parente3dea9daba5b89fab618921a94f9394f8ae9144c (diff)
Remove subscribe to TLS.
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h69
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h8
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp60
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp137
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp24
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp38
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h18
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp57
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h27
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp32
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h1
15 files changed, 55 insertions, 441 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index 6d57795a734..17f1faffbba 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -315,7 +315,6 @@ public:
}
return RPC::OK;
}
- virtual void inSync() override { }
virtual void eof() override { _eof = true; }
bool isEof() const { return _eof; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 0ed3de93965..6b48c94fabd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -776,14 +776,6 @@ FeedHandler::eof()
_writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performEof)));
}
-
-void
-FeedHandler::inSync()
-{
- // Called by visit callback thread, when in sync
-}
-
-
void
FeedHandler::
performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp)
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 32ff8f5d690..94e70ff0fd2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -184,22 +184,19 @@ public:
search::transactionlog::Writer & writer,
TlsWriter * tlsWriter = nullptr);
- virtual
- ~FeedHandler();
+ virtual~FeedHandler();
/**
* Init this feed handler.
*
* @param oldestConfigSerial The serial number of the oldest config snapshot.
*/
- void
- init(SerialNum oldestConfigSerial);
+ void init(SerialNum oldestConfigSerial);
/**
* Close this feed handler and its components.
*/
- void
- close();
+ void close();
/**
* Start replay of the transaction log.
@@ -223,8 +220,7 @@ public:
*
* @param flushedSerial serial number flushed for all relevant flush targets.
*/
- void
- flushDone(SerialNum flushedSerial);
+ void flushDone(SerialNum flushedSerial);
/**
* Used to flip between normal and recovery feed states.
@@ -235,21 +231,14 @@ public:
* Update the active feed view.
* Always called by the master write thread so locking is not needed.
*/
- void
- setActiveFeedView(IFeedView *feedView)
- {
+ void setActiveFeedView(IFeedView *feedView) {
_activeFeedView = feedView;
}
- void
- setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler)
- {
+ void setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler) {
_bucketDBHandler = bucketDBHandler;
}
- /**
- * Wait until transaction log is replayed.
- */
void waitForReplayDone();
void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; }
@@ -270,46 +259,18 @@ public:
void performOperation(FeedTokenUP token, FeedOperationUP op);
void handleOperation(FeedToken token, FeedOperationUP op);
- /**
- * Implements IDocumentMoveHandler
- */
- virtual void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override;
-
- /**
- * Implements IHeartBeatHandler
- */
- virtual void
- heartBeat() override;
-
- virtual void
- sync();
-
- /**
- * Implements TransLogClient::Session::Callback.
- */
- virtual RPC::Result
- receive(const Packet &packet) override;
-
- virtual void
- eof() override;
-
- virtual void
- inSync() override;
+ void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override;
+ void heartBeat() override;
- /**
- * Implements IPruneRemovedDocumentsHandler
- */
- void
- performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
+ virtual void sync();
- void
- syncTls(SerialNum syncTo);
-
- void
- storeRemoteOperation(const FeedOperation &op);
+ RPC::Result receive(const Packet &packet) override;
- // Implements IOperationStorer
- virtual void storeOperation(FeedOperation &op) override;
+ void eof() override;
+ void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
+ void syncTls(SerialNum syncTo);
+ void storeRemoteOperation(const FeedOperation &op);
+ void storeOperation(FeedOperation &op) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
index 62ea321efbb..95f31f141d7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
@@ -23,10 +23,7 @@ TransactionLogManagerBase::TransactionLogManagerBase(
{
}
-TransactionLogManagerBase::~TransactionLogManagerBase()
-{
-}
-
+TransactionLogManagerBase::~TransactionLogManagerBase() = default;
TransactionLogManagerBase::StatusResult
TransactionLogManagerBase::init()
@@ -65,7 +62,6 @@ TransactionLogManagerBase::init()
return res;
}
-
void
TransactionLogManagerBase::internalStartReplay()
{
@@ -77,7 +73,6 @@ TransactionLogManagerBase::internalStartReplay()
_replayStartTime = timer.MilliSecs();
}
-
void
TransactionLogManagerBase::markReplayStarted()
{
@@ -85,7 +80,6 @@ TransactionLogManagerBase::markReplayStarted()
_replayStarted = true;
}
-
void TransactionLogManagerBase::changeReplayDone()
{
vespalib::MonitorGuard guard(_replayMonitor);
@@ -93,7 +87,6 @@ void TransactionLogManagerBase::changeReplayDone()
guard.broadcast();
}
-
void
TransactionLogManagerBase::waitForReplayDone() const
{
@@ -103,7 +96,6 @@ TransactionLogManagerBase::waitForReplayDone() const
}
}
-
void
TransactionLogManagerBase::close()
{
@@ -117,11 +109,6 @@ TransactionLogManagerBase::close()
}
}
-TransLogClient::Subscriber::UP TransactionLogManagerBase::createTlcSubscriber(
- TransLogClient::Session::Callback &callback) {
- return _tlc.createSubscriber(_domainName, callback);
-}
-
TransLogClient::Visitor::UP TransactionLogManagerBase::createTlcVisitor(
TransLogClient::Session::Callback &callback) {
return _tlc.createVisitor(_domainName, callback);
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
index ae2a8356016..1b109d8d9e1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
@@ -51,10 +51,7 @@ public:
void changeReplayDone();
void close();
- TransLogClient::Subscriber::UP createTlcSubscriber(
- TransLogClient::Session::Callback &callback);
- TransLogClient::Visitor::UP createTlcVisitor(
- TransLogClient::Session::Callback &callback);
+ TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback);
void waitForReplayDone() const;
@@ -64,8 +61,7 @@ public:
bool getReplayDone() const;
bool isDoingReplay() const;
void logReplayComplete() const;
- const vespalib::string &getRpcTarget() const
- { return _tlc.getRPCTarget(); }
+ const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); }
void
markReplayStarted();
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index efd3b70858a..9f83db9b23a 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -42,7 +42,6 @@ private:
uint32_t countFiles(const vespalib::string &dir);
void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
- bool subscribeDomainTest(TransLogClient & tls, const vespalib::string & name);
bool partialUpdateTest();
bool test1();
bool testRemove();
@@ -59,22 +58,20 @@ private:
TEST_APPHOOK(Test);
-class CallBackTest : public TransLogClient::Subscriber::Callback
+class CallBackTest : public TransLogClient::Visitor::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
typedef std::map<SerialNum, ByteBuffer> PacketMap;
PacketMap _packetMap;
public:
- CallBackTest() : _inSync(false), _eof(false) { }
+ CallBackTest() : _eof(false) { }
size_t size() const { return _packetMap.size(); }
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
- void clear() { _inSync = false; _eof = false; _packetMap.clear(); }
+ void clear() { _eof = false; _packetMap.clear(); }
const ByteBuffer & packet(SerialNum n) { return (_packetMap.find(n)->second); }
- bool _inSync;
bool _eof;
};
@@ -91,16 +88,14 @@ RPC::Result CallBackTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackManyTest : public TransLogClient::Subscriber::Callback
+class CallBackManyTest : public TransLogClient::Visitor::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
public:
- CallBackManyTest(size_t start) : _inSync(false), _eof(false), _count(start), _value(start) { }
- void clear() { _inSync = false; _eof = false; _count = 0; _value = 0; }
- bool _inSync;
+ CallBackManyTest(size_t start) : _eof(false), _count(start), _value(start) { }
+ void clear() { _eof = false; _count = 0; _value = 0; }
bool _eof;
size_t _count;
size_t _value;
@@ -121,21 +116,19 @@ RPC::Result CallBackManyTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackUpdate : public TransLogClient::Subscriber::Callback
+class CallBackUpdate : public TransLogClient::Visitor::Callback
{
public:
typedef std::map<SerialNum, Identifiable *> PacketMap;
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
PacketMap _packetMap;
public:
- CallBackUpdate() : _inSync(false), _eof(false) { }
+ CallBackUpdate() : _eof(false) { }
virtual ~CallBackUpdate() { while (_packetMap.begin() != _packetMap.end()) { delete _packetMap.begin()->second; _packetMap.erase(_packetMap.begin()); } }
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
const PacketMap & map() const { return _packetMap; }
- bool _inSync;
bool _eof;
};
@@ -176,16 +169,14 @@ class CallBackStatsTest : public TransLogClient::Session::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
public:
- CallBackStatsTest() : _inSync(false), _eof(false),
+ CallBackStatsTest() : _eof(false),
_count(0), _inOrder(0),
_firstSerial(0), _lastSerial(0),
_prevSerial(0) { }
- void clear() { _inSync = false; _eof = false; _count = 0; _inOrder = 0;
+ void clear() { _eof = false; _count = 0; _inOrder = 0;
_firstSerial = 0; _lastSerial = 0; _inOrder = 0; }
- bool _inSync;
bool _eof;
uint64_t _count;
uint64_t _inOrder; // increase when next entry is one above previous
@@ -258,7 +249,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
ASSERT_TRUE( ca.map().size() == 1);
ASSERT_TRUE( ca.hasSerial(7) );
@@ -268,7 +258,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor1.get());
ASSERT_TRUE( visitor1->visit(4, 5) );
for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca1._inSync );
ASSERT_TRUE( ca1._eof );
ASSERT_TRUE( ca1.map().size() == 0);
@@ -277,7 +266,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor2.get());
ASSERT_TRUE( visitor2->visit(5, 6) );
for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca2._inSync );
ASSERT_TRUE( ca2._eof );
ASSERT_TRUE( ca2.map().size() == 0);
@@ -286,7 +274,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor3.get());
ASSERT_TRUE( visitor3->visit(5, 1000) );
for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca3._inSync );
ASSERT_TRUE( ca3._eof );
ASSERT_TRUE( ca3.map().size() == 1);
ASSERT_TRUE( ca3.hasSerial(7) );
@@ -451,7 +438,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 1) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -462,7 +448,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(1, 2) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ! ca.hasSerial(1) );
@@ -474,7 +459,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
EXPECT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 3) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -486,7 +470,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(2, 3) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( !ca.hasSerial(1) );
@@ -497,23 +480,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
return retval;
}
-bool Test::subscribeDomainTest(TransLogClient & tls, const vespalib::string & name)
-{
- bool retval(true);
- CallBackTest ca;
- TransLogClient::Subscriber::UP subscriber = tls.createSubscriber(name, ca);
- ASSERT_TRUE(subscriber.get());
- ASSERT_TRUE( subscriber->subscribe(0) );
- for (size_t i(0); ! ca._inSync && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ca._inSync );
- ASSERT_TRUE( ! ca.hasSerial(0) );
- ASSERT_TRUE( ! ca._eof );
- ASSERT_TRUE( ca.hasSerial(1) );
- ASSERT_TRUE( ca.hasSerial(2) );
- ASSERT_TRUE( ca.hasSerial(3) );
- return retval;
-}
-
bool Test::test1()
{
DummyFileHeaderContext fileHeaderContext;
@@ -525,7 +491,6 @@ bool Test::test1()
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
return true;
}
@@ -569,7 +534,6 @@ bool Test::testRemove()
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
ASSERT_TRUE(tls.remove(name));
return true;
@@ -584,7 +548,6 @@ bool Test::test2()
vespalib::string name("test1");
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
return true;
}
@@ -603,7 +566,6 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
FastOS_Thread::Sleep(10);
}
- ASSERT_TRUE(!ca._inSync);
ASSERT_TRUE(ca._eof);
EXPECT_EQUAL(expFirstSerial, ca._firstSerial);
EXPECT_EQUAL(expLastSerial, ca._lastSerial);
@@ -651,7 +613,6 @@ void Test::testMany()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
@@ -673,7 +634,6 @@ void Test::testMany()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index aabfdb62a96..abba84b75b6 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -23,11 +23,9 @@ using vespalib::make_string;
using vespalib::ConstBufferRef;
using search::index::DummyFileHeaderContext;
-namespace search {
-namespace transactionlog {
+namespace search::transactionlog {
using ClientSession = TransLogClient::Session;
-using Subscriber = TransLogClient::Subscriber;
using Visitor = TransLogClient::Visitor;
//-----------------------------------------------------------------------------
@@ -308,89 +306,12 @@ public:
_generator(generator), _name(name), _id(id), _validate(validate) {}
virtual ~Agent() {}
virtual RPC::Result receive(const Packet & packet) override = 0;
- virtual void inSync() override {}
virtual void eof() override {}
virtual void failed() {}
};
//-----------------------------------------------------------------------------
-// SubscriberAgent
-//-----------------------------------------------------------------------------
-class SubscriberAgent : public Agent
-{
-private:
- std::unique_ptr<Subscriber> _subscriber;
- SerialNum _from;
- SerialNum _next;
- Monitor _monitor;
-
- SerialNum getNext() {
- MonitorGuard guard(_monitor);
- return _next++;
- }
-
-public:
- SubscriberAgent(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) :
- Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate),
- _subscriber(), _from(from), _next(from + 1) {}
- virtual ~SubscriberAgent() {}
- void start();
- void stop();
- SerialNum getExpectedNext() const {
- MonitorGuard guard(_monitor);
- return _next;
- }
- SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); }
- virtual RPC::Result receive(const Packet & packet) override;
-};
-
-void
-SubscriberAgent::start()
-{
- _subscriber = _client.createSubscriber(_domain, *this);
- if (_subscriber.get() == NULL) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str()));
- }
- if (!_subscriber->subscribe(_from)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64,
- _id, _tlsSpec.c_str(), _from));
- }
-}
-
-void
-SubscriberAgent::stop()
-{
- _subscriber.reset();
-}
-
-RPC::Result
-SubscriberAgent::receive(const Packet & packet)
-{
- auto handle = packet.getHandle();
- while (handle.size() > 0) {
- Packet::Entry entry;
- entry.deserialize(handle);
- Packet::Entry expected = _generator.getRandomEntry(getNext());
- if (_validate) {
- if (!EntryComparator::cmp(entry, expected)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Got %s, expected %s", _id,
- EntryPrinter::toStr(entry).c_str(),
- EntryPrinter::toStr(expected).c_str()));
- }
- }
- }
- LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str());
-
- return RPC::OK;
-}
-
-
-//-----------------------------------------------------------------------------
// VisitorAgent
//-----------------------------------------------------------------------------
class VisitorAgent : public Agent
@@ -534,7 +455,6 @@ private:
TransLogClient _client;
std::unique_ptr<ClientSession> _session;
EntryGenerator _generator;
- std::vector<std::shared_ptr<SubscriberAgent> > _subscribers;
std::vector<std::shared_ptr<VisitorAgent> > _visitors;
std::vector<std::shared_ptr<VisitorAgent> > _rndVisitors;
uint64_t _visitorInterval; // in milliseconds
@@ -548,29 +468,22 @@ private:
void makeRandomVisitorVector();
public:
- ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
- uint64_t visitorInterval, uint64_t pruneInterval);
+ ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator,
+ uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval);
~ControllerThread();
- void startSubscribers();
uint32_t runningVisitors();
- std::vector<std::shared_ptr<SubscriberAgent> > & getSubscribers() { return _subscribers; }
std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; }
virtual void doRun() override;
};
ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
+ const EntryGenerator & generator, uint32_t numVisitors,
uint64_t visitorInterval, uint64_t pruneInterval)
: _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(),
- _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
+ _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
_pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0)
{
- for (uint32_t i = 0; i < numSubscribers; ++i) {
- _subscribers.push_back(std::make_shared<SubscriberAgent>(tlsSpec, domain, generator, 0, i, true));
- }
-
for (uint32_t i = 0; i < numVisitors; ++i) {
_visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true));
}
@@ -598,14 +511,6 @@ ControllerThread::makeRandomVisitorVector()
}
void
-ControllerThread::startSubscribers()
-{
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- _subscribers[i]->start();
- }
-}
-
-void
ControllerThread::doRun()
{
_session = _client.open(_domain);
@@ -641,12 +546,6 @@ ControllerThread::doRun()
safePrune = _visitors[i]->getFrom();
}
}
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- SerialNum next = _subscribers[i]->getExpectedNext();
- if (next < safePrune) {
- safePrune = next;
- }
- }
LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count);
LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune);
if (!_session->erase(safePrune)) {
@@ -672,7 +571,6 @@ private:
uint64_t stressTime;
uint32_t feedRate;
- uint32_t numSubscribers;
uint32_t numVisitors;
uint64_t visitorInterval;
uint64_t pruneInterval;
@@ -683,7 +581,7 @@ private:
long baseSeed;
Config() :
- domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0),
+ domainPartSize(0), packetSize(0), stressTime(0), feedRate(0),
numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {}
};
@@ -702,7 +600,6 @@ TransLogStress::printConfig()
std::cout << "######## Config ########" << std::endl;
std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl;
std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl;
- std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl;
std::cout << "numVisitors: " << _cfg.numVisitors << std::endl;
std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl;
std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl;
@@ -733,7 +630,6 @@ TransLogStress::Main()
_cfg.stressTime = 1000 * 60;
_cfg.feedRate = 10000;
- _cfg.numSubscribers = 1;
_cfg.numVisitors = 1;
_cfg.visitorInterval = 1000 * 1;
_cfg.pruneInterval = 1000 * 12;
@@ -763,9 +659,6 @@ TransLogStress::Main()
case 'f':
_cfg.feedRate = atoi(arg);
break;
- case 's':
- _cfg.numSubscribers = atoi(arg);
- break;
case 'v':
_cfg.numVisitors = atoi(arg);
break;
@@ -830,13 +723,9 @@ TransLogStress::Main()
FastOS_Thread::Sleep(sleepTime);
- ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors,
- _cfg.visitorInterval, _cfg.pruneInterval);
+ ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval);
threadPool.NewThread(&controller);
- // start subscribers
- controller.startSubscribers();
-
// stop feeder and controller
FastOS_Thread::Sleep(_cfg.stressTime);
printConfig();
@@ -862,24 +751,12 @@ TransLogStress::Main()
std::cout << "</visitor>" << std::endl;
}
- // stop subscribers
- LOG(info, "Stop subscribers...");
- std::vector<std::shared_ptr<SubscriberAgent> > & subscribers = controller.getSubscribers();
- for (size_t i = 0; i < subscribers.size(); ++i) {
- subscribers[i]->stop();
- std::cout << "<subscriber id='" << i << "'>" << std::endl;
- std::cout << " <from>" << subscribers[i]->getRange().from() << "</from>" << std::endl;
- std::cout << " <to>" << subscribers[i]->getRange().to() << "</to>" << std::endl;
- std::cout << "</subscriber>" << std::endl;
- }
-
threadPool.Close();
return 0;
}
}
-}
int main(int argc, char ** argv)
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 5d66dded0dd..e46b8744ae8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -253,7 +253,7 @@ void Domain::cleanSessions()
LockGuard guard(_sessionLock);
for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) {
Session * session(it->second.get());
- if ((!session->continous() && session->inSync())) {
+ if (session->inSync()) {
_sessions.erase(it++);
} else if (session->finished()) {
_sessions.erase(it++);
@@ -295,16 +295,6 @@ void Domain::commit(const Packet & packet)
}
dp->commit(entry.serial(), packet);
cleanSessions();
-
- LockGuard guard(_sessionLock);
- for (auto & it : _sessions) {
- const Session::SP & session(it.second);
- if (session->continous()) {
- if (session->ok()) {
- Session::enQ(session, entry.serial(), packet);
- }
- }
- }
}
bool Domain::erase(SerialNum to)
@@ -378,18 +368,6 @@ int Domain::closeSession(int sessionId)
return retval;
}
-int Domain::subscribe(const Domain::SP & domain, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn)
-{
- assert(this == domain.get());
- cleanSessions();
- SerialNumRange range(from, end());
- Session * session = new Session(_sessionId++, range, domain, supervisor, conn, true);
- LockGuard guard(_sessionLock);
- _sessions[session->id()] = Session::SP(session);
- return session->id();
-}
-
-
Domain::SerialNumList
Domain::scanDir()
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index dfd201d182f..ab7ded91e5b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -49,7 +49,6 @@ public:
void commit(const Packet & packet);
int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
- int subscribe(const Domain::SP & self, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn);
SerialNum begin() const;
SerialNum end() const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 302e0c12dda..3ea656be9a2 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -19,17 +19,7 @@ namespace {
vespalib::Executor::Task::UP
Session::createTask(const Session::SP & session)
{
- if (session->continous()) {
- return Task::UP(new SubscribeTask(session));
- } else {
- return Task::UP(new VisitTask(session));
- }
-}
-
-void
-Session::SubscribeTask::run()
-{
- _session->subscribe();
+ return Task::UP(new VisitTask(session));
}
void
@@ -110,14 +100,6 @@ Session::enQ(const SP & session, SerialNum serial, const Packet & packet)
}
void
-Session::subscribe()
-{
- visit();
- sendPending();
- sendSync();
-}
-
-void
Session::sendPending()
{
for (;;) {
@@ -147,7 +129,7 @@ void
Session::finalize()
{
if (!ok()) {
- LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, (continous() ? "subscriber" : "visitor"), _range.from(), _range.to());
+ LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, "visitor", _range.from(), _range.to());
}
LOG(debug, "[%d] : Stopped %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
_finished = true;
@@ -206,13 +188,12 @@ Session::rpcAsync(FRT_RPCRequest * req)
}
Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
- FRT_Supervisor & supervisor, FNET_Connection *conn, bool subscriber) :
+ FRT_Supervisor & supervisor, FNET_Connection *conn) :
_supervisor(supervisor),
_connection(conn),
_domain(d),
_range(r),
_id(sId),
- _subscriber(subscriber),
_inSync(false),
_ok(true),
_finished(false),
@@ -254,19 +235,6 @@ Session::send(FRT_RPCRequest * req, bool wait)
}
bool
-Session::sendSync()
-{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
- req->SetMethodName("syncCallback");
- req->GetParams()->AddString(_domain->name().c_str());
- req->GetParams()->AddInt32(id());
- bool retval(send(req, true));
- LockGuard guard(_lock);
- _inSync = true;
- return retval;
-}
-
-bool
Session::sendDone()
{
FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 26e448540c3..ac6f496e151 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -13,7 +13,7 @@ namespace search::transactionlog {
class Domain;
class DomainPart;
-typedef std::shared_ptr<Domain> DomainSP;
+using DomainSP = std::shared_ptr<Domain>;
class Session : public FRT_IRequestWait
{
@@ -24,12 +24,11 @@ public:
typedef std::shared_ptr<Session> SP;
Session(const Session &) = delete;
Session & operator = (const Session &) = delete;
- Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn, bool subscriber=false);
- virtual ~Session();
+ Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ ~Session();
const SerialNumRange & range() const { return _range; }
int id() const { return _id; }
bool inSync() const;
- bool continous() const { return _subscriber; }
bool ok() const { return _ok; }
bool finished() const;
static void enQ(const SP & session, SerialNum serial, const Packet & packet);
@@ -51,13 +50,7 @@ private:
void run() override;
Session::SP _session;
};
- class SubscribeTask : public Task {
- public:
- SubscribeTask(const Session::SP & session) : _session(session) { }
- private:
- void run() override;
- Session::SP _session;
- };
+
class SendTask : public Task {
public:
SendTask(const Session::SP & session) : _session(session) { }
@@ -70,11 +63,9 @@ private:
bool send(const Packet & packet);
void sendPacket(SerialNum serial, const Packet & packet);
bool sendDone();
- bool sendSync();
void sendPending();
void visit();
void visitOnly();
- void subscribe();
void finalize();
bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline));
int32_t rpc(FRT_RPCRequest * req);
@@ -84,7 +75,6 @@ private:
DomainSP _domain;
SerialNumRange _range;
int _id;
- bool _subscriber;
bool _inSync;
bool _ok;
bool _finished;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index af2e8ad47a1..aa2b558ea0c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -88,12 +88,8 @@ TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain
return session;
}
-TransLogClient::Subscriber::UP TransLogClient::createSubscriber(const vespalib::string & domain, TransLogClient::Session::Callback & callBack)
-{
- return TransLogClient::Subscriber::UP(new Subscriber(domain, *this, callBack));
-}
-
-TransLogClient::Visitor::UP TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack)
+TransLogClient::Visitor::UP
+TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack)
{
return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack));
}
@@ -151,13 +147,6 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error.");
//-- Visit Callbacks -----------------------------------------------------------
- rb.DefineMethod("syncCallback", "si", "i", false, FRT_METHOD(TransLogClient::syncCallbackRPC), this);
- rb.MethodDesc("Will tell you that now you are uptodate on the subscribtion.");
- rb.ParamDesc("name", "The name of the domain.");
- rb.ParamDesc("session", "Session handle.");
- rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error.");
-
- //-- Visit Callbacks -----------------------------------------------------------
rb.DefineMethod("eofCallback", "si", "i", false, FRT_METHOD(TransLogClient::eofCallbackRPC), this);
rb.MethodDesc("Will tell you that you are done with the visitor.");
rb.ParamDesc("name", "The name of the domain.");
@@ -182,24 +171,6 @@ void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::syncCallbackRPC(FRT_RPCRequest *req)
-{
- uint32_t retval(uint32_t(-1));
- FRT_Values & params = *req->GetParams();
- FRT_Values & ret = *req->GetReturn();
- const char * domainName = params[0]._string._str;
- int32_t sessionId(params[1]._intval32);
- LOG(debug, "syncCallback(%s, %d)", domainName, sessionId);
- LockGuard guard(_lock);
- Session * session(findSession(domainName, sessionId));
- if (session != NULL) {
- session->inSync();
- retval = 0;
- }
- ret.AddInt32(retval);
- LOG(debug, "syncCallback(%s, %d)=%d done", domainName, sessionId, retval);
-}
-
void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
@@ -322,21 +293,12 @@ int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const
return diff;
}
-TransLogClient::Subscriber::Subscriber(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
+TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
Session(domain, tlc),
_callback(callBack)
{
}
-TransLogClient::Subscriber::~Subscriber()
-{
-}
-
-TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
- Subscriber(domain, tlc, callBack)
-{
-}
-
bool TransLogClient::Session::init(FRT_RPCRequest *req)
{
int32_t retval(_tlc.rpc(req));
@@ -364,15 +326,6 @@ bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to
return init(req);
}
-bool TransLogClient::Subscriber::subscribe(const SerialNum & from)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSubscribe");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt64(from);
- return init(req);
-}
-
bool TransLogClient::Session::run()
{
FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
@@ -402,8 +355,6 @@ bool TransLogClient::Session::close()
return (retval == 0);
}
-TransLogClient::Visitor::~Visitor()
-{
-}
+TransLogClient::Visitor::~Visitor() = default;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index a3351549df8..87901890673 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -28,7 +28,6 @@ public:
public:
virtual ~Callback() { }
virtual RPC::Result receive(const Packet & packet) = 0;
- virtual void inSync() { }
virtual void eof() { }
};
public:
@@ -46,7 +45,6 @@ public:
bool sync(const SerialNum &syncTo, SerialNum &syncedTo);
virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
- virtual void inSync() { }
virtual void eof() { }
bool close();
void clear();
@@ -60,32 +58,22 @@ public:
int _sessionId;
};
/// Here you connect to the incomming data getting everything from <from>
- class Subscriber : public Session
+ class Visitor : public Session
{
public:
- typedef std::unique_ptr<Subscriber> UP;
- typedef std::shared_ptr<Subscriber> SP;
+ typedef std::unique_ptr<Visitor> UP;
+ typedef std::shared_ptr<Visitor> SP;
- Subscriber(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
- bool subscribe(const SerialNum & from);
- ~Subscriber();
+ Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
+ bool visit(const SerialNum & from, const SerialNum & to);
+ virtual ~Visitor();
RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
- void inSync() override { _callback.inSync(); }
void eof() override { _callback.eof(); }
private:
Callback & _callback;
};
/// Here you read the incomming data getting everything from <from>
- class Visitor : public Subscriber
- {
- public:
- typedef std::unique_ptr<Visitor> UP;
- typedef std::shared_ptr<Visitor> SP;
- Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
- bool visit(const SerialNum & from, const SerialNum & to);
- virtual ~Visitor();
- };
public:
typedef std::unique_ptr<TransLogClient> UP;
@@ -100,8 +88,6 @@ public:
Session::UP open(const vespalib::string & domain);
/// Here you can get a list of available domains.
bool listDomains(std::vector<vespalib::string> & dir);
- /// Here you get a subscriber
- Subscriber::UP createSubscriber(const vespalib::string & domain, Session::Callback & callBack);
Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack);
bool isConnected() const;
@@ -111,7 +97,6 @@ public:
private:
void exportRPC(FRT_Supervisor & supervisor);
void visitCallbackRPC(FRT_RPCRequest *req);
- void syncCallbackRPC(FRT_RPCRequest *req);
void eofCallbackRPC(FRT_RPCRequest *req);
int32_t rpc(FRT_RPCRequest * req);
Session * findSession(const vespalib::string & domain, int sessionId);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index e1fc4399613..1eb6301f3f5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -166,8 +166,6 @@ void TransLogServer::run()
bool immediate = true;
if (strcmp(req->GetMethodName(), "domainSessionClose") == 0) {
domainSessionClose(req);
- } else if (strcmp(req->GetMethodName(), "domainSubscribe") == 0) {
- domainSubscribe(req);
} else if (strcmp(req->GetMethodName(), "domainVisit") == 0) {
domainVisit(req);
} else if (strcmp(req->GetMethodName(), "createDomain") == 0) {
@@ -302,13 +300,6 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ParamDesc("to", "Will erase all up and including.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
- //-- Domain Subscribe -----------------------------------------------------------
- rb.DefineMethod("domainSubscribe", "sl", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
- rb.MethodDesc("This will create a subscription. It will live till the connection is closed.");
- rb.ParamDesc("name", "The name of the domain.");
- rb.ParamDesc("from", "Will return all entries following(not including) <from>.");
- rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. Positive number is the sessionid");
-
//-- Domain Visit -----------------------------------------------------------
rb.DefineMethod("domainVisit", "sll", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("This will create a visitor that return all operations in the range.");
@@ -332,14 +323,11 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. 1 means busy -> retry. 0 is OK.");
//-- Domain Sync --
- rb.DefineMethod("domainSync", "sl", "il", true,
- FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainSync", "sl", "il", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Sync domain to given entry");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("syncto", "Entry to sync to");
- rb.ReturnDesc("result",
- "A resultcode(int) of the operation. "
- "Negative number indicates error.");
+ rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
rb.ReturnDesc("syncedto", "Entry synced to");
}
@@ -497,22 +485,6 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req)
}
}
-void TransLogServer::domainSubscribe(FRT_RPCRequest *req)
-{
- uint32_t retval(uint32_t(-1));
- FRT_Values & params = *req->GetParams();
- FRT_Values & ret = *req->GetReturn();
- const char * domainName = params[0]._string._str;
- LOG(debug, "domainSubscribe(%s)", domainName);
- Domain::SP domain(findDomain(domainName));
- if (domain) {
- SerialNum from(params[1]._intval64);
- LOG(debug, "domainSubscribe(%s, %" PRIu64 ")", domainName, from);
- retval = domain->subscribe(domain, from, *_supervisor, req->GetConnection());
- }
- ret.AddInt32(retval);
-}
-
void TransLogServer::domainVisit(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 225206dc0f5..8b612e4967a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -61,7 +61,6 @@ private:
void domainSessionRun(FRT_RPCRequest *req);
void domainPrune(FRT_RPCRequest *req);
void domainVisit(FRT_RPCRequest *req);
- void domainSubscribe(FRT_RPCRequest *req);
void domainSessionClose(FRT_RPCRequest *req);
void domainSync(FRT_RPCRequest *req);