diff options
7 files changed, 97 insertions, 55 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 5c910983bc9..1587e2696b8 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -54,7 +54,7 @@ import static org.junit.Assert.fail; */ public abstract class FleetControllerTest implements Waiter { - private static Logger log = Logger.getLogger(FleetControllerTest.class.getName()); + private static final Logger log = Logger.getLogger(FleetControllerTest.class.getName()); private static final int DEFAULT_NODE_COUNT = 10; Supervisor supervisor; @@ -342,10 +342,6 @@ public abstract class FleetControllerTest implements Waiter { fleetController.waitForCompleteCycle(timeoutMS); } - protected void verifyNodeEvents(Node n, String exp) { - verifyNodeEvents(n, exp, null); - } - private static class ExpectLine { Pattern regex; int matchedCount = 0; @@ -390,17 +386,15 @@ public abstract class FleetControllerTest implements Waiter { * <li>The rest of the line is a regular expression. * </ul> */ - private void verifyNodeEvents(Node n, String exp, String ignoreRegex) { - Pattern ignorePattern = (ignoreRegex == null ? null : Pattern.compile(ignoreRegex)); + protected void verifyNodeEvents(Node n, String exp) { List<NodeEvent> events = fleetController.getNodeEvents(n); String[] expectLines = exp.split("\n"); - List<ExpectLine> expected = new ArrayList<ExpectLine>(); + List<ExpectLine> expected = new ArrayList<>(); for (String line : expectLines) { expected.add(new ExpectLine(line)); } boolean mismatch = false; - StringBuilder eventLog = new StringBuilder(); StringBuilder errors = new StringBuilder(); int gotno = 0; @@ -413,16 +407,10 @@ public abstract class FleetControllerTest implements Waiter { eventLine = e.toString(); } - if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) { - ++gotno; - continue; - } - ExpectLine pattern = null; if (expno < expected.size()) { pattern = expected.get(expno); } - eventLog.append(eventLine).append("\n"); if (pattern == null) { errors.append("Exhausted expected list before matching event " + gotno @@ -456,9 +444,6 @@ public abstract class FleetControllerTest implements Waiter { StringBuilder eventsGotten = new StringBuilder(); for (Event e : events) { String eventLine = e.toString(); - if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) { - continue; - } eventsGotten.append(eventLine).append("\n"); } errors.append("\nExpected events matching:\n" + exp + "\n"); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index 557765ca761..d14f6701288 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -16,6 +16,7 @@ import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; +import org.junit.rules.Timeout; import java.util.ArrayList; import java.util.List; @@ -31,14 +32,17 @@ import static org.junit.Assert.assertTrue; public class MasterElectionTest extends FleetControllerTest { - private static Logger log = Logger.getLogger(MasterElectionTest.class.getName()); + private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName()); private Supervisor supervisor; - private List<FleetController> fleetControllers = new ArrayList<>(); + private final List<FleetController> fleetControllers = new ArrayList<>(); @Rule public TestRule cleanupZookeeperLogsOnSuccess = new CleanupZookeeperLogsOnSuccess(); + @Rule + public Timeout globalTimeout= Timeout.seconds(120); + private static int defaultZkSessionTimeoutInMillis() { return 30_000; } protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception { @@ -74,12 +78,12 @@ public class MasterElectionTest extends FleetControllerTest { private void waitForZookeeperDisconnected() throws TimeoutException { long maxTime = System.currentTimeMillis() + timeoutMS; - for(FleetController f : fleetControllers) { - while (true) { - if (!f.hasZookeeperConnection()) break; + for (FleetController f : fleetControllers) { + while (f.hasZookeeperConnection()) { timer.advanceTime(1000); - try{ Thread.sleep(1); } catch (InterruptedException e) {} - if (System.currentTimeMillis() > maxTime) throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms"); + try { Thread.sleep(1); } catch (InterruptedException e) {} + if (System.currentTimeMillis() > maxTime) + throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms"); } } waitForCompleteCycles(); @@ -226,24 +230,20 @@ public class MasterElectionTest extends FleetControllerTest { startingTest("MasterElectionTest::testClusterStateVersionIncreasesAcrossMasterElections"); FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 1; - setUpFleetController(5, false, options); + setUpFleetController(3, false, options); // Currently need to have content nodes present for the cluster controller to even bother // attempting to persisting its cluster state version to ZK. setUpVdsNodes(false, new DummyVdsNodeOptions()); fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing waitForStableSystem(); waitForMaster(0); - Stream.of(0, 1, 2, 3, 4).forEach(this::waitForCompleteCycle); + Stream.of(0, 1, 2).forEach(this::waitForCompleteCycle); StrictlyIncreasingVersionChecker checker = StrictlyIncreasingVersionChecker.bootstrappedWith( fleetControllers.get(0).getClusterState()); fleetControllers.get(0).shutdown(); waitForMaster(1); - Stream.of(1, 2, 3, 4).forEach(this::waitForCompleteCycle); + Stream.of(1, 2).forEach(this::waitForCompleteCycle); checker.updateAndVerify(fleetControllers.get(1).getClusterState()); - fleetControllers.get(1).shutdown(); - waitForMaster(2); // Still a quorum available - Stream.of(2, 3, 4).forEach(this::waitForCompleteCycle); - checker.updateAndVerify(fleetControllers.get(2).getClusterState()); } @Test @@ -275,7 +275,7 @@ public class MasterElectionTest extends FleetControllerTest { FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 100; options.zooKeeperServerAddress = "localhost"; - setUpFleetController(5, false, options); + setUpFleetController(3, false, options); waitForMaster(0); log.log(Level.INFO, "STOPPING ZOOKEEPER SERVER AT " + zooKeeperServer.getAddress()); @@ -329,12 +329,21 @@ public class MasterElectionTest extends FleetControllerTest { long endTime = System.currentTimeMillis() + timeoutMS; while (System.currentTimeMillis() < endTime) { boolean allOk = true; - for (int i=0; i<nodes.length; ++i) { + for (int node : nodes) { Request req = new Request("getMaster"); - connections.get(nodes[i]).invokeSync(req, FleetControllerTest.timeoutS); - if (req.isError()) { allOk = false; break; } - if (master != null && master != req.returnValues().get(0).asInt32()) { allOk = false; break; } - if (reason != null && !reason.equals(req.returnValues().get(1).asString())) { allOk = false; break; } + connections.get(node).invokeSync(req, FleetControllerTest.timeoutS); + if (req.isError()) { + allOk = false; + break; + } + if (master != null && master != req.returnValues().get(0).asInt32()) { + allOk = false; + break; + } + if (reason != null && ! reason.equals(req.returnValues().get(1).asString())) { + allOk = false; + break; + } } if (allOk) return; try{ Thread.sleep(100); } catch (InterruptedException e) {} @@ -354,7 +363,7 @@ public class MasterElectionTest extends FleetControllerTest { waitForMaster(0); supervisor = new Supervisor(new Transport()); - List<Target> connections = new ArrayList<Target>(); + List<Target> connections = new ArrayList<>(); for (FleetController fleetController : fleetControllers) { int rpcPort = fleetController.getRpcPort(); Target connection = supervisor.connect(new Spec("localhost", rpcPort)); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java index 73b4163d542..7861db249bd 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java @@ -14,10 +14,10 @@ import java.time.Duration; * This class sets up a zookeeper server, such that we can test fleetcontroller zookeeper parts without stubbing in the client. */ public class ZooKeeperTestServer { - private File zooKeeperDir; - private ZooKeeperServer server; + private final File zooKeeperDir; + private final ZooKeeperServer server; private static final Duration tickTime = Duration.ofMillis(2000); - private NIOServerCnxnFactory factory; + private final NIOServerCnxnFactory factory; private static final String DIR_PREFIX = "test_fltctrl_zk"; private static final String DIR_POSTFIX = "sdir"; @@ -39,7 +39,7 @@ public class ZooKeeperTestServer { try{ factory.startup(server); } catch (InterruptedException e) { - throw (RuntimeException) new IllegalStateException("Interrupted during test startup: ").initCause(e); + throw new IllegalStateException("Interrupted during test startup: ", e); } } diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 7fb33dc242b..8228ceb5e79 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) { TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) { auto cmd = _node_0->create_dummy_put_command(); cmd->setAddress(non_existing_address()); + cmd->getTrace().setLevel(9); _node_0->send_request(cmd); auto bounced_msg = _node_0->wait_and_receive_single_message(); @@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str()); EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg)); + EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems")); } TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) { @@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) { // TODO also test bad response header/payload +TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) { + auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){ + cmd.getTrace().setLevel(9); + }); + auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd); + auto trace_str = recv_reply->getTrace().toString(); + // Ordering of traced events matter, so we use a cheeky regex. + EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+" + "Request received at.+" + "Sending response from.+" + "Response received at")); +} + } diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index 722d1fd3a81..740277218c3 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -48,6 +48,7 @@ public: // Hostname of host node is running on. [[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; } + [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; } const RpcTargetFactory& target_factory() const; private: diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index a8cdc0bfeea..8b5c7706510 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -16,12 +16,14 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/trace/tracelevel.h> #include <cassert> #include <vespa/log/log.h> LOG_SETUP(".storage.storage_api_rpc_service"); using vespalib::compression::CompressionConfig; +using vespalib::TraceLevel; namespace storage::rpc { @@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { scmd->getTrace().setLevel(hdr.trace_level()); scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms())); req->DiscardBlobs(); + if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + scmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload", + _rpc_resources.handle().c_str(), + _rpc_resources.hostname().c_str(), + _rpc_resources.listen_port(), + uncompressed_size)); + } detach_and_forward_to_enqueuer(std::move(scmd), req); } else { req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload"); } } -void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) { +void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) { LOG(spam, "Server: encoding rpc.v1 response header and payload"); auto* ret = request.GetReturn(); + if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + reply.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str())); + } // TODO skip encoding header altogether if no relevant fields set? protobuf::ResponseHeader hdr; if (reply.getTrace().getLevel() > 0) { @@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma if (!target) { auto reply = cmd->makeReply(); reply->setResult(make_no_address_for_service_error(*cmd->getAddress())); + if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage()); + } // Always dispatch async for synchronously generated replies, or we risk nuking the // stack if the reply receiver keeps resending synchronously as well. _message_dispatcher.dispatch_async(std::move(reply)); return; } + if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds", + _rpc_resources.handle().c_str(), + CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(), + target->_spec.c_str(), vespalib::to_s(cmd->getTimeout()))); + } std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); req->SetMethodName(rpc_v1_method_name()); @@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); + auto& cmd = *req_ctx->_originator_cmd; if (!req->CheckReturnTypes("bixbix")) { handle_request_done_rpc_error(*req, *req_ctx); return; } LOG(spam, "Client: received rpc.v1 OK response"); - const auto& ret = *req->GetReturn(); protobuf::ResponseHeader hdr; if (!decode_header_from_rpc_params(ret, hdr)) { @@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { return; } std::unique_ptr<mbusprot::StorageReply> wrapped_reply; - bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { + uint32_t uncompressed_size = 0; + bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) { wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd); + uncompressed_size = payload.size(); }); if (!ok) { assert(!wrapped_reply); @@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { assert(reply); if (!hdr.trace_payload().empty()) { - req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + } + if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Response received at '%s' with %u bytes of payload", + _rpc_resources.handle().c_str(), + uncompressed_size)); } - reply->getTrace().swap(req_ctx->_originator_cmd->getTrace()); + reply->getTrace().swap(cmd.getTrace()); + reply->setApproxByteSize(uncompressed_size); // TODO ensure that no implicit long-lived refs end up pointing into RPC memory...! req->DiscardBlobs(); @@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) { auto error_reply = cmd.makeReply(); - LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'", + LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'", cmd.getAddress()->toString().c_str(), error.toString().c_str()); + error_reply->getTrace().swap(cmd.getTrace()); + if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage()); + } error_reply->setResult(std::move(error)); - // TODO needs tracing of received-event! _message_dispatcher.dispatch_sync(std::move(error_reply)); } @@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc( /* * Major TODOs: - * - tracing and trace propagation - * - forwards/backwards compatibility - * - what to remap bounced Not Found errors to internally? * - lifetime semantics of FRT targets vs requests created from them? * - lifetime of document type/fieldset repos vs messages * - is repo ref squirreled away into the messages anywhere? diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 39508e51841..cb2344ccd13 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -58,14 +58,13 @@ public: [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; void RPC_rpc_v1_send(FRT_RPCRequest* req); - void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); + void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply); void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); static constexpr const char* rpc_v1_method_name() noexcept { return "storageapi.v1.send"; } private: - // TODO dedupe void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); struct RpcRequestContext { |