summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/bucketdb/lockablemaptest.cpp1
-rw-r--r--storage/src/tests/common/dummystoragelink.h2
-rw-r--r--storage/src/tests/common/metricstest.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp44
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp46
-rw-r--r--storage/src/vespa/storage/common/statusmetricconsumer.cpp14
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h5
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.cpp1
-rw-r--r--storage/src/vespa/storage/frameworkimpl/status/statuswebserver.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
-rw-r--r--storage/src/vespa/storage/storageserver/messagesink.cpp83
-rw-r--r--storage/src/vespa/storage/storageserver/messagesink.h33
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp64
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.h6
-rw-r--r--storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h4
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h1
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h1
-rw-r--r--storage/src/vespa/storageapi/mbusprot/serializationhelper.h34
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h1
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp13
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h9
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp4
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h4
-rw-r--r--storage/src/vespa/storageframework/generic/component/component.h12
36 files changed, 128 insertions, 315 deletions
diff --git a/storage/src/tests/bucketdb/lockablemaptest.cpp b/storage/src/tests/bucketdb/lockablemaptest.cpp
index 582e6957c22..3a16ee170fe 100644
--- a/storage/src/tests/bucketdb/lockablemaptest.cpp
+++ b/storage/src/tests/bucketdb/lockablemaptest.cpp
@@ -1,6 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/bucketdb/btree_lockable_map.hpp>
#include <vespa/storage/bucketdb/striped_btree_lockable_map.hpp>
#include <vespa/vespalib/datastore/buffer_type.hpp>
diff --git a/storage/src/tests/common/dummystoragelink.h b/storage/src/tests/common/dummystoragelink.h
index e8ccc38df76..8da92917c08 100644
--- a/storage/src/tests/common/dummystoragelink.h
+++ b/storage/src/tests/common/dummystoragelink.h
@@ -11,8 +11,6 @@
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storageapi/message/internal.h>
-class FastOS_ThreadPool;
-
namespace storage {
class DummyStorageLink : public StorageLink {
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 97d1c22364f..7231a071319 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -52,8 +52,7 @@ namespace {
{
framework::Clock& _clock;
explicit MetricClock(framework::Clock& c) : _clock(c) {}
- [[nodiscard]] time_t getTime() const override { return vespalib::count_s(_clock.getMonotonicTime().time_since_epoch()); }
- [[nodiscard]] time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getMonotonicTime().time_since_epoch()); }
+ [[nodiscard]] metrics::time_point getTime() const override { return _clock.getSystemTime(); }
};
}
@@ -85,10 +84,7 @@ void MetricsTest::SetUp() {
_metricManager->registerMetric(guard, *_topSet);
}
- _metricsConsumer = std::make_unique<StatusMetricConsumer>(
- _node->getComponentRegister(),
- *_metricManager,
- "status");
+ _metricsConsumer = std::make_unique<StatusMetricConsumer>(_node->getComponentRegister(), *_metricManager, "status");
_filestorMetrics = std::make_shared<FileStorMetrics>();
_filestorMetrics->initDiskMetrics(1, 1);
@@ -100,7 +96,7 @@ void MetricsTest::SetUp() {
_visitorMetrics = std::make_shared<VisitorMetrics>();
_visitorMetrics->initThreads(4);
_topSet->registerMetric(*_visitorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool());
+ _metricManager->init(config::ConfigUri(_config->getConfigId()));
}
void MetricsTest::TearDown() {
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 50ad7b54382..7f3fe06fc29 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -533,17 +533,18 @@ TEST_F(FileStorManagerTest, handler_priority) {
ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
-class MessagePusherThread : public document::Runnable {
+class MessagePusherThread {
public:
FileStorHandler& _handler;
Document::SP _doc;
std::atomic<bool> _done;
std::atomic<bool> _threadDone;
-
+ std::thread _thread;
+
MessagePusherThread(FileStorHandler& handler, Document::SP doc);
- ~MessagePusherThread() override;
+ ~MessagePusherThread();
- void run() override {
+ void run() {
while (!_done) {
document::BucketIdFactory factory;
document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
@@ -558,11 +559,16 @@ public:
};
MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc)
- : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false)
-{}
-MessagePusherThread::~MessagePusherThread() = default;
+ : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false), _thread()
+{
+ _thread = std::thread([this](){run();});
+}
+MessagePusherThread::~MessagePusherThread()
+{
+ _thread.join();
+}
-class MessageFetchingThread : public document::Runnable {
+class MessageFetchingThread {
public:
const uint32_t _threadId;
FileStorHandler& _handler;
@@ -571,13 +577,17 @@ public:
std::atomic<bool> _done;
std::atomic<bool> _failed;
std::atomic<bool> _threadDone;
-
+ std::thread _thread;
+
explicit MessageFetchingThread(FileStorHandler& handler)
: _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false),
- _failed(false), _threadDone(false)
- {}
-
- void run() override {
+ _failed(false), _threadDone(false), _thread()
+ {
+ _thread = std::thread([this](){run();});
+ }
+ ~MessageFetchingThread();
+
+ void run() {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
if (msg.msg.get()) {
@@ -596,6 +606,10 @@ public:
_threadDone = true;
};
};
+MessageFetchingThread::~MessageFetchingThread()
+{
+ _thread.join();
+}
TEST_F(FileStorManagerTest, handler_paused_multi_thread) {
FileStorHandlerComponents c(*this);
@@ -606,12 +620,8 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) {
Document::SP doc(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release());
- FastOS_ThreadPool pool;
MessagePusherThread pushthread(filestorHandler, doc);
- pushthread.start(pool);
-
MessageFetchingThread thread(filestorHandler);
- thread.start(pool);
for (uint32_t i = 0; i < 50; ++i) {
std::this_thread::sleep_for(2ms);
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index b7903de0fe2..3a772c1ddde 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -30,7 +30,6 @@ public:
};
struct StateReporterTest : Test {
- FastOS_ThreadPool _threadPool;
framework::defaultimplementation::FakeClock* _clock;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _top;
@@ -54,15 +53,13 @@ struct MetricClock : public metrics::MetricManager::Timer
{
framework::Clock& _clock;
explicit MetricClock(framework::Clock& c) : _clock(c) {}
- [[nodiscard]] time_t getTime() const override { return vespalib::count_s(_clock.getMonotonicTime().time_since_epoch()); }
- [[nodiscard]] time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getMonotonicTime().time_since_epoch()); }
+ [[nodiscard]] metrics::time_point getTime() const override { return _clock.getSystemTime(); }
};
}
StateReporterTest::StateReporterTest()
- : _threadPool(),
- _clock(nullptr),
+ : _clock(nullptr),
_top(),
_stateReporter()
{
@@ -87,17 +84,14 @@ void StateReporterTest::SetUp() {
_metricManager->registerMetric(guard, *_topSet);
}
- _stateReporter = std::make_unique<StateReporter>(
- _node->getComponentRegister(),
- *_metricManager,
- _generationFetcher,
- "status");
+ _stateReporter = std::make_unique<StateReporter>(_node->getComponentRegister(), *_metricManager,
+ _generationFetcher, "status");
_filestorMetrics = std::make_shared<FileStorMetrics>();
_filestorMetrics->initDiskMetrics(1, 1);
_topSet->registerMetric(*_filestorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool());
+ _metricManager->init(config::ConfigUri(_config->getConfigId()));
}
void StateReporterTest::TearDown() {
@@ -127,20 +121,14 @@ vespalib::Slime slime; \
#define ASSERT_GENERATION(jsonData, component, generation) \
{ \
PARSE_JSON(jsonData); \
- ASSERT_EQ( \
- generation, \
- slime.get()["config"][component]["generation"].asDouble()); \
+ ASSERT_EQ(generation, slime.get()["config"][component]["generation"].asDouble()); \
}
#define ASSERT_NODE_STATUS(jsonData, code, message) \
{ \
PARSE_JSON(jsonData); \
- ASSERT_EQ( \
- vespalib::string(code), \
- slime.get()["status"]["code"].asString().make_string()); \
- ASSERT_EQ( \
- vespalib::string(message), \
- slime.get()["status"]["message"].asString().make_string()); \
+ ASSERT_EQ(vespalib::string(code), slime.get()["status"]["code"].asString().make_string()); \
+ ASSERT_EQ(vespalib::string(message), slime.get()["status"]["message"].asString().make_string()); \
}
#define ASSERT_METRIC_GET_PUT(jsonData, expGetCount, expPutCount) \
@@ -150,16 +138,11 @@ vespalib::Slime slime; \
double putCount = -1; \
size_t metricCount = slime.get()["metrics"]["values"].children(); \
for (size_t j=0; j<metricCount; j++) { \
- const vespalib::string name = slime.get()["metrics"]["values"][j]["name"] \
- .asString().make_string(); \
- if (name.compare("vds.filestor.allthreads.get.count") == 0) \
- { \
- getCount = slime.get()["metrics"]["values"][j]["values"]["count"] \
- .asDouble(); \
- } else if (name.compare("vds.filestor.allthreads.put.count") == 0) \
- { \
- putCount = slime.get()["metrics"]["values"][j]["values"]["count"] \
- .asDouble(); \
+ const vespalib::string name = slime.get()["metrics"]["values"][j]["name"].asString().make_string(); \
+ if (name.compare("vds.filestor.allthreads.get.count") == 0) { \
+ getCount = slime.get()["metrics"]["values"][j]["values"]["count"].asDouble(); \
+ } else if (name.compare("vds.filestor.allthreads.put.count") == 0) { \
+ putCount = slime.get()["metrics"]["values"][j]["values"]["count"].asDouble(); \
} \
} \
ASSERT_EQ(expGetCount, getCount); \
@@ -228,8 +211,7 @@ TEST_F(StateReporterTest, report_metrics) {
for (uint32_t i = 0; i < 6; ++i) {
_clock->addSecondsToTime(60);
_metricManager->timeChangedNotification();
- while (int64_t(_metricManager->getLastProcessedTime()) < vespalib::count_s(_clock->getMonotonicTime().time_since_epoch()))
- {
+ while (int64_t(_metricManager->getLastProcessedTime()) < vespalib::count_s(_clock->getMonotonicTime().time_since_epoch())) {
std::this_thread::sleep_for(1ms);
}
}
diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.cpp b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
index 8eb3e9f3ab6..c6f73540605 100644
--- a/storage/src/vespa/storage/common/statusmetricconsumer.cpp
+++ b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
@@ -5,7 +5,6 @@
#include <boost/lexical_cast.hpp>
#include <vespa/metrics/jsonwriter.h>
#include <vespa/metrics/textwriter.h>
-#include <vespa/metrics/xmlwriter.h>
#include <vespa/metrics/metricmanager.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -37,10 +36,6 @@ StatusMetricConsumer::getReportContentType(const framework::HttpUrlPath& path) c
return "text/plain";
}
- if (path.getAttribute("format") == "xml") {
- return "application/xml";
- }
-
if (path.getAttribute("format") == "text") {
return "text/plain";
}
@@ -67,7 +62,6 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
LOG(debug, "Not calling update hooks as dontcallupdatehooks option has been given");
}
int64_t currentTimeS(vespalib::count_s(_component.getClock().getMonotonicTime().time_since_epoch()));
- bool xml = (path.getAttribute("format") == "xml");
bool json = (path.getAttribute("format") == "json");
int verbosity(path.get("verbosity", 0));
@@ -131,13 +125,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
}
std::string consumer = path.getAttribute("consumer", "");
- if (xml) {
- out << "<?xml version=\"1.0\"?>\n";
- vespalib::XmlOutputStream xos(out);
- metrics::XmlWriter xmlWriter(xos, snapshot->getPeriod(), verbosity);
- _manager.visit(metricLock, *snapshot, xmlWriter, consumer);
- out << "\n";
- } else if (json) {
+ if (json) {
vespalib::asciistream jsonStreamData;
vespalib::JsonStream stream(jsonStreamData, true);
stream << Object() << "metrics";
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index 74434c0116b..17a344a368a 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -16,7 +16,6 @@
#include "storagelink.h"
#include <vespa/storageframework/generic/thread/runnable.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <deque>
#include <limits>
#include <mutex>
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
index 26ca8963783..ceadd20baca 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -8,8 +8,7 @@
namespace storage::distributor {
DistributorStripePool::DistributorStripePool(bool test_mode, PrivateCtorTag)
- : _thread_pool(std::make_unique<FastOS_ThreadPool>()),
- _n_stripe_bits(0),
+ : _n_stripe_bits(0),
_stripes(),
_threads(),
_mutex(),
@@ -119,7 +118,7 @@ void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
}
std::unique_lock lock(_mutex); // Ensure _threads is visible to all started threads
for (auto& s : _stripes) {
- _threads.emplace_back(_thread_pool->NewThread(s.get()));
+ _threads.start([ptr = s.get()](){ ptr->run(); });
}
}
@@ -131,9 +130,7 @@ void DistributorStripePool::stop_and_join() {
for (auto& s : _stripes) {
s->signal_should_stop();
}
- for (auto* t : _threads) {
- t->Join();
- }
+ _threads.join();
}
void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 00f5f57edf9..6ac95c27b76 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -2,14 +2,12 @@
#pragma once
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/thread.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
-class FastOS_ThreadInterface;
-class FastOS_ThreadPool;
-
namespace storage::distributor {
class DistributorStripeThread;
@@ -37,12 +35,10 @@ class TickableStripe;
*/
class DistributorStripePool {
using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>;
- using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
- std::unique_ptr<FastOS_ThreadPool> _thread_pool;
uint8_t _n_stripe_bits;
StripeVector _stripes;
- NativeThreadVector _threads;
+ vespalib::ThreadPool _threads;
std::mutex _mutex;
std::condition_variable _parker_cond;
size_t _parked_threads; // Must be protected by _park_mutex
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
index 8f37dbbbf5d..72854d9af75 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
@@ -23,7 +23,7 @@ DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe,
DistributorStripeThread::~DistributorStripeThread() = default;
-void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) {
+void DistributorStripeThread::run() {
uint32_t tick_waits_inhibited = 0;
while (!should_stop_thread_relaxed()) {
while (should_park_relaxed()) {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
index 7015d27a53e..8b9453ab3f3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/time.h>
#include <atomic>
#include <condition_variable>
@@ -21,7 +20,7 @@ class TickableStripe;
* A DistributorStripeThread instance is bidirectionally bound to a particular pool and
* should therefore always be created by the pool itself (never standalone).
*/
-class DistributorStripeThread : private FastOS_Runnable {
+class DistributorStripeThread {
using AtomicDuration = std::atomic<vespalib::duration>;
TickableStripe& _stripe;
@@ -41,7 +40,7 @@ public:
DistributorStripePool& stripe_pool);
~DistributorStripeThread();
- void Run(FastOS_ThreadInterface*, void*) override;
+ void run();
// Wakes up stripe thread if it's currently waiting for an external event to be triggered,
// such as the arrival of a new RPC message. If thread is parked this call will have no
diff --git a/storage/src/vespa/storage/distributor/messagetracker.cpp b/storage/src/vespa/storage/distributor/messagetracker.cpp
index 93db31bdc29..8830e5ecabc 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/messagetracker.cpp
@@ -3,6 +3,7 @@
#include "messagetracker.h"
#include <vespa/storageapi/messageapi/bucketcommand.h>
#include <vespa/storageapi/messageapi/bucketreply.h>
+#include <cinttypes>
#include <vespa/log/log.h>
LOG_SETUP(".messagetracker");
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 55fe2e039e1..bdf4fa2ba72 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -12,7 +12,7 @@
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <cinttypes>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.twophaseupdate");
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.cpp b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
index 44dd4fbde89..4b7292c1e81 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.cpp
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/operations/operation.h>
#include <sstream>
#include <set>
+#include <cinttypes>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.map");
diff --git a/storage/src/vespa/storage/frameworkimpl/status/statuswebserver.cpp b/storage/src/vespa/storage/frameworkimpl/status/statuswebserver.cpp
index 8690f6e122d..0b4e32d637d 100644
--- a/storage/src/vespa/storage/frameworkimpl/status/statuswebserver.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/status/statuswebserver.cpp
@@ -10,6 +10,7 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/net/connection_auth_context.h>
#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/net/tls/statistics.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/config/helper/configfetcher.hpp>
#include <functional>
@@ -203,6 +204,7 @@ StatusWebServer::handlePage(const framework::HttpUrlPath& urlpath, vespalib::Por
if (auth_ctx.capabilities().contains_all(reporter->required_capabilities())) {
invoke_reporter(*reporter, urlpath, request);
} else {
+ vespalib::net::tls::CapabilityStatistics::get().inc_status_capability_checks_failed();
// TODO should print peer address as well; not currently exposed
LOG(warning, "Peer with %s denied status page access to '%s' due to insufficient "
"credentials (had %s, needed %s)",
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 08a48cc2d8a..99f61c62cd1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -10,7 +10,6 @@
#include "filestorhandler.h"
#include "service_layer_host_info_reporter.h"
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/persistence/spi/bucketexecutor.h>
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 8b82bb251b2..009f8170669 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_add_library(storage_storageserver OBJECT
documentapiconverter.cpp
fnet_metrics_wrapper.cpp
mergethrottler.cpp
- messagesink.cpp
opslogger.cpp
priorityconverter.cpp
rpcrequestwrapper.cpp
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 593640e7d03..156ec8bc031 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -23,7 +23,6 @@
#include <vespa/messagebus/imessagehandler.h>
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/config-bucketspaces.h>
#include <map>
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 0adfb209e66..dddcd42aad7 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -15,7 +15,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/bucket/bucket.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
#include <vespa/metrics/countmetric.h>
diff --git a/storage/src/vespa/storage/storageserver/messagesink.cpp b/storage/src/vespa/storage/storageserver/messagesink.cpp
deleted file mode 100644
index 94762e545d9..00000000000
--- a/storage/src/vespa/storage/storageserver/messagesink.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "messagesink.h"
-#include <vespa/storageapi/message/persistence.h>
-#include <ostream>
-
-using std::shared_ptr;
-
-namespace storage {
-
-MessageSink::MessageSink()
- : StorageLink("Message Sink")
-{
-}
-
-MessageSink::~MessageSink()
-{
- closeNextLink();
-}
-
-void
-MessageSink::print(std::ostream& out, bool verbose,
- const std::string& indent) const
-{
- (void) verbose; (void) indent;
- out << "MessageSink";
-}
-
-namespace {
-#if 0
- std::string getTimeString() {
- char timeBuf[200];
- time_t tm;
- struct tm tms;
- time(&tm);
- gmtime_r(&tm, &tms);
- strftime(timeBuf, sizeof(timeBuf), "%Y-%m-%d:%H:%M:%S %Z", &tms);
- return std::string(timeBuf);
- }
-#endif
-}
-
-IMPL_MSG_COMMAND_H(MessageSink, Get)
-{
- //LOG(event, "[%s] Get %s", getTimeString().c_str(),
- // cmd->getDocumentId()->toString());
- shared_ptr<api::StorageReply> rmsg(new api::GetReply(*cmd));
- rmsg->setResult(api::ReturnCode::NOT_IMPLEMENTED);
- sendUp(rmsg);
- return true;
-}
-
-IMPL_MSG_COMMAND_H(MessageSink, Put)
-{
- //LOG(event, "[%s] Put %s", getTimeString().c_str(),
- // cmd->getDocumentId()->toString());
- shared_ptr<api::StorageReply> rmsg(new api::PutReply(*cmd));
- rmsg->setResult(api::ReturnCode::OK);
- sendUp(rmsg);
- return true;
-}
-
-IMPL_MSG_COMMAND_H(MessageSink, Remove)
-{
- //LOG(event, "[%s] Remove %s", getTimeString().c_str(),
- // cmd->getDocumentId()->toString());
- shared_ptr<api::StorageReply> rmsg(new api::RemoveReply(*cmd));
- rmsg->setResult(api::ReturnCode::OK);
- sendUp(rmsg);
- return true;
-}
-
-IMPL_MSG_COMMAND_H(MessageSink, Revert)
-{
- //LOG(event, "[%s] Revert %s", getTimeString().c_str(),
- // cmd->getDocumentId()->toString());
- shared_ptr<api::StorageReply> rmsg(new api::RevertReply(*cmd));
- rmsg->setResult(api::ReturnCode::OK);
- sendUp(rmsg);
- return true;
-}
-
-} // storage
diff --git a/storage/src/vespa/storage/storageserver/messagesink.h b/storage/src/vespa/storage/storageserver/messagesink.h
deleted file mode 100644
index d98d0439b48..00000000000
--- a/storage/src/vespa/storage/storageserver/messagesink.h
+++ /dev/null
@@ -1,33 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class storage::MessageSink
- * @ingroup storageserver
- *
- * @brief This class grabs persistence messages, and answers them without doing anything.
- *
- * @version $Id$
- */
-
-#pragma once
-
-#include <vespa/storage/common/storagelink.h>
-
-namespace storage {
-
-class MessageSink : public StorageLink {
-public:
- explicit MessageSink();
- MessageSink(const MessageSink &) = delete;
- MessageSink& operator=(const MessageSink &) = delete;
- ~MessageSink();
-
- void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
-private:
- DEF_MSG_COMMAND_H(Get);
- DEF_MSG_COMMAND_H(Put);
- DEF_MSG_COMMAND_H(Remove);
- DEF_MSG_COMMAND_H(Revert);
-};
-
-}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index e940dc71722..3f015d91a4a 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpc_target.h"
#include "shared_rpc_resources.h"
-#include <vespa/fastos/thread.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/transport.h>
@@ -66,8 +65,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
int rpc_server_port,
size_t rpc_thread_pool_size,
size_t rpc_events_before_wakeup)
- : _thread_pool(std::make_unique<FastOS_ThreadPool>()),
- _transport(std::make_unique<FNET_Transport>(fnet::TransportConfig(rpc_thread_pool_size).
+ : _transport(std::make_unique<FNET_Transport>(fnet::TransportConfig(rpc_thread_pool_size).
events_before_wakeup(rpc_events_before_wakeup))),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))),
@@ -92,7 +90,7 @@ void SharedRpcResources::start_server_and_register_slobrok(vespalib::stringref m
if (!_orb->Listen(_rpc_server_port)) {
throw IllegalStateException(fmt("Failed to listen to RPC port %d", _rpc_server_port), VESPA_STRLOC);
}
- _transport->Start(_thread_pool.get());
+ _transport->Start();
_slobrok_register->registerName(my_handle);
wait_until_slobrok_is_ready();
_handle = my_handle;
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index a30fcdc4ea7..953492089c1 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -6,7 +6,6 @@
#include <vespa/vespalib/stllike/string.h>
#include <memory>
-class FastOS_ThreadPool;
class FNET_Transport;
class FRT_Supervisor;
@@ -19,7 +18,6 @@ namespace storage::rpc {
class SharedRpcResources {
class RpcTargetFactoryImpl;
- std::unique_ptr<FastOS_ThreadPool> _thread_pool;
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register;
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 2836ab80acf..9f8456afc37 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -33,34 +33,36 @@ namespace storage {
namespace {
- using vespalib::getLastErrorString;
+using vespalib::getLastErrorString;
- void writePidFile(const vespalib::string& pidfile)
- {
- ssize_t rv = -1;
- vespalib::string mypid = vespalib::make_string("%d\n", getpid());
- size_t lastSlash = pidfile.rfind('/');
- if (lastSlash != vespalib::string::npos) {
- std::filesystem::create_directories(std::filesystem::path(pidfile.substr(0, lastSlash)));
- }
- int fd = open(pidfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
- if (fd != -1) {
- rv = write(fd, mypid.c_str(), mypid.size());
- close(fd);
- }
- if (rv < 1) {
- LOG(warning, "Failed to write pidfile '%s': %s",
- pidfile.c_str(), getLastErrorString().c_str());
- }
+void
+writePidFile(const vespalib::string& pidfile)
+{
+ ssize_t rv = -1;
+ vespalib::string mypid = vespalib::make_string("%d\n", getpid());
+ size_t lastSlash = pidfile.rfind('/');
+ if (lastSlash != vespalib::string::npos) {
+ std::filesystem::create_directories(std::filesystem::path(pidfile.substr(0, lastSlash)));
+ }
+ int fd = open(pidfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (fd != -1) {
+ rv = write(fd, mypid.c_str(), mypid.size());
+ close(fd);
}
+ if (rv < 1) {
+ LOG(warning, "Failed to write pidfile '%s': %s",
+ pidfile.c_str(), getLastErrorString().c_str());
+ }
+}
- void removePidFile(const vespalib::string& pidfile)
- {
- if (unlink(pidfile.c_str()) != 0) {
- LOG(warning, "Failed to delete pidfile '%s': %s",
- pidfile.c_str(), getLastErrorString().c_str());
- }
+void
+removePidFile(const vespalib::string& pidfile)
+{
+ if (unlink(pidfile.c_str()) != 0) {
+ LOG(warning, "Failed to delete pidfile '%s': %s",
+ pidfile.c_str(), getLastErrorString().c_str());
}
+}
} // End of anonymous namespace
@@ -201,7 +203,7 @@ StorageNode::initialize()
// have been created, such that we don't need to pay the extra cost of
// reinitializing metric manager often.
if ( ! _context.getComponentRegister().getMetricManager().isInitialized() ) {
- _context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool());
+ _context.getComponentRegister().getMetricManager().init(_configUri);
}
if (_chain) {
@@ -429,7 +431,8 @@ StorageNode::shutdown()
LOG(debug, "Done shutting down node");
}
-void StorageNode::configure(std::unique_ptr<StorServerConfig> config) {
+void
+StorageNode::configure(std::unique_ptr<StorServerConfig> config) {
log_config_received(*config);
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
@@ -445,7 +448,8 @@ void StorageNode::configure(std::unique_ptr<StorServerConfig> config) {
}
}
-void StorageNode::configure(std::unique_ptr<UpgradingConfig> config) {
+void
+StorageNode::configure(std::unique_ptr<UpgradingConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
@@ -457,7 +461,8 @@ void StorageNode::configure(std::unique_ptr<UpgradingConfig> config) {
}
}
-void StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) {
+void
+StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
@@ -486,7 +491,8 @@ StorageNode::configure(std::unique_ptr<document::config::DocumenttypesConfig> co
}
}
-void StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) {
+void
+StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h
index f07bdd37cd4..52709fb1d9b 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.h
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h
@@ -28,12 +28,6 @@ struct StorageNodeContext {
*/
ComponentRegister& getComponentRegister() { return *_componentRegister; }
- /**
- * There currently exist threads that doesn't use the component model.
- * Let the backend threadpool be accessible for now.
- */
- FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); }
-
~StorageNodeContext();
protected:
diff --git a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp
index 5e281152b2b..ad74e020a82 100644
--- a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp
+++ b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp
@@ -27,9 +27,14 @@ TlsStatisticsMetricsWrapper::TlsStatisticsMetricsWrapper(metrics::MetricSet* own
"connections broken due to failures during frame encoding or decoding", this),
failed_tls_config_reloads("failed-tls-config-reloads", {}, "Number of times "
"background reloading of TLS config has failed", this),
+ rpc_capability_checks_failed("rpc-capability-checks-failed", {},
+ "Number of RPC operations that failed to due one or more missing capabilities", this),
+ status_capability_checks_failed("status-capability-checks-failed", {},
+ "Number of status page operations that failed to due one or more missing capabilities", this),
last_client_stats_snapshot(),
last_server_stats_snapshot(),
- last_config_stats_snapshot()
+ last_config_stats_snapshot(),
+ last_capability_stats_snapshot()
{}
TlsStatisticsMetricsWrapper::~TlsStatisticsMetricsWrapper() = default;
@@ -60,9 +65,16 @@ void TlsStatisticsMetricsWrapper::update_metrics_with_snapshot_delta() {
failed_tls_config_reloads.set(config_delta.failed_config_reloads);
+ auto capability_current = vespalib::net::tls::CapabilityStatistics::get().snapshot();
+ auto capability_delta = capability_current.subtract(last_capability_stats_snapshot);
+
+ rpc_capability_checks_failed.set(capability_delta.rpc_capability_checks_failed);
+ status_capability_checks_failed.set(capability_delta.status_capability_checks_failed);
+
last_server_stats_snapshot = server_current;
last_client_stats_snapshot = client_current;
last_config_stats_snapshot = config_current;
+ last_capability_stats_snapshot = capability_current;
}
}
diff --git a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h
index 7bb51acd1fe..daf02b53b7a 100644
--- a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h
+++ b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h
@@ -29,9 +29,13 @@ class TlsStatisticsMetricsWrapper : public metrics::MetricSet {
metrics::LongCountMetric failed_tls_config_reloads;
+ metrics::LongCountMetric rpc_capability_checks_failed;
+ metrics::LongCountMetric status_capability_checks_failed;
+
vespalib::net::tls::ConnectionStatistics::Snapshot last_client_stats_snapshot;
vespalib::net::tls::ConnectionStatistics::Snapshot last_server_stats_snapshot;
vespalib::net::tls::ConfigStatistics::Snapshot last_config_stats_snapshot;
+ vespalib::net::tls::CapabilityStatistics::Snapshot last_capability_stats_snapshot;
public:
explicit TlsStatisticsMetricsWrapper(metrics::MetricSet* owner);
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 0c5ec08fb4c..02bb37db59f 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -30,7 +30,6 @@
#include <vespa/storageapi/message/internal.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/vespalib/util/document_runnable.h>
namespace config {
class ConfigUri;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 729b675df3a..f6204fed438 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -22,7 +22,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/metrics/metrictimer.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <atomic>
#include <deque>
diff --git a/storage/src/vespa/storageapi/mbusprot/serializationhelper.h b/storage/src/vespa/storageapi/mbusprot/serializationhelper.h
index 457a6178704..671ffbddd6f 100644
--- a/storage/src/vespa/storageapi/mbusprot/serializationhelper.h
+++ b/storage/src/vespa/storageapi/mbusprot/serializationhelper.h
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/fastos/types.h>
#include <vespa/document/base/globalid.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/util/bytebuffer.h>
@@ -13,12 +12,6 @@ namespace storage::mbusprot {
class SerializationHelper
{
public:
- static int64_t getLong(document::ByteBuffer& buf) {
- int64_t tmp;
- buf.getLongNetwork(tmp);
- return tmp;
- }
-
static int32_t getInt(document::ByteBuffer& buf) {
int32_t tmp;
buf.getIntNetwork(tmp);
@@ -46,26 +39,6 @@ public:
return s;
}
- static bool getBoolean(document::ByteBuffer& buf) {
- uint8_t tmp;
- buf.getByte(tmp);
- return (tmp == 1);
- }
-
- static api::ReturnCode getReturnCode(document::ByteBuffer& buf) {
- api::ReturnCode::Result result = (api::ReturnCode::Result) getInt(buf);
- vespalib::stringref message = getString(buf);
- return api::ReturnCode(result, message);
- }
-
- static void putReturnCode(const api::ReturnCode& code, vespalib::GrowableByteBuffer& buf)
- {
- buf.putInt(code.getResult());
- buf.putString(code.getMessage());
- }
-
- static const uint32_t BUCKET_INFO_SERIALIZED_SIZE = sizeof(uint32_t) * 3;
-
static document::GlobalId getGlobalId(document::ByteBuffer& buf) {
std::vector<char> buffer(getShort(buf));
for (uint32_t i=0; i<buffer.size(); ++i) {
@@ -74,13 +47,6 @@ public:
return document::GlobalId(&buffer[0]);
}
- static void putGlobalId(const document::GlobalId& gid, vespalib::GrowableByteBuffer& buf)
- {
- buf.putShort(document::GlobalId::LENGTH);
- for (uint32_t i=0; i<document::GlobalId::LENGTH; ++i) {
- buf.putByte(gid.get()[i]);
- }
- }
static document::Document::UP getDocument(document::ByteBuffer& buf, const document::DocumentTypeRepo& repo)
{
uint32_t size = getInt(buf);
diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
index d228dace1ed..1aede4d12e8 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
@@ -31,7 +31,6 @@ public:
virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; }
FakeClock& getClock() { return _clock; }
ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; }
- FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); }
};
}
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
index 925c9cda248..c1fa2aac708 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
@@ -5,6 +5,7 @@
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vespalib/util/atomic.h>
#include <vespa/vespalib/util/signalhandler.h>
+#include <cinttypes>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".framework.thread.impl");
@@ -28,11 +29,11 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
_tickDataPtr(0),
_interrupted(false),
_joined(false),
- _thread(*this),
+ _thread(),
_cpu_category(cpu_category)
{
_tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime();
- _thread.start(_pool.getThreadPool());
+ _thread = std::thread([this](){run();});
}
ThreadImpl::~ThreadImpl()
@@ -70,19 +71,21 @@ void
ThreadImpl::interrupt()
{
_interrupted.store(true, std::memory_order_relaxed);
- _thread.stop();
}
void
ThreadImpl::join()
{
- _thread.join();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
}
vespalib::string
ThreadImpl::get_live_thread_stack_trace() const
{
- return vespalib::SignalHandler::get_cross_thread_stack_trace(_thread.native_thread_id());
+ auto native_handle = const_cast<std::thread&>(_thread).native_handle();
+ return vespalib::SignalHandler::get_cross_thread_stack_trace(native_handle);
}
void
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index d95ba2a37ef..68ed63ea17c 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -4,7 +4,6 @@
#include <vespa/storageframework/generic/thread/thread.h>
#include <vespa/vespalib/util/cpu_usage.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <array>
#include <atomic>
#include <optional>
@@ -15,12 +14,6 @@ struct ThreadPoolImpl;
class ThreadImpl final : public Thread
{
- struct BackendThread : public document::Runnable {
- ThreadImpl& _impl;
- explicit BackendThread(ThreadImpl& impl) : _impl(impl) {}
- void run() override { _impl.run(); }
- };
-
/**
* Internal data race free implementation of tick data that maps to and
* from ThreadTickData. We hide the atomicity of this since atomic vars
@@ -52,7 +45,7 @@ class ThreadImpl final : public Thread
std::atomic<uint32_t> _tickDataPtr;
std::atomic<bool> _interrupted;
bool _joined;
- BackendThread _thread;
+ std::thread _thread;
std::optional<vespalib::CpuUsage::Category> _cpu_category;
void run();
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index 95959d06b54..068de8f5880 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -15,8 +15,7 @@ using vespalib::IllegalStateException;
namespace storage::framework::defaultimplementation {
ThreadPoolImpl::ThreadPoolImpl(Clock& clock)
- : _backendThreadPool(std::make_unique<FastOS_ThreadPool>()),
- _clock(clock),
+ : _clock(clock),
_stopping(false)
{ }
@@ -44,7 +43,6 @@ ThreadPoolImpl::~ThreadPoolImpl()
}
std::this_thread::sleep_for(10ms);
}
- _backendThreadPool->Close();
}
Thread::UP
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index b788a3eed78..4319b4a0efe 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -4,8 +4,6 @@
#include <vespa/storageframework/generic/thread/threadpool.h>
-class FastOS_ThreadPool;
-
namespace storage::framework {
struct Clock;
}
@@ -15,7 +13,6 @@ class ThreadImpl;
struct ThreadPoolImpl final : public ThreadPool
{
- std::unique_ptr<FastOS_ThreadPool> _backendThreadPool;
std::vector<ThreadImpl*> _threads;
mutable std::mutex _threadVectorLock;
Clock & _clock;
@@ -30,7 +27,6 @@ public:
std::optional<vespalib::CpuUsage::Category> cpu_category) override;
void visitThreads(ThreadVisitor&) const override;
void unregisterThread(ThreadImpl&);
- FastOS_ThreadPool& getThreadPool() { return *_backendThreadPool; }
Clock& getClock() { return _clock; }
};
diff --git a/storage/src/vespa/storageframework/generic/component/component.h b/storage/src/vespa/storageframework/generic/component/component.h
index 9a5e524e504..47469cce05d 100644
--- a/storage/src/vespa/storageframework/generic/component/component.h
+++ b/storage/src/vespa/storageframework/generic/component/component.h
@@ -45,12 +45,12 @@
* optimize clock fetching as we see fit later.
*
* - A thread pool is given. This makes us able to use a thread pool.
- * (Allthough currently we don't really need a thread pool, as threads
- * typically live for the whole lifetime of the server. But currently we are
- * forced to use a thread pool due to fastos.) Another feature of this is
- * that the thread interface has built in information needed to detect
- * deadlocks and report status about thread behavior, such that deadlock
- * detecting and thread status can be shown without the threads themselves
+ * (Allthough currently we don't really need a thread pool, as
+ * threads typically live for the whole lifetime of the
+ * server. Another feature of this is that the thread interface has
+ * built in information needed to detect deadlocks and report
+ * status about thread behavior, such that deadlock detecting and
+ * thread status can be shown without the threads themselves
* depending on how this is done.
*
* - A memory manager may also be provided, allowing components to request