summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java21
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java51
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java8
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp53
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h3
7 files changed, 97 insertions, 55 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 5c910983bc9..1587e2696b8 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -54,7 +54,7 @@ import static org.junit.Assert.fail;
*/
public abstract class FleetControllerTest implements Waiter {
- private static Logger log = Logger.getLogger(FleetControllerTest.class.getName());
+ private static final Logger log = Logger.getLogger(FleetControllerTest.class.getName());
private static final int DEFAULT_NODE_COUNT = 10;
Supervisor supervisor;
@@ -342,10 +342,6 @@ public abstract class FleetControllerTest implements Waiter {
fleetController.waitForCompleteCycle(timeoutMS);
}
- protected void verifyNodeEvents(Node n, String exp) {
- verifyNodeEvents(n, exp, null);
- }
-
private static class ExpectLine {
Pattern regex;
int matchedCount = 0;
@@ -390,17 +386,15 @@ public abstract class FleetControllerTest implements Waiter {
* <li>The rest of the line is a regular expression.
* </ul>
*/
- private void verifyNodeEvents(Node n, String exp, String ignoreRegex) {
- Pattern ignorePattern = (ignoreRegex == null ? null : Pattern.compile(ignoreRegex));
+ protected void verifyNodeEvents(Node n, String exp) {
List<NodeEvent> events = fleetController.getNodeEvents(n);
String[] expectLines = exp.split("\n");
- List<ExpectLine> expected = new ArrayList<ExpectLine>();
+ List<ExpectLine> expected = new ArrayList<>();
for (String line : expectLines) {
expected.add(new ExpectLine(line));
}
boolean mismatch = false;
- StringBuilder eventLog = new StringBuilder();
StringBuilder errors = new StringBuilder();
int gotno = 0;
@@ -413,16 +407,10 @@ public abstract class FleetControllerTest implements Waiter {
eventLine = e.toString();
}
- if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) {
- ++gotno;
- continue;
- }
-
ExpectLine pattern = null;
if (expno < expected.size()) {
pattern = expected.get(expno);
}
- eventLog.append(eventLine).append("\n");
if (pattern == null) {
errors.append("Exhausted expected list before matching event " + gotno
@@ -456,9 +444,6 @@ public abstract class FleetControllerTest implements Waiter {
StringBuilder eventsGotten = new StringBuilder();
for (Event e : events) {
String eventLine = e.toString();
- if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) {
- continue;
- }
eventsGotten.append(eventLine).append("\n");
}
errors.append("\nExpected events matching:\n" + exp + "\n");
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
index 557765ca761..d14f6701288 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
@@ -16,6 +16,7 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.List;
@@ -31,14 +32,17 @@ import static org.junit.Assert.assertTrue;
public class MasterElectionTest extends FleetControllerTest {
- private static Logger log = Logger.getLogger(MasterElectionTest.class.getName());
+ private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName());
private Supervisor supervisor;
- private List<FleetController> fleetControllers = new ArrayList<>();
+ private final List<FleetController> fleetControllers = new ArrayList<>();
@Rule
public TestRule cleanupZookeeperLogsOnSuccess = new CleanupZookeeperLogsOnSuccess();
+ @Rule
+ public Timeout globalTimeout= Timeout.seconds(120);
+
private static int defaultZkSessionTimeoutInMillis() { return 30_000; }
protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception {
@@ -74,12 +78,12 @@ public class MasterElectionTest extends FleetControllerTest {
private void waitForZookeeperDisconnected() throws TimeoutException {
long maxTime = System.currentTimeMillis() + timeoutMS;
- for(FleetController f : fleetControllers) {
- while (true) {
- if (!f.hasZookeeperConnection()) break;
+ for (FleetController f : fleetControllers) {
+ while (f.hasZookeeperConnection()) {
timer.advanceTime(1000);
- try{ Thread.sleep(1); } catch (InterruptedException e) {}
- if (System.currentTimeMillis() > maxTime) throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms");
+ try { Thread.sleep(1); } catch (InterruptedException e) {}
+ if (System.currentTimeMillis() > maxTime)
+ throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms");
}
}
waitForCompleteCycles();
@@ -226,24 +230,20 @@ public class MasterElectionTest extends FleetControllerTest {
startingTest("MasterElectionTest::testClusterStateVersionIncreasesAcrossMasterElections");
FleetControllerOptions options = defaultOptions("mycluster");
options.masterZooKeeperCooldownPeriod = 1;
- setUpFleetController(5, false, options);
+ setUpFleetController(3, false, options);
// Currently need to have content nodes present for the cluster controller to even bother
// attempting to persisting its cluster state version to ZK.
setUpVdsNodes(false, new DummyVdsNodeOptions());
fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing
waitForStableSystem();
waitForMaster(0);
- Stream.of(0, 1, 2, 3, 4).forEach(this::waitForCompleteCycle);
+ Stream.of(0, 1, 2).forEach(this::waitForCompleteCycle);
StrictlyIncreasingVersionChecker checker = StrictlyIncreasingVersionChecker.bootstrappedWith(
fleetControllers.get(0).getClusterState());
fleetControllers.get(0).shutdown();
waitForMaster(1);
- Stream.of(1, 2, 3, 4).forEach(this::waitForCompleteCycle);
+ Stream.of(1, 2).forEach(this::waitForCompleteCycle);
checker.updateAndVerify(fleetControllers.get(1).getClusterState());
- fleetControllers.get(1).shutdown();
- waitForMaster(2); // Still a quorum available
- Stream.of(2, 3, 4).forEach(this::waitForCompleteCycle);
- checker.updateAndVerify(fleetControllers.get(2).getClusterState());
}
@Test
@@ -275,7 +275,7 @@ public class MasterElectionTest extends FleetControllerTest {
FleetControllerOptions options = defaultOptions("mycluster");
options.masterZooKeeperCooldownPeriod = 100;
options.zooKeeperServerAddress = "localhost";
- setUpFleetController(5, false, options);
+ setUpFleetController(3, false, options);
waitForMaster(0);
log.log(Level.INFO, "STOPPING ZOOKEEPER SERVER AT " + zooKeeperServer.getAddress());
@@ -329,12 +329,21 @@ public class MasterElectionTest extends FleetControllerTest {
long endTime = System.currentTimeMillis() + timeoutMS;
while (System.currentTimeMillis() < endTime) {
boolean allOk = true;
- for (int i=0; i<nodes.length; ++i) {
+ for (int node : nodes) {
Request req = new Request("getMaster");
- connections.get(nodes[i]).invokeSync(req, FleetControllerTest.timeoutS);
- if (req.isError()) { allOk = false; break; }
- if (master != null && master != req.returnValues().get(0).asInt32()) { allOk = false; break; }
- if (reason != null && !reason.equals(req.returnValues().get(1).asString())) { allOk = false; break; }
+ connections.get(node).invokeSync(req, FleetControllerTest.timeoutS);
+ if (req.isError()) {
+ allOk = false;
+ break;
+ }
+ if (master != null && master != req.returnValues().get(0).asInt32()) {
+ allOk = false;
+ break;
+ }
+ if (reason != null && ! reason.equals(req.returnValues().get(1).asString())) {
+ allOk = false;
+ break;
+ }
}
if (allOk) return;
try{ Thread.sleep(100); } catch (InterruptedException e) {}
@@ -354,7 +363,7 @@ public class MasterElectionTest extends FleetControllerTest {
waitForMaster(0);
supervisor = new Supervisor(new Transport());
- List<Target> connections = new ArrayList<Target>();
+ List<Target> connections = new ArrayList<>();
for (FleetController fleetController : fleetControllers) {
int rpcPort = fleetController.getRpcPort();
Target connection = supervisor.connect(new Spec("localhost", rpcPort));
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
index 73b4163d542..7861db249bd 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
@@ -14,10 +14,10 @@ import java.time.Duration;
* This class sets up a zookeeper server, such that we can test fleetcontroller zookeeper parts without stubbing in the client.
*/
public class ZooKeeperTestServer {
- private File zooKeeperDir;
- private ZooKeeperServer server;
+ private final File zooKeeperDir;
+ private final ZooKeeperServer server;
private static final Duration tickTime = Duration.ofMillis(2000);
- private NIOServerCnxnFactory factory;
+ private final NIOServerCnxnFactory factory;
private static final String DIR_PREFIX = "test_fltctrl_zk";
private static final String DIR_POSTFIX = "sdir";
@@ -39,7 +39,7 @@ public class ZooKeeperTestServer {
try{
factory.startup(server);
} catch (InterruptedException e) {
- throw (RuntimeException) new IllegalStateException("Interrupted during test startup: ").initCause(e);
+ throw new IllegalStateException("Interrupted during test startup: ", e);
}
}
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 7fb33dc242b..8228ceb5e79 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) {
auto cmd = _node_0->create_dummy_put_command();
cmd->setAddress(non_existing_address());
+ cmd->getTrace().setLevel(9);
_node_0->send_request(cmd);
auto bounced_msg = _node_0->wait_and_receive_single_message();
@@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl
to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str());
EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg));
+ EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems"));
}
TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) {
@@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) {
// TODO also test bad response header/payload
+TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(9);
+ });
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd);
+ auto trace_str = recv_reply->getTrace().toString();
+ // Ordering of traced events matter, so we use a cheeky regex.
+ EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+"
+ "Request received at.+"
+ "Sending response from.+"
+ "Response received at"));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 722d1fd3a81..740277218c3 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -48,6 +48,7 @@ public:
// Hostname of host node is running on.
[[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; }
+ [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; }
const RpcTargetFactory& target_factory() const;
private:
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index a8cdc0bfeea..8b5c7706510 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -16,12 +16,14 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/trace/tracelevel.h>
#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".storage.storage_api_rpc_service");
using vespalib::compression::CompressionConfig;
+using vespalib::TraceLevel;
namespace storage::rpc {
@@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
scmd->getTrace().setLevel(hdr.trace_level());
scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms()));
req->DiscardBlobs();
+ if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ scmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ _rpc_resources.hostname().c_str(),
+ _rpc_resources.listen_port(),
+ uncompressed_size));
+ }
detach_and_forward_to_enqueuer(std::move(scmd), req);
} else {
req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload");
}
}
-void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
+void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) {
LOG(spam, "Server: encoding rpc.v1 response header and payload");
auto* ret = request.GetReturn();
+ if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str()));
+ }
// TODO skip encoding header altogether if no relevant fields set?
protobuf::ResponseHeader hdr;
if (reply.getTrace().getLevel() > 0) {
@@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
+ if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage());
+ }
// Always dispatch async for synchronously generated replies, or we risk nuking the
// stack if the reply receiver keeps resending synchronously as well.
_message_dispatcher.dispatch_async(std::move(reply));
return;
}
+ if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
+ _rpc_resources.handle().c_str(),
+ CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
+ target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ }
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
+ auto& cmd = *req_ctx->_originator_cmd;
if (!req->CheckReturnTypes("bixbix")) {
handle_request_done_rpc_error(*req, *req_ctx);
return;
}
LOG(spam, "Client: received rpc.v1 OK response");
-
const auto& ret = *req->GetReturn();
protobuf::ResponseHeader hdr;
if (!decode_header_from_rpc_params(ret, hdr)) {
@@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
return;
}
std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
- bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ uint32_t uncompressed_size = 0;
+ bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) {
wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
+ uncompressed_size = payload.size();
});
if (!ok) {
assert(!wrapped_reply);
@@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
assert(reply);
if (!hdr.trace_payload().empty()) {
- req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ }
+ if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Response received at '%s' with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ uncompressed_size));
}
- reply->getTrace().swap(req_ctx->_originator_cmd->getTrace());
+ reply->getTrace().swap(cmd.getTrace());
+ reply->setApproxByteSize(uncompressed_size);
// TODO ensure that no implicit long-lived refs end up pointing into RPC memory...!
req->DiscardBlobs();
@@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont
void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) {
auto error_reply = cmd.makeReply();
- LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'",
+ LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'",
cmd.getAddress()->toString().c_str(), error.toString().c_str());
+ error_reply->getTrace().swap(cmd.getTrace());
+ if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage());
+ }
error_reply->setResult(std::move(error));
- // TODO needs tracing of received-event!
_message_dispatcher.dispatch_sync(std::move(error_reply));
}
@@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc(
/*
* Major TODOs:
- * - tracing and trace propagation
- * - forwards/backwards compatibility
- * - what to remap bounced Not Found errors to internally?
* - lifetime semantics of FRT targets vs requests created from them?
* - lifetime of document type/fieldset repos vs messages
* - is repo ref squirreled away into the messages anywhere?
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 39508e51841..cb2344ccd13 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -58,14 +58,13 @@ public:
[[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
- void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
+ void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
static constexpr const char* rpc_v1_method_name() noexcept {
return "storageapi.v1.send";
}
private:
- // TODO dedupe
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
struct RpcRequestContext {