aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-07-04 14:33:24 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-07-05 14:17:07 +0000
commit9b35cbe66cfeadcd42d233f171a19c1e7edb593d (patch)
tree8b8a1f933d9c8a9b6c33fea4df57423fca809104 /vespalib/src
parent61c0690fc5538257c6729f98e3695fa47a586437 (diff)
async resolver
Diffstat (limited to 'vespalib/src')
-rw-r--r--vespalib/src/tests/net/async_resolver/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/net/async_resolver/async_resolver_test.cpp288
-rw-r--r--vespalib/src/vespa/vespalib/net/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.cpp166
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.h130
5 files changed, 593 insertions, 0 deletions
diff --git a/vespalib/src/tests/net/async_resolver/CMakeLists.txt b/vespalib/src/tests/net/async_resolver/CMakeLists.txt
new file mode 100644
index 00000000000..8023bd3055a
--- /dev/null
+++ b/vespalib/src/tests/net/async_resolver/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_async_resolver_test_app TEST
+ SOURCES
+ async_resolver_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_async_resolver_test_app COMMAND vespalib_async_resolver_test_app)
diff --git a/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp b/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp
new file mode 100644
index 00000000000..bcff1f15a36
--- /dev/null
+++ b/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp
@@ -0,0 +1,288 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/net/async_resolver.h>
+#include <vespa/vespalib/net/socket_spec.h>
+#include <vespa/vespalib/util/sync.h>
+#include <atomic>
+
+using namespace vespalib;
+
+struct ResultSetter : public AsyncResolver::ResultHandler {
+ SocketAddress &addr;
+ std::atomic<bool> done;
+ ResultSetter(SocketAddress &addr_out) : addr(addr_out), done(false) {}
+ void handle_result(SocketAddress result) override {
+ addr = result;
+ done = true;
+ }
+};
+
+struct MyClock : public AsyncResolver::Clock {
+ using time_point = AsyncResolver::time_point;
+ using seconds = AsyncResolver::seconds;
+ time_point my_now;
+ void set_now(seconds t) {
+ my_now = time_point(std::chrono::duration_cast<time_point::duration>(t));
+ }
+ AsyncResolver::time_point now() override { return my_now; }
+};
+
+struct BlockingHostResolver : public AsyncResolver::HostResolver {
+ CountDownLatch callers;
+ Gate barrier;
+ BlockingHostResolver(size_t num_callers)
+ : callers(num_callers), barrier() {}
+ vespalib::string ip_address(const vespalib::string &) override {
+ callers.countDown();
+ barrier.await();
+ return "127.0.0.7";
+ }
+ void wait_for_callers() { callers.await(); }
+ void release_callers() { barrier.countDown(); }
+};
+
+struct MyHostResolver : public AsyncResolver::HostResolver {
+ std::mutex ip_lock;
+ std::map<vespalib::string,vespalib::string> ip_map;
+ std::map<vespalib::string, size_t> ip_cnt;
+ MyHostResolver() : ip_lock(), ip_map(), ip_cnt() {}
+ vespalib::string ip_address(const vespalib::string &host) override {
+ std::lock_guard<std::mutex> guard(ip_lock);
+ ++ip_cnt[host];
+ return ip_map[host];
+ }
+ void set_ip_addr(const vespalib::string &host, const vespalib::string &ip_addr) {
+ std::lock_guard<std::mutex> guard(ip_lock);
+ ip_map[host] = ip_addr;
+ }
+ size_t get_cnt(const vespalib::string &host) {
+ std::lock_guard<std::mutex> guard(ip_lock);
+ return ip_cnt[host];
+ }
+ size_t get_total_cnt() {
+ size_t total = 0;
+ std::lock_guard<std::mutex> guard(ip_lock);
+ for (const auto &entry: ip_cnt) {
+ total += entry.second;
+ }
+ return total;
+ }
+};
+
+struct ResolveFixture {
+ std::shared_ptr<MyClock> clock;
+ std::shared_ptr<MyHostResolver> host_resolver;
+ AsyncResolver::SP async_resolver;
+ void set_ip_addr(const vespalib::string &host, const vespalib::string &ip) {
+ host_resolver->set_ip_addr(host, ip);
+ }
+ size_t get_cnt(const vespalib::string &host) { return host_resolver->get_cnt(host); }
+ size_t get_total_cnt() { return host_resolver->get_total_cnt(); }
+ void set_now(double s) { clock->set_now(MyClock::seconds(s)); }
+ ResolveFixture() : clock(new MyClock()), host_resolver(new MyHostResolver()), async_resolver() {
+ AsyncResolver::Params params;
+ params.clock = clock;
+ params.resolver = host_resolver;
+ params.max_result_age = AsyncResolver::seconds(60.0);
+ params.max_resolve_time = AsyncResolver::seconds(1.0);
+ params.num_threads = 4;
+ async_resolver = AsyncResolver::create(params);
+ set_ip_addr("localhost", "127.0.0.1");
+ set_ip_addr("127.0.0.1", "127.0.0.1");
+ set_ip_addr("a", "127.0.1.1");
+ set_ip_addr("b", "127.0.2.1");
+ set_ip_addr("c", "127.0.3.1");
+ set_ip_addr("d", "127.0.4.1");
+ set_ip_addr("e", "127.0.5.1");
+ }
+ vespalib::string resolve(const vespalib::string &spec) {
+ SocketAddress result;
+ auto handler = std::make_shared<ResultSetter>(result);
+ async_resolver->resolve_async(spec, handler);
+ async_resolver->wait_for_pending_resolves();
+ ASSERT_TRUE(handler->done);
+ return result.spec();
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+TEST("require that async resolver internal duration type is appropriate") {
+ AsyncResolver::seconds my_secs = std::chrono::milliseconds(500);
+ EXPECT_EQUAL(my_secs.count(), 0.5);
+}
+
+TEST("require that default async resolver is tuned as expected") {
+ AsyncResolver::Params params;
+ EXPECT_EQUAL(params.max_result_age.count(), 60.0);
+ EXPECT_EQUAL(params.max_resolve_time.count(), 1.0);
+ EXPECT_EQUAL(params.num_threads, 4u);
+}
+
+TEST("require that shared async resolver is shared") {
+ auto resolver1 = AsyncResolver::get_shared();
+ auto resolver2 = AsyncResolver::get_shared();
+ EXPECT_TRUE(resolver1.get() != nullptr);
+ EXPECT_TRUE(resolver2.get() != nullptr);
+ EXPECT_TRUE(resolver1.get() == resolver2.get());
+}
+
+TEST("require that shared async resolver can resolve connect spec") {
+ vespalib::string spec("tcp/localhost:123");
+ SocketAddress result;
+ auto resolver = AsyncResolver::get_shared();
+ auto handler = std::make_shared<ResultSetter>(result);
+ resolver->resolve_async(spec, handler);
+ resolver->wait_for_pending_resolves();
+ vespalib::string resolved = result.spec();
+ fprintf(stderr, "resolver(spec:%s) -> '%s'\n", spec.c_str(), resolved.c_str());
+ EXPECT_TRUE(handler->done);
+ EXPECT_NOT_EQUAL(resolved, spec);
+ EXPECT_EQUAL(resolved, SocketSpec(spec).client_address().spec());
+ EXPECT_EQUAL(resolved, SocketAddress::select_remote(123, "localhost").spec());
+}
+
+TEST("require that steady clock is steady clock") {
+ AsyncResolver::SteadyClock clock;
+ auto past = std::chrono::steady_clock::now();
+ for (size_t i = 0; i < 10; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ auto now = ((i % 2) == 0) ? clock.now() : std::chrono::steady_clock::now();
+ EXPECT_GREATER_EQUAL(now.time_since_epoch().count(), past.time_since_epoch().count());
+ past = now;
+ }
+}
+
+TEST("require that simple host resolver can resolve host name") {
+ vespalib::string host_name("localhost");
+ AsyncResolver::SimpleHostResolver resolver;
+ auto resolved = resolver.ip_address(host_name);
+ fprintf(stderr, "resolver(host_name:%s) -> '%s'\n", host_name.c_str(), resolved.c_str());
+ EXPECT_NOT_EQUAL(resolved, host_name);
+ EXPECT_EQUAL(resolved, SocketSpec("tcp/localhost:123").client_address().ip_address());
+ EXPECT_EQUAL(resolved, SocketAddress::select_remote(123, "localhost").ip_address());
+}
+
+TEST_F("require that alternative host name resolution works", ResolveFixture()) {
+ f1.set_ip_addr("host_name", "127.0.0.7");
+ EXPECT_EQUAL(f1.resolve("tcp/host_name:123"), "tcp/127.0.0.7:123");
+}
+
+TEST_F("require that async resolver can be used to resolve connect specs without host names", ResolveFixture()) {
+ EXPECT_EQUAL(f1.resolve("this is bogus"), "invalid");
+ EXPECT_EQUAL(f1.resolve("tcp/123"), SocketSpec("tcp/123").client_address().spec());
+ EXPECT_EQUAL(f1.resolve("ipc/file:my_socket"), "ipc/file:my_socket");
+ EXPECT_EQUAL(f1.resolve("ipc/name:my_socket"), "ipc/name:my_socket");
+ EXPECT_EQUAL(f1.get_total_cnt(), 0u);
+}
+
+TEST_F("require that resolved hosts are cached", ResolveFixture()) {
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123");
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:456"), "tcp/127.0.0.1:456");
+ EXPECT_EQUAL(f1.get_cnt("localhost"), 1u);
+ EXPECT_EQUAL(f1.get_total_cnt(), 1u);
+}
+
+TEST_F("require that host names resolving to themselves (ip addresses) are also cached", ResolveFixture()) {
+ EXPECT_EQUAL(f1.resolve("tcp/127.0.0.1:123"), "tcp/127.0.0.1:123");
+ EXPECT_EQUAL(f1.resolve("tcp/127.0.0.1:456"), "tcp/127.0.0.1:456");
+ EXPECT_EQUAL(f1.get_cnt("127.0.0.1"), 1u);
+ EXPECT_EQUAL(f1.get_total_cnt(), 1u);
+}
+
+TEST_F("require that cached results expire at the right time", ResolveFixture()) {
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123");
+ f1.set_ip_addr("localhost", "127.0.0.2");
+ f1.set_now(59.5);
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123");
+ f1.set_now(60.0);
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.2:123");
+ EXPECT_EQUAL(f1.get_cnt("localhost"), 2u);
+ EXPECT_EQUAL(f1.get_total_cnt(), 2u);
+}
+
+TEST_F("require that missing ip address gives invalid address", ResolveFixture()) {
+ f1.set_ip_addr("localhost", "");
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid");
+ EXPECT_EQUAL(f1.get_cnt("localhost"), 1u);
+ EXPECT_EQUAL(f1.get_total_cnt(), 1u);
+}
+
+TEST_F("require that empty lookup results are cached", ResolveFixture()) {
+ f1.set_ip_addr("localhost", "");
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid");
+ f1.set_ip_addr("localhost", "127.0.0.1");
+ f1.set_now(59.5);
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid");
+ f1.set_now(60.0);
+ EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123");
+ EXPECT_EQUAL(f1.get_cnt("localhost"), 2u);
+ EXPECT_EQUAL(f1.get_total_cnt(), 2u);
+}
+
+TEST_F("require that multiple cache entries can be evicted at the same time", ResolveFixture()) {
+ EXPECT_EQUAL(f1.resolve("tcp/a:123"), "tcp/127.0.1.1:123");
+ f1.set_now(10.0);
+ EXPECT_EQUAL(f1.resolve("tcp/b:123"), "tcp/127.0.2.1:123");
+ f1.set_now(20.0);
+ EXPECT_EQUAL(f1.resolve("tcp/c:123"), "tcp/127.0.3.1:123");
+ f1.set_now(30.0);
+ EXPECT_EQUAL(f1.resolve("tcp/d:123"), "tcp/127.0.4.1:123");
+ f1.set_now(40.0);
+ EXPECT_EQUAL(f1.resolve("tcp/e:123"), "tcp/127.0.5.1:123");
+ EXPECT_EQUAL(f1.get_total_cnt(), 5u);
+ f1.set_now(85.0); // c too old, d still good
+ EXPECT_EQUAL(f1.resolve("tcp/c:123"), "tcp/127.0.3.1:123");
+ EXPECT_EQUAL(f1.get_total_cnt(), 6u);
+ EXPECT_EQUAL(f1.resolve("tcp/d:123"), "tcp/127.0.4.1:123");
+ EXPECT_EQUAL(f1.get_total_cnt(), 6u);
+}
+
+TEST_F("require that slow host lookups trigger warning (manual log inspection)", TimeBomb(60)) {
+ auto my_clock = std::make_shared<MyClock>();
+ auto host_resolver = std::make_shared<BlockingHostResolver>(1);
+ AsyncResolver::Params params;
+ params.clock = my_clock;
+ params.resolver = host_resolver;
+ params.max_resolve_time = AsyncResolver::seconds(1.0);
+ auto resolver = AsyncResolver::create(params);
+ SocketAddress result;
+ auto handler = std::make_shared<ResultSetter>(result);
+ resolver->resolve_async("tcp/some_host:123", handler);
+ host_resolver->wait_for_callers();
+ my_clock->set_now(MyClock::seconds(1.0));
+ EXPECT_TRUE(!handler->done);
+ host_resolver->release_callers();
+ resolver->wait_for_pending_resolves();
+ EXPECT_TRUE(handler->done);
+ EXPECT_EQUAL(result.spec(), "tcp/127.0.0.7:123");
+}
+
+TEST_F("require that discarding result handlers will avoid pending work (but complete started work)", TimeBomb(60)) {
+ auto host_resolver = std::make_shared<BlockingHostResolver>(2);
+ AsyncResolver::Params params;
+ params.resolver = host_resolver;
+ params.num_threads = 2;
+ auto resolver = AsyncResolver::create(params);
+ SocketAddress result1;
+ SocketAddress result2;
+ SocketAddress result3;
+ auto handler1 = std::make_shared<ResultSetter>(result1);
+ auto handler2 = std::make_shared<ResultSetter>(result2);
+ auto handler3 = std::make_shared<ResultSetter>(result3);
+ resolver->resolve_async("tcp/x:123", handler1);
+ resolver->resolve_async("tcp/y:123", handler2);
+ resolver->resolve_async("tcp/z:123", handler3);
+ host_resolver->wait_for_callers();
+ handler1.reset();
+ handler2.reset();
+ handler3.reset();
+ host_resolver->release_callers();
+ resolver->wait_for_pending_resolves();
+ EXPECT_EQUAL(result1.spec(), "tcp/127.0.0.7:123");
+ EXPECT_EQUAL(result2.spec(), "tcp/127.0.0.7:123");
+ EXPECT_EQUAL(result3.spec(), "invalid");
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
index 26c5c867631..42023cb73b0 100644
--- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_net OBJECT
SOURCES
+ async_resolver.cpp
lazy_resolver.cpp
selector.cpp
server_socket.cpp
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
new file mode 100644
index 00000000000..c50fd722c45
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
@@ -0,0 +1,166 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "async_resolver.h"
+#include "socket_spec.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".vespalib.net.async_resolver");
+
+namespace vespalib {
+
+VESPA_THREAD_STACK_TAG(async_resolver_executor_thread);
+
+//-----------------------------------------------------------------------------
+
+AsyncResolver::time_point
+AsyncResolver::SteadyClock::now()
+{
+ return std::chrono::steady_clock::now();
+}
+
+//-----------------------------------------------------------------------------
+
+vespalib::string
+AsyncResolver::SimpleHostResolver::ip_address(const vespalib::string &host_name)
+{
+ return SocketAddress::select_remote(80, host_name.c_str()).ip_address();
+}
+
+//-----------------------------------------------------------------------------
+
+AsyncResolver::Params::Params()
+ : clock(std::make_shared<SteadyClock>()),
+ resolver(std::make_shared<SimpleHostResolver>()),
+ max_result_age(60.0),
+ max_resolve_time(1.0),
+ num_threads(4)
+{
+}
+
+//-----------------------------------------------------------------------------
+
+vespalib::string
+AsyncResolver::LoggingHostResolver::ip_address(const vespalib::string &host_name)
+{
+ auto before = _clock->now();
+ vespalib::string ip_address = _resolver->ip_address(host_name);
+ seconds resolve_time = (_clock->now() - before);
+ if (resolve_time >= _max_resolve_time) {
+ LOG(warning, "slow resolve time: '%s' -> '%s' (%g s)",
+ host_name.c_str(), ip_address.c_str(), resolve_time.count());
+ }
+ if (ip_address.empty()) {
+ LOG(warning, "could not resolve host name: '%s'", host_name.c_str());
+ }
+ return ip_address;
+}
+
+//-----------------------------------------------------------------------------
+
+bool
+AsyncResolver::CachingHostResolver::lookup(const vespalib::string &host_name, vespalib::string &ip_address)
+{
+ auto now = _clock->now();
+ std::lock_guard<std::mutex> guard(_lock);
+ while (!_queue.empty() && (_queue.front()->second.end_time <= now)) {
+ _map.erase(_queue.front());
+ _queue.pop();
+ }
+ auto pos = _map.find(host_name);
+ if (pos != _map.end()) {
+ ip_address = pos->second.ip_address;
+ return true;
+ }
+ return false;
+}
+
+void
+AsyncResolver::CachingHostResolver::store(const vespalib::string &host_name, const vespalib::string &ip_address)
+{
+ auto end_time = _clock->now() + std::chrono::duration_cast<time_point::duration>(_max_result_age);
+ std::lock_guard<std::mutex> guard(_lock);
+ auto res = _map.emplace(host_name, Entry(ip_address, end_time));
+ if (res.second) {
+ _queue.push(res.first);
+ }
+}
+
+AsyncResolver::CachingHostResolver::CachingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_result_age)
+ : _clock(std::move(clock)),
+ _resolver(std::move(resolver)),
+ _max_result_age(max_result_age),
+ _lock(),
+ _map(),
+ _queue()
+{
+}
+
+vespalib::string
+AsyncResolver::CachingHostResolver::ip_address(const vespalib::string &host_name)
+{
+ vespalib::string ip_address;
+ if (lookup(host_name, ip_address)) {
+ return ip_address;
+ }
+ ip_address = _resolver->ip_address(host_name);
+ store(host_name, ip_address);
+ return ip_address;
+}
+
+//-----------------------------------------------------------------------------
+
+void
+AsyncResolver::ResolveTask::run()
+{
+ if (ResultHandler::SP handler = weak_handler.lock()) {
+ SocketSpec socket_spec(spec);
+ if (!socket_spec.valid()) {
+ LOG(warning, "invalid socket spec: '%s'", spec.c_str());
+ }
+ if (!socket_spec.host().empty()) {
+ socket_spec = socket_spec.replace_host(resolver.ip_address(socket_spec.host()));
+ }
+ handler->handle_result(socket_spec.client_address());
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+std::mutex AsyncResolver::_shared_lock;
+AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr);
+
+AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads)
+ : _resolver(std::move(resolver)),
+ _executor(num_threads, 128*1024, async_resolver_executor_thread)
+{
+}
+
+void
+AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler)
+{
+ auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler));
+ auto rejected = _executor.execute(std::move(task));
+ assert(!rejected);
+}
+
+AsyncResolver::SP
+AsyncResolver::create(Params params)
+{
+ auto logger = std::make_shared<LoggingHostResolver>(params.clock, std::move(params.resolver), params.max_resolve_time);
+ auto cacher = std::make_shared<CachingHostResolver>(std::move(params.clock), std::move(logger), params.max_result_age);
+ return SP(new AsyncResolver(std::move(cacher), params.num_threads));
+}
+
+AsyncResolver::SP
+AsyncResolver::get_shared()
+{
+ std::lock_guard<std::mutex> guard(_shared_lock);
+ if (!_shared_resolver) {
+ _shared_resolver = create(Params());
+ }
+ return _shared_resolver;
+}
+
+//-----------------------------------------------------------------------------
+
+} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h
new file mode 100644
index 00000000000..8323881ee7e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.h
@@ -0,0 +1,130 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "socket_address.h"
+#include "socket_spec.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/arrayqueue.hpp>
+#include <chrono>
+#include <memory>
+#include <mutex>
+#include <map>
+
+namespace vespalib {
+
+/**
+ * Component used to perform asynchronous resolving of connect
+ * specs. Internal worker threads are used to perform synchronous
+ * resolving with caching. Results are delivered to a result handler
+ * that is tracked using a weak pointer while the operation is
+ * pending. This enables us to skip resolving specs that are no longer
+ * needed by the client. Use the get_shared function to obtain a
+ * shared default-constructed instance. It will be created on the
+ * first call and cleaned up on program exit.
+ **/
+class AsyncResolver
+{
+public:
+ using SP = std::shared_ptr<AsyncResolver>;
+ using time_point = std::chrono::steady_clock::time_point;
+ using seconds = std::chrono::duration<double>;
+
+ struct ResultHandler {
+ virtual void handle_result(SocketAddress addr) = 0;
+ virtual ~ResultHandler() {}
+ using SP = std::shared_ptr<ResultHandler>;
+ using WP = std::weak_ptr<ResultHandler>;
+ };
+
+ struct Clock {
+ virtual time_point now() = 0;
+ virtual ~Clock() {}
+ using SP = std::shared_ptr<Clock>;
+ };
+
+ struct HostResolver {
+ virtual vespalib::string ip_address(const vespalib::string &host_name) = 0;
+ virtual ~HostResolver() {}
+ using SP = std::shared_ptr<HostResolver>;
+ };
+
+ struct SteadyClock : public Clock {
+ time_point now() override;
+ };
+
+ struct SimpleHostResolver : public HostResolver {
+ vespalib::string ip_address(const vespalib::string &host_name) override;
+ };
+
+ struct Params {
+ Clock::SP clock;
+ HostResolver::SP resolver;
+ seconds max_result_age;
+ seconds max_resolve_time;
+ size_t num_threads;
+ Params();
+ ~Params() {}
+ };
+
+private:
+ class LoggingHostResolver : public HostResolver {
+ private:
+ Clock::SP _clock;
+ HostResolver::SP _resolver;
+ seconds _max_resolve_time;
+ public:
+ LoggingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_resolve_time)
+ : _clock(std::move(clock)), _resolver(std::move(resolver)), _max_resolve_time(max_resolve_time) {}
+ vespalib::string ip_address(const vespalib::string &host_name) override;
+ };
+
+ class CachingHostResolver : public HostResolver {
+ private:
+ struct Entry {
+ vespalib::string ip_address;
+ time_point end_time;
+ Entry(const vespalib::string &ip, time_point end)
+ : ip_address(ip), end_time(end) {}
+ };
+ using Map = std::map<vespalib::string,Entry>;
+ using Itr = Map::iterator;
+ Clock::SP _clock;
+ HostResolver::SP _resolver;
+ seconds _max_result_age;
+ std::mutex _lock;
+ Map _map;
+ ArrayQueue<Itr> _queue;
+
+ bool lookup(const vespalib::string &host_name, vespalib::string &ip_address);
+ void resolve(const vespalib::string &host_name, vespalib::string &ip_address);
+ void store(const vespalib::string &host_name, const vespalib::string &ip_address);
+
+ public:
+ CachingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_result_age);
+ vespalib::string ip_address(const vespalib::string &host_name) override;
+ };
+
+ struct ResolveTask : public Executor::Task {
+ vespalib::string spec;
+ HostResolver &resolver;
+ ResultHandler::WP weak_handler;
+ ResolveTask(const vespalib::string &spec_in, HostResolver &resolver_in, ResultHandler::WP weak_handler_in)
+ : spec(spec_in), resolver(resolver_in), weak_handler(std::move(weak_handler_in)) {}
+ void run() override;
+ };
+
+ HostResolver::SP _resolver;
+ ThreadStackExecutor _executor;
+ static std::mutex _shared_lock;
+ static AsyncResolver::SP _shared_resolver;
+
+ AsyncResolver(HostResolver::SP resolver, size_t num_threads);
+public:
+ void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler);
+ void wait_for_pending_resolves() { _executor.sync(); }
+ static AsyncResolver::SP create(Params params);
+ static AsyncResolver::SP get_shared();
+};
+
+} // namespace vespalib