diff options
author | Haavard <havardpe@yahoo-inc.com> | 2017-07-04 14:33:24 +0000 |
---|---|---|
committer | Haavard <havardpe@yahoo-inc.com> | 2017-07-05 14:17:07 +0000 |
commit | 9b35cbe66cfeadcd42d233f171a19c1e7edb593d (patch) | |
tree | 8b8a1f933d9c8a9b6c33fea4df57423fca809104 /vespalib/src | |
parent | 61c0690fc5538257c6729f98e3695fa47a586437 (diff) |
async resolver
Diffstat (limited to 'vespalib/src')
-rw-r--r-- | vespalib/src/tests/net/async_resolver/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vespalib/src/tests/net/async_resolver/async_resolver_test.cpp | 288 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.cpp | 166 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.h | 130 |
5 files changed, 593 insertions, 0 deletions
diff --git a/vespalib/src/tests/net/async_resolver/CMakeLists.txt b/vespalib/src/tests/net/async_resolver/CMakeLists.txt new file mode 100644 index 00000000000..8023bd3055a --- /dev/null +++ b/vespalib/src/tests/net/async_resolver/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_async_resolver_test_app TEST + SOURCES + async_resolver_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_async_resolver_test_app COMMAND vespalib_async_resolver_test_app) diff --git a/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp b/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp new file mode 100644 index 00000000000..bcff1f15a36 --- /dev/null +++ b/vespalib/src/tests/net/async_resolver/async_resolver_test.cpp @@ -0,0 +1,288 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/net/async_resolver.h> +#include <vespa/vespalib/net/socket_spec.h> +#include <vespa/vespalib/util/sync.h> +#include <atomic> + +using namespace vespalib; + +struct ResultSetter : public AsyncResolver::ResultHandler { + SocketAddress &addr; + std::atomic<bool> done; + ResultSetter(SocketAddress &addr_out) : addr(addr_out), done(false) {} + void handle_result(SocketAddress result) override { + addr = result; + done = true; + } +}; + +struct MyClock : public AsyncResolver::Clock { + using time_point = AsyncResolver::time_point; + using seconds = AsyncResolver::seconds; + time_point my_now; + void set_now(seconds t) { + my_now = time_point(std::chrono::duration_cast<time_point::duration>(t)); + } + AsyncResolver::time_point now() override { return my_now; } +}; + +struct BlockingHostResolver : public AsyncResolver::HostResolver { + CountDownLatch callers; + Gate barrier; + BlockingHostResolver(size_t num_callers) + : callers(num_callers), barrier() {} + vespalib::string ip_address(const vespalib::string &) override { + callers.countDown(); + barrier.await(); + return "127.0.0.7"; + } + void wait_for_callers() { callers.await(); } + void release_callers() { barrier.countDown(); } +}; + +struct MyHostResolver : public AsyncResolver::HostResolver { + std::mutex ip_lock; + std::map<vespalib::string,vespalib::string> ip_map; + std::map<vespalib::string, size_t> ip_cnt; + MyHostResolver() : ip_lock(), ip_map(), ip_cnt() {} + vespalib::string ip_address(const vespalib::string &host) override { + std::lock_guard<std::mutex> guard(ip_lock); + ++ip_cnt[host]; + return ip_map[host]; + } + void set_ip_addr(const vespalib::string &host, const vespalib::string &ip_addr) { + std::lock_guard<std::mutex> guard(ip_lock); + ip_map[host] = ip_addr; + } + size_t get_cnt(const vespalib::string &host) { + std::lock_guard<std::mutex> guard(ip_lock); + return ip_cnt[host]; + } + size_t get_total_cnt() { + size_t total = 0; + std::lock_guard<std::mutex> guard(ip_lock); + for (const auto &entry: ip_cnt) { + total += entry.second; + } + return total; + } +}; + +struct ResolveFixture { + std::shared_ptr<MyClock> clock; + std::shared_ptr<MyHostResolver> host_resolver; + AsyncResolver::SP async_resolver; + void set_ip_addr(const vespalib::string &host, const vespalib::string &ip) { + host_resolver->set_ip_addr(host, ip); + } + size_t get_cnt(const vespalib::string &host) { return host_resolver->get_cnt(host); } + size_t get_total_cnt() { return host_resolver->get_total_cnt(); } + void set_now(double s) { clock->set_now(MyClock::seconds(s)); } + ResolveFixture() : clock(new MyClock()), host_resolver(new MyHostResolver()), async_resolver() { + AsyncResolver::Params params; + params.clock = clock; + params.resolver = host_resolver; + params.max_result_age = AsyncResolver::seconds(60.0); + params.max_resolve_time = AsyncResolver::seconds(1.0); + params.num_threads = 4; + async_resolver = AsyncResolver::create(params); + set_ip_addr("localhost", "127.0.0.1"); + set_ip_addr("127.0.0.1", "127.0.0.1"); + set_ip_addr("a", "127.0.1.1"); + set_ip_addr("b", "127.0.2.1"); + set_ip_addr("c", "127.0.3.1"); + set_ip_addr("d", "127.0.4.1"); + set_ip_addr("e", "127.0.5.1"); + } + vespalib::string resolve(const vespalib::string &spec) { + SocketAddress result; + auto handler = std::make_shared<ResultSetter>(result); + async_resolver->resolve_async(spec, handler); + async_resolver->wait_for_pending_resolves(); + ASSERT_TRUE(handler->done); + return result.spec(); + } +}; + +//----------------------------------------------------------------------------- + +TEST("require that async resolver internal duration type is appropriate") { + AsyncResolver::seconds my_secs = std::chrono::milliseconds(500); + EXPECT_EQUAL(my_secs.count(), 0.5); +} + +TEST("require that default async resolver is tuned as expected") { + AsyncResolver::Params params; + EXPECT_EQUAL(params.max_result_age.count(), 60.0); + EXPECT_EQUAL(params.max_resolve_time.count(), 1.0); + EXPECT_EQUAL(params.num_threads, 4u); +} + +TEST("require that shared async resolver is shared") { + auto resolver1 = AsyncResolver::get_shared(); + auto resolver2 = AsyncResolver::get_shared(); + EXPECT_TRUE(resolver1.get() != nullptr); + EXPECT_TRUE(resolver2.get() != nullptr); + EXPECT_TRUE(resolver1.get() == resolver2.get()); +} + +TEST("require that shared async resolver can resolve connect spec") { + vespalib::string spec("tcp/localhost:123"); + SocketAddress result; + auto resolver = AsyncResolver::get_shared(); + auto handler = std::make_shared<ResultSetter>(result); + resolver->resolve_async(spec, handler); + resolver->wait_for_pending_resolves(); + vespalib::string resolved = result.spec(); + fprintf(stderr, "resolver(spec:%s) -> '%s'\n", spec.c_str(), resolved.c_str()); + EXPECT_TRUE(handler->done); + EXPECT_NOT_EQUAL(resolved, spec); + EXPECT_EQUAL(resolved, SocketSpec(spec).client_address().spec()); + EXPECT_EQUAL(resolved, SocketAddress::select_remote(123, "localhost").spec()); +} + +TEST("require that steady clock is steady clock") { + AsyncResolver::SteadyClock clock; + auto past = std::chrono::steady_clock::now(); + for (size_t i = 0; i < 10; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + auto now = ((i % 2) == 0) ? clock.now() : std::chrono::steady_clock::now(); + EXPECT_GREATER_EQUAL(now.time_since_epoch().count(), past.time_since_epoch().count()); + past = now; + } +} + +TEST("require that simple host resolver can resolve host name") { + vespalib::string host_name("localhost"); + AsyncResolver::SimpleHostResolver resolver; + auto resolved = resolver.ip_address(host_name); + fprintf(stderr, "resolver(host_name:%s) -> '%s'\n", host_name.c_str(), resolved.c_str()); + EXPECT_NOT_EQUAL(resolved, host_name); + EXPECT_EQUAL(resolved, SocketSpec("tcp/localhost:123").client_address().ip_address()); + EXPECT_EQUAL(resolved, SocketAddress::select_remote(123, "localhost").ip_address()); +} + +TEST_F("require that alternative host name resolution works", ResolveFixture()) { + f1.set_ip_addr("host_name", "127.0.0.7"); + EXPECT_EQUAL(f1.resolve("tcp/host_name:123"), "tcp/127.0.0.7:123"); +} + +TEST_F("require that async resolver can be used to resolve connect specs without host names", ResolveFixture()) { + EXPECT_EQUAL(f1.resolve("this is bogus"), "invalid"); + EXPECT_EQUAL(f1.resolve("tcp/123"), SocketSpec("tcp/123").client_address().spec()); + EXPECT_EQUAL(f1.resolve("ipc/file:my_socket"), "ipc/file:my_socket"); + EXPECT_EQUAL(f1.resolve("ipc/name:my_socket"), "ipc/name:my_socket"); + EXPECT_EQUAL(f1.get_total_cnt(), 0u); +} + +TEST_F("require that resolved hosts are cached", ResolveFixture()) { + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123"); + EXPECT_EQUAL(f1.resolve("tcp/localhost:456"), "tcp/127.0.0.1:456"); + EXPECT_EQUAL(f1.get_cnt("localhost"), 1u); + EXPECT_EQUAL(f1.get_total_cnt(), 1u); +} + +TEST_F("require that host names resolving to themselves (ip addresses) are also cached", ResolveFixture()) { + EXPECT_EQUAL(f1.resolve("tcp/127.0.0.1:123"), "tcp/127.0.0.1:123"); + EXPECT_EQUAL(f1.resolve("tcp/127.0.0.1:456"), "tcp/127.0.0.1:456"); + EXPECT_EQUAL(f1.get_cnt("127.0.0.1"), 1u); + EXPECT_EQUAL(f1.get_total_cnt(), 1u); +} + +TEST_F("require that cached results expire at the right time", ResolveFixture()) { + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123"); + f1.set_ip_addr("localhost", "127.0.0.2"); + f1.set_now(59.5); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123"); + f1.set_now(60.0); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.2:123"); + EXPECT_EQUAL(f1.get_cnt("localhost"), 2u); + EXPECT_EQUAL(f1.get_total_cnt(), 2u); +} + +TEST_F("require that missing ip address gives invalid address", ResolveFixture()) { + f1.set_ip_addr("localhost", ""); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid"); + EXPECT_EQUAL(f1.get_cnt("localhost"), 1u); + EXPECT_EQUAL(f1.get_total_cnt(), 1u); +} + +TEST_F("require that empty lookup results are cached", ResolveFixture()) { + f1.set_ip_addr("localhost", ""); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid"); + f1.set_ip_addr("localhost", "127.0.0.1"); + f1.set_now(59.5); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "invalid"); + f1.set_now(60.0); + EXPECT_EQUAL(f1.resolve("tcp/localhost:123"), "tcp/127.0.0.1:123"); + EXPECT_EQUAL(f1.get_cnt("localhost"), 2u); + EXPECT_EQUAL(f1.get_total_cnt(), 2u); +} + +TEST_F("require that multiple cache entries can be evicted at the same time", ResolveFixture()) { + EXPECT_EQUAL(f1.resolve("tcp/a:123"), "tcp/127.0.1.1:123"); + f1.set_now(10.0); + EXPECT_EQUAL(f1.resolve("tcp/b:123"), "tcp/127.0.2.1:123"); + f1.set_now(20.0); + EXPECT_EQUAL(f1.resolve("tcp/c:123"), "tcp/127.0.3.1:123"); + f1.set_now(30.0); + EXPECT_EQUAL(f1.resolve("tcp/d:123"), "tcp/127.0.4.1:123"); + f1.set_now(40.0); + EXPECT_EQUAL(f1.resolve("tcp/e:123"), "tcp/127.0.5.1:123"); + EXPECT_EQUAL(f1.get_total_cnt(), 5u); + f1.set_now(85.0); // c too old, d still good + EXPECT_EQUAL(f1.resolve("tcp/c:123"), "tcp/127.0.3.1:123"); + EXPECT_EQUAL(f1.get_total_cnt(), 6u); + EXPECT_EQUAL(f1.resolve("tcp/d:123"), "tcp/127.0.4.1:123"); + EXPECT_EQUAL(f1.get_total_cnt(), 6u); +} + +TEST_F("require that slow host lookups trigger warning (manual log inspection)", TimeBomb(60)) { + auto my_clock = std::make_shared<MyClock>(); + auto host_resolver = std::make_shared<BlockingHostResolver>(1); + AsyncResolver::Params params; + params.clock = my_clock; + params.resolver = host_resolver; + params.max_resolve_time = AsyncResolver::seconds(1.0); + auto resolver = AsyncResolver::create(params); + SocketAddress result; + auto handler = std::make_shared<ResultSetter>(result); + resolver->resolve_async("tcp/some_host:123", handler); + host_resolver->wait_for_callers(); + my_clock->set_now(MyClock::seconds(1.0)); + EXPECT_TRUE(!handler->done); + host_resolver->release_callers(); + resolver->wait_for_pending_resolves(); + EXPECT_TRUE(handler->done); + EXPECT_EQUAL(result.spec(), "tcp/127.0.0.7:123"); +} + +TEST_F("require that discarding result handlers will avoid pending work (but complete started work)", TimeBomb(60)) { + auto host_resolver = std::make_shared<BlockingHostResolver>(2); + AsyncResolver::Params params; + params.resolver = host_resolver; + params.num_threads = 2; + auto resolver = AsyncResolver::create(params); + SocketAddress result1; + SocketAddress result2; + SocketAddress result3; + auto handler1 = std::make_shared<ResultSetter>(result1); + auto handler2 = std::make_shared<ResultSetter>(result2); + auto handler3 = std::make_shared<ResultSetter>(result3); + resolver->resolve_async("tcp/x:123", handler1); + resolver->resolve_async("tcp/y:123", handler2); + resolver->resolve_async("tcp/z:123", handler3); + host_resolver->wait_for_callers(); + handler1.reset(); + handler2.reset(); + handler3.reset(); + host_resolver->release_callers(); + resolver->wait_for_pending_resolves(); + EXPECT_EQUAL(result1.spec(), "tcp/127.0.0.7:123"); + EXPECT_EQUAL(result2.spec(), "tcp/127.0.0.7:123"); + EXPECT_EQUAL(result3.spec(), "invalid"); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt index 26c5c867631..42023cb73b0 100644 --- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_net OBJECT SOURCES + async_resolver.cpp lazy_resolver.cpp selector.cpp server_socket.cpp diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp new file mode 100644 index 00000000000..c50fd722c45 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp @@ -0,0 +1,166 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "async_resolver.h" +#include "socket_spec.h" + +#include <vespa/log/log.h> +LOG_SETUP(".vespalib.net.async_resolver"); + +namespace vespalib { + +VESPA_THREAD_STACK_TAG(async_resolver_executor_thread); + +//----------------------------------------------------------------------------- + +AsyncResolver::time_point +AsyncResolver::SteadyClock::now() +{ + return std::chrono::steady_clock::now(); +} + +//----------------------------------------------------------------------------- + +vespalib::string +AsyncResolver::SimpleHostResolver::ip_address(const vespalib::string &host_name) +{ + return SocketAddress::select_remote(80, host_name.c_str()).ip_address(); +} + +//----------------------------------------------------------------------------- + +AsyncResolver::Params::Params() + : clock(std::make_shared<SteadyClock>()), + resolver(std::make_shared<SimpleHostResolver>()), + max_result_age(60.0), + max_resolve_time(1.0), + num_threads(4) +{ +} + +//----------------------------------------------------------------------------- + +vespalib::string +AsyncResolver::LoggingHostResolver::ip_address(const vespalib::string &host_name) +{ + auto before = _clock->now(); + vespalib::string ip_address = _resolver->ip_address(host_name); + seconds resolve_time = (_clock->now() - before); + if (resolve_time >= _max_resolve_time) { + LOG(warning, "slow resolve time: '%s' -> '%s' (%g s)", + host_name.c_str(), ip_address.c_str(), resolve_time.count()); + } + if (ip_address.empty()) { + LOG(warning, "could not resolve host name: '%s'", host_name.c_str()); + } + return ip_address; +} + +//----------------------------------------------------------------------------- + +bool +AsyncResolver::CachingHostResolver::lookup(const vespalib::string &host_name, vespalib::string &ip_address) +{ + auto now = _clock->now(); + std::lock_guard<std::mutex> guard(_lock); + while (!_queue.empty() && (_queue.front()->second.end_time <= now)) { + _map.erase(_queue.front()); + _queue.pop(); + } + auto pos = _map.find(host_name); + if (pos != _map.end()) { + ip_address = pos->second.ip_address; + return true; + } + return false; +} + +void +AsyncResolver::CachingHostResolver::store(const vespalib::string &host_name, const vespalib::string &ip_address) +{ + auto end_time = _clock->now() + std::chrono::duration_cast<time_point::duration>(_max_result_age); + std::lock_guard<std::mutex> guard(_lock); + auto res = _map.emplace(host_name, Entry(ip_address, end_time)); + if (res.second) { + _queue.push(res.first); + } +} + +AsyncResolver::CachingHostResolver::CachingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_result_age) + : _clock(std::move(clock)), + _resolver(std::move(resolver)), + _max_result_age(max_result_age), + _lock(), + _map(), + _queue() +{ +} + +vespalib::string +AsyncResolver::CachingHostResolver::ip_address(const vespalib::string &host_name) +{ + vespalib::string ip_address; + if (lookup(host_name, ip_address)) { + return ip_address; + } + ip_address = _resolver->ip_address(host_name); + store(host_name, ip_address); + return ip_address; +} + +//----------------------------------------------------------------------------- + +void +AsyncResolver::ResolveTask::run() +{ + if (ResultHandler::SP handler = weak_handler.lock()) { + SocketSpec socket_spec(spec); + if (!socket_spec.valid()) { + LOG(warning, "invalid socket spec: '%s'", spec.c_str()); + } + if (!socket_spec.host().empty()) { + socket_spec = socket_spec.replace_host(resolver.ip_address(socket_spec.host())); + } + handler->handle_result(socket_spec.client_address()); + } +} + +//----------------------------------------------------------------------------- + +std::mutex AsyncResolver::_shared_lock; +AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr); + +AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads) + : _resolver(std::move(resolver)), + _executor(num_threads, 128*1024, async_resolver_executor_thread) +{ +} + +void +AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler) +{ + auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler)); + auto rejected = _executor.execute(std::move(task)); + assert(!rejected); +} + +AsyncResolver::SP +AsyncResolver::create(Params params) +{ + auto logger = std::make_shared<LoggingHostResolver>(params.clock, std::move(params.resolver), params.max_resolve_time); + auto cacher = std::make_shared<CachingHostResolver>(std::move(params.clock), std::move(logger), params.max_result_age); + return SP(new AsyncResolver(std::move(cacher), params.num_threads)); +} + +AsyncResolver::SP +AsyncResolver::get_shared() +{ + std::lock_guard<std::mutex> guard(_shared_lock); + if (!_shared_resolver) { + _shared_resolver = create(Params()); + } + return _shared_resolver; +} + +//----------------------------------------------------------------------------- + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h new file mode 100644 index 00000000000..8323881ee7e --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/async_resolver.h @@ -0,0 +1,130 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "socket_address.h" +#include "socket_spec.h" +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/arrayqueue.hpp> +#include <chrono> +#include <memory> +#include <mutex> +#include <map> + +namespace vespalib { + +/** + * Component used to perform asynchronous resolving of connect + * specs. Internal worker threads are used to perform synchronous + * resolving with caching. Results are delivered to a result handler + * that is tracked using a weak pointer while the operation is + * pending. This enables us to skip resolving specs that are no longer + * needed by the client. Use the get_shared function to obtain a + * shared default-constructed instance. It will be created on the + * first call and cleaned up on program exit. + **/ +class AsyncResolver +{ +public: + using SP = std::shared_ptr<AsyncResolver>; + using time_point = std::chrono::steady_clock::time_point; + using seconds = std::chrono::duration<double>; + + struct ResultHandler { + virtual void handle_result(SocketAddress addr) = 0; + virtual ~ResultHandler() {} + using SP = std::shared_ptr<ResultHandler>; + using WP = std::weak_ptr<ResultHandler>; + }; + + struct Clock { + virtual time_point now() = 0; + virtual ~Clock() {} + using SP = std::shared_ptr<Clock>; + }; + + struct HostResolver { + virtual vespalib::string ip_address(const vespalib::string &host_name) = 0; + virtual ~HostResolver() {} + using SP = std::shared_ptr<HostResolver>; + }; + + struct SteadyClock : public Clock { + time_point now() override; + }; + + struct SimpleHostResolver : public HostResolver { + vespalib::string ip_address(const vespalib::string &host_name) override; + }; + + struct Params { + Clock::SP clock; + HostResolver::SP resolver; + seconds max_result_age; + seconds max_resolve_time; + size_t num_threads; + Params(); + ~Params() {} + }; + +private: + class LoggingHostResolver : public HostResolver { + private: + Clock::SP _clock; + HostResolver::SP _resolver; + seconds _max_resolve_time; + public: + LoggingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_resolve_time) + : _clock(std::move(clock)), _resolver(std::move(resolver)), _max_resolve_time(max_resolve_time) {} + vespalib::string ip_address(const vespalib::string &host_name) override; + }; + + class CachingHostResolver : public HostResolver { + private: + struct Entry { + vespalib::string ip_address; + time_point end_time; + Entry(const vespalib::string &ip, time_point end) + : ip_address(ip), end_time(end) {} + }; + using Map = std::map<vespalib::string,Entry>; + using Itr = Map::iterator; + Clock::SP _clock; + HostResolver::SP _resolver; + seconds _max_result_age; + std::mutex _lock; + Map _map; + ArrayQueue<Itr> _queue; + + bool lookup(const vespalib::string &host_name, vespalib::string &ip_address); + void resolve(const vespalib::string &host_name, vespalib::string &ip_address); + void store(const vespalib::string &host_name, const vespalib::string &ip_address); + + public: + CachingHostResolver(Clock::SP clock, HostResolver::SP resolver, seconds max_result_age); + vespalib::string ip_address(const vespalib::string &host_name) override; + }; + + struct ResolveTask : public Executor::Task { + vespalib::string spec; + HostResolver &resolver; + ResultHandler::WP weak_handler; + ResolveTask(const vespalib::string &spec_in, HostResolver &resolver_in, ResultHandler::WP weak_handler_in) + : spec(spec_in), resolver(resolver_in), weak_handler(std::move(weak_handler_in)) {} + void run() override; + }; + + HostResolver::SP _resolver; + ThreadStackExecutor _executor; + static std::mutex _shared_lock; + static AsyncResolver::SP _shared_resolver; + + AsyncResolver(HostResolver::SP resolver, size_t num_threads); +public: + void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler); + void wait_for_pending_resolves() { _executor.sync(); } + static AsyncResolver::SP create(Params params); + static AsyncResolver::SP get_shared(); +}; + +} // namespace vespalib |