summaryrefslogtreecommitdiffstats
path: root/fnet/src/tests/frt/rpc/sharedblob.cpp
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /fnet/src/tests/frt/rpc/sharedblob.cpp
Publish
Diffstat (limited to 'fnet/src/tests/frt/rpc/sharedblob.cpp')
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp256
1 files changed, 256 insertions, 0 deletions
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
new file mode 100644
index 00000000000..bb115f1c65f
--- /dev/null
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -0,0 +1,256 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/fnet/frt/frt.h>
+#include <vector>
+
+struct MyBlob : FRT_ISharedBlob
+{
+ int refcnt;
+ MyBlob() : refcnt(1) {}
+ virtual uint32_t getLen() { return (strlen("blob_test") + 1); }
+ virtual const char *getData() { return "blob_test"; }
+ virtual void addRef() { ++refcnt; }
+ virtual void subRef() { --refcnt; }
+};
+
+struct Data
+{
+ enum {
+ SMALL = (FRT_MemoryTub::ALLOC_LIMIT / 2),
+ LARGE = (FRT_MemoryTub::ALLOC_LIMIT * 2)
+ };
+
+ char *buf;
+ uint32_t len;
+
+ Data(const char *pt, uint32_t l) : buf(new char[l]), len(l) {
+ memcpy(buf, pt, len);
+ }
+ Data(uint32_t l, char c) : buf(new char[l]), len(l) {
+ memset(buf, c, len);
+ }
+ Data(const Data &rhs) : buf(new char[rhs.len]), len(rhs.len) {
+ memcpy(buf, rhs.buf, len);
+ }
+ Data &operator=(const Data &rhs) {
+ if (this != &rhs) {
+ delete [] buf;
+ buf = new char[rhs.len];
+ len = rhs.len;
+ memcpy(buf, rhs.buf, len);
+ }
+ return *this;
+ }
+ bool check(uint32_t l, char c) {
+ if (l != len) {
+ fprintf(stderr, "blob length was %u, expected %u\n", len, l);
+ return false;
+ }
+ for (uint32_t i = 0; i < l; ++i) {
+ if (buf[i] != c) {
+ fprintf(stderr, "byte at offset %u was %c, expected %c\n", i, buf[i], c);
+ return false;
+ }
+ }
+ return true;
+ }
+ ~Data() {
+ delete [] buf;
+ }
+};
+
+struct DataSet
+{
+ std::vector<Data> blobs;
+
+ void sample(FRT_Values &v) {
+ blobs.push_back(Data(v.GetNumValues(), 'V'));
+ for (uint32_t i = 0; i < v.GetNumValues(); ++i) {
+ if (v.GetType(i) == FRT_VALUE_DATA) {
+ blobs.push_back(Data(1, 'x'));
+ blobs.push_back(Data(v[i]._data._buf, v[i]._data._len));
+ } else if (v.GetType(i) == FRT_VALUE_DATA_ARRAY) {
+ blobs.push_back(Data(v[i]._data_array._len, 'X'));
+ for (uint32_t j = 0; j < v[i]._data_array._len; ++j) {
+ blobs.push_back(Data(v[i]._data_array._pt[j]._buf,
+ v[i]._data_array._pt[j]._len));
+ }
+ }
+ }
+ }
+};
+
+struct ServerSampler : public FRT_Invokable
+{
+ DataSet &dataSet;
+ FRT_RPCRequest *clientReq;
+ FRT_RPCRequest *serverReq;
+
+ ServerSampler(DataSet &ds, FRT_RPCRequest *cr) : dataSet(ds), clientReq(cr), serverReq(0) {}
+
+ void RPC_test(FRT_RPCRequest *req)
+ {
+ if (clientReq != 0) {
+ dataSet.sample(*clientReq->GetParams()); // client params after drop
+ }
+
+ // store away parameters
+ FNET_DataBuffer buf;
+ buf.EnsureFree(req->GetParams()->GetLength());
+ req->GetParams()->EncodeCopy(&buf);
+
+ dataSet.sample(*req->GetParams()); // server params before drop
+ req->DiscardBlobs();
+ dataSet.sample(*req->GetParams()); // server params after drop
+
+ // restore parameters into return values
+ req->GetReturn()->DecodeCopy(&buf, buf.GetDataLen());
+
+ dataSet.sample(*req->GetReturn()); // server return before drop
+
+ // keep request to sample return after drop
+ req->AddRef();
+ serverReq = req;
+ }
+};
+
+TEST("testExplicitShared") {
+ FRT_Supervisor orb;
+ MyBlob blob;
+
+ FRT_RPCRequest *req = orb.AllocRPCRequest();
+ EXPECT_TRUE(blob.refcnt == 1);
+
+ req->GetParams()->AddSharedData(&blob);
+ req->GetParams()->AddInt32(42);
+ req->GetParams()->AddSharedData(&blob);
+ req->GetParams()->AddInt32(84);
+ req->GetParams()->AddSharedData(&blob);
+
+ EXPECT_TRUE(blob.refcnt == 4);
+ EXPECT_TRUE(strcmp(req->GetParamSpec(), "xixix") == 0);
+ EXPECT_TRUE(req->GetParams()->GetValue(0)._data._len == blob.getLen());
+ EXPECT_TRUE(req->GetParams()->GetValue(0)._data._buf == blob.getData());
+ EXPECT_TRUE(req->GetParams()->GetValue(1)._intval32 == 42);
+ EXPECT_TRUE(req->GetParams()->GetValue(2)._data._len == blob.getLen());
+ EXPECT_TRUE(req->GetParams()->GetValue(2)._data._buf == blob.getData());
+ EXPECT_TRUE(req->GetParams()->GetValue(3)._intval32 == 84);
+ EXPECT_TRUE(req->GetParams()->GetValue(4)._data._len == blob.getLen());
+ EXPECT_TRUE(req->GetParams()->GetValue(4)._data._buf == blob.getData());
+
+ req->CreateRequestPacket(true)->Free(); // fake request send.
+
+ EXPECT_TRUE(blob.refcnt == 1);
+ EXPECT_TRUE(strcmp(req->GetParamSpec(), "xixix") == 0);
+ EXPECT_TRUE(req->GetParams()->GetValue(0)._data._len == 0);
+ EXPECT_TRUE(req->GetParams()->GetValue(0)._data._buf == NULL);
+ EXPECT_TRUE(req->GetParams()->GetValue(1)._intval32 == 42);
+ EXPECT_TRUE(req->GetParams()->GetValue(2)._data._len == 0);
+ EXPECT_TRUE(req->GetParams()->GetValue(2)._data._buf == NULL);
+ EXPECT_TRUE(req->GetParams()->GetValue(3)._intval32 == 84);
+ EXPECT_TRUE(req->GetParams()->GetValue(4)._data._len == 0);
+ EXPECT_TRUE(req->GetParams()->GetValue(4)._data._buf == NULL);
+
+ req = orb.AllocRPCRequest(req);
+
+ req->GetParams()->AddSharedData(&blob);
+ req->GetParams()->AddInt32(42);
+ req->GetParams()->AddSharedData(&blob);
+ req->GetParams()->AddInt32(84);
+ req->GetParams()->AddSharedData(&blob);
+
+ EXPECT_TRUE(blob.refcnt == 4);
+ req->SubRef();
+ EXPECT_TRUE(blob.refcnt == 1);
+}
+
+TEST("testImplicitShared") {
+ DataSet dataSet;
+ FRT_Supervisor orb;
+ FRT_RPCRequest *req = orb.AllocRPCRequest();
+ ServerSampler serverSampler(dataSet, req);
+ {
+ FRT_ReflectionBuilder rb(&orb);
+ rb.DefineMethod("test", "*", "*", true,
+ FRT_METHOD(ServerSampler::RPC_test), &serverSampler);
+ }
+ orb.Listen(0);
+ int port = orb.GetListenPort();
+ ASSERT_TRUE(port != 0);
+ orb.Start();
+
+ char tmp[64];
+ snprintf(tmp, sizeof(tmp), "tcp/localhost:%d", port);
+ FRT_Target *target = orb.GetTarget(tmp);
+ req->SetMethodName("test");
+ {
+ Data data(Data::SMALL, 'a');
+ req->GetParams()->AddData(data.buf, data.len);
+ }
+ {
+ Data data(Data::LARGE, 'b');
+ req->GetParams()->AddData(data.buf, data.len);
+ }
+ {
+ char *data = req->GetParams()->AddData(Data::LARGE);
+ memset(data, 'c', Data::LARGE);
+ }
+ {
+ Data data1(Data::SMALL, 'd');
+ Data data2(Data::LARGE, 'e');
+ FRT_DataValue *arr = req->GetParams()->AddDataArray(2);
+ req->GetParams()->SetData(&arr[0], data1.buf, data1.len);
+ req->GetParams()->SetData(&arr[1], data2.buf, data2.len);
+ }
+
+ dataSet.sample(*req->GetParams()); // client params before drop
+
+ target->InvokeSync(req, 30.0);
+
+ if (serverSampler.serverReq != 0) {
+ dataSet.sample(*serverSampler.serverReq->GetReturn()); // server return after drop
+ }
+ dataSet.sample(*req->GetReturn()); // client return before drop
+
+ req->DiscardBlobs();
+
+ dataSet.sample(*req->GetReturn()); // client return after drop
+
+ // verify blob samples
+ EXPECT_EQUAL(dataSet.blobs.size(), 80u);
+
+ for (int i = 0; i < 80; i += 20) {
+ // before discard (client params, server params, server return, client return)
+ EXPECT_TRUE(dataSet.blobs[i + 0].check(4, 'V'));
+ EXPECT_TRUE(dataSet.blobs[i + 1].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 2].check(Data::SMALL, 'a'));
+ EXPECT_TRUE(dataSet.blobs[i + 3].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 4].check(Data::LARGE, 'b'));
+ EXPECT_TRUE(dataSet.blobs[i + 5].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 6].check(Data::LARGE, 'c'));
+ EXPECT_TRUE(dataSet.blobs[i + 7].check(2, 'X'));
+ EXPECT_TRUE(dataSet.blobs[i + 8].check(Data::SMALL, 'd'));
+ EXPECT_TRUE(dataSet.blobs[i + 9].check(Data::LARGE, 'e'));
+
+ // after discard (client params, server params, server return, client return)
+ EXPECT_TRUE(dataSet.blobs[i + 10].check(4, 'V'));
+ EXPECT_TRUE(dataSet.blobs[i + 11].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 12].check(Data::SMALL, 'a'));
+ EXPECT_TRUE(dataSet.blobs[i + 13].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 14].check(0, 0));
+ EXPECT_TRUE(dataSet.blobs[i + 15].check(1, 'x'));
+ EXPECT_TRUE(dataSet.blobs[i + 16].check(0, 0));
+ EXPECT_TRUE(dataSet.blobs[i + 17].check(2, 'X'));
+ EXPECT_TRUE(dataSet.blobs[i + 18].check(Data::SMALL, 'd'));
+ EXPECT_TRUE(dataSet.blobs[i + 19].check(0, 0));
+ }
+
+ if (serverSampler.serverReq != 0) {
+ serverSampler.serverReq->SubRef();
+ }
+ req->SubRef();
+ target->SubRef();
+ orb.ShutDown(true);
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }