diff options
author | Håvard Pettersen <havardpe@oath.com> | 2018-10-29 10:04:19 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2018-11-27 15:11:06 +0000 |
commit | c52bd31c7208d4bab093efb24acb9a3452a0b936 (patch) | |
tree | 1fc9c4b9b03dc1fe305e995a41c413da76df493a | |
parent | 95323a3a983e3b87f83ea7eaee990ce6070ed699 (diff) |
initial portal code
24 files changed, 2070 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 6491bdfb036..8bd3dda4e5a 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -67,6 +67,10 @@ vespa_define_module( src/tests/net/tls/transport_options src/tests/objects/nbostream src/tests/optimized + src/tests/portal + src/tests/portal/handle_manager + src/tests/portal/http_request + src/tests/portal/reactor src/tests/printable src/tests/priority_queue src/tests/random @@ -130,6 +134,7 @@ vespa_define_module( src/vespa/vespalib/net/tls src/vespa/vespalib/net/tls/impl src/vespa/vespalib/objects + src/vespa/vespalib/portal src/vespa/vespalib/stllike src/vespa/vespalib/test src/vespa/vespalib/testkit diff --git a/vespalib/src/tests/portal/CMakeLists.txt b/vespalib/src/tests/portal/CMakeLists.txt new file mode 100644 index 00000000000..3c39f286a15 --- /dev/null +++ b/vespalib/src/tests/portal/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_portal_test_app TEST + SOURCES + portal_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_portal_test_app COMMAND vespalib_portal_test_app) diff --git a/vespalib/src/tests/portal/handle_manager/CMakeLists.txt b/vespalib/src/tests/portal/handle_manager/CMakeLists.txt new file mode 100644 index 00000000000..b12b72f9ad6 --- /dev/null +++ b/vespalib/src/tests/portal/handle_manager/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_handle_manager_test_app TEST + SOURCES + handle_manager_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_handle_manager_test_app COMMAND vespalib_handle_manager_test_app) diff --git a/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp b/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp new file mode 100644 index 00000000000..4ce25aa0a7a --- /dev/null +++ b/vespalib/src/tests/portal/handle_manager/handle_manager_test.cpp @@ -0,0 +1,146 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/portal/handle_manager.h> +#include <vespa/vespalib/util/gate.h> + +#include <thread> +#include <chrono> +#include <atomic> + +using namespace vespalib; +using namespace vespalib::portal; + +TEST_F("require that handles can be created, locked and destroyed", TimeBomb(60)) { + HandleManager manager; + uint64_t handle = manager.create(); + EXPECT_TRUE(handle != manager.null_handle()); + { + HandleGuard guard = manager.lock(handle); + EXPECT_TRUE(guard.valid()); + EXPECT_EQUAL(guard.handle(), handle); + } + EXPECT_TRUE(manager.destroy(handle)); + { + HandleGuard guard = manager.lock(handle); + EXPECT_TRUE(!guard.valid()); + EXPECT_EQUAL(guard.handle(), manager.null_handle()); + } +} + +TEST_F("require that multiple guards can be taken for the same handle", TimeBomb(60)) { + HandleManager manager; + uint64_t handle = manager.create(); + EXPECT_TRUE(handle != manager.null_handle()); + { + HandleGuard guard1 = manager.lock(handle); + HandleGuard guard2 = manager.lock(handle); // <- does not block + EXPECT_TRUE(guard1.valid()); + EXPECT_EQUAL(guard1.handle(), handle); + EXPECT_TRUE(guard2.valid()); + EXPECT_EQUAL(guard2.handle(), handle); + } + EXPECT_TRUE(manager.destroy(handle)); +} + +TEST_F("require that handles are independent", TimeBomb(60)) { + HandleManager manager; + uint64_t handle1 = manager.create(); + uint64_t handle2 = manager.create(); + uint64_t handle3 = manager.create(); + EXPECT_TRUE(handle1 != manager.null_handle()); + EXPECT_TRUE(handle2 != manager.null_handle()); + EXPECT_TRUE(handle3 != manager.null_handle()); + EXPECT_TRUE(handle1 != handle2); + EXPECT_TRUE(handle1 != handle3); + EXPECT_TRUE(handle2 != handle3); + { + HandleGuard guard1 = manager.lock(handle1); + HandleGuard guard2 = manager.lock(handle2); + EXPECT_TRUE(guard1.valid()); + EXPECT_EQUAL(guard1.handle(), handle1); + EXPECT_TRUE(guard2.valid()); + EXPECT_EQUAL(guard2.handle(), handle2); + EXPECT_TRUE(manager.destroy(handle3)); // <- does not block + HandleGuard guard3 = manager.lock(handle3); + EXPECT_TRUE(!guard3.valid()); + EXPECT_EQUAL(guard3.handle(), manager.null_handle()); + } + EXPECT_TRUE(manager.destroy(handle1)); + EXPECT_TRUE(manager.destroy(handle2)); + EXPECT_TRUE(!manager.destroy(handle3)); +} + +struct Fixture { + HandleManager manager; + uint64_t handle; + Gate gate; + std::atomic<size_t> cnt1; + std::atomic<size_t> cnt2; + Fixture() : manager(), handle(manager.create()), gate(), cnt1(0), cnt2(0) {} +}; + +TEST_MT_FF("require that destroy waits for active handle guards", 2, Fixture(), TimeBomb(60)) { + if (thread_id == 0) { + { + auto guard = f1.manager.lock(f1.handle); + TEST_BARRIER(); // #1 + EXPECT_TRUE(!f1.gate.await(20)); + } + EXPECT_TRUE(f1.gate.await(60000)); + } else { + TEST_BARRIER(); // #1 + EXPECT_TRUE(f1.manager.destroy(f1.handle)); + f1.gate.countDown(); + } +} + +TEST_MT_FF("require that destroy disables ability to lock handles", 3, Fixture(), TimeBomb(60)) { + if (thread_id == 0) { + auto guard = f1.manager.lock(f1.handle); + ASSERT_TRUE(guard.valid()); + TEST_BARRIER(); // #1 + while (f1.cnt1 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + EXPECT_EQUAL(f1.cnt2, 0u); + } else if (thread_id == 1) { + TEST_BARRIER(); // #1 + EXPECT_TRUE(f1.manager.destroy(f1.handle)); + EXPECT_EQUAL(f1.cnt1, 1u); + ++f1.cnt2; + } else { + TEST_BARRIER(); // #1 + while (f1.cnt1 == 0) { + auto guard = f1.manager.lock(f1.handle); + if (guard.valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + EXPECT_EQUAL(f1.cnt2, 0u); + ++f1.cnt1; + } + } + } +} + +TEST_MT_FF("require that a single destroy call returns true", 10, Fixture(), TimeBomb(60)) { + if (thread_id == 0) { // 1 thread here + auto guard = f1.manager.lock(f1.handle); + ASSERT_TRUE(guard.valid()); + TEST_BARRIER(); // #1 + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { // 'many' threads here + TEST_BARRIER(); // #1 + if (f1.manager.destroy(f1.handle)) { + ++f1.cnt1; + } else { + ++f1.cnt2; + } + } + TEST_BARRIER(); // #2 + EXPECT_EQUAL(f1.cnt1, 1u); + EXPECT_GREATER(num_threads, 5u); + EXPECT_EQUAL(f1.cnt2, (num_threads - 2)); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/portal/http_request/CMakeLists.txt b/vespalib/src/tests/portal/http_request/CMakeLists.txt new file mode 100644 index 00000000000..00dbf356634 --- /dev/null +++ b/vespalib/src/tests/portal/http_request/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_portal_http_request_test_app TEST + SOURCES + http_request_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_portal_http_request_test_app COMMAND vespalib_portal_http_request_test_app) diff --git a/vespalib/src/tests/portal/http_request/http_request_test.cpp b/vespalib/src/tests/portal/http_request/http_request_test.cpp new file mode 100644 index 00000000000..6e1527efa4b --- /dev/null +++ b/vespalib/src/tests/portal/http_request/http_request_test.cpp @@ -0,0 +1,120 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/portal/http_request.h> + +using namespace vespalib; +using namespace vespalib::portal; + +vespalib::string simple_req("GET /my/path HTTP/1.1\r\n" + "Host: my.host.com:80\r\n" + "CustomHeader: CustomValue\r\n" + "\r\n123456789"); +size_t simple_req_padding = 9; +size_t simple_req_size = (simple_req.size() - simple_req_padding); + +void verify_simple_req(const HttpRequest &req) { + EXPECT_TRUE(!req.need_more_data()); + EXPECT_TRUE(req.valid()); + EXPECT_TRUE(req.is_get()); + EXPECT_EQUAL(req.get_uri(), "/my/path"); + EXPECT_EQUAL(req.get_header("host"), "my.host.com:80"); + EXPECT_EQUAL(req.get_header("customheader"), "CustomValue"); + EXPECT_EQUAL(req.get_header("non-existing-header"), ""); +} + +HttpRequest make_request(const vespalib::string &req) { + HttpRequest result; + ASSERT_EQUAL(result.handle_data(req.data(), req.size()), req.size()); + ASSERT_TRUE(result.valid()); + return result; +} + +void verify_invalid_request(const vespalib::string &req) { + HttpRequest result; + EXPECT_EQUAL(result.handle_data(req.data(), req.size()), req.size()); + EXPECT_TRUE(!result.need_more_data()); + EXPECT_TRUE(!result.valid()); +} + +TEST("require that request can be parsed in one go") { + HttpRequest req; + EXPECT_EQUAL(req.handle_data(simple_req.data(), simple_req_size), simple_req_size); + TEST_DO(verify_simple_req(req)); +} + +TEST("require that trailing data is not consumed") { + HttpRequest req; + EXPECT_EQUAL(req.handle_data(simple_req.data(), simple_req.size()), simple_req_size); + TEST_DO(verify_simple_req(req)); +} + +TEST("require that request can be parsed incrementally") { + HttpRequest req; + size_t chunk = 7; + size_t done = 0; + while (done < simple_req_size) { + size_t expect = std::min(simple_req_size - done, chunk); + EXPECT_EQUAL(req.handle_data(simple_req.data() + done, chunk), expect); + done += expect; + } + EXPECT_EQUAL(done, simple_req_size); + TEST_DO(verify_simple_req(req)); +} + +TEST("require that header continuation is replaced by single space") { + auto req = make_request("GET /my/path HTTP/1.1\r\n" + "test: one\r\n" + " two\r\n" + "\tthree\r\n" + "\r\n"); + EXPECT_EQUAL(req.get_header("test"), "one two three"); +} + +TEST("require that duplicate headers are combined as list") { + auto req = make_request("GET /my/path HTTP/1.1\r\n" + "test: one\r\n" + "test: two\r\n" + "test: three\r\n" + "\r\n"); + EXPECT_EQUAL(req.get_header("test"), "one,two,three"); +} + +TEST("require that leading and trailing whitespaces are stripped") { + auto req = make_request("GET /my/path HTTP/1.1\r\n" + "test: one \r\n" + " , two \r\n" + "test: three \r\n" + "\r\n"); + EXPECT_EQUAL(req.get_header("test"), "one , two,three"); +} + +TEST("require that non-GET requests are detected") { + auto req = make_request("POST /my/path HTTP/1.1\r\n" + "\r\n"); + EXPECT_TRUE(!req.is_get()); +} + +TEST("require that request line must contain all relevant parts") { + TEST_DO(verify_invalid_request("/my/path HTTP/1.1\r\n")); + TEST_DO(verify_invalid_request("GET HTTP/1.1\r\n")); + TEST_DO(verify_invalid_request("GET /my/path\r\n")); +} + +TEST("require that first header line cannot be a continuation") { + TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n" + " two\r\n")); +} + +TEST("require that header name is not allowed to be empty") { + TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n" + ": value\r\n")); +} + +TEST("require that header line must contain separator") { + TEST_DO(verify_invalid_request("GET /my/path HTTP/1.1\r\n" + "ok-header: ok-value\r\n" + "missing separator\r\n")); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/portal/portal_test.cpp b/vespalib/src/tests/portal/portal_test.cpp new file mode 100644 index 00000000000..cac4b78d3e2 --- /dev/null +++ b/vespalib/src/tests/portal/portal_test.cpp @@ -0,0 +1,298 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/portal/portal.h> +#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/net/socket_spec.h> +#include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/net/sync_crypto_socket.h> +#include <vespa/vespalib/net/tls/tls_crypto_engine.h> +#include <vespa/vespalib/net/tls/maybe_tls_crypto_engine.h> +#include <vespa/vespalib/test/make_tls_options_for_testing.h> +#include <vespa/vespalib/util/threadstackexecutor.h> + +using namespace vespalib; + +//----------------------------------------------------------------------------- + +vespalib::string do_http(int port, CryptoEngine::SP crypto, const vespalib::string &method, const vespalib::string &uri) { + auto socket = SocketSpec::from_port(port).client_address().connect(); + ASSERT_TRUE(socket.valid()); + auto conn = SyncCryptoSocket::create(*crypto, std::move(socket), false); + vespalib::string http_req = vespalib::make_string("%s %s HTTP/1.1\r\n" + "Host: localhost:%d\r\n" + "\r\n", method.c_str(), uri.c_str(), port); + ASSERT_EQUAL(conn->write(http_req.data(), http_req.size()), ssize_t(http_req.size())); + char buf[1024]; + vespalib::string result; + ssize_t res = conn->read(buf, sizeof(buf)); + while (res > 0) { + result.append(vespalib::stringref(buf, res)); + res = conn->read(buf, sizeof(buf)); + } + ASSERT_EQUAL(res, 0); + return result; +} + +vespalib::string fetch(int port, CryptoEngine::SP crypto, const vespalib::string &path) { + return do_http(port, std::move(crypto), "GET", path); +} + +//----------------------------------------------------------------------------- + +vespalib::string make_expected_response(const vespalib::string &content_type, const vespalib::string &content) { + return vespalib::make_string("HTTP/1.1 200 OK\r\n" + "Connection: close\r\n" + "Content-Type: %s\r\n" + "\r\n" + "%s", content_type.c_str(), content.c_str()); +} + +vespalib::string make_expected_error(int code, const vespalib::string &message) { + return vespalib::make_string("HTTP/1.1 %d %s\r\n" + "Connection: close\r\n" + "\r\n", code, message.c_str()); +} + +//----------------------------------------------------------------------------- + +struct Encryption { + vespalib::string name; + CryptoEngine::SP engine; + ~Encryption(); +}; +Encryption::~Encryption() = default; + +auto null_crypto() { return std::make_shared<NullCryptoEngine>(); } +auto xor_crypto() { return std::make_shared<XorCryptoEngine>(); } +auto tls_crypto() { return std::make_shared<TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); } +auto maybe_tls_crypto(bool client_tls) { return std::make_shared<MaybeTlsCryptoEngine>(tls_crypto(), client_tls); } + +std::vector<Encryption> crypto_list = {{"no encryption", null_crypto()}, + {"ad-hoc xor", xor_crypto()}, + {"always TLS", tls_crypto()}, + {"maybe TLS; yes", maybe_tls_crypto(true)}, + {"maybe TLS; no", maybe_tls_crypto(false)}}; + +//----------------------------------------------------------------------------- + +struct MyGetHandler : public Portal::GetHandler { + std::function<void(Portal::GetRequest)> fun; + template <typename F> + MyGetHandler(F &&f) : fun(std::move(f)) {} + void get(Portal::GetRequest request) const override { + fun(std::move(request)); + } + ~MyGetHandler(); +}; +MyGetHandler::~MyGetHandler() = default; + +//----------------------------------------------------------------------------- + +TEST("require that failed portal listening throws exception") { + EXPECT_EXCEPTION(Portal::create(null_crypto(), -37), PortListenException, "-37"); +} + +TEST("require that portal can listen to auto-selected port") { + auto portal = Portal::create(null_crypto(), 0); + EXPECT_GREATER(portal->listen_port(), 0); +} + +TEST("require that simple GET works with various encryption strategies") { + vespalib::string path = "/test"; + vespalib::string type = "application/json"; + vespalib::string content = "[1,2,3]"; + MyGetHandler handler([&](Portal::GetRequest request) + { + EXPECT_EQUAL(request.get_uri(), path); + request.respond_with_content(type, content); + }); + for (const Encryption &crypto: crypto_list) { + fprintf(stderr, "... testing simple GET with encryption: '%s'\n", crypto.name.c_str()); + auto portal = Portal::create(crypto.engine, 0); + auto bound = portal->bind(path, handler); + auto expect = make_expected_response(type, content); + auto result = fetch(portal->listen_port(), crypto.engine, path); + EXPECT_EQUAL(result, expect); + bound.reset(); + result = fetch(portal->listen_port(), crypto.engine, path); + expect = make_expected_error(404, "Not Found"); + EXPECT_EQUAL(result, expect); + } +} + +//----------------------------------------------------------------------------- + +TEST("require that methods other than GET return not implemented error") { + auto portal = Portal::create(null_crypto(), 0); + auto expect_get = make_expected_error(404, "Not Found"); + auto expect_other = make_expected_error(501, "Not Implemented"); + for (const auto &method: {"OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"}) { + auto result = do_http(portal->listen_port(), null_crypto(), method, "/test"); + if (vespalib::string("GET") == method) { + EXPECT_EQUAL(result, expect_get); + } else { + EXPECT_EQUAL(result, expect_other); + } + } +} + +TEST("require that GET handler can return HTTP error") { + vespalib::string path = "/test"; + auto portal = Portal::create(null_crypto(), 0); + auto expect = make_expected_error(123, "My Error"); + MyGetHandler handler([](Portal::GetRequest request) + { + request.respond_with_error(123, "My Error"); + }); + auto bound = portal->bind(path, handler); + auto result = fetch(portal->listen_port(), null_crypto(), path); + EXPECT_EQUAL(result, expect); +} + +TEST("require that get requests dropped on the floor returns HTTP error") { + vespalib::string path = "/test"; + auto portal = Portal::create(null_crypto(), 0); + auto expect = make_expected_error(500, "Internal Server Error"); + MyGetHandler handler([](Portal::GetRequest){}); + auto bound = portal->bind(path, handler); + auto result = fetch(portal->listen_port(), null_crypto(), path); + EXPECT_EQUAL(result, expect); +} + +struct GetTask : public Executor::Task { + Portal::GetRequest request; + GetTask(Portal::GetRequest request_in) : request(std::move(request_in)) {} + void run() override { + request.respond_with_content("text/plain", "hello"); + } +}; + +TEST("require that GET requests can be completed in another thread") { + vespalib::string path = "/test"; + ThreadStackExecutor executor(1, 128 * 1024); + auto portal = Portal::create(null_crypto(), 0); + auto expect = make_expected_response("text/plain", "hello"); + MyGetHandler handler([&executor](Portal::GetRequest request) + { + executor.execute(std::make_unique<GetTask>(std::move(request))); + }); + auto bound = portal->bind(path, handler); + auto result = fetch(portal->listen_port(), null_crypto(), path); + EXPECT_EQUAL(result, expect); + executor.shutdown().sync(); +} + +TEST("require that bogus request returns HTTP error") { + auto portal = Portal::create(null_crypto(), 0); + auto expect = make_expected_error(400, "Bad Request"); + auto result = do_http(portal->listen_port(), null_crypto(), "this request is", " totally bogus\r\n"); + EXPECT_EQUAL(result, expect); +} + +TEST("require that the handler with the longest matching prefix is selected") { + auto portal = Portal::create(null_crypto(), 0); + MyGetHandler handler1([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler1"); }); + MyGetHandler handler2([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler2"); }); + MyGetHandler handler3([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler3"); }); + auto bound1 = portal->bind("/foo", handler1); + auto bound3 = portal->bind("/foo/bar/baz", handler3); + auto bound2 = portal->bind("/foo/bar", handler2); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1")); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar"), make_expected_response("text/plain", "handler2")); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler3")); + bound3.reset(); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler2")); + bound2.reset(); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo/bar/baz"), make_expected_response("text/plain", "handler1")); +} + +TEST("require that newer handlers with the same prefix shadows older ones") { + auto portal = Portal::create(null_crypto(), 0); + MyGetHandler handler1([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler1"); }); + MyGetHandler handler2([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler2"); }); + MyGetHandler handler3([](Portal::GetRequest request){ request.respond_with_content("text/plain", "handler3"); }); + auto bound1 = portal->bind("/foo", handler1); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1")); + auto bound2 = portal->bind("/foo", handler2); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler2")); + auto bound3 = portal->bind("/foo", handler3); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler3")); + bound3.reset(); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler2")); + bound2.reset(); + EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1")); +} + +TEST("require that connection errors do not block shutdown by leaking resources (also tests tight shutdown timing)") { + MyGetHandler handler([](Portal::GetRequest request) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + request.respond_with_content("application/json", "[1,2,3]"); + }); + for (const Encryption &crypto: crypto_list) { + fprintf(stderr, "... testing connection errors with encryption: '%s'\n", crypto.name.c_str()); + auto portal = Portal::create(crypto.engine, 0); + auto bound = portal->bind("/test", handler); + { // close before sending anything + auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect(); + auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false); + } + { // send partial request then close connection + auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect(); + auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false); + vespalib::string req = "GET /test HTTP/1.1\r\n" + "Host: local"; + ASSERT_EQUAL(conn->write(req.data(), req.size()), ssize_t(req.size())); + } + { // send request then close without reading response + auto socket = SocketSpec::from_port(portal->listen_port()).client_address().connect(); + auto conn = SyncCryptoSocket::create(*crypto.engine, std::move(socket), false); + vespalib::string req = "GET /test HTTP/1.1\r\n" + "Host: localhost\r\n" + "\r\n"; + ASSERT_EQUAL(conn->write(req.data(), req.size()), ssize_t(req.size())); + } + } +} + +struct WaitingFixture { + Portal::SP portal; + Gate enter_callback; + Gate exit_callback; + MyGetHandler handler; + Portal::Token::UP bound; + WaitingFixture() : portal(Portal::create(null_crypto(), 0)), + enter_callback(), + exit_callback(), + handler([this](Portal::GetRequest request) + { + enter_callback.countDown(); + request.respond_with_content("application/json", "[1,2,3]"); + exit_callback.await(); + }), + bound(portal->bind("/test", handler)) {} +}; + +TEST_MT_FFF("require that bind token destruction waits for active requests", 3, + WaitingFixture(), Gate(), TimeBomb(60)) +{ + if (thread_id == 0) { + f1.enter_callback.await(); + EXPECT_TRUE(!f2.await(20)); + f1.exit_callback.countDown(); + EXPECT_TRUE(f2.await(60000)); + } else if (thread_id == 1) { + f1.enter_callback.await(); + f1.bound.reset(); + f2.countDown(); + } else { + auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test"); + EXPECT_EQUAL(result, make_expected_response("application/json", "[1,2,3]")); + } +} + +//----------------------------------------------------------------------------- + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/portal/reactor/CMakeLists.txt b/vespalib/src/tests/portal/reactor/CMakeLists.txt new file mode 100644 index 00000000000..2ed829fcfc0 --- /dev/null +++ b/vespalib/src/tests/portal/reactor/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_reactor_test_app TEST + SOURCES + reactor_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_reactor_test_app COMMAND vespalib_reactor_test_app) diff --git a/vespalib/src/tests/portal/reactor/reactor_test.cpp b/vespalib/src/tests/portal/reactor/reactor_test.cpp new file mode 100644 index 00000000000..31bda119fd9 --- /dev/null +++ b/vespalib/src/tests/portal/reactor/reactor_test.cpp @@ -0,0 +1,162 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/net/socket_handle.h> +#include <vespa/vespalib/portal/reactor.h> +#include <vespa/vespalib/util/gate.h> + +#include <thread> +#include <chrono> + +#include <sys/types.h> +#include <sys/socket.h> +#include <fcntl.h> + +using namespace vespalib; +using namespace vespalib::portal; + +struct SocketPair { + SocketHandle main; + SocketHandle other; + SocketPair() : main(), other() { + int sockets[2]; + ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets)); + main.reset(sockets[0]); + other.reset(sockets[1]); + // make main socket both readable and writable + ASSERT_EQUAL(other.write("x", 1), 1); + } + ~SocketPair(); +}; +SocketPair::~SocketPair() = default; + +std::atomic<size_t> tick_cnt = 0; +int tick() { + ++tick_cnt; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return 0; +} + +void wait_tick() { + size_t sample = tick_cnt; + while (sample == tick_cnt) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + +struct SimpleHandler : Reactor::EventHandler { + SocketPair sockets; + std::atomic<size_t> read_cnt; + std::atomic<size_t> write_cnt; + Reactor::Token::UP token; + SimpleHandler(Reactor &reactor, bool read, bool write) + : sockets(), read_cnt(0), write_cnt(0), token() + { + token = reactor.attach(*this, sockets.main.get(), read, write); + } + void handle_event(bool read, bool write) override { + if (read) { + ++read_cnt; + } + if (write) { + ++write_cnt; + } + } + void verify(bool read, bool write) { + size_t read_sample = read_cnt; + size_t write_sample = write_cnt; + wait_tick(); + wait_tick(); + EXPECT_EQUAL((read_sample != read_cnt), read); + EXPECT_EQUAL((write_sample != write_cnt), write); + } + ~SimpleHandler(); +}; +SimpleHandler::~SimpleHandler() = default; + +struct DeletingHandler : SimpleHandler { + Gate token_deleted; + DeletingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true), + token_deleted() {} + void handle_event(bool read, bool write) override { + SimpleHandler::handle_event(read, write); + token.reset(); + token_deleted.countDown(); + } + ~DeletingHandler(); +}; +DeletingHandler::~DeletingHandler() = default; + +struct WaitingHandler : SimpleHandler { + Gate enter_callback; + Gate exit_callback; + WaitingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true), + enter_callback(), exit_callback() {} + void handle_event(bool read, bool write) override { + enter_callback.countDown(); + SimpleHandler::handle_event(read, write); + exit_callback.await(); + } + ~WaitingHandler(); +}; +WaitingHandler::~WaitingHandler() = default; + +//----------------------------------------------------------------------------- + +TEST_FF("require that reactor can produce async io events", Reactor(tick), TimeBomb(60)) { + for (bool read: {true, false}) { + for (bool write: {true, false}) { + { + SimpleHandler handler(f1, read, write); + TEST_DO(handler.verify(read, write)); + } + } + } +} + +TEST_FF("require that reactor token can be used to change active io events", Reactor(tick), TimeBomb(60)) { + SimpleHandler handler(f1, false, false); + TEST_DO(handler.verify(false, false)); + for (int i = 0; i < 2; ++i) { + for (bool read: {true, false}) { + for (bool write: {true, false}) { + handler.token->update(read, write); + wait_tick(); // avoid stale events + TEST_DO(handler.verify(read, write)); + } + } + } +} + +TEST_FF("require that deleting reactor token disables io events", Reactor(tick), TimeBomb(60)) { + SimpleHandler handler(f1, true, true); + TEST_DO(handler.verify(true, true)); + handler.token.reset(); + TEST_DO(handler.verify(false, false)); +} + +TEST_FF("require that reactor token can be destroyed during io event handling", Reactor(tick), TimeBomb(60)) { + DeletingHandler handler(f1); + handler.token_deleted.await(); + TEST_DO(handler.verify(false, false)); + EXPECT_EQUAL(handler.read_cnt, 1u); + EXPECT_EQUAL(handler.write_cnt, 1u); +} + +TEST_MT_FFFF("require that reactor token destruction waits for io event handling", 2, + Reactor(), WaitingHandler(f1), Gate(), TimeBomb(60)) +{ + if (thread_id == 0) { + f2.enter_callback.await(); + TEST_BARRIER(); // #1 + EXPECT_TRUE(!f3.await(20)); + f2.exit_callback.countDown(); + EXPECT_TRUE(f3.await(60000)); + } else { + TEST_BARRIER(); // #1 + f2.token.reset(); + f3.countDown(); + } +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/CMakeLists.txt b/vespalib/src/vespa/vespalib/CMakeLists.txt index 8261bb8874e..676362a7aef 100644 --- a/vespalib/src/vespa/vespalib/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_library(vespalib $<TARGET_OBJECTS:vespalib_vespalib_net_tls> $<TARGET_OBJECTS:vespalib_vespalib_net_tls_impl> $<TARGET_OBJECTS:vespalib_vespalib_objects> + $<TARGET_OBJECTS:vespalib_vespalib_portal> $<TARGET_OBJECTS:vespalib_vespalib_stllike> $<TARGET_OBJECTS:vespalib_vespalib_test> $<TARGET_OBJECTS:vespalib_vespalib_testkit> diff --git a/vespalib/src/vespa/vespalib/portal/CMakeLists.txt b/vespalib/src/vespa/vespalib/portal/CMakeLists.txt new file mode 100644 index 00000000000..cb27aafdcf1 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/CMakeLists.txt @@ -0,0 +1,11 @@ +# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(vespalib_vespalib_portal OBJECT + SOURCES + handle_manager.cpp + http_connection.cpp + http_request.cpp + listener.cpp + portal.cpp + reactor.cpp + DEPENDS +) diff --git a/vespalib/src/vespa/vespalib/portal/README b/vespalib/src/vespa/vespalib/portal/README new file mode 100644 index 00000000000..dfaf904ae95 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/README @@ -0,0 +1,22 @@ +Portal is the codename of a component that does 3 things: + +(1) it is a simple HTTP server able to answer GET requests +(2) it accepts incoming connections from other applications +(3) it establishes outgoing connections to other applications + +The idea is that each resource is either something to be queried with +a GET request or something to be connected to with a connection +upgrade similar to how websocket connections are established. This +allows an application to use a single port for all network traffic. + +For backwards compatibility, incoming connections can also be accepted +directly from a server socket using a separate port. The important +thing is that the component that will be using the connections is +relieved of the complexities of accepting and establishing them. The +flip to using HTTP connection upgrade later on should requite minor +changes in how the portal API is used. + +notable http server limitations: + * HEAD method not supported + * http connection keep-alive not supported + * absolute uri in request-line not supported diff --git a/vespalib/src/vespa/vespalib/portal/handle_manager.cpp b/vespalib/src/vespa/vespalib/portal/handle_manager.cpp new file mode 100644 index 00000000000..b78541deb19 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/handle_manager.cpp @@ -0,0 +1,105 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "handle_manager.h" + +#include <cassert> + +namespace vespalib::portal { + +void +HandleGuard::unlock() +{ + if (valid()) { + _manager->unlock(_handle); + _manager = nullptr; + _handle = 0; + } +} + +HandleGuard::~HandleGuard() +{ + unlock(); +} + +//----------------------------------------------------------------------------- + +HandleManager::Entry::~Entry() +{ + assert(use_cnt == 0); + assert(wait_cnt == 0); +} + +void +HandleManager::unlock(uint64_t handle) +{ + std::lock_guard guard(_lock); + auto pos = _repo.find(handle); + assert(pos != _repo.end()); + --pos->second.use_cnt; + if (pos->second.should_notify()) { + pos->second.cond.notify_all(); + } +} + +HandleManager::HandleManager() + : _lock(), + _next_handle(1), + _repo() +{ +} + +HandleManager::~HandleManager() = default; + +size_t +HandleManager::size() const +{ + std::lock_guard guard(_lock); + return _repo.size(); +} + +uint64_t +HandleManager::create() +{ + std::lock_guard guard(_lock); + uint64_t handle = _next_handle++; + _repo[handle]; + return handle; +} + +HandleGuard +HandleManager::lock(uint64_t handle) +{ + std::lock_guard guard(_lock); + auto pos = _repo.find(handle); + if (pos == _repo.end()) { + return HandleGuard(); + } + if (pos->second.disable) { + return HandleGuard(); + } + ++pos->second.use_cnt; + return HandleGuard(*this, handle); +} + +bool +HandleManager::destroy(uint64_t handle) +{ + std::unique_lock guard(_lock); + auto pos = _repo.find(handle); + if (pos == _repo.end()) { + return false; + } + pos->second.disable = true; + ++pos->second.wait_cnt; + while (pos->second.use_cnt > 0) { + pos->second.cond.wait(guard); + } + --pos->second.wait_cnt; + if (pos->second.should_erase()) { + _repo.erase(pos); + return true; + } + return false; +} + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/handle_manager.h b/vespalib/src/vespa/vespalib/portal/handle_manager.h new file mode 100644 index 00000000000..8b3456dc75c --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/handle_manager.h @@ -0,0 +1,98 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <condition_variable> +#include <map> +#include <memory> +#include <mutex> + +namespace vespalib::portal { + +class HandleManager; + +/** + * A guard that makes sure a handle remains valid while using it. + **/ +class HandleGuard { + friend class HandleManager; +private: + HandleManager *_manager; + uint64_t _handle; + HandleGuard(HandleManager &manager_in, uint64_t handle_in) + : _manager(&manager_in), _handle(handle_in) {} + void unlock(); +public: + HandleGuard() : _manager(nullptr), _handle(0) {} + HandleGuard(const HandleGuard &) = delete; + HandleGuard &operator=(const HandleGuard &) = delete; + HandleGuard(HandleGuard &&rhs) + : _manager(rhs._manager), + _handle(rhs._handle) + { + rhs._manager = nullptr; + rhs._handle = 0; + } + HandleGuard &operator=(HandleGuard &&rhs) { + unlock(); + _manager = rhs._manager; + _handle = rhs._handle; + rhs._manager = nullptr; + rhs._handle = 0; + return *this; + } + bool valid() { return (_manager != nullptr); } + uint64_t handle() const { return _handle; } + ~HandleGuard(); +}; + +/** + * A manager keeping track of all currently active handles. The + * 'create' function will create a unique handle and return it. The + * 'lock' function is used to obtain guard for a specific handle, + * making sure it remains valid while using it. Calling the 'destroy' + * function will tag the handle for destruction and also wait until + * the handle is no longer in use. Any subsequent calls to 'lock' + * after the handle has been tagged for destruction will return an + * invalid guard, making it important to check the return value of + * 'lock'. The 'destroy' function can be called by multiple actors at + * any time. Only one of these calls will return true, indicating + * credit for the destruction of the handle and responsibility for + * cleaning up after it. + **/ +class HandleManager +{ + friend class HandleGuard; +private: + struct Entry { + std::condition_variable cond; + bool disable; + size_t use_cnt; + size_t wait_cnt; + bool should_notify() const { + return ((use_cnt == 0) && (wait_cnt > 0)); + } + bool should_erase() const { + return (disable && (use_cnt == 0) && (wait_cnt == 0)); + } + Entry() : cond(), disable(false), use_cnt(0), wait_cnt(0) {} + ~Entry(); + }; + + mutable std::mutex _lock; + uint64_t _next_handle; + std::map<uint64_t,Entry> _repo; + + void unlock(uint64_t handle); +public: + HandleManager(); + ~HandleManager(); + size_t size() const; + bool empty() const { return (size() == 0); } + uint64_t create(); + HandleGuard lock(uint64_t handle); + bool destroy(uint64_t handle); + static uint64_t null_handle() { return 0; } +}; + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/http_connection.cpp b/vespalib/src/vespa/vespalib/portal/http_connection.cpp new file mode 100644 index 00000000000..2bed10bfd2d --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/http_connection.cpp @@ -0,0 +1,242 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "http_connection.h" +#include <vespa/vespalib/data/output_writer.h> +#include <cassert> + +namespace vespalib::portal { + +namespace { + +constexpr size_t CHUNK_SIZE = 4096; + +enum class ReadRes { OK, END, FAIL }; +enum class WriteRes { OK, BLOCKED, FAIL }; + +bool is_blocked(int res) { + return ((res == -1) && ((errno == EWOULDBLOCK) || (errno == EAGAIN))); +} + +ReadRes drain(CryptoSocket &socket, SmartBuffer &buffer) { + size_t chunk_size = std::max(CHUNK_SIZE, socket.min_read_buffer_size()); + for (;;) { + auto chunk = buffer.reserve(chunk_size); + auto res = socket.drain(chunk.data, chunk.size); + if (res > 0) { + buffer.commit(res); + } else if (res == 0) { + return ReadRes::OK; + } else { + return ReadRes::FAIL; + } + } +} + +ReadRes read(CryptoSocket &socket, SmartBuffer &buffer) { + size_t chunk_size = std::max(CHUNK_SIZE, socket.min_read_buffer_size()); + auto chunk = buffer.reserve(chunk_size); + auto res = socket.read(chunk.data, chunk.size); + if (res > 0) { + buffer.commit(res); + } else if (res == 0) { + return ReadRes::END; + } else if (is_blocked(res)) { + return ReadRes::OK; + } else { + return ReadRes::FAIL; + } + return drain(socket, buffer); +} + +WriteRes flush(CryptoSocket &socket) { + for (;;) { + auto res = socket.flush(); + if (res > 0) { + // flush more + } else if (res == 0) { + return WriteRes::OK; + } else if (is_blocked(res)) { + return WriteRes::BLOCKED; + } else { + return WriteRes::FAIL; + } + } +} + +WriteRes write(CryptoSocket &socket, SmartBuffer &buffer) { + auto chunk = buffer.obtain(); + if (chunk.size > 0) { + auto res = socket.write(chunk.data, chunk.size); + if (res > 0) { + buffer.evict(res); + } else if (is_blocked(res)) { + return WriteRes::BLOCKED; + } else { + assert(res < 0); + return WriteRes::FAIL; + } + } + return flush(socket); +} + +WriteRes half_close(CryptoSocket &socket) { + auto res = socket.half_close(); + if (res == 0) { + return WriteRes::OK; + } else if (is_blocked(res)) { + return WriteRes::BLOCKED; + } else { + return WriteRes::FAIL; + } +} + +} // namespace vespalib::portal::<unnamed> + +void +HttpConnection::set_state(State state, bool read, bool write) +{ + _token->update(read, write); + _state = state; +} + +void +HttpConnection::do_handshake() +{ + switch (_socket->handshake()) { + case vespalib::CryptoSocket::HandshakeResult::FAIL: return set_state(State::NOTIFY, false, false); + case vespalib::CryptoSocket::HandshakeResult::DONE: return set_state(State::READ_REQUEST, true, false); + case vespalib::CryptoSocket::HandshakeResult::NEED_READ: return set_state(State::HANDSHAKE, true, false); + case vespalib::CryptoSocket::HandshakeResult::NEED_WRITE: return set_state(State::HANDSHAKE, false, true); + } +} + +void +HttpConnection::do_read_request() +{ + if (read(*_socket, _input) != ReadRes::OK) { + return set_state(State::NOTIFY, false, false); + } + auto data = _input.obtain(); + auto consumed = _request.handle_data(data.data, data.size); + _input.evict(consumed); + if (!_request.need_more_data()) { + set_state(State::DISPATCH, false, false); + } +} + +void +HttpConnection::do_dispatch() +{ + set_state(State::WAIT, false, false); + return _handler(this); // callback is final touch +} + +void +HttpConnection::do_wait() +{ + if (_reply_ready) { + set_state(State::WRITE_REPLY, false, true); + } +} + +void +HttpConnection::do_write_reply() +{ + if (write(*_socket, _output) == WriteRes::FAIL) { + return set_state(State::NOTIFY, false, false); + } + if (_output.obtain().size == 0) { + set_state(State::CLOSE, false, true); + } +} + +void +HttpConnection::do_close() +{ + if (half_close(*_socket) != WriteRes::BLOCKED) { + set_state(State::NOTIFY, false, false); + } +} + +void +HttpConnection::do_notify() +{ + set_state(State::END, false, false); + return _handler(this); // callback is final touch +} + +HttpConnection::HttpConnection(HandleGuard guard, Reactor &reactor, CryptoSocket::UP socket, handler_fun_t handler) + : _guard(std::move(guard)), + _state(State::HANDSHAKE), + _socket(std::move(socket)), + _input(CHUNK_SIZE * 2), + _output(CHUNK_SIZE * 2), + _request(), + _handler(std::move(handler)), + _reply_ready(false), + _token() +{ + _token = reactor.attach(*this, _socket->get_fd(), true, true); +} + +HttpConnection::~HttpConnection() +{ + _token.reset(); +} + +void +HttpConnection::handle_event(bool, bool) +{ + if (_state == State::HANDSHAKE) { + do_handshake(); + } + if (_state == State::READ_REQUEST) { + do_read_request(); + } + if (_state == State::DISPATCH) { + return do_dispatch(); // callback is final touch + } + if (_state == State::WAIT) { + do_wait(); + } + if (_state == State::WRITE_REPLY) { + do_write_reply(); + } + if (_state == State::CLOSE) { + do_close(); + } + if (_state == State::NOTIFY) { + return do_notify(); // callback is final touch + } +} + +void +HttpConnection::respond_with_content(const vespalib::string &content_type, + const vespalib::string &content) +{ + { + OutputWriter dst(_output, CHUNK_SIZE); + dst.printf("HTTP/1.1 200 OK\r\n"); + dst.printf("Connection: close\r\n"); + dst.printf("Content-Type: %s\r\n", content_type.c_str()); + dst.printf("\r\n"); + dst.write(content.data(), content.size()); + } + _reply_ready = true; + _token->update(false, true); +} + +void +HttpConnection::respond_with_error(int code, const vespalib::string &msg) +{ + { + OutputWriter dst(_output, CHUNK_SIZE); + dst.printf("HTTP/1.1 %d %s\r\n", code, msg.c_str()); + dst.printf("Connection: close\r\n"); + dst.printf("\r\n"); + } + _reply_ready = true; + _token->update(false, true); +} + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/http_connection.h b/vespalib/src/vespa/vespalib/portal/http_connection.h new file mode 100644 index 00000000000..296a915c873 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/http_connection.h @@ -0,0 +1,54 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "reactor.h" +#include "http_request.h" +#include "handle_manager.h" +#include <vespa/vespalib/net/crypto_socket.h> +#include <vespa/vespalib/data/smart_buffer.h> +#include <functional> +#include <atomic> + +namespace vespalib::portal { + +class HttpConnection : public Reactor::EventHandler +{ +public: + enum class State { HANDSHAKE, READ_REQUEST, DISPATCH, WAIT, WRITE_REPLY, CLOSE, NOTIFY, END }; +private: + using handler_fun_t = std::function<void(HttpConnection*)>; + + HandleGuard _guard; + State _state; + CryptoSocket::UP _socket; + SmartBuffer _input; + SmartBuffer _output; + HttpRequest _request; + handler_fun_t _handler; + std::atomic<bool> _reply_ready; + Reactor::Token::UP _token; + + void set_state(State state, bool read, bool write); + + void do_handshake(); + void do_read_request(); + void do_dispatch(); + void do_wait(); + void do_write_reply(); + void do_close(); + void do_notify(); + +public: + using UP = std::unique_ptr<HttpConnection>; + HttpConnection(HandleGuard guard, Reactor &reactor, CryptoSocket::UP socket, handler_fun_t handler); + ~HttpConnection(); + void handle_event(bool read, bool write) override; + State get_state() const { return _state; } + const HttpRequest &get_request() const { return _request; } + void respond_with_content(const vespalib::string &content_type, + const vespalib::string &content); + void respond_with_error(int code, const vespalib::string &msg); +}; + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/http_request.cpp b/vespalib/src/vespa/vespalib/portal/http_request.cpp new file mode 100644 index 00000000000..d49fc2e70f4 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/http_request.cpp @@ -0,0 +1,166 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "http_request.h" + +#include <algorithm> +#include <vector> + +namespace vespalib::portal { + +namespace { + +void strip_cr(vespalib::string &str) { + if (!str.empty() && str[str.size() - 1] == '\r') { + str.resize(str.size() - 1); + } +} + +std::vector<vespalib::string> split(vespalib::stringref str, vespalib::stringref sep) { + vespalib::string token; + std::vector<vespalib::string> list; + for (char c: str) { + if (sep.find(c) == vespalib::stringref::npos) { + token.push_back(c); + } else if (!token.empty()) { + list.push_back(token); + token.clear(); + } + } + if (!token.empty()) { + list.push_back(token); + } + return list; +} + +} // namespace vespalib::portal::<unnamed> + +void +HttpRequest::set_done() +{ + _done = true; +} + +void +HttpRequest::set_error() +{ + _error = true; +} + +void +HttpRequest::handle_request_line(const vespalib::string &line) +{ + auto parts = split(line, " "); + if (parts.size() != 3) { + return set_error(); // malformed request line + } + _method = parts[0]; + _uri = parts[1]; + _version = parts[2]; +} + +void +HttpRequest::handle_header_line(const vespalib::string &line) +{ + if (line.empty()) { + return set_done(); + } + size_t pos = 0; + size_t end = line.size(); + bool continuation = (line[0] == ' ') || (line[0] == '\t'); + if (!continuation) { + pos = line.find(":"); + if (pos == vespalib::string::npos) { + return set_error(); // missing header: value separator + } else { + _header_name.assign(line, 0, pos++); + std::transform(_header_name.begin(), _header_name.end(), + _header_name.begin(), ::tolower); + } + } + if (_header_name.empty()) { + return set_error(); // missing header name + } + while ((pos < end) && (isspace(line[pos]))) { + ++pos; // strip leading whitespace + } + while ((pos < end) && (isspace(line[end - 1]))) { + --end; // strip trailing whitespace + } + auto header_insert_result = _headers.insert(std::make_pair(_header_name, vespalib::string())); + bool header_found = !header_insert_result.second; + vespalib::string &header_value = header_insert_result.first->second; + if (header_found) { + if (continuation) { + header_value.push_back(' '); + } else { // duplicate header + header_value.push_back(','); + } + } + header_value.append(line.data() + pos, end - pos); +} + +void +HttpRequest::handle_line(const vespalib::string &line) +{ + if (_first) { + handle_request_line(line); + _first = false; + } else { + handle_header_line(line); + } +} + +HttpRequest::HttpRequest() + : _method(), + _uri(), + _version(), + _headers(), + _host(), + _empty(), + _first(true), + _done(false), + _error(false), + _header_name(), + _line_buffer() +{ +} + +HttpRequest::~HttpRequest() = default; + +size_t +HttpRequest::handle_data(const char *buf, size_t len) +{ + size_t used = 0; + while (need_more_data() && (used < len)) { + char c = buf[used++]; + if (c != '\n') { + _line_buffer.push_back(c); + } else { + strip_cr(_line_buffer); + handle_line(_line_buffer); + _line_buffer.clear(); + } + } + return used; +} + +void +HttpRequest::resolve_host(const vespalib::string &my_host) +{ + _host = get_header("host"); + if (_host.empty()) { + _host = my_host; + } +} + +const vespalib::string & +HttpRequest::get_header(const vespalib::string &name) const +{ + auto pos = _headers.find(name); + if (pos == _headers.end()) { + return _empty; + } + return pos->second; +} + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/http_request.h b/vespalib/src/vespa/vespalib/portal/http_request.h new file mode 100644 index 00000000000..51c7ab08da9 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/http_request.h @@ -0,0 +1,48 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +#include <map> + +namespace vespalib::portal { + +class HttpRequest +{ +private: + // http stuff + vespalib::string _method; + vespalib::string _uri; + vespalib::string _version; + std::map<vespalib::string, vespalib::string> _headers; + vespalib::string _host; + // internal state + vespalib::string _empty; + bool _first; + bool _done; + bool _error; + vespalib::string _header_name; + vespalib::string _line_buffer; + + void set_done(); + void set_error(); + + void handle_request_line(const vespalib::string &line); + void handle_header_line(const vespalib::string &line); + void handle_line(const vespalib::string &line); + +public: + HttpRequest(); + ~HttpRequest(); + size_t handle_data(const char *buf, size_t len); + bool need_more_data() const { return (!_error && !_done); } + bool valid() const { return (!_error && _done); } + bool is_get() const { return _method == "GET"; } + void resolve_host(const vespalib::string &my_host); + const vespalib::string &get_header(const vespalib::string &name) const; + const vespalib::string &get_host() const { return _host; } + const vespalib::string &get_uri() const { return _uri; } +}; + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/listener.cpp b/vespalib/src/vespa/vespalib/portal/listener.cpp new file mode 100644 index 00000000000..1c4478dcd63 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/listener.cpp @@ -0,0 +1,37 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "listener.h" +#include <vespa/vespalib/util/exceptions.h> +#include <cassert> + +namespace vespalib::portal { + +Listener::Listener(Reactor &reactor, int port, std::function<void(SocketHandle)> handler) + : _server_socket(port), + _handler(std::move(handler)), + _token() +{ + if (_server_socket.valid()) { + bool async = _server_socket.set_blocking(false); + assert(async); + _token = reactor.attach(*this, _server_socket.get_fd(), true, false); + } else { + throw PortListenException(port, "PORTAL"); + } +} + +Listener::~Listener() +{ + _token.reset(); +} + +void +Listener::handle_event(bool, bool) +{ + SocketHandle handle = _server_socket.accept(); + if (handle.valid()) { + _handler(std::move(handle)); + } +} + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/listener.h b/vespalib/src/vespa/vespalib/portal/listener.h new file mode 100644 index 00000000000..e2c20477f04 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/listener.h @@ -0,0 +1,25 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "reactor.h" +#include <vespa/vespalib/net/server_socket.h> +#include <functional> + +namespace vespalib::portal { + +class Listener : public Reactor::EventHandler +{ +private: + ServerSocket _server_socket; + std::function<void(SocketHandle)> _handler; + Reactor::Token::UP _token; +public: + using UP = std::unique_ptr<Listener>; + Listener(Reactor &reactor, int port, std::function<void(SocketHandle)> handler); + ~Listener(); + int listen_port() const { return _server_socket.address().port(); } + void handle_event(bool read, bool write) override; +}; + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/portal.cpp b/vespalib/src/vespa/vespalib/portal/portal.cpp new file mode 100644 index 00000000000..c96d7f9f83e --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/portal.cpp @@ -0,0 +1,188 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "portal.h" +#include "http_connection.h" +#include <cassert> + +namespace vespalib { + +namespace { + +template <typename T> +void remove_handle(T &collection, uint64_t handle) { + collection.erase(std::remove_if(collection.begin(), collection.end(), + [handle](const typename T::value_type &item) + { return (item.handle == handle); }), + collection.end()); +} + +} // namespace vespalib::<unnamed> + +using HttpConnection = portal::HttpConnection; + +Portal::Token::~Token() +{ + _portal.cancel_token(*this); +} + +const vespalib::string & +Portal::GetRequest::get_header(const vespalib::string &name) const +{ + assert(active()); + return _conn->get_request().get_header(name); +} + +const vespalib::string & +Portal::GetRequest::get_host() const +{ + assert(active()); + return _conn->get_request().get_host(); +} + +const vespalib::string & +Portal::GetRequest::get_uri() const +{ + assert(active()); + return _conn->get_request().get_uri(); +} + +void +Portal::GetRequest::respond_with_content(const vespalib::string &content_type, + const vespalib::string &content) +{ + assert(active()); + _conn->respond_with_content(content_type, content); + _conn = nullptr; +} + +void +Portal::GetRequest::respond_with_error(int code, const vespalib::string &msg) +{ + assert(active()); + _conn->respond_with_error(code, msg); + _conn = nullptr; +} + +Portal::GetRequest::~GetRequest() +{ + if (active()) { + respond_with_error(500, "Internal Server Error"); + } +} + +Portal::GetHandler::~GetHandler() = default; + +Portal::Token::UP +Portal::make_token() +{ + return Token::UP(new Token(*this, _handle_manager.create())); +} + +void +Portal::cancel_token(Token &token) +{ + _handle_manager.destroy(token._handle); + evict_handle(token._handle); +} + +portal::HandleGuard +Portal::lookup_get_handler(const vespalib::string &uri, const GetHandler *&handler) +{ + std::lock_guard guard(_lock); + for (const auto &entry: _bind_list) { + if (starts_with(uri, entry.prefix)) { + auto handle_guard = _handle_manager.lock(entry.handle); + if (handle_guard.valid()) { + handler = entry.handler; + return handle_guard; + } + } + } + return portal::HandleGuard(); +} + +void +Portal::evict_handle(uint64_t handle) +{ + std::lock_guard guard(_lock); + remove_handle(_bind_list, handle); +} + +void +Portal::handle_accept(portal::HandleGuard guard, SocketHandle socket) +{ + new HttpConnection(std::move(guard), _reactor, _crypto->create_crypto_socket(std::move(socket), true), + [this](HttpConnection *conn) + { + handle_http(conn); + }); +} + +void +Portal::handle_http(portal::HttpConnection *conn) +{ + if (conn->get_state() == HttpConnection::State::WAIT) { + if (!conn->get_request().valid()) { + conn->respond_with_error(400, "Bad Request"); + } else if (!conn->get_request().is_get()) { + conn->respond_with_error(501, "Not Implemented"); + } else { + const GetHandler *get_handler = nullptr; + auto guard = lookup_get_handler(conn->get_request().get_uri(), get_handler); + if (guard.valid()) { + assert(get_handler != nullptr); + get_handler->get(GetRequest(*conn)); + } else { + conn->respond_with_error(404, "Not Found"); + } + } + } else { + assert(conn->get_state() == HttpConnection::State::END); + delete(conn); + } +} + +Portal::Portal(CryptoEngine::SP crypto, int port) + : _crypto(std::move(crypto)), + _reactor(), + _handle_manager(), + _conn_handle(_handle_manager.create()), + _listener(), + _lock(), + _bind_list() +{ + _listener = std::make_unique<portal::Listener>(_reactor, port, + [this](SocketHandle socket) + { + auto guard = _handle_manager.lock(_conn_handle); + if (guard.valid()) { + handle_accept(std::move(guard), std::move(socket)); + } + }); +} + +Portal::~Portal() +{ + _listener.reset(); + _handle_manager.destroy(_conn_handle); + assert(_handle_manager.empty()); + assert(_bind_list.empty()); +} + +Portal::SP +Portal::create(CryptoEngine::SP crypto, int port) +{ + return Portal::SP(new Portal(std::move(crypto), port)); +} + +Portal::Token::UP +Portal::bind(const vespalib::string &path_prefix, const GetHandler &handler) +{ + auto token = make_token(); + std::lock_guard guard(_lock); + _bind_list.emplace_back(token->_handle, path_prefix, handler); + std::sort(_bind_list.begin(), _bind_list.end()); + return token; +} + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/portal/portal.h b/vespalib/src/vespa/vespalib/portal/portal.h new file mode 100644 index 00000000000..0c75734de0b --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/portal.h @@ -0,0 +1,114 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "listener.h" +#include "reactor.h" +#include "handle_manager.h" + +#include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/net/crypto_socket.h> +#include <vespa/vespalib/stllike/string.h> + +#include <map> +#include <memory> +#include <mutex> +#include <thread> + +namespace vespalib { + +namespace portal { class HttpConnection; } + +/** + * Minimal HTTP server and connection establishment manager. + **/ +class Portal +{ +public: + using SP = std::shared_ptr<Portal>; + + class Token { + friend class Portal; + private: + Portal &_portal; + uint64_t _handle; + Token(const Token &) = delete; + Token &operator=(const Token &) = delete; + Token(Token &&) = delete; + Token &operator=(Token &&) = delete; + Token(Portal &portal, uint64_t handle) + : _portal(portal), _handle(handle) {} + uint64_t handle() const { return _handle; } + public: + using UP = std::unique_ptr<Token>; + ~Token(); + }; + + class GetRequest { + friend class Portal; + private: + portal::HttpConnection *_conn; + GetRequest(portal::HttpConnection &conn) : _conn(&conn) {} + public: + GetRequest(const GetRequest &rhs) = delete; + GetRequest &operator=(const GetRequest &rhs) = delete; + GetRequest &operator=(GetRequest &&rhs) = delete; + GetRequest(GetRequest &&rhs) : _conn(rhs._conn) { + rhs._conn = nullptr; + } + bool active() const { return (_conn != nullptr); } + const vespalib::string &get_header(const vespalib::string &name) const; + const vespalib::string &get_host() const; + const vespalib::string &get_uri() const; + void respond_with_content(const vespalib::string &content_type, + const vespalib::string &content); + void respond_with_error(int code, const vespalib::string &msg); + ~GetRequest(); + }; + + struct GetHandler { + virtual void get(GetRequest request) const = 0; + virtual ~GetHandler(); + }; + +private: + struct BindState { + uint64_t handle; + vespalib::string prefix; + const GetHandler *handler; + BindState(uint64_t handle_in, vespalib::string prefix_in, const GetHandler &handler_in) + : handle(handle_in), prefix(prefix_in), handler(&handler_in) {} + bool operator<(const BindState &rhs) const { + if (prefix.size() == rhs.prefix.size()) { + return (handle > rhs.handle); + } + return (prefix.size() > rhs.prefix.size()); + } + }; + + CryptoEngine::SP _crypto; + portal::Reactor _reactor; + portal::HandleManager _handle_manager; + uint64_t _conn_handle; + portal::Listener::UP _listener; + std::mutex _lock; + std::vector<BindState> _bind_list; + + Token::UP make_token(); + void cancel_token(Token &token); + + portal::HandleGuard lookup_get_handler(const vespalib::string &uri, const GetHandler *&handler); + void evict_handle(uint64_t handle); + + void handle_accept(portal::HandleGuard guard, SocketHandle socket); + void handle_http(portal::HttpConnection *conn); + + Portal(CryptoEngine::SP crypto, int port); +public: + ~Portal(); + static SP create(CryptoEngine::SP crypto, int port); + int listen_port() const { return _listener->listen_port(); } + Token::UP bind(const vespalib::string &path_prefix, const GetHandler &handler); +}; + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/portal/reactor.cpp b/vespalib/src/vespa/vespalib/portal/reactor.cpp new file mode 100644 index 00000000000..52a59f617fb --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/reactor.cpp @@ -0,0 +1,128 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "reactor.h" +#include <cassert> + +namespace vespalib::portal { + +Reactor::EventHandler::~EventHandler() = default; + +//----------------------------------------------------------------------------- + +Reactor::Token::Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write) + : _reactor(reactor), _handler(handler), _fd(fd) +{ + ++_reactor._token_cnt; + _reactor._selector.add(_fd, _handler, read, write); +} + +void +Reactor::Token::update(bool read, bool write) +{ + _reactor._selector.update(_fd, _handler, read, write); +} + +Reactor::Token::~Token() +{ + _reactor._selector.remove(_fd); + _reactor.cancel_token(*this); + --_reactor._token_cnt; +} + +//----------------------------------------------------------------------------- + +void +Reactor::cancel_token(const Token &) +{ + if (std::this_thread::get_id() == _thread.get_id()) { + _skip_events = true; + } else { + std::unique_lock guard(_lock); + size_t old_gen = _sync_seq; + ++_wait_cnt; + guard.unlock(); // UNLOCK + _selector.wakeup(); + guard.lock(); // LOCK + while (_sync_seq == old_gen) { + _cond.wait(guard); + } + --_wait_cnt; + } +} + +void +Reactor::release_tokens() +{ + std::lock_guard guard(_lock); + if (_wait_cnt > 0) { + ++_sync_seq; + _cond.notify_all(); + } +} + +//----------------------------------------------------------------------------- + +void +Reactor::handle_wakeup() +{ + _was_woken = true; +} + +void +Reactor::handle_event(EventHandler &handler, bool read, bool write) +{ + if (!_skip_events) { + handler.handle_event(read, write); + } +} + +void +Reactor::event_loop() +{ + while (!_done) { + _selector.poll(_tick()); + _selector.dispatch(*this); + if (_skip_events) { + _skip_events = false; + } + if (_was_woken) { + release_tokens(); + _was_woken = false; + } + } +} + +//----------------------------------------------------------------------------- + +Reactor::Reactor(std::function<int()> tick) + : _selector(), + _tick(std::move(tick)), + _done(false), + _was_woken(false), + _skip_events(false), + _lock(), + _cond(), + _sync_seq(0), + _wait_cnt(0), + _token_cnt(0), + _thread(&Reactor::event_loop, this) +{ +} + +Reactor::~Reactor() +{ + assert(_token_cnt == 0); + _done = true; + _selector.wakeup(); + _thread.join(); +} + +Reactor::Token::UP +Reactor::attach(EventHandler &handler, int fd, bool read, bool write) +{ + return Token::UP(new Token(*this, handler, fd, read, write)); +} + +//----------------------------------------------------------------------------- + +} // namespace vespalib::portal diff --git a/vespalib/src/vespa/vespalib/portal/reactor.h b/vespalib/src/vespa/vespalib/portal/reactor.h new file mode 100644 index 00000000000..a7961c3c943 --- /dev/null +++ b/vespalib/src/vespa/vespalib/portal/reactor.h @@ -0,0 +1,68 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/net/selector.h> + +#include <atomic> +#include <condition_variable> +#include <functional> +#include <memory> +#include <mutex> +#include <thread> + +namespace vespalib::portal { + +class Reactor +{ +public: + struct EventHandler { + virtual void handle_event(bool read, bool write) = 0; + virtual ~EventHandler(); + }; + friend class Selector<EventHandler>; + class Token { + friend class Reactor; + private: + Reactor &_reactor; + EventHandler &_handler; + int _fd; + Token(const Token &) = delete; + Token &operator=(const Token &) = delete; + Token(Token &&) = delete; + Token &operator=(Token &&) = delete; + Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write); + public: + using UP = std::unique_ptr<Token>; + void update(bool read, bool write); + ~Token(); + }; + +private: + Selector<EventHandler> _selector; + std::function<int()> _tick; + std::atomic<bool> _done; + bool _was_woken; + bool _skip_events; + std::mutex _lock; + std::condition_variable _cond; + size_t _sync_seq; + size_t _wait_cnt; + std::atomic<size_t> _token_cnt; + std::thread _thread; + + void cancel_token(const Token &token); + void release_tokens(); + + void handle_wakeup(); + void handle_event(EventHandler &handler, bool read, bool write); + void event_loop(); + +public: + Reactor(std::function<int()> tick); + Reactor() : Reactor([](){ return -1; }) {} + ~Reactor(); + Token::UP attach(EventHandler &handler, int fd, bool read, bool write); +}; + +} // namespace vespalib::portal |