aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-11-29 13:44:13 +0100
committerGitHub <noreply@github.com>2018-11-29 13:44:13 +0100
commitc1fc295f963cf7e9a117dac9d0dabf7151ec0c1b (patch)
treeb5b56d4d265f75dfb2811ae39aebb0c7722adb52 /vespalib
parent1b378e87eaf64857ca914094a624952731f0c827 (diff)
parentc52bd31c7208d4bab093efb24acb9a3452a0b936 (diff)
Merge pull request #7786 from vespa-engine/havardpe/initial-portal-code
initial portal code
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/CMakeLists.txt5
-rw-r--r--vespalib/src/tests/portal/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/portal/handle_manager/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp146
-rw-r--r--vespalib/src/tests/portal/http_request/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/portal/http_request/http_request_test.cpp120
-rw-r--r--vespalib/src/tests/portal/portal_test.cpp298
-rw-r--r--vespalib/src/tests/portal/reactor/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/portal/reactor/reactor_test.cpp162
-rw-r--r--vespalib/src/vespa/vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/portal/CMakeLists.txt11
-rw-r--r--vespalib/src/vespa/vespalib/portal/README22
-rw-r--r--vespalib/src/vespa/vespalib/portal/handle_manager.cpp105
-rw-r--r--vespalib/src/vespa/vespalib/portal/handle_manager.h98
-rw-r--r--vespalib/src/vespa/vespalib/portal/http_connection.cpp242
-rw-r--r--vespalib/src/vespa/vespalib/portal/http_connection.h54
-rw-r--r--vespalib/src/vespa/vespalib/portal/http_request.cpp166
-rw-r--r--vespalib/src/vespa/vespalib/portal/http_request.h48
-rw-r--r--vespalib/src/vespa/vespalib/portal/listener.cpp37
-rw-r--r--vespalib/src/vespa/vespalib/portal/listener.h25
-rw-r--r--vespalib/src/vespa/vespalib/portal/portal.cpp188
-rw-r--r--vespalib/src/vespa/vespalib/portal/portal.h114
-rw-r--r--vespalib/src/vespa/vespalib/portal/reactor.cpp128
-rw-r--r--vespalib/src/vespa/vespalib/portal/reactor.h68
24 files changed, 2070 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index 6491bdfb036..8bd3dda4e5a 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -67,6 +67,10 @@ vespa_define_module(
src/tests/net/tls/transport_options
src/tests/objects/nbostream
src/tests/optimized
+ src/tests/portal
+ src/tests/portal/handle_manager
+ src/tests/portal/http_request
+ src/tests/portal/reactor
src/tests/printable
src/tests/priority_queue
src/tests/random
@@ -130,6 +134,7 @@ vespa_define_module(
src/vespa/vespalib/net/tls
src/vespa/vespalib/net/tls/impl
src/vespa/vespalib/objects
+ src/vespa/vespalib/portal
src/vespa/vespalib/stllike
src/vespa/vespalib/test
src/vespa/vespalib/testkit
diff --git a/vespalib/src/tests/portal/CMakeLists.txt b/vespalib/src/tests/portal/CMakeLists.txt
new file mode 100644
index 00000000000..3c39f286a15
--- /dev/null
+++ b/vespalib/src/tests/portal/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_portal_test_app TEST
+ SOURCES
+ portal_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_portal_test_app COMMAND vespalib_portal_test_app)
diff --git a/vespalib/src/tests/portal/handle_manager/CMakeLists.txt b/vespalib/src/tests/portal/handle_manager/CMakeLists.txt
new file mode 100644
index 00000000000..b12b72f9ad6
--- /dev/null
+++ b/vespalib/src/tests/portal/handle_manager/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_handle_manager_test_app TEST
+ SOURCES
+ handle_manager_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_handle_manager_test_app COMMAND vespalib_handle_manager_test_app)
diff --git a/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp b/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp
new file mode 100644
index 00000000000..4ce25aa0a7a
--- /dev/null
+++ b/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp
@@ -0,0 +1,146 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/portal/handle_manager.h>
+#include <vespa/vespalib/util/gate.h>
+
+#include <thread>
+#include <chrono>
+#include <atomic>
+
+using namespace vespalib;
+using namespace vespalib::portal;
+
+TEST_F("require that handles can be created, locked and destroyed", TimeBomb(60)) {
+ HandleManager manager;
+ uint64_t handle = manager.create();
+ EXPECT_TRUE(handle != manager.null_handle());
+ {
+ HandleGuard guard = manager.lock(handle);
+ EXPECT_TRUE(guard.valid());
+ EXPECT_EQUAL(guard.handle(), handle);
+ }
+ EXPECT_TRUE(manager.destroy(handle));
+ {
+ HandleGuard guard = manager.lock(handle);
+ EXPECT_TRUE(!guard.valid());
+ EXPECT_EQUAL(guard.handle(), manager.null_handle());
+ }
+}
+
+TEST_F("require that multiple guards can be taken for the same handle", TimeBomb(60)) {
+ HandleManager manager;
+ uint64_t handle = manager.create();
+ EXPECT_TRUE(handle != manager.null_handle());
+ {
+ HandleGuard guard1 = manager.lock(handle);
+ HandleGuard guard2 = manager.lock(handle); // <- does not block
+ EXPECT_TRUE(guard1.valid());
+ EXPECT_EQUAL(guard1.handle(), handle);
+ EXPECT_TRUE(guard2.valid());
+ EXPECT_EQUAL(guard2.handle(), handle);
+ }
+ EXPECT_TRUE(manager.destroy(handle));
+}
+
+TEST_F("require that handles are independent", TimeBomb(60)) {
+ HandleManager manager;
+ uint64_t handle1 = manager.create();
+ uint64_t handle2 = manager.create();
+ uint64_t handle3 = manager.create();
+ EXPECT_TRUE(handle1 != manager.null_handle());
+ EXPECT_TRUE(handle2 != manager.null_handle());
+ EXPECT_TRUE(handle3 != manager.null_handle());
+ EXPECT_TRUE(handle1 != handle2);
+ EXPECT_TRUE(handle1 != handle3);
+ EXPECT_TRUE(handle2 != handle3);
+ {
+ HandleGuard guard1 = manager.lock(handle1);
+ HandleGuard guard2 = manager.lock(handle2);
+ EXPECT_TRUE(guard1.valid());
+ EXPECT_EQUAL(guard1.handle(), handle1);
+ EXPECT_TRUE(guard2.valid());
+ EXPECT_EQUAL(guard2.handle(), handle2);
+ EXPECT_TRUE(manager.destroy(handle3)); // <- does not block
+ HandleGuard guard3 = manager.lock(handle3);
+ EXPECT_TRUE(!guard3.valid());
+ EXPECT_EQUAL(guard3.handle(), manager.null_handle());
+ }
+ EXPECT_TRUE(manager.destroy(handle1));
+ EXPECT_TRUE(manager.destroy(handle2));
+ EXPECT_TRUE(!manager.destroy(handle3));
+}
+
+struct Fixture {
+ HandleManager manager;
+ uint64_t handle;
+ Gate gate;
+ std::atomic<size_t> cnt1;
+ std::atomic<size_t> cnt2;
+ Fixture() : manager(), handle(manager.create()), gate(), cnt1(0), cnt2(0) {}
+};
+
+TEST_MT_FF("require that destroy waits for active handle guards", 2, Fixture(), TimeBomb(60)) {
+ if (thread_id == 0) {
+ {
+ auto guard = f1.manager.lock(f1.handle);
+ TEST_BARRIER(); // #1
+ EXPECT_TRUE(!f1.gate.await(20));
+ }
+ EXPECT_TRUE(f1.gate.await(60000));
+ } else {
+ TEST_BARRIER(); // #1
+ EXPECT_TRUE(f1.manager.destroy(f1.handle));
+ f1.gate.countDown();
+ }
+}
+
+TEST_MT_FF("require that destroy disables ability to lock handles", 3, Fixture(), TimeBomb(60)) {
+ if (thread_id == 0) {
+ auto guard = f1.manager.lock(f1.handle);
+ ASSERT_TRUE(guard.valid());
+ TEST_BARRIER(); // #1
+ while (f1.cnt1 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ EXPECT_EQUAL(f1.cnt2, 0u);
+ } else if (thread_id == 1) {
+ TEST_BARRIER(); // #1
+ EXPECT_TRUE(f1.manager.destroy(f1.handle));
+ EXPECT_EQUAL(f1.cnt1, 1u);
+ ++f1.cnt2;
+ } else {
+ TEST_BARRIER(); // #1
+ while (f1.cnt1 == 0) {
+ auto guard = f1.manager.lock(f1.handle);
+ if (guard.valid()) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ } else {
+ EXPECT_EQUAL(f1.cnt2, 0u);
+ ++f1.cnt1;
+ }
+ }
+ }
+}
+
+TEST_MT_FF("require that a single destroy call returns true", 10, Fixture(), TimeBomb(60)) {
+ if (thread_id == 0) { // 1 thread here
+ auto guard = f1.manager.lock(f1.handle);
+ ASSERT_TRUE(guard.valid());
+ TEST_BARRIER(); // #1
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ } else { // 'many' threads here
+ TEST_BARRIER(); // #1
+ if (f1.manager.destroy(f1.handle)) {
+ ++f1.cnt1;
+ } else {
+ ++f1.cnt2;
+ }
+ }
+ TEST_BARRIER(); // #2
+ EXPECT_EQUAL(f1.cnt1, 1u);
+ EXPECT_GREATER(num_threads, 5u);
+ EXPECT_EQUAL(f1.cnt2, (num_threads - 2));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/portal/http_request/CMakeLists.txt b/vespalib/src/tests/portal/http_request/CMakeLists.txt
new file mode 100644
index 00000000000..00dbf356634
--- /dev/null
+++ b/vespalib/src/tests/portal/http_request/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_portal_http_request_test_app TEST
+ SOURCES
+ http_request_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_portal_http_request_test_app COMMAND vespalib_portal_http_request_test_app)
diff --git a/vespalib/src/tests/portal/http_request/http_request_test.cpp b/vespalib/src/tests/portal/http_request/http_request_test.cpp
new file mode 100644
index 00000000000..6e1527efa4b
--- /dev/null
+++ b/vespalib/src/tests/portal/http_request/http_request_test.cpp
@@ -0,0 +1,120 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/portal/http_request.h>
+
+using namespace vespalib;
+using namespace vespalib::portal;
+
+vespalib::string simple_req("GET /my/path HTTP/1.1\r\n"
+ "Host: my.host.com:80\r\n"
+ "CustomHeader: CustomValue\r\n"
+ "\r\n123456789");
+size_t simple_req_padding = 9;
+size_t simple_req_size = (simple_req.size() - simple_req_padding);
+
+void verify_simple_req(const HttpRequest &req) {
+ EXPECT_TRUE(!req.need_more_data());
+ EXPECT_TRUE(req.valid());
+ EXPECT_TRUE(req.is_get());
+ EXPECT_EQUAL(req.get_uri(), "/my/path");
+ EXPECT_EQUAL(req.get_header("host"), "my.host.com:80");
+ EXPECT_EQUAL(req.get_header("customheader"), "CustomValue");
+ EXPECT_EQUAL(req.get_header("non-existing-header"), "");
+}
+
+HttpRequest make_request(const vespalib::string &req) {
+ HttpRequest result;
+ ASSERT_EQUAL(result.handle_data(req.data(), req.size()), req.size());
+ ASSERT_TRUE(result.valid());
+ return result;
+}
+
+void verify_invalid_request(const vespalib::string &req) {
+ HttpRequest result;
+ EXPECT_EQUAL(result.handle_data(req.data(), req.size()), req.size());
+ EXPECT_TRUE(!result.need_more_data());
+ EXPECT_TRUE(!result.valid());
+}
+
+TEST("require that request can be parsed in one go") {
+ HttpRequest req;
+ EXPECT_EQUAL(req.handle_data(simple_req.data(), simple_req_size), simple_req_size);
+ TEST_DO(verify_simple_req(req));
+}
+
+TEST("require that trailing data is not consumed") {
+ HttpRequest req;
+ EXPECT_EQUAL(req.handle_data(simple_req.data(), simple_req.size()), simple_req_size);
+ TEST_DO(verify_simple_req(req));
+}
+
+TEST("require that request can be parsed incrementally") {
+ HttpRequest req;
+ size_t chunk = 7;
+ size_t done = 0;
+ while (done < simple_req_size) {
+ size_t expect = std::min(simple_req_size - done, chunk);
+ EXPECT_EQUAL(req.handle_data(simple_req.data() + done, chunk), expect);
+ done += expect;
+ }
+ EXPECT_EQUAL(done, simple_req_size);
+ TEST_DO(verify_simple_req(req));
+}
+
+TEST("require that header continuation is replaced by single space") {
+ auto req = make_request("GET /my/path HTTP/1.1\r\n"
+ "test: one\r\n"
+ " two\r\n"
+ "\tthree\r\n"
+ "\r\n");
+ EXPECT_EQUAL(req.get_header("test"), "one two three");
+}
+
+TEST("require that duplicate headers are combined as list") {
+ auto req = make_request("GET /my/path HTTP/1.1\r\n"
+ "test: one\r\n"
+ "test: two\r\n"
+ "test: three\r\n"
+ "\r\n");
+ EXPECT_EQUAL(req.get_header("test"), "one,two,three");
+}
+
+TEST("require that leading and trailing whitespaces are stripped") {
+ auto req = make_request("GET /my/path HTTP/1.1\r\n"
+ "test: one \r\n"
+ " , two \r\n"
+ "test: three \r\n"
+ "\r\n");
+ EXPECT_EQUAL(req.get_header("test"), "one , two,three");
+}
+
+TEST("require that non-GET requests are detected") {
+ auto req = make_request("POST /my/path HTTP/1.1\r\n"
+ "\r\n");
+ EXPECT_TRUE(!req.is_get());
+}
+
+TEST("require that request line must contain all relevant parts") {
+ TEST_DO(verify_invalid_request("/my/path HTTP/1.1\r\n"));
+ TEST_DO(verify_invalid_request("GET HTTP/1.1\r\n"));
+ TEST_DO(verify_invalid_request("GET /my/path\r\n"));
+}
+
+TEST("require that first header line cannot be a continuation") {
+ TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n"
+ " two\r\n"));
+}
+
+TEST("require that header name is not allowed to be empty") {
+ TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n"
+ ": value\r\n"));
+}
+
+TEST("require that header line must contain separator") {
+ TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n"
+ "ok-header: ok-value\r\n"
+ "missing separator\r\n"));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/portal/portal_test.cpp b/vespalib/src/tests/portal/portal_test.cpp
new file mode 100644
index 00000000000..cac4b78d3e2
--- /dev/null
+++ b/vespalib/src/tests/portal/portal_test.cpp
@@ -0,0 +1,298 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/portal/portal.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/net/socket_spec.h>
+#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/net/sync_crypto_socket.h>
+#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
+#include <vespa/vespalib/net/tls/maybe_tls_crypto_engine.h>
+#include <vespa/vespalib/test/make_tls_options_for_testing.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+
+using namespace vespalib;
+
+//-----------------------------------------------------------------------------
+
+vespalib::string do_http(int port, CryptoEngine::SP crypto, const vespalib::string &method, const vespalib::string &uri) {
+ auto socket = SocketSpec::from_port(port).client_address().connect();
+ ASSERT_TRUE(socket.valid());
+ auto conn = SyncCryptoSocket::create(*crypto, std::move(socket), false);
+ vespalib::string http_req = vespalib::make_string("%s %s HTTP/1.1\r\n"
+ "Host: localhost:%d\r\n"
+ "\r\n", method.c_str(), uri.c_str(), port);
+ ASSERT_EQUAL(conn->write(http_req.data(), http_req.size()), ssize_t(http_req.size()));
+ char buf[1024];
+ vespalib::string result;
+ ssize_t res = conn->read(buf, sizeof(buf));
+ while (res > 0) {
+ result.append(vespalib::stringref(buf, res));
+ res = conn->read(buf, sizeof(buf));
+ }
+ ASSERT_EQUAL(res, 0);
+ return result;
+}
+
+vespalib::string fetch(int port, CryptoEngine::SP crypto, const vespalib::string &path) {
+ return do_http(port, std::move(crypto), "GET", path);
+}
+
+//-----------------------------------------------------------------------------
+
+vespalib::string make_expected_response(const vespalib::string &content_type, const vespalib::string &content) {
+ return vespalib::make_string("HTTP/1.1 200 OK\r\n"
+ "Connection: close\r\n"
+ "Content-Type: %s\r\n"
+ "\r\n"
+ "%s", content_type.c_str(), content.c_str());
+}
+
+vespalib::string make_expected_error(int code, const vespalib::string &message) {
+ return vespalib::make_string("HTTP/1.1 %d %s\r\n"
+ "Connection: close\r\n"
+ "\r\n", code, message.c_str());
+}
+
+//-----------------------------------------------------------------------------
+
+struct Encryption {
+ vespalib::string name;
+ CryptoEngine::SP engine;
+ ~Encryption();
+};
+Encryption::~Encryption() = default;
+
+auto null_crypto() { return std::make_shared<NullCryptoEngine>(); }
+auto xor_crypto() { return std::make_shared<XorCryptoEngine>(); }
+auto tls_crypto() { return std::make_shared<TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); }
+auto maybe_tls_crypto(bool client_tls) { return std::make_shared<MaybeTlsCryptoEngine>(tls_crypto(), client_tls); }
+
+std::vector<Encryption> crypto_list = {{"no encryption", null_crypto()},
+ {"ad-hoc xor", xor_crypto()},
+ {"always TLS", tls_crypto()},
+ {"maybe TLS; yes", maybe_tls_crypto(true)},
+ {"maybe TLS; no", maybe_tls_crypto(false)}};
+
+//-----------------------------------------------------------------------------
+
+struct MyGetHandler : public Portal::GetHandler {
+ std::function<void(Portal::GetRequest)> fun;
+ template <typename F>
+ MyGetHandler(F &&f) : fun(std::move(f)) {}
+ void get(Portal::GetRequest request) const override {
+ fun(std::move(request));
+ }
+ ~MyGetHandler();
+};
+MyGetHandler::~MyGetHandler() = default;
+
+//-----------------------------------------------------------------------------
+
+TEST("require that failed portal listening throws exception") {
+ EXPECT_EXCEPTION(Portal::create(null_crypto(), -37), PortListenException, "-37");
+}
+
+TEST("require that portal can listen to auto-selected port") {
+ auto portal = Portal::create(null_crypto(), 0);
+ EXPECT_GREATER(portal->listen_port(), 0);
+}
+
+TEST("require that simple GET works with various encryption strategies") {
+ vespalib::string path = "/test";
+ vespalib::string type = "application/json";
+ vespalib::string content = "[1,2,3]";
+ MyGetHandler handler([&](Portal::GetRequest request)
+ {
+ EXPECT_EQUAL(request.get_uri(), path);
+ request.respond_with_content(type, content);
+ });
+ for (const Encryption &crypto: crypto_list) {
+ fprintf(stderr, "... testing simple GET with encryption: '%s'\n", crypto.name.c_str());
+ auto portal = Portal::create(crypto.engine, 0);
+ auto bound = portal->bind(path, handler);
+ auto expect = make_expected_response(type, content);
+ auto result = fetch(portal->listen_port(), crypto.engine, path);
+ EXPECT_EQUAL(result, expect);
+ bound.reset();
+ result = fetch(portal->listen_port(), crypto.engine, path);
+ expect = make_expected_error(404, "Not Found");
+ EXPECT_EQUAL(result, expect);
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+TEST("require that methods other than GET return not implemented error") {
+ auto portal = Portal::create(null_crypto(), 0);
+ auto expect_get = make_expected_error(404, "Not Found");
+ auto expect_other = make_expected_error(501, "Not Implemented");
+ for (const auto &method: {"OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"}) {
+ auto result = do_http(portal->listen_port(), null_crypto(), method, "/test");
+ if (vespalib::string("GET") == method) {
+ EXPECT_EQUAL(result, expect_get);
+ } else {
+ EXPECT_EQUAL(result, expect_other);
+ }
+ }
+}
+
+TEST("require that GET handler can return HTTP error") {
+ vespalib::string path = "/test";
+ auto portal = Portal::create(null_crypto(), 0);
+ auto expect = make_expected_error(123, "My Error");
+ MyGetHandler handler([](Portal::GetRequest request)
+ {
+ request.respond_with_error(123, "My Error");
+ });
+ auto bound = portal->bind(path, handler);
+ auto result = fetch(portal->listen_port(), null_crypto(), path);
+ EXPECT_EQUAL(result, expect);
+}
+
+TEST("require that get requests dropped on the floor returns HTTP error") {
+ vespalib::string path = "/test";
+ auto portal = Portal::create(null_crypto(), 0);
+ auto expect = make_expected_error(500, "Internal Server Error");
+ MyGetHandler handler([](Portal::GetRequest){});
+ auto bound = portal->bind(path, handler);
+ auto result = fetch(portal->listen_port(), null_crypto(), path);
+ EXPECT_EQUAL(result, expect);
+}
+
+struct GetTask : public Executor::Task {
+ Portal::GetRequest request;
+ GetTask(Portal::GetRequest request_in) : request(std::move(request_in)) {}
+ void run() override {
+ request.respond_with_content("text/plain", "hello");
+ }
+};
+
+TEST("require that GET requests can be completed in another thread") {
+ vespalib::string path = "/test";
+ ThreadStackExecutor executor(1, 128 * 1024);
+ auto portal = Portal::create(null_crypto(), 0);
+ auto expect = make_expected_response("text/plain", "hello");
+ MyGetHandler handler([&executor](Portal::GetRequest request)
+ {
+ executor.execute(std::make_unique<GetTask>(std::move(request)));
+ });
+ auto bound = portal->bind(path, handler);
+ auto result = fetch(portal->listen_port(), null_crypto(), path);
+ EXPECT_EQUAL(result, expect);
+ executor.shutdown().sync();
+}
+
+TEST("require that bogus request returns HTTP error") {
+ auto portal = Portal::create(null_crypto(), 0);
+ auto expect = make_expected_error(400, "Bad Request");
+ auto result = do_http(portal->listen_port(), null_crypto(), "this request is", " totally bogus\r\n");
+ EXPECT_EQUAL(result, expect);
+}
+
+TEST("require that the handler with the longest matching prefix is selected") {
+ auto portal = Portal::create(null_crypto(), 0);
+ MyGetHandler handler1([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler1"); });
+ MyGetHandler handler2([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler2"); });
+ MyGetHandler handler3([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler3"); });
+ auto bound1 = portal->bind("/foo", handler1);
+ auto bound3 = portal->bind("/foo/bar/baz", handler3);
+ auto bound2 = portal->bind("/foo/bar", handler2);
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1"));
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar"), make_expected_response("text/plain", "handler2"));
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler3"));
+ bound3.reset();
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler2"));
+ bound2.reset();
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler1"));
+}
+
+TEST("require that newer handlers with the same prefix shadows older ones") {
+ auto portal = Portal::create(null_crypto(), 0);
+ MyGetHandler handler1([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler1"); });
+ MyGetHandler handler2([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler2"); });
+ MyGetHandler handler3([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler3"); });
+ auto bound1 = portal->bind("/foo", handler1);
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1"));
+ auto bound2 = portal->bind("/foo", handler2);
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler2"));
+ auto bound3 = portal->bind("/foo", handler3);
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler3"));
+ bound3.reset();
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler2"));
+ bound2.reset();
+ EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1"));
+}
+
+TEST("require that connection errors do not block shutdown by leaking resources (also tests tight shutdown timing)") {
+ MyGetHandler handler([](Portal::GetRequest request)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(5));
+ request.respond_with_content("application/json", "[1,2,3]");
+ });
+ for (const Encryption &crypto: crypto_list) {
+ fprintf(stderr, "... testing connection errors with encryption: '%s'\n", crypto.name.c_str());
+ auto portal = Portal::create(crypto.engine, 0);
+ auto bound = portal->bind("/test", handler);
+ { // close before sending anything
+ auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect();
+ auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false);
+ }
+ { // send partial request then close connection
+ auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect();
+ auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false);
+ vespalib::string req = "GET /test HTTP/1.1\r\n"
+ "Host: local";
+ ASSERT_EQUAL(conn->write(req.data(), req.size()), ssize_t(req.size()));
+ }
+ { // send request then close without reading response
+ auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect();
+ auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false);
+ vespalib::string req = "GET /test HTTP/1.1\r\n"
+ "Host: localhost\r\n"
+ "\r\n";
+ ASSERT_EQUAL(conn->write(req.data(), req.size()), ssize_t(req.size()));
+ }
+ }
+}
+
+struct WaitingFixture {
+ Portal::SP portal;
+ Gate enter_callback;
+ Gate exit_callback;
+ MyGetHandler handler;
+ Portal::Token::UP bound;
+ WaitingFixture() : portal(Portal::create(null_crypto(), 0)),
+ enter_callback(),
+ exit_callback(),
+ handler([this](Portal::GetRequest request)
+ {
+ enter_callback.countDown();
+ request.respond_with_content("application/json", "[1,2,3]");
+ exit_callback.await();
+ }),
+ bound(portal->bind("/test", handler)) {}
+};
+
+TEST_MT_FFF("require that bind token destruction waits for active requests", 3,
+ WaitingFixture(), Gate(), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ f1.enter_callback.await();
+ EXPECT_TRUE(!f2.await(20));
+ f1.exit_callback.countDown();
+ EXPECT_TRUE(f2.await(60000));
+ } else if (thread_id == 1) {
+ f1.enter_callback.await();
+ f1.bound.reset();
+ f2.countDown();
+ } else {
+ auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test");
+ EXPECT_EQUAL(result, make_expected_response("application/json", "[1,2,3]"));
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/portal/reactor/CMakeLists.txt b/vespalib/src/tests/portal/reactor/CMakeLists.txt
new file mode 100644
index 00000000000..2ed829fcfc0
--- /dev/null
+++ b/vespalib/src/tests/portal/reactor/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_reactor_test_app TEST
+ SOURCES
+ reactor_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_reactor_test_app COMMAND vespalib_reactor_test_app)
diff --git a/vespalib/src/tests/portal/reactor/reactor_test.cpp b/vespalib/src/tests/portal/reactor/reactor_test.cpp
new file mode 100644
index 00000000000..31bda119fd9
--- /dev/null
+++ b/vespalib/src/tests/portal/reactor/reactor_test.cpp
@@ -0,0 +1,162 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/net/socket_handle.h>
+#include <vespa/vespalib/portal/reactor.h>
+#include <vespa/vespalib/util/gate.h>
+
+#include <thread>
+#include <chrono>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+
+using namespace vespalib;
+using namespace vespalib::portal;
+
+struct SocketPair {
+ SocketHandle main;
+ SocketHandle other;
+ SocketPair() : main(), other() {
+ int sockets[2];
+ ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets));
+ main.reset(sockets[0]);
+ other.reset(sockets[1]);
+ // make main socket both readable and writable
+ ASSERT_EQUAL(other.write("x", 1), 1);
+ }
+ ~SocketPair();
+};
+SocketPair::~SocketPair() = default;
+
+std::atomic<size_t> tick_cnt = 0;
+int tick() {
+ ++tick_cnt;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ return 0;
+}
+
+void wait_tick() {
+ size_t sample = tick_cnt;
+ while (sample == tick_cnt) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+}
+
+struct SimpleHandler : Reactor::EventHandler {
+ SocketPair sockets;
+ std::atomic<size_t> read_cnt;
+ std::atomic<size_t> write_cnt;
+ Reactor::Token::UP token;
+ SimpleHandler(Reactor &reactor, bool read, bool write)
+ : sockets(), read_cnt(0), write_cnt(0), token()
+ {
+ token = reactor.attach(*this, sockets.main.get(), read, write);
+ }
+ void handle_event(bool read, bool write) override {
+ if (read) {
+ ++read_cnt;
+ }
+ if (write) {
+ ++write_cnt;
+ }
+ }
+ void verify(bool read, bool write) {
+ size_t read_sample = read_cnt;
+ size_t write_sample = write_cnt;
+ wait_tick();
+ wait_tick();
+ EXPECT_EQUAL((read_sample != read_cnt), read);
+ EXPECT_EQUAL((write_sample != write_cnt), write);
+ }
+ ~SimpleHandler();
+};
+SimpleHandler::~SimpleHandler() = default;
+
+struct DeletingHandler : SimpleHandler {
+ Gate token_deleted;
+ DeletingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true),
+ token_deleted() {}
+ void handle_event(bool read, bool write) override {
+ SimpleHandler::handle_event(read, write);
+ token.reset();
+ token_deleted.countDown();
+ }
+ ~DeletingHandler();
+};
+DeletingHandler::~DeletingHandler() = default;
+
+struct WaitingHandler : SimpleHandler {
+ Gate enter_callback;
+ Gate exit_callback;
+ WaitingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true),
+ enter_callback(), exit_callback() {}
+ void handle_event(bool read, bool write) override {
+ enter_callback.countDown();
+ SimpleHandler::handle_event(read, write);
+ exit_callback.await();
+ }
+ ~WaitingHandler();
+};
+WaitingHandler::~WaitingHandler() = default;
+
+//-----------------------------------------------------------------------------
+
+TEST_FF("require that reactor can produce async io events", Reactor(tick), TimeBomb(60)) {
+ for (bool read: {true, false}) {
+ for (bool write: {true, false}) {
+ {
+ SimpleHandler handler(f1, read, write);
+ TEST_DO(handler.verify(read, write));
+ }
+ }
+ }
+}
+
+TEST_FF("require that reactor token can be used to change active io events", Reactor(tick), TimeBomb(60)) {
+ SimpleHandler handler(f1, false, false);
+ TEST_DO(handler.verify(false, false));
+ for (int i = 0; i < 2; ++i) {
+ for (bool read: {true, false}) {
+ for (bool write: {true, false}) {
+ handler.token->update(read, write);
+ wait_tick(); // avoid stale events
+ TEST_DO(handler.verify(read, write));
+ }
+ }
+ }
+}
+
+TEST_FF("require that deleting reactor token disables io events", Reactor(tick), TimeBomb(60)) {
+ SimpleHandler handler(f1, true, true);
+ TEST_DO(handler.verify(true, true));
+ handler.token.reset();
+ TEST_DO(handler.verify(false, false));
+}
+
+TEST_FF("require that reactor token can be destroyed during io event handling", Reactor(tick), TimeBomb(60)) {
+ DeletingHandler handler(f1);
+ handler.token_deleted.await();
+ TEST_DO(handler.verify(false, false));
+ EXPECT_EQUAL(handler.read_cnt, 1u);
+ EXPECT_EQUAL(handler.write_cnt, 1u);
+}
+
+TEST_MT_FFFF("require that reactor token destruction waits for io event handling", 2,
+ Reactor(), WaitingHandler(f1), Gate(), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ f2.enter_callback.await();
+ TEST_BARRIER(); // #1
+ EXPECT_TRUE(!f3.await(20));
+ f2.exit_callback.countDown();
+ EXPECT_TRUE(f3.await(60000));
+ } else {
+ TEST_BARRIER(); // #1
+ f2.token.reset();
+ f3.countDown();
+ }
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/CMakeLists.txt b/vespalib/src/vespa/vespalib/CMakeLists.txt
index 8261bb8874e..676362a7aef 100644
--- a/vespalib/src/vespa/vespalib/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_library(vespalib
$<TARGET_OBJECTS:vespalib_vespalib_net_tls>
$<TARGET_OBJECTS:vespalib_vespalib_net_tls_impl>
$<TARGET_OBJECTS:vespalib_vespalib_objects>
+ $<TARGET_OBJECTS:vespalib_vespalib_portal>
$<TARGET_OBJECTS:vespalib_vespalib_stllike>
$<TARGET_OBJECTS:vespalib_vespalib_test>
$<TARGET_OBJECTS:vespalib_vespalib_testkit>
diff --git a/vespalib/src/vespa/vespalib/portal/CMakeLists.txt b/vespalib/src/vespa/vespalib/portal/CMakeLists.txt
new file mode 100644
index 00000000000..cb27aafdcf1
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/CMakeLists.txt
@@ -0,0 +1,11 @@
+# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(vespalib_vespalib_portal OBJECT
+ SOURCES
+ handle_manager.cpp
+ http_connection.cpp
+ http_request.cpp
+ listener.cpp
+ portal.cpp
+ reactor.cpp
+ DEPENDS
+)
diff --git a/vespalib/src/vespa/vespalib/portal/README b/vespalib/src/vespa/vespalib/portal/README
new file mode 100644
index 00000000000..dfaf904ae95
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/README
@@ -0,0 +1,22 @@
+Portal is the codename of a component that does 3 things:
+
+(1) it is a simple HTTP server able to answer GET requests
+(2) it accepts incoming connections from other applications
+(3) it establishes outgoing connections to other applications
+
+The idea is that each resource is either something to be queried with
+a GET request or something to be connected to with a connection
+upgrade similar to how websocket connections are established. This
+allows an application to use a single port for all network traffic.
+
+For backwards compatibility, incoming connections can also be accepted
+directly from a server socket using a separate port. The important
+thing is that the component that will be using the connections is
+relieved of the complexities of accepting and establishing them. The
+flip to using HTTP connection upgrade later on should requite minor
+changes in how the portal API is used.
+
+notable http server limitations:
+ * HEAD method not supported
+ * http connection keep-alive not supported
+ * absolute uri in request-line not supported
diff --git a/vespalib/src/vespa/vespalib/portal/handle_manager.cpp b/vespalib/src/vespa/vespalib/portal/handle_manager.cpp
new file mode 100644
index 00000000000..b78541deb19
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/handle_manager.cpp
@@ -0,0 +1,105 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "handle_manager.h"
+
+#include <cassert>
+
+namespace vespalib::portal {
+
+void
+HandleGuard::unlock()
+{
+ if (valid()) {
+ _manager->unlock(_handle);
+ _manager = nullptr;
+ _handle = 0;
+ }
+}
+
+HandleGuard::~HandleGuard()
+{
+ unlock();
+}
+
+//-----------------------------------------------------------------------------
+
+HandleManager::Entry::~Entry()
+{
+ assert(use_cnt == 0);
+ assert(wait_cnt == 0);
+}
+
+void
+HandleManager::unlock(uint64_t handle)
+{
+ std::lock_guard guard(_lock);
+ auto pos = _repo.find(handle);
+ assert(pos != _repo.end());
+ --pos->second.use_cnt;
+ if (pos->second.should_notify()) {
+ pos->second.cond.notify_all();
+ }
+}
+
+HandleManager::HandleManager()
+ : _lock(),
+ _next_handle(1),
+ _repo()
+{
+}
+
+HandleManager::~HandleManager() = default;
+
+size_t
+HandleManager::size() const
+{
+ std::lock_guard guard(_lock);
+ return _repo.size();
+}
+
+uint64_t
+HandleManager::create()
+{
+ std::lock_guard guard(_lock);
+ uint64_t handle = _next_handle++;
+ _repo[handle];
+ return handle;
+}
+
+HandleGuard
+HandleManager::lock(uint64_t handle)
+{
+ std::lock_guard guard(_lock);
+ auto pos = _repo.find(handle);
+ if (pos == _repo.end()) {
+ return HandleGuard();
+ }
+ if (pos->second.disable) {
+ return HandleGuard();
+ }
+ ++pos->second.use_cnt;
+ return HandleGuard(*this, handle);
+}
+
+bool
+HandleManager::destroy(uint64_t handle)
+{
+ std::unique_lock guard(_lock);
+ auto pos = _repo.find(handle);
+ if (pos == _repo.end()) {
+ return false;
+ }
+ pos->second.disable = true;
+ ++pos->second.wait_cnt;
+ while (pos->second.use_cnt > 0) {
+ pos->second.cond.wait(guard);
+ }
+ --pos->second.wait_cnt;
+ if (pos->second.should_erase()) {
+ _repo.erase(pos);
+ return true;
+ }
+ return false;
+}
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/handle_manager.h b/vespalib/src/vespa/vespalib/portal/handle_manager.h
new file mode 100644
index 00000000000..8b3456dc75c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/handle_manager.h
@@ -0,0 +1,98 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+
+namespace vespalib::portal {
+
+class HandleManager;
+
+/**
+ * A guard that makes sure a handle remains valid while using it.
+ **/
+class HandleGuard {
+ friend class HandleManager;
+private:
+ HandleManager *_manager;
+ uint64_t _handle;
+ HandleGuard(HandleManager &manager_in, uint64_t handle_in)
+ : _manager(&manager_in), _handle(handle_in) {}
+ void unlock();
+public:
+ HandleGuard() : _manager(nullptr), _handle(0) {}
+ HandleGuard(const HandleGuard &) = delete;
+ HandleGuard &operator=(const HandleGuard &) = delete;
+ HandleGuard(HandleGuard &&rhs)
+ : _manager(rhs._manager),
+ _handle(rhs._handle)
+ {
+ rhs._manager = nullptr;
+ rhs._handle = 0;
+ }
+ HandleGuard &operator=(HandleGuard &&rhs) {
+ unlock();
+ _manager = rhs._manager;
+ _handle = rhs._handle;
+ rhs._manager = nullptr;
+ rhs._handle = 0;
+ return *this;
+ }
+ bool valid() { return (_manager != nullptr); }
+ uint64_t handle() const { return _handle; }
+ ~HandleGuard();
+};
+
+/**
+ * A manager keeping track of all currently active handles. The
+ * 'create' function will create a unique handle and return it. The
+ * 'lock' function is used to obtain guard for a specific handle,
+ * making sure it remains valid while using it. Calling the 'destroy'
+ * function will tag the handle for destruction and also wait until
+ * the handle is no longer in use. Any subsequent calls to 'lock'
+ * after the handle has been tagged for destruction will return an
+ * invalid guard, making it important to check the return value of
+ * 'lock'. The 'destroy' function can be called by multiple actors at
+ * any time. Only one of these calls will return true, indicating
+ * credit for the destruction of the handle and responsibility for
+ * cleaning up after it.
+ **/
+class HandleManager
+{
+ friend class HandleGuard;
+private:
+ struct Entry {
+ std::condition_variable cond;
+ bool disable;
+ size_t use_cnt;
+ size_t wait_cnt;
+ bool should_notify() const {
+ return ((use_cnt == 0) && (wait_cnt > 0));
+ }
+ bool should_erase() const {
+ return (disable && (use_cnt == 0) && (wait_cnt == 0));
+ }
+ Entry() : cond(), disable(false), use_cnt(0), wait_cnt(0) {}
+ ~Entry();
+ };
+
+ mutable std::mutex _lock;
+ uint64_t _next_handle;
+ std::map<uint64_t,Entry> _repo;
+
+ void unlock(uint64_t handle);
+public:
+ HandleManager();
+ ~HandleManager();
+ size_t size() const;
+ bool empty() const { return (size() == 0); }
+ uint64_t create();
+ HandleGuard lock(uint64_t handle);
+ bool destroy(uint64_t handle);
+ static uint64_t null_handle() { return 0; }
+};
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/http_connection.cpp b/vespalib/src/vespa/vespalib/portal/http_connection.cpp
new file mode 100644
index 00000000000..2bed10bfd2d
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/http_connection.cpp
@@ -0,0 +1,242 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "http_connection.h"
+#include <vespa/vespalib/data/output_writer.h>
+#include <cassert>
+
+namespace vespalib::portal {
+
+namespace {
+
+constexpr size_t CHUNK_SIZE = 4096;
+
+enum class ReadRes { OK, END, FAIL };
+enum class WriteRes { OK, BLOCKED, FAIL };
+
+bool is_blocked(int res) {
+ return ((res == -1) && ((errno == EWOULDBLOCK) || (errno == EAGAIN)));
+}
+
+ReadRes drain(CryptoSocket &socket, SmartBuffer &buffer) {
+ size_t chunk_size = std::max(CHUNK_SIZE, socket.min_read_buffer_size());
+ for (;;) {
+ auto chunk = buffer.reserve(chunk_size);
+ auto res = socket.drain(chunk.data, chunk.size);
+ if (res > 0) {
+ buffer.commit(res);
+ } else if (res == 0) {
+ return ReadRes::OK;
+ } else {
+ return ReadRes::FAIL;
+ }
+ }
+}
+
+ReadRes read(CryptoSocket &socket, SmartBuffer &buffer) {
+ size_t chunk_size = std::max(CHUNK_SIZE, socket.min_read_buffer_size());
+ auto chunk = buffer.reserve(chunk_size);
+ auto res = socket.read(chunk.data, chunk.size);
+ if (res > 0) {
+ buffer.commit(res);
+ } else if (res == 0) {
+ return ReadRes::END;
+ } else if (is_blocked(res)) {
+ return ReadRes::OK;
+ } else {
+ return ReadRes::FAIL;
+ }
+ return drain(socket, buffer);
+}
+
+WriteRes flush(CryptoSocket &socket) {
+ for (;;) {
+ auto res = socket.flush();
+ if (res > 0) {
+ // flush more
+ } else if (res == 0) {
+ return WriteRes::OK;
+ } else if (is_blocked(res)) {
+ return WriteRes::BLOCKED;
+ } else {
+ return WriteRes::FAIL;
+ }
+ }
+}
+
+WriteRes write(CryptoSocket &socket, SmartBuffer &buffer) {
+ auto chunk = buffer.obtain();
+ if (chunk.size > 0) {
+ auto res = socket.write(chunk.data, chunk.size);
+ if (res > 0) {
+ buffer.evict(res);
+ } else if (is_blocked(res)) {
+ return WriteRes::BLOCKED;
+ } else {
+ assert(res < 0);
+ return WriteRes::FAIL;
+ }
+ }
+ return flush(socket);
+}
+
+WriteRes half_close(CryptoSocket &socket) {
+ auto res = socket.half_close();
+ if (res == 0) {
+ return WriteRes::OK;
+ } else if (is_blocked(res)) {
+ return WriteRes::BLOCKED;
+ } else {
+ return WriteRes::FAIL;
+ }
+}
+
+} // namespace vespalib::portal::<unnamed>
+
+void
+HttpConnection::set_state(State state, bool read, bool write)
+{
+ _token->update(read, write);
+ _state = state;
+}
+
+void
+HttpConnection::do_handshake()
+{
+ switch (_socket->handshake()) {
+ case vespalib::CryptoSocket::HandshakeResult::FAIL: return set_state(State::NOTIFY, false, false);
+ case vespalib::CryptoSocket::HandshakeResult::DONE: return set_state(State::READ_REQUEST, true, false);
+ case vespalib::CryptoSocket::HandshakeResult::NEED_READ: return set_state(State::HANDSHAKE, true, false);
+ case vespalib::CryptoSocket::HandshakeResult::NEED_WRITE: return set_state(State::HANDSHAKE, false, true);
+ }
+}
+
+void
+HttpConnection::do_read_request()
+{
+ if (read(*_socket, _input) != ReadRes::OK) {
+ return set_state(State::NOTIFY, false, false);
+ }
+ auto data = _input.obtain();
+ auto consumed = _request.handle_data(data.data, data.size);
+ _input.evict(consumed);
+ if (!_request.need_more_data()) {
+ set_state(State::DISPATCH, false, false);
+ }
+}
+
+void
+HttpConnection::do_dispatch()
+{
+ set_state(State::WAIT, false, false);
+ return _handler(this); // callback is final touch
+}
+
+void
+HttpConnection::do_wait()
+{
+ if (_reply_ready) {
+ set_state(State::WRITE_REPLY, false, true);
+ }
+}
+
+void
+HttpConnection::do_write_reply()
+{
+ if (write(*_socket, _output) == WriteRes::FAIL) {
+ return set_state(State::NOTIFY, false, false);
+ }
+ if (_output.obtain().size == 0) {
+ set_state(State::CLOSE, false, true);
+ }
+}
+
+void
+HttpConnection::do_close()
+{
+ if (half_close(*_socket) != WriteRes::BLOCKED) {
+ set_state(State::NOTIFY, false, false);
+ }
+}
+
+void
+HttpConnection::do_notify()
+{
+ set_state(State::END, false, false);
+ return _handler(this); // callback is final touch
+}
+
+HttpConnection::HttpConnection(HandleGuard guard, Reactor &reactor, CryptoSocket::UP socket, handler_fun_t handler)
+ : _guard(std::move(guard)),
+ _state(State::HANDSHAKE),
+ _socket(std::move(socket)),
+ _input(CHUNK_SIZE * 2),
+ _output(CHUNK_SIZE * 2),
+ _request(),
+ _handler(std::move(handler)),
+ _reply_ready(false),
+ _token()
+{
+ _token = reactor.attach(*this, _socket->get_fd(), true, true);
+}
+
+HttpConnection::~HttpConnection()
+{
+ _token.reset();
+}
+
+void
+HttpConnection::handle_event(bool, bool)
+{
+ if (_state == State::HANDSHAKE) {
+ do_handshake();
+ }
+ if (_state == State::READ_REQUEST) {
+ do_read_request();
+ }
+ if (_state == State::DISPATCH) {
+ return do_dispatch(); // callback is final touch
+ }
+ if (_state == State::WAIT) {
+ do_wait();
+ }
+ if (_state == State::WRITE_REPLY) {
+ do_write_reply();
+ }
+ if (_state == State::CLOSE) {
+ do_close();
+ }
+ if (_state == State::NOTIFY) {
+ return do_notify(); // callback is final touch
+ }
+}
+
+void
+HttpConnection::respond_with_content(const vespalib::string &content_type,
+ const vespalib::string &content)
+{
+ {
+ OutputWriter dst(_output, CHUNK_SIZE);
+ dst.printf("HTTP/1.1 200 OK\r\n");
+ dst.printf("Connection: close\r\n");
+ dst.printf("Content-Type: %s\r\n", content_type.c_str());
+ dst.printf("\r\n");
+ dst.write(content.data(), content.size());
+ }
+ _reply_ready = true;
+ _token->update(false, true);
+}
+
+void
+HttpConnection::respond_with_error(int code, const vespalib::string &msg)
+{
+ {
+ OutputWriter dst(_output, CHUNK_SIZE);
+ dst.printf("HTTP/1.1 %d %s\r\n", code, msg.c_str());
+ dst.printf("Connection: close\r\n");
+ dst.printf("\r\n");
+ }
+ _reply_ready = true;
+ _token->update(false, true);
+}
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/http_connection.h b/vespalib/src/vespa/vespalib/portal/http_connection.h
new file mode 100644
index 00000000000..296a915c873
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/http_connection.h
@@ -0,0 +1,54 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "reactor.h"
+#include "http_request.h"
+#include "handle_manager.h"
+#include <vespa/vespalib/net/crypto_socket.h>
+#include <vespa/vespalib/data/smart_buffer.h>
+#include <functional>
+#include <atomic>
+
+namespace vespalib::portal {
+
+class HttpConnection : public Reactor::EventHandler
+{
+public:
+ enum class State { HANDSHAKE, READ_REQUEST, DISPATCH, WAIT, WRITE_REPLY, CLOSE, NOTIFY, END };
+private:
+ using handler_fun_t = std::function<void(HttpConnection*)>;
+
+ HandleGuard _guard;
+ State _state;
+ CryptoSocket::UP _socket;
+ SmartBuffer _input;
+ SmartBuffer _output;
+ HttpRequest _request;
+ handler_fun_t _handler;
+ std::atomic<bool> _reply_ready;
+ Reactor::Token::UP _token;
+
+ void set_state(State state, bool read, bool write);
+
+ void do_handshake();
+ void do_read_request();
+ void do_dispatch();
+ void do_wait();
+ void do_write_reply();
+ void do_close();
+ void do_notify();
+
+public:
+ using UP = std::unique_ptr<HttpConnection>;
+ HttpConnection(HandleGuard guard, Reactor &reactor, CryptoSocket::UP socket, handler_fun_t handler);
+ ~HttpConnection();
+ void handle_event(bool read, bool write) override;
+ State get_state() const { return _state; }
+ const HttpRequest &get_request() const { return _request; }
+ void respond_with_content(const vespalib::string &content_type,
+ const vespalib::string &content);
+ void respond_with_error(int code, const vespalib::string &msg);
+};
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/http_request.cpp b/vespalib/src/vespa/vespalib/portal/http_request.cpp
new file mode 100644
index 00000000000..d49fc2e70f4
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/http_request.cpp
@@ -0,0 +1,166 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "http_request.h"
+
+#include <algorithm>
+#include <vector>
+
+namespace vespalib::portal {
+
+namespace {
+
+void strip_cr(vespalib::string &str) {
+ if (!str.empty() && str[str.size() - 1] == '\r') {
+ str.resize(str.size() - 1);
+ }
+}
+
+std::vector<vespalib::string> split(vespalib::stringref str, vespalib::stringref sep) {
+ vespalib::string token;
+ std::vector<vespalib::string> list;
+ for (char c: str) {
+ if (sep.find(c) == vespalib::stringref::npos) {
+ token.push_back(c);
+ } else if (!token.empty()) {
+ list.push_back(token);
+ token.clear();
+ }
+ }
+ if (!token.empty()) {
+ list.push_back(token);
+ }
+ return list;
+}
+
+} // namespace vespalib::portal::<unnamed>
+
+void
+HttpRequest::set_done()
+{
+ _done = true;
+}
+
+void
+HttpRequest::set_error()
+{
+ _error = true;
+}
+
+void
+HttpRequest::handle_request_line(const vespalib::string &line)
+{
+ auto parts = split(line, " ");
+ if (parts.size() != 3) {
+ return set_error(); // malformed request line
+ }
+ _method = parts[0];
+ _uri = parts[1];
+ _version = parts[2];
+}
+
+void
+HttpRequest::handle_header_line(const vespalib::string &line)
+{
+ if (line.empty()) {
+ return set_done();
+ }
+ size_t pos = 0;
+ size_t end = line.size();
+ bool continuation = (line[0] == ' ') || (line[0] == '\t');
+ if (!continuation) {
+ pos = line.find(":");
+ if (pos == vespalib::string::npos) {
+ return set_error(); // missing header: value separator
+ } else {
+ _header_name.assign(line, 0, pos++);
+ std::transform(_header_name.begin(), _header_name.end(),
+ _header_name.begin(), ::tolower);
+ }
+ }
+ if (_header_name.empty()) {
+ return set_error(); // missing header name
+ }
+ while ((pos < end) && (isspace(line[pos]))) {
+ ++pos; // strip leading whitespace
+ }
+ while ((pos < end) && (isspace(line[end - 1]))) {
+ --end; // strip trailing whitespace
+ }
+ auto header_insert_result = _headers.insert(std::make_pair(_header_name, vespalib::string()));
+ bool header_found = !header_insert_result.second;
+ vespalib::string &header_value = header_insert_result.first->second;
+ if (header_found) {
+ if (continuation) {
+ header_value.push_back(' ');
+ } else { // duplicate header
+ header_value.push_back(',');
+ }
+ }
+ header_value.append(line.data() + pos, end - pos);
+}
+
+void
+HttpRequest::handle_line(const vespalib::string &line)
+{
+ if (_first) {
+ handle_request_line(line);
+ _first = false;
+ } else {
+ handle_header_line(line);
+ }
+}
+
+HttpRequest::HttpRequest()
+ : _method(),
+ _uri(),
+ _version(),
+ _headers(),
+ _host(),
+ _empty(),
+ _first(true),
+ _done(false),
+ _error(false),
+ _header_name(),
+ _line_buffer()
+{
+}
+
+HttpRequest::~HttpRequest() = default;
+
+size_t
+HttpRequest::handle_data(const char *buf, size_t len)
+{
+ size_t used = 0;
+ while (need_more_data() && (used < len)) {
+ char c = buf[used++];
+ if (c != '\n') {
+ _line_buffer.push_back(c);
+ } else {
+ strip_cr(_line_buffer);
+ handle_line(_line_buffer);
+ _line_buffer.clear();
+ }
+ }
+ return used;
+}
+
+void
+HttpRequest::resolve_host(const vespalib::string &my_host)
+{
+ _host = get_header("host");
+ if (_host.empty()) {
+ _host = my_host;
+ }
+}
+
+const vespalib::string &
+HttpRequest::get_header(const vespalib::string &name) const
+{
+ auto pos = _headers.find(name);
+ if (pos == _headers.end()) {
+ return _empty;
+ }
+ return pos->second;
+}
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/http_request.h b/vespalib/src/vespa/vespalib/portal/http_request.h
new file mode 100644
index 00000000000..51c7ab08da9
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/http_request.h
@@ -0,0 +1,48 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+#include <map>
+
+namespace vespalib::portal {
+
+class HttpRequest
+{
+private:
+ // http stuff
+ vespalib::string _method;
+ vespalib::string _uri;
+ vespalib::string _version;
+ std::map<vespalib::string, vespalib::string> _headers;
+ vespalib::string _host;
+ // internal state
+ vespalib::string _empty;
+ bool _first;
+ bool _done;
+ bool _error;
+ vespalib::string _header_name;
+ vespalib::string _line_buffer;
+
+ void set_done();
+ void set_error();
+
+ void handle_request_line(const vespalib::string &line);
+ void handle_header_line(const vespalib::string &line);
+ void handle_line(const vespalib::string &line);
+
+public:
+ HttpRequest();
+ ~HttpRequest();
+ size_t handle_data(const char *buf, size_t len);
+ bool need_more_data() const { return (!_error && !_done); }
+ bool valid() const { return (!_error && _done); }
+ bool is_get() const { return _method == "GET"; }
+ void resolve_host(const vespalib::string &my_host);
+ const vespalib::string &get_header(const vespalib::string &name) const;
+ const vespalib::string &get_host() const { return _host; }
+ const vespalib::string &get_uri() const { return _uri; }
+};
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/listener.cpp b/vespalib/src/vespa/vespalib/portal/listener.cpp
new file mode 100644
index 00000000000..1c4478dcd63
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/listener.cpp
@@ -0,0 +1,37 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "listener.h"
+#include <vespa/vespalib/util/exceptions.h>
+#include <cassert>
+
+namespace vespalib::portal {
+
+Listener::Listener(Reactor &reactor, int port, std::function<void(SocketHandle)> handler)
+ : _server_socket(port),
+ _handler(std::move(handler)),
+ _token()
+{
+ if (_server_socket.valid()) {
+ bool async = _server_socket.set_blocking(false);
+ assert(async);
+ _token = reactor.attach(*this, _server_socket.get_fd(), true, false);
+ } else {
+ throw PortListenException(port, "PORTAL");
+ }
+}
+
+Listener::~Listener()
+{
+ _token.reset();
+}
+
+void
+Listener::handle_event(bool, bool)
+{
+ SocketHandle handle = _server_socket.accept();
+ if (handle.valid()) {
+ _handler(std::move(handle));
+ }
+}
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/listener.h b/vespalib/src/vespa/vespalib/portal/listener.h
new file mode 100644
index 00000000000..e2c20477f04
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/listener.h
@@ -0,0 +1,25 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "reactor.h"
+#include <vespa/vespalib/net/server_socket.h>
+#include <functional>
+
+namespace vespalib::portal {
+
+class Listener : public Reactor::EventHandler
+{
+private:
+ ServerSocket _server_socket;
+ std::function<void(SocketHandle)> _handler;
+ Reactor::Token::UP _token;
+public:
+ using UP = std::unique_ptr<Listener>;
+ Listener(Reactor &reactor, int port, std::function<void(SocketHandle)> handler);
+ ~Listener();
+ int listen_port() const { return _server_socket.address().port(); }
+ void handle_event(bool read, bool write) override;
+};
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/portal.cpp b/vespalib/src/vespa/vespalib/portal/portal.cpp
new file mode 100644
index 00000000000..c96d7f9f83e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/portal.cpp
@@ -0,0 +1,188 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "portal.h"
+#include "http_connection.h"
+#include <cassert>
+
+namespace vespalib {
+
+namespace {
+
+template <typename T>
+void remove_handle(T &collection, uint64_t handle) {
+ collection.erase(std::remove_if(collection.begin(), collection.end(),
+ [handle](const typename T::value_type &item)
+ { return (item.handle == handle); }),
+ collection.end());
+}
+
+} // namespace vespalib::<unnamed>
+
+using HttpConnection = portal::HttpConnection;
+
+Portal::Token::~Token()
+{
+ _portal.cancel_token(*this);
+}
+
+const vespalib::string &
+Portal::GetRequest::get_header(const vespalib::string &name) const
+{
+ assert(active());
+ return _conn->get_request().get_header(name);
+}
+
+const vespalib::string &
+Portal::GetRequest::get_host() const
+{
+ assert(active());
+ return _conn->get_request().get_host();
+}
+
+const vespalib::string &
+Portal::GetRequest::get_uri() const
+{
+ assert(active());
+ return _conn->get_request().get_uri();
+}
+
+void
+Portal::GetRequest::respond_with_content(const vespalib::string &content_type,
+ const vespalib::string &content)
+{
+ assert(active());
+ _conn->respond_with_content(content_type, content);
+ _conn = nullptr;
+}
+
+void
+Portal::GetRequest::respond_with_error(int code, const vespalib::string &msg)
+{
+ assert(active());
+ _conn->respond_with_error(code, msg);
+ _conn = nullptr;
+}
+
+Portal::GetRequest::~GetRequest()
+{
+ if (active()) {
+ respond_with_error(500, "Internal Server Error");
+ }
+}
+
+Portal::GetHandler::~GetHandler() = default;
+
+Portal::Token::UP
+Portal::make_token()
+{
+ return Token::UP(new Token(*this, _handle_manager.create()));
+}
+
+void
+Portal::cancel_token(Token &token)
+{
+ _handle_manager.destroy(token._handle);
+ evict_handle(token._handle);
+}
+
+portal::HandleGuard
+Portal::lookup_get_handler(const vespalib::string &uri, const GetHandler *&handler)
+{
+ std::lock_guard guard(_lock);
+ for (const auto &entry: _bind_list) {
+ if (starts_with(uri, entry.prefix)) {
+ auto handle_guard = _handle_manager.lock(entry.handle);
+ if (handle_guard.valid()) {
+ handler = entry.handler;
+ return handle_guard;
+ }
+ }
+ }
+ return portal::HandleGuard();
+}
+
+void
+Portal::evict_handle(uint64_t handle)
+{
+ std::lock_guard guard(_lock);
+ remove_handle(_bind_list, handle);
+}
+
+void
+Portal::handle_accept(portal::HandleGuard guard, SocketHandle socket)
+{
+ new HttpConnection(std::move(guard), _reactor, _crypto->create_crypto_socket(std::move(socket), true),
+ [this](HttpConnection *conn)
+ {
+ handle_http(conn);
+ });
+}
+
+void
+Portal::handle_http(portal::HttpConnection *conn)
+{
+ if (conn->get_state() == HttpConnection::State::WAIT) {
+ if (!conn->get_request().valid()) {
+ conn->respond_with_error(400, "Bad Request");
+ } else if (!conn->get_request().is_get()) {
+ conn->respond_with_error(501, "Not Implemented");
+ } else {
+ const GetHandler *get_handler = nullptr;
+ auto guard = lookup_get_handler(conn->get_request().get_uri(), get_handler);
+ if (guard.valid()) {
+ assert(get_handler != nullptr);
+ get_handler->get(GetRequest(*conn));
+ } else {
+ conn->respond_with_error(404, "Not Found");
+ }
+ }
+ } else {
+ assert(conn->get_state() == HttpConnection::State::END);
+ delete(conn);
+ }
+}
+
+Portal::Portal(CryptoEngine::SP crypto, int port)
+ : _crypto(std::move(crypto)),
+ _reactor(),
+ _handle_manager(),
+ _conn_handle(_handle_manager.create()),
+ _listener(),
+ _lock(),
+ _bind_list()
+{
+ _listener = std::make_unique<portal::Listener>(_reactor, port,
+ [this](SocketHandle socket)
+ {
+ auto guard = _handle_manager.lock(_conn_handle);
+ if (guard.valid()) {
+ handle_accept(std::move(guard), std::move(socket));
+ }
+ });
+}
+
+Portal::~Portal()
+{
+ _listener.reset();
+ _handle_manager.destroy(_conn_handle);
+ assert(_handle_manager.empty());
+ assert(_bind_list.empty());
+}
+
+Portal::SP
+Portal::create(CryptoEngine::SP crypto, int port)
+{
+ return Portal::SP(new Portal(std::move(crypto), port));
+}
+
+Portal::Token::UP
+Portal::bind(const vespalib::string &path_prefix, const GetHandler &handler)
+{
+ auto token = make_token();
+ std::lock_guard guard(_lock);
+ _bind_list.emplace_back(token->_handle, path_prefix, handler);
+ std::sort(_bind_list.begin(), _bind_list.end());
+ return token;
+}
+
+} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/portal/portal.h b/vespalib/src/vespa/vespalib/portal/portal.h
new file mode 100644
index 00000000000..0c75734de0b
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/portal.h
@@ -0,0 +1,114 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "listener.h"
+#include "reactor.h"
+#include "handle_manager.h"
+
+#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/net/crypto_socket.h>
+#include <vespa/vespalib/stllike/string.h>
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+namespace vespalib {
+
+namespace portal { class HttpConnection; }
+
+/**
+ * Minimal HTTP server and connection establishment manager.
+ **/
+class Portal
+{
+public:
+ using SP = std::shared_ptr<Portal>;
+
+ class Token {
+ friend class Portal;
+ private:
+ Portal &_portal;
+ uint64_t _handle;
+ Token(const Token &) = delete;
+ Token &operator=(const Token &) = delete;
+ Token(Token &&) = delete;
+ Token &operator=(Token &&) = delete;
+ Token(Portal &portal, uint64_t handle)
+ : _portal(portal), _handle(handle) {}
+ uint64_t handle() const { return _handle; }
+ public:
+ using UP = std::unique_ptr<Token>;
+ ~Token();
+ };
+
+ class GetRequest {
+ friend class Portal;
+ private:
+ portal::HttpConnection *_conn;
+ GetRequest(portal::HttpConnection &conn) : _conn(&conn) {}
+ public:
+ GetRequest(const GetRequest &rhs) = delete;
+ GetRequest &operator=(const GetRequest &rhs) = delete;
+ GetRequest &operator=(GetRequest &&rhs) = delete;
+ GetRequest(GetRequest &&rhs) : _conn(rhs._conn) {
+ rhs._conn = nullptr;
+ }
+ bool active() const { return (_conn != nullptr); }
+ const vespalib::string &get_header(const vespalib::string &name) const;
+ const vespalib::string &get_host() const;
+ const vespalib::string &get_uri() const;
+ void respond_with_content(const vespalib::string &content_type,
+ const vespalib::string &content);
+ void respond_with_error(int code, const vespalib::string &msg);
+ ~GetRequest();
+ };
+
+ struct GetHandler {
+ virtual void get(GetRequest request) const = 0;
+ virtual ~GetHandler();
+ };
+
+private:
+ struct BindState {
+ uint64_t handle;
+ vespalib::string prefix;
+ const GetHandler *handler;
+ BindState(uint64_t handle_in, vespalib::string prefix_in, const GetHandler &handler_in)
+ : handle(handle_in), prefix(prefix_in), handler(&handler_in) {}
+ bool operator<(const BindState &rhs) const {
+ if (prefix.size() == rhs.prefix.size()) {
+ return (handle > rhs.handle);
+ }
+ return (prefix.size() > rhs.prefix.size());
+ }
+ };
+
+ CryptoEngine::SP _crypto;
+ portal::Reactor _reactor;
+ portal::HandleManager _handle_manager;
+ uint64_t _conn_handle;
+ portal::Listener::UP _listener;
+ std::mutex _lock;
+ std::vector<BindState> _bind_list;
+
+ Token::UP make_token();
+ void cancel_token(Token &token);
+
+ portal::HandleGuard lookup_get_handler(const vespalib::string &uri, const GetHandler *&handler);
+ void evict_handle(uint64_t handle);
+
+ void handle_accept(portal::HandleGuard guard, SocketHandle socket);
+ void handle_http(portal::HttpConnection *conn);
+
+ Portal(CryptoEngine::SP crypto, int port);
+public:
+ ~Portal();
+ static SP create(CryptoEngine::SP crypto, int port);
+ int listen_port() const { return _listener->listen_port(); }
+ Token::UP bind(const vespalib::string &path_prefix, const GetHandler &handler);
+};
+
+} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/portal/reactor.cpp b/vespalib/src/vespa/vespalib/portal/reactor.cpp
new file mode 100644
index 00000000000..52a59f617fb
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/reactor.cpp
@@ -0,0 +1,128 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "reactor.h"
+#include <cassert>
+
+namespace vespalib::portal {
+
+Reactor::EventHandler::~EventHandler() = default;
+
+//-----------------------------------------------------------------------------
+
+Reactor::Token::Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write)
+ : _reactor(reactor), _handler(handler), _fd(fd)
+{
+ ++_reactor._token_cnt;
+ _reactor._selector.add(_fd, _handler, read, write);
+}
+
+void
+Reactor::Token::update(bool read, bool write)
+{
+ _reactor._selector.update(_fd, _handler, read, write);
+}
+
+Reactor::Token::~Token()
+{
+ _reactor._selector.remove(_fd);
+ _reactor.cancel_token(*this);
+ --_reactor._token_cnt;
+}
+
+//-----------------------------------------------------------------------------
+
+void
+Reactor::cancel_token(const Token &)
+{
+ if (std::this_thread::get_id() == _thread.get_id()) {
+ _skip_events = true;
+ } else {
+ std::unique_lock guard(_lock);
+ size_t old_gen = _sync_seq;
+ ++_wait_cnt;
+ guard.unlock(); // UNLOCK
+ _selector.wakeup();
+ guard.lock(); // LOCK
+ while (_sync_seq == old_gen) {
+ _cond.wait(guard);
+ }
+ --_wait_cnt;
+ }
+}
+
+void
+Reactor::release_tokens()
+{
+ std::lock_guard guard(_lock);
+ if (_wait_cnt > 0) {
+ ++_sync_seq;
+ _cond.notify_all();
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+void
+Reactor::handle_wakeup()
+{
+ _was_woken = true;
+}
+
+void
+Reactor::handle_event(EventHandler &handler, bool read, bool write)
+{
+ if (!_skip_events) {
+ handler.handle_event(read, write);
+ }
+}
+
+void
+Reactor::event_loop()
+{
+ while (!_done) {
+ _selector.poll(_tick());
+ _selector.dispatch(*this);
+ if (_skip_events) {
+ _skip_events = false;
+ }
+ if (_was_woken) {
+ release_tokens();
+ _was_woken = false;
+ }
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+Reactor::Reactor(std::function<int()> tick)
+ : _selector(),
+ _tick(std::move(tick)),
+ _done(false),
+ _was_woken(false),
+ _skip_events(false),
+ _lock(),
+ _cond(),
+ _sync_seq(0),
+ _wait_cnt(0),
+ _token_cnt(0),
+ _thread(&Reactor::event_loop, this)
+{
+}
+
+Reactor::~Reactor()
+{
+ assert(_token_cnt == 0);
+ _done = true;
+ _selector.wakeup();
+ _thread.join();
+}
+
+Reactor::Token::UP
+Reactor::attach(EventHandler &handler, int fd, bool read, bool write)
+{
+ return Token::UP(new Token(*this, handler, fd, read, write));
+}
+
+//-----------------------------------------------------------------------------
+
+} // namespace vespalib::portal
diff --git a/vespalib/src/vespa/vespalib/portal/reactor.h b/vespalib/src/vespa/vespalib/portal/reactor.h
new file mode 100644
index 00000000000..a7961c3c943
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/portal/reactor.h
@@ -0,0 +1,68 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/net/selector.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+namespace vespalib::portal {
+
+class Reactor
+{
+public:
+ struct EventHandler {
+ virtual void handle_event(bool read, bool write) = 0;
+ virtual ~EventHandler();
+ };
+ friend class Selector<EventHandler>;
+ class Token {
+ friend class Reactor;
+ private:
+ Reactor &_reactor;
+ EventHandler &_handler;
+ int _fd;
+ Token(const Token &) = delete;
+ Token &operator=(const Token &) = delete;
+ Token(Token &&) = delete;
+ Token &operator=(Token &&) = delete;
+ Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write);
+ public:
+ using UP = std::unique_ptr<Token>;
+ void update(bool read, bool write);
+ ~Token();
+ };
+
+private:
+ Selector<EventHandler> _selector;
+ std::function<int()> _tick;
+ std::atomic<bool> _done;
+ bool _was_woken;
+ bool _skip_events;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ size_t _sync_seq;
+ size_t _wait_cnt;
+ std::atomic<size_t> _token_cnt;
+ std::thread _thread;
+
+ void cancel_token(const Token &token);
+ void release_tokens();
+
+ void handle_wakeup();
+ void handle_event(EventHandler &handler, bool read, bool write);
+ void event_loop();
+
+public:
+ Reactor(std::function<int()> tick);
+ Reactor() : Reactor([](){ return -1; }) {}
+ ~Reactor();
+ Token::UP attach(EventHandler &handler, int fd, bool read, bool write);
+};
+
+} // namespace vespalib::portal