summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java16
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java27
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java17
-rw-r--r--controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java45
-rw-r--r--persistence/src/vespa/persistence/conformancetest/conformancetest.cpp71
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h5
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h24
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp6
-rw-r--r--searchcore/src/apps/proton/downpersistence.h1
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp9
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h6
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp70
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp13
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp15
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp62
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp38
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp114
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h6
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp17
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h1
26 files changed, 122 insertions, 467 deletions
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java
index 5c11dfc2a55..ced3d201f6f 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java
@@ -58,13 +58,25 @@ enum PathGroup {
"/application/v4/tenant/{tenant}/application/",
"/application/v4/tenant/{tenant}/cost",
"/application/v4/tenant/{tenant}/cost/{date}",
- "/routing/v1/status/tenant/{tenant}/{*}",
- "/billing/v1/tenant/{tenant}/{*}"),
+ "/routing/v1/status/tenant/{tenant}/{*}"),
tenantKeys(Matcher.tenant,
PathPrefix.api,
"/application/v4/tenant/{tenant}/key/"),
+
+ billingToken(Matcher.tenant,
+ PathPrefix.api,
+ "/billing/v1/tenant/{tenant}/token"),
+
+ billingInstrument(Matcher.tenant,
+ PathPrefix.api,
+ "/billing/v1/tenant/{tenant}/instrument/{*}"),
+
+ billingList(Matcher.tenant,
+ PathPrefix.api,
+ "/billing/v1/tenant/{tenant}/billing/{*}"),
+
applicationKeys(Matcher.tenant,
Matcher.application,
PathPrefix.api,
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java
index cfe8d247e54..0afa0668a00 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java
@@ -142,7 +142,32 @@ enum Policy {
/** Access to /payment/notification */
paymentProcessor(Privilege.grant(Action.create)
.on(PathGroup.paymentProcessor)
- .in(SystemName.PublicCd));
+ .in(SystemName.PublicCd)),
+
+ /** Read your own instrument information */
+ paymentInstrumentRead(Privilege.grant(Action.read)
+ .on(PathGroup.billingInstrument)
+ .in(SystemName.PublicCd)),
+
+ /** Ability to update tenant payment instrument */
+ paymentInstrumentUpdate(Privilege.grant(Action.update)
+ .on(PathGroup.billingInstrument)
+ .in(SystemName.PublicCd)),
+
+ /** Ability to remove your own payment instrument */
+ paymentInstrumentDelete(Privilege.grant(Action.delete)
+ .on(PathGroup.billingInstrument)
+ .in(SystemName.PublicCd)),
+
+ /** Get the token to view instrument form */
+ paymentInstrumentCreate(Privilege.grant(Action.read)
+ .on(PathGroup.billingToken)
+ .in(SystemName.PublicCd)),
+
+ /** Read the generated bills */
+ billingInformationRead(Privilege.grant(Action.read)
+ .on(PathGroup.billingList)
+ .in(SystemName.PublicCd));
private final Set<Privilege> privileges;
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
index c05936ee593..438e79bcc4f 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
@@ -43,7 +43,10 @@ public enum RoleDefinition {
reader(Policy.tenantRead,
Policy.applicationRead,
Policy.deploymentRead,
- Policy.publicRead),
+ Policy.publicRead,
+ Policy.paymentInstrumentRead,
+ Policy.paymentInstrumentDelete,
+ Policy.billingInformationRead),
/** User — the dev.ops. role for normal Vespa tenant users */
developer(Policy.applicationCreate,
@@ -52,12 +55,20 @@ public enum RoleDefinition {
Policy.applicationOperations,
Policy.developmentDeployment,
Policy.keyManagement,
- Policy.submission),
+ Policy.submission,
+ Policy.paymentInstrumentRead,
+ Policy.paymentInstrumentDelete,
+ Policy.billingInformationRead),
/** Admin — the administrative function for user management etc. */
administrator(Policy.tenantUpdate,
Policy.tenantManager,
- Policy.applicationManager),
+ Policy.applicationManager,
+ Policy.paymentInstrumentRead,
+ Policy.paymentInstrumentUpdate,
+ Policy.paymentInstrumentDelete,
+ Policy.paymentInstrumentCreate,
+ Policy.billingInformationRead),
/** Headless — the application specific role identified by deployment keys for production */
headless(Policy.submission),
diff --git a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java
index 57b4af9d16c..2da93c5ceca 100644
--- a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java
+++ b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java
@@ -6,8 +6,10 @@ import com.yahoo.config.provision.SystemName;
import com.yahoo.config.provision.TenantName;
import org.junit.Test;
+import java.awt.event.AdjustmentEvent;
import java.net.URI;
import java.util.List;
+import java.util.stream.Stream;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -19,6 +21,7 @@ public class RoleTest {
private static final Enforcer mainEnforcer = new Enforcer(SystemName.main);
private static final Enforcer publicEnforcer = new Enforcer(SystemName.Public);
+ private static final Enforcer publicCdEnforcer = new Enforcer(SystemName.PublicCd);
@Test
public void operator_membership() {
@@ -143,4 +146,46 @@ public class RoleTest {
}
}
+ @Test
+ public void payment_instrument() {
+ URI paymentInstrumentUri = URI.create("/billing/v1/tenant/t1/instrument/foobar");
+ URI tenantPaymentInstrumentUri = URI.create("/billing/v1/tenant/t1/instrument");
+ URI tokenUri = URI.create("/billing/v1/tenant/t1/token");
+
+ Role user = Role.reader(TenantName.from("t1"));
+ assertTrue(publicCdEnforcer.allows(user, Action.read, paymentInstrumentUri));
+ assertTrue(publicCdEnforcer.allows(user, Action.delete, paymentInstrumentUri));
+ assertFalse(publicCdEnforcer.allows(user, Action.update, tenantPaymentInstrumentUri));
+ assertFalse(publicCdEnforcer.allows(user, Action.read, tokenUri));
+
+ Role developer = Role.developer(TenantName.from("t1"));
+ assertTrue(publicCdEnforcer.allows(developer, Action.read, paymentInstrumentUri));
+ assertTrue(publicCdEnforcer.allows(developer, Action.delete, paymentInstrumentUri));
+ assertFalse(publicCdEnforcer.allows(developer, Action.update, tenantPaymentInstrumentUri));
+ assertFalse(publicCdEnforcer.allows(developer, Action.read, tokenUri));
+
+ Role admin = Role.administrator(TenantName.from("t1"));
+ assertTrue(publicCdEnforcer.allows(admin, Action.read, paymentInstrumentUri));
+ assertTrue(publicCdEnforcer.allows(admin, Action.delete, paymentInstrumentUri));
+ assertTrue(publicCdEnforcer.allows(admin, Action.update, tenantPaymentInstrumentUri));
+ assertTrue(publicCdEnforcer.allows(admin, Action.read, tokenUri));
+ }
+
+ @Test
+ public void billing() {
+ URI billing = URI.create("/billing/v1/tenant/t1/billing");
+
+ Role user = Role.reader(TenantName.from("t1"));
+ Role developer = Role.developer(TenantName.from("t1"));
+ Role admin = Role.administrator(TenantName.from("t1"));
+
+ Stream.of(user, developer, admin).forEach(role -> {
+ assertTrue(publicCdEnforcer.allows(role, Action.read, billing));
+ assertFalse(publicCdEnforcer.allows(role, Action.update, billing));
+ assertFalse(publicCdEnforcer.allows(role, Action.delete, billing));
+ assertFalse(publicCdEnforcer.allows(role, Action.create, billing));
+ });
+
+ }
+
}
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
index 5c8b5b029d2..298e29cbecb 100644
--- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
+++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
@@ -300,8 +300,6 @@ feedDocs(PersistenceProvider& spi,
EXPECT_TRUE(!result.hasError());
docs.push_back(DocAndTimestamp(doc, Timestamp(1000 + i)));
}
- spi.flush(bucket, context);
- EXPECT_EQ(Result(), Result(spi.flush(bucket, context)));
return docs;
}
@@ -354,8 +352,6 @@ TEST_F(ConformanceTest, testBasics)
Result(),
Result(spi->remove(bucket, Timestamp(3), doc1->getId(), context)));
- EXPECT_EQ(Result(), Result(spi->flush(bucket, context)));
-
// Iterate first without removes, then with.
for (int iterPass = 0; iterPass < 2; ++iterPass) {
bool includeRemoves = (iterPass == 1);
@@ -441,11 +437,8 @@ TEST_F(ConformanceTest, testListBuckets)
spi->createBucket(bucket3, context);
spi->put(bucket1, Timestamp(1), doc1, context);
- spi->flush(bucket1, context);
spi->put(bucket2, Timestamp(2), doc2, context);
- spi->flush(bucket2, context);
spi->put(bucket3, Timestamp(3), doc3, context);
- spi->flush(bucket3, context);
{
BucketIdListResult result = spi->listBuckets(makeBucketSpace(), PartitionId(1));
@@ -482,7 +475,6 @@ TEST_F(ConformanceTest, testBucketInfo)
spi->put(bucket, Timestamp(2), doc2, context);
const BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
{
EXPECT_EQ(1, (int)info1.getDocumentCount());
@@ -492,7 +484,6 @@ TEST_F(ConformanceTest, testBucketInfo)
spi->put(bucket, Timestamp(3), doc1, context);
const BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
{
EXPECT_EQ(2, (int)info2.getDocumentCount());
@@ -503,7 +494,6 @@ TEST_F(ConformanceTest, testBucketInfo)
spi->put(bucket, Timestamp(4), doc1, context);
const BucketInfo info3 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
{
EXPECT_EQ(2, (int)info3.getDocumentCount());
@@ -514,7 +504,6 @@ TEST_F(ConformanceTest, testBucketInfo)
spi->remove(bucket, Timestamp(5), doc1->getId(), context);
const BucketInfo info4 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
{
EXPECT_EQ(1, (int)info4.getDocumentCount());
@@ -544,7 +533,6 @@ TEST_F(ConformanceTest, testOrderIndependentBucketInfo)
{
spi->put(bucket, Timestamp(2), doc1, context);
spi->put(bucket, Timestamp(3), doc2, context);
- spi->flush(bucket, context);
const BucketInfo info(spi->getBucketInfo(bucket).getBucketInfo());
checksumOrdered = info.getChecksum();
@@ -563,7 +551,6 @@ TEST_F(ConformanceTest, testOrderIndependentBucketInfo)
// Swap order of puts
spi->put(bucket, Timestamp(3), doc2, context);
spi->put(bucket, Timestamp(2), doc1, context);
- spi->flush(bucket, context);
const BucketInfo info(spi->getBucketInfo(bucket).getBucketInfo());
checksumUnordered = info.getChecksum();
@@ -589,7 +576,6 @@ TEST_F(ConformanceTest, testPut)
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount());
@@ -615,7 +601,6 @@ TEST_F(ConformanceTest, testPutNewDocumentVersion)
Result result = spi->put(bucket, Timestamp(3), doc1, context);
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount());
@@ -627,7 +612,6 @@ TEST_F(ConformanceTest, testPutNewDocumentVersion)
result = spi->put(bucket, Timestamp(4), doc2, context);
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount());
@@ -666,7 +650,6 @@ TEST_F(ConformanceTest, testPutOlderDocumentVersion)
Result result = spi->put(bucket, Timestamp(5), doc1, context);
const BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
{
EXPECT_EQ(1, (int)info1.getDocumentCount());
EXPECT_TRUE(info1.getEntryCount() >= info1.getDocumentCount());
@@ -678,7 +661,6 @@ TEST_F(ConformanceTest, testPutOlderDocumentVersion)
result = spi->put(bucket, Timestamp(4), doc2, context);
{
const BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info2.getDocumentCount());
EXPECT_TRUE(info2.getEntryCount() >= info1.getDocumentCount());
@@ -712,7 +694,6 @@ TEST_F(ConformanceTest, testPutDuplicate)
BucketChecksum checksum;
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
checksum = info.getChecksum();
}
@@ -721,7 +702,6 @@ TEST_F(ConformanceTest, testPutDuplicate)
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
EXPECT_EQ(checksum, info.getChecksum());
}
@@ -746,7 +726,6 @@ TEST_F(ConformanceTest, testRemove)
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(1, (int)info.getDocumentCount());
EXPECT_TRUE(info.getChecksum() != 0);
@@ -764,7 +743,6 @@ TEST_F(ConformanceTest, testRemove)
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(0, (int)info.getDocumentCount());
EXPECT_EQ(0, (int)info.getChecksum());
@@ -791,7 +769,6 @@ TEST_F(ConformanceTest, testRemove)
context);
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(0, (int)info.getDocumentCount());
EXPECT_EQ(0, (int)info.getChecksum());
@@ -799,7 +776,6 @@ TEST_F(ConformanceTest, testRemove)
}
Result result4 = spi->put(bucket, Timestamp(9), doc1, context);
- spi->flush(bucket, context);
EXPECT_TRUE(!result4.hasError());
@@ -809,7 +785,6 @@ TEST_F(ConformanceTest, testRemove)
context);
{
const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo();
- spi->flush(bucket, context);
EXPECT_EQ(0, (int)info.getDocumentCount());
EXPECT_EQ(0, (int)info.getChecksum());
@@ -847,7 +822,6 @@ TEST_F(ConformanceTest, testRemoveMerge)
Timestamp(10),
removeId,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode());
EXPECT_EQ(false, removeResult.wasFound());
}
@@ -875,7 +849,6 @@ TEST_F(ConformanceTest, testRemoveMerge)
Timestamp(11),
removeId,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode());
EXPECT_EQ(false, removeResult.wasFound());
}
@@ -903,7 +876,6 @@ TEST_F(ConformanceTest, testRemoveMerge)
Timestamp(7),
removeId,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode());
EXPECT_EQ(false, removeResult.wasFound());
}
@@ -946,7 +918,6 @@ TEST_F(ConformanceTest, testUpdate)
{
UpdateResult result = spi->update(bucket, Timestamp(3), update,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result(), Result(result));
EXPECT_EQ(Timestamp(0), result.getExistingTimestamp());
}
@@ -955,7 +926,6 @@ TEST_F(ConformanceTest, testUpdate)
{
UpdateResult result = spi->update(bucket, Timestamp(4), update,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode());
EXPECT_EQ(Timestamp(3), result.getExistingTimestamp());
@@ -975,7 +945,6 @@ TEST_F(ConformanceTest, testUpdate)
}
spi->remove(bucket, Timestamp(5), doc1->getId(), context);
- spi->flush(bucket, context);
{
GetResult result = spi->get(bucket,
@@ -991,7 +960,6 @@ TEST_F(ConformanceTest, testUpdate)
{
UpdateResult result = spi->update(bucket, Timestamp(6), update,
context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode());
EXPECT_EQ(Timestamp(0), result.getExistingTimestamp());
@@ -1009,7 +977,6 @@ TEST_F(ConformanceTest, testUpdate)
// Document does not exist (and therefore its condition cannot match by definition),
// but since CreateIfNonExistent is set it should be auto-created anyway.
UpdateResult result = spi->update(bucket, Timestamp(7), update, context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode());
EXPECT_EQ(Timestamp(7), result.getExistingTimestamp());
}
@@ -1044,7 +1011,6 @@ TEST_F(ConformanceTest, testGet)
}
spi->put(bucket, Timestamp(3), doc1, context);
- spi->flush(bucket, context);
{
GetResult result = spi->get(bucket, document::AllFields(),
@@ -1054,7 +1020,6 @@ TEST_F(ConformanceTest, testGet)
}
spi->remove(bucket, Timestamp(4), doc1->getId(), context);
- spi->flush(bucket, context);
{
GetResult result = spi->get(bucket, document::AllFields(),
@@ -1168,7 +1133,6 @@ TEST_F(ConformanceTest, testIterateAllDocsNewestVersionOnly)
spi->put(b, newTimestamp, newDoc, context);
newDocs.push_back(DocAndTimestamp(newDoc, newTimestamp));
}
- spi->flush(b, context);
CreateIteratorResult iter(createIterator(*spi, b, createSelection("")));
@@ -1249,7 +1213,6 @@ TEST_F(ConformanceTest, testIterateMatchTimestampRange)
DocAndTimestamp(doc, Timestamp(1000 + i)));
}
}
- spi->flush(b, context);
Selection sel = Selection(DocumentSelection(""));
sel.setFromTimestamp(fromTimestamp);
@@ -1295,7 +1258,6 @@ TEST_F(ConformanceTest, testIterateExplicitTimestampSubset)
Timestamp(2000),
docsToVisit.front().doc->getId(), context)
.wasFound());
- spi->flush(b, context);
timestampsToVisit.push_back(Timestamp(2000));
removes.insert(docsToVisit.front().doc->getId().toString());
@@ -1339,7 +1301,6 @@ TEST_F(ConformanceTest, testIterateRemoves)
nonRemovedDocs.push_back(docs[i]);
}
}
- spi->flush(b, context);
// First, test iteration without removes
{
@@ -1387,7 +1348,6 @@ TEST_F(ConformanceTest, testIterateMatchSelection)
DocAndTimestamp(doc, Timestamp(1000 + i)));
}
}
- spi->flush(b, context);
CreateIteratorResult iter(
createIterator(*spi,
@@ -1416,7 +1376,6 @@ TEST_F(ConformanceTest, testIterationRequiringDocumentIdOnlyMatching)
// remove entry for it regardless.
EXPECT_TRUE(
!spi->remove(b, Timestamp(2000), removedId, context).wasFound());
- spi->flush(b, context);
Selection sel(createSelection("id == '" + removedId.toString() + "'"));
@@ -1530,7 +1489,6 @@ TEST_F(ConformanceTest, testDeleteBucket)
spi->createBucket(bucket, context);
spi->put(bucket, Timestamp(3), doc1, context);
- spi->flush(bucket, context);
spi->deleteBucket(bucket, context);
testDeleteBucketPostCondition(spi, bucket, *doc1);
@@ -1586,8 +1544,6 @@ TEST_F(ConformanceTest, testSplitNormalCase)
spi->put(bucketC, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketC, context);
-
spi->split(bucketC, bucketA, bucketB, context);
testSplitNormalCasePostCondition(spi, bucketA, bucketB, bucketC,
testDocMan);
@@ -1657,28 +1613,23 @@ TEST_F(ConformanceTest, testSplitTargetExists)
spi->put(bucketC, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketC, context);
for (uint32_t i = 10; i < 20; ++i) {
Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i);
spi->put(bucketB, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketB, context);
EXPECT_TRUE(!spi->getBucketInfo(bucketB).getBucketInfo().isActive());
for (uint32_t i = 10; i < 20; ++i) {
Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i);
spi->put(bucketC, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketC, context);
for (uint32_t i = 20; i < 25; ++i) {
Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i);
spi->put(bucketB, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketB, context);
-
spi->split(bucketC, bucketA, bucketB, context);
testSplitTargetExistsPostCondition(spi, bucketA, bucketB, bucketC,
testDocMan);
@@ -1745,8 +1696,6 @@ TEST_F(ConformanceTest, testSplitSingleDocumentInSource)
Document::SP doc = testDocMan.createRandomDocumentAtLocation(0x06, 0);
spi->put(source, Timestamp(1), doc, context);
- spi->flush(source, context);
-
spi->split(source, target1, target2, context);
testSplitSingleDocumentInSourcePostCondition(
spi, source, target1, target2, testDocMan);
@@ -1804,7 +1753,6 @@ ConformanceTest::createAndPopulateJoinSourceBuckets(
source1.getBucketId().getId(), i));
spi.put(source1, Timestamp(i + 1), doc, context);
}
- spi.flush(source1, context);
for (uint32_t i = 10; i < 20; ++i) {
Document::SP doc(
@@ -1812,7 +1760,6 @@ ConformanceTest::createAndPopulateJoinSourceBuckets(
source2.getBucketId().getId(), i));
spi.put(source2, Timestamp(i + 1), doc, context);
}
- spi.flush(source2, context);
}
void
@@ -1912,21 +1859,17 @@ TEST_F(ConformanceTest, testJoinTargetExists)
spi->put(bucketA, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketA, context);
for (uint32_t i = 10; i < 20; ++i) {
Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i);
spi->put(bucketB, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketB, context);
for (uint32_t i = 20; i < 30; ++i) {
Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i);
spi->put(bucketC, Timestamp(i + 1), doc1, context);
}
- spi->flush(bucketC, context);
-
spi->join(bucketA, bucketB, bucketC, context);
testJoinTargetExistsPostCondition(spi, bucketA, bucketB, bucketC,
testDocMan);
@@ -1991,7 +1934,6 @@ ConformanceTest::populateBucket(const Bucket& b,
location, i);
spi.put(b, Timestamp(i + 1), doc1, context);
}
- spi.flush(b, context);
}
TEST_F(ConformanceTest, testJoinOneBucket)
@@ -2150,7 +2092,6 @@ TEST_F(ConformanceTest, testMaintain)
spi->createBucket(bucket, context);
spi->put(bucket, Timestamp(3), doc1, context);
- spi->flush(bucket, context);
EXPECT_EQ(Result::ErrorType::NONE,
spi->maintain(bucket, LOW).getErrorCode());
@@ -2229,7 +2170,6 @@ TEST_F(SingleDocTypeConformanceTest, testBucketActivationSplitAndJoin)
spi->createBucket(bucketC, context);
spi->put(bucketC, Timestamp(1), doc1, context);
spi->put(bucketC, Timestamp(2), doc2, context);
- spi->flush(bucketC, context);
spi->setActiveState(bucketC, BucketInfo::ACTIVE);
EXPECT_TRUE(spi->getBucketInfo(bucketC).getBucketInfo().isActive());
@@ -2304,14 +2244,11 @@ TEST_F(ConformanceTest, testRemoveEntry)
spi->createBucket(bucket, context);
spi->put(bucket, Timestamp(3), doc1, context);
- spi->flush(bucket, context);
BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo();
{
spi->put(bucket, Timestamp(4), doc2, context);
- spi->flush(bucket, context);
spi->removeEntry(bucket, Timestamp(4), context);
- spi->flush(bucket, context);
BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo();
EXPECT_EQ(info1, info2);
}
@@ -2319,9 +2256,7 @@ TEST_F(ConformanceTest, testRemoveEntry)
// Test case where there exists a previous version of the document.
{
spi->put(bucket, Timestamp(5), doc1, context);
- spi->flush(bucket, context);
spi->removeEntry(bucket, Timestamp(5), context);
- spi->flush(bucket, context);
BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo();
EXPECT_EQ(info1, info2);
}
@@ -2329,14 +2264,11 @@ TEST_F(ConformanceTest, testRemoveEntry)
// Test case where the newest document version after removeEntrying is a remove.
{
spi->remove(bucket, Timestamp(6), doc1->getId(), context);
- spi->flush(bucket, context);
BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo();
EXPECT_EQ(uint32_t(0), info2.getDocumentCount());
spi->put(bucket, Timestamp(7), doc1, context);
- spi->flush(bucket, context);
spi->removeEntry(bucket, Timestamp(7), context);
- spi->flush(bucket, context);
BucketInfo info3 = spi->getBucketInfo(bucket).getBucketInfo();
EXPECT_EQ(info2, info3);
}
@@ -2395,9 +2327,6 @@ TEST_F(ConformanceTest, testBucketSpaces)
spi->put(bucket01, Timestamp(4), doc2, context);
spi->put(bucket11, Timestamp(5), doc3, context);
spi->put(bucket12, Timestamp(6), doc4, context);
- spi->flush(bucket01, context);
- spi->flush(bucket11, context);
- spi->flush(bucket12, context);
// Check bucket lists
assertBucketList(*spi, bucketSpace0, partId, { bucketId1 });
assertBucketList(*spi, bucketSpace1, partId, { bucketId1, bucketId2 });
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 557a9ec2edd..cbd76951004 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -41,11 +41,6 @@ public:
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
/**
- * Default impl is getBucketInfo();
- */
- Result flush(const Bucket&, Context&) override { return Result(); }
-
- /**
* Default impl is remove().
*/
RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 857630e2606..c70d5e3f1c3 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -202,30 +202,6 @@ struct PersistenceProvider
virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0;
/**
- * The service layer may choose to batch certain commands. This means that
- * the service layer will lock the bucket only once, then perform several
- * commands, and finally get the bucket info from the bucket, and then
- * flush it. This can be used to improve performance by caching the
- * modifications, and persisting them to disk only when flush is called.
- * The service layer guarantees that after one of these operations, flush()
- * is called, regardless of whether the operation succeeded or not, before
- * another bucket is processed in the same worker thead. The following
- * operations can be batched and have the guarantees
- * above:
- * - put
- * - get
- * - remove (all versions)
- * - update
- * - revert
- * - join
- * <p/>
- * A provider may of course choose to not sync to disk at flush time either,
- * but then data may be more prone to being lost on node issues, and the
- * provider must figure out when to flush its cache itself.
- */
- virtual Result flush(const Bucket&, Context&) = 0;
-
- /**
* Retrieves the latest version of the document specified by the
* document id. If no versions were found, or the document was removed,
* the result should be successful, but contain no document (see GetResult).
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index 4b911eb6d2b..d4ec9cee395 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -98,12 +98,6 @@ UpdateResult DownPersistence::update(const Bucket&, Timestamp,
errorResult.getErrorMessage());
}
-Result
-DownPersistence::flush(const Bucket&, Context&)
-{
- return errorResult;
-}
-
GetResult
DownPersistence::get(const Bucket&, const document::FieldSet&,
const DocumentId&, Context&) const
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 0a602c4467e..8cdac7aaa1b 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -37,7 +37,6 @@ public:
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override;
- Result flush(const Bucket&, Context&) override;
GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet,
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 05e51993f49..fb80c25bfb7 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -145,15 +145,6 @@ PersistenceProviderWrapper::get(const spi::Bucket& bucket,
return _spi.get(bucket, fieldSet, id, context);
}
-spi::Result
-PersistenceProviderWrapper::flush(const spi::Bucket& bucket,
- spi::Context& context)
-{
- LOG_SPI("flush(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_FLUSH);
- return _spi.flush(bucket, context);
-}
-
spi::CreateIteratorResult
PersistenceProviderWrapper::createIterator(const spi::Bucket& bucket,
const document::FieldSet& fields,
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 511ced02118..9bd3653e8a1 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -36,7 +36,6 @@ public:
FAIL_REPLACE_WITH_REMOVE = 1 << 6,
FAIL_UPDATE = 1 << 7,
FAIL_REVERT = 1 << 8,
- FAIL_FLUSH = 1 << 9,
FAIL_CREATE_ITERATOR = 1 << 10,
FAIL_ITERATE = 1 << 11,
FAIL_DESTROY_ITERATOR = 1 << 12,
@@ -44,7 +43,7 @@ public:
FAIL_SPLIT = 1 << 14,
FAIL_JOIN = 1 << 15,
FAIL_CREATE_BUCKET = 1 << 16,
- FAIL_BUCKET_PERSISTENCE = FAIL_PUT|FAIL_REMOVE|FAIL_UPDATE|FAIL_REVERT|FAIL_FLUSH,
+ FAIL_BUCKET_PERSISTENCE = FAIL_PUT|FAIL_REMOVE|FAIL_UPDATE|FAIL_REVERT,
FAIL_ALL_OPERATIONS = 0xffff,
// TODO: add more as needed
};
@@ -55,7 +54,7 @@ private:
uint32_t _failureMask;
public:
PersistenceProviderWrapper(spi::PersistenceProvider& spi);
- ~PersistenceProviderWrapper();
+ ~PersistenceProviderWrapper() override;
/**
* Explicitly set result to anything != NONE to have all operations
@@ -96,7 +95,6 @@ public:
spi::GetResult get(const spi::Bucket&, const document::FieldSet&,
const spi::DocumentId&, spi::Context&) const override ;
- spi::Result flush(const spi::Bucket&, spi::Context&) override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 4576f8a08f8..2cb390fca86 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -698,19 +698,6 @@ TEST_F(FileStorManagerTest, handler_pause) {
ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
}
-namespace {
-
-uint64_t getPutTime(api::StorageMessage::SP& msg)
-{
- if (!msg.get()) {
- return (uint64_t)-1;
- }
-
- return static_cast<api::PutCommand*>(msg.get())->getTimestamp();
-};
-
-}
-
TEST_F(FileStorManagerTest, remap_split) {
// Setup a filestorthread to test
DummyStorageLink top;
@@ -768,63 +755,6 @@ TEST_F(FileStorManagerTest, remap_split) {
filestorHandler.dumpQueue(0));
}
-TEST_F(FileStorManagerTest, handler_multi) {
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- filestorHandler.setGetNextMessageTimeout(50);
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
-
- std::string content("Here is some content which is in all documents");
-
- Document::SP doc1(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release());
-
- Document::SP doc2(createDocument(content, "id:footype:testdoctype1:n=4567:bar").release());
-
- document::BucketIdFactory factory;
- document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId());
- document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId());
-
- // Populate bucket with the given data
- for (uint32_t i = 1; i < 10; i++) {
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0);
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0);
- }
-
- {
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId);
- ASSERT_EQ(1, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(2, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(3, getPutTime(lock.second));
- }
-
- {
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId);
- ASSERT_EQ(11, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(12, getPutTime(lock.second));
- }
-}
-
TEST_F(FileStorManagerTest, handler_timeout) {
// Setup a filestorthread to test
DummyStorageLink top;
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 0b2baab5652..7e7a76b95a8 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -626,8 +626,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
}
TEST_F(MergeHandlerTest, spi_flush_guard) {
- PersistenceProviderWrapper providerWrapper(
- getPersistenceProvider());
+ PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler(providerWrapper, getEnv());
providerWrapper.setResult(
@@ -635,8 +634,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
setUpChain(MIDDLE);
// Fail applying unrevertable remove
- providerWrapper.setFailureMask(
- PersistenceProviderWrapper::FAIL_REMOVE);
+ providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_REMOVE);
providerWrapper.clearOperationLog();
try {
@@ -645,11 +643,6 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
- // Test that we always flush after applying diff locally, even when
- // errors are encountered.
- const std::vector<std::string>& opLog(providerWrapper.getOperationLog());
- ASSERT_FALSE(opLog.empty());
- EXPECT_EQ("flush(Bucket(0x40000000000004d2, partition 0))", opLog.back());
}
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
@@ -901,7 +894,6 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
{ PersistenceProviderWrapper::FAIL_PUT, "Failed put" },
{ PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" },
- { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" },
};
typedef ExpectedExceptionSpec* ExceptionIterator;
@@ -1058,7 +1050,6 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
{ PersistenceProviderWrapper::FAIL_PUT, "Failed put" },
{ PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" },
- { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" },
};
typedef ExpectedExceptionSpec* ExceptionIterator;
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index be276dd7f9d..3754a82e7ae 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -168,19 +168,4 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
ASSERT_FALSE(lock1.first.get());
}
-TEST_F(PersistenceQueueTest, operation_batching_not_allowed_across_different_lock_modes) {
- Fixture f(*this);
-
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
- f.filestorHandler->schedule(createGet(1234), _disk);
-
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock0.second);
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
-
- f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0);
- ASSERT_FALSE(lock0.second);
-}
-
} // namespace storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index e32fc056413..25c0a36a7f5 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -159,7 +159,6 @@ PersistenceTestUtils::doPutOnDisk(
getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context);
- getPersistenceProvider().flush(b, context);
return doc;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 543accc80ae..44cd8f0ab0c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -60,12 +60,6 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId)
return _impl->getNextMessage(disk, stripeId);
}
-FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lck)
-{
- return _impl->getNextMessage(disk, stripeId, lck);
-}
-
FileStorHandler::BucketLockInterface::SP
FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index db294d7c39f..af0a52b2fa0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -121,11 +121,6 @@ public:
LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
/**
- * Returns the next message for the same bucket.
- */
- LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock);
-
- /**
* Lock a bucket. By default, each file stor thread has the locks of all
* buckets in their area of responsibility. If they need to access buckets
* outside of their area, they can call this to make sure the thread
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index bfd5233d017..5ad37d585ef 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -319,25 +319,6 @@ FileStorHandlerImpl::getNextStripeId(uint32_t disk) {
return _diskInfo[disk].getNextStripeId();
}
-
-FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck)
-{
- document::Bucket bucket(lck.first->getBucket());
-
- LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str());
-
- assert(diskId < _diskInfo.size());
- Disk& disk(_diskInfo[diskId]);
-
- if (disk.isClosed()) {
- lck.second.reset();
- return lck;
- }
-
- return disk.getNextMessage(stripeId, lck);
-}
-
bool
FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
{
@@ -977,49 +958,6 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
return {}; // No message fetched.
}
-FileStorHandler::LockedMessage &
-FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
-{
- const document::Bucket & bucket = lck.second->getBucket();
- vespalib::MonitorGuard guard(_lock);
- BucketIdx& idx = bmi::get<2>(_queue);
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
-
- // No more for this bucket.
- if (range.first == range.second) {
- lck.second.reset();
- return lck;
- }
-
- api::StorageMessage & m(*range.first->_command);
- // For now, don't allow batching of operations across lock requirement modes.
- // We might relax this requirement later once we're 100% sure it can't trigger
- // any unfortunate edge cases.
- if (lck.first->lockingRequirements() != m.lockingRequirements()) {
- lck.second.reset();
- return lck;
- }
-
- std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
-
- if (!messageTimedOutInQueue(m, waitTime)) {
- std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
- idx.erase(range.first);
- lck.second.swap(msg);
- guard.broadcast();
- } else {
- std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply();
- idx.erase(range.first);
- guard.broadcast();
- guard.unlock();
- msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
- _messageSender.sendReply(msgReply);
-
- lck.second.reset();
- }
- return lck;
-}
-
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index da4d242a4c9..7a4f9000e82 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -129,7 +129,6 @@ public:
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
- FileStorHandler::LockedMessage & getNextMessage(FileStorHandler::LockedMessage& lock);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
@@ -185,9 +184,6 @@ public:
FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) {
return _stripes[stripeId].getNextMessage(timeout, *this);
}
- FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) {
- return _stripes[stripeId].getNextMessage(lck);
- }
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
return stripe(bucket).lock(bucket, lockReq);
@@ -253,8 +249,6 @@ public:
FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock);
-
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index ea6f5e03a80..6eab44923f7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -802,7 +802,7 @@ void FileStorManager::onFlush(bool downwards)
LOG(debug, "Flushed _filestorHandler->flush(!downwards);");
for (uint32_t i = 0; i < _disks.size(); ++i) {
for (uint32_t j = 0; j < _disks[i].size(); ++j) {
- if (_disks[i][j].get() != NULL) {
+ if (_disks[i][j]) {
_disks[i][j]->flush();
LOG(debug, "flushed disk[%d][%d]", i, j);
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index e5e358cbb60..d47ed28f636 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -91,40 +91,6 @@ public:
}
};
-class FlushGuard
-{
- spi::PersistenceProvider& _spi;
- spi::Bucket _bucket;
- spi::Context& _context;
- bool _hasFlushed;
-public:
- FlushGuard(spi::PersistenceProvider& spi,
- const spi::Bucket& bucket,
- spi::Context& context)
- : _spi(spi),
- _bucket(bucket),
- _context(context),
- _hasFlushed(false)
- {}
- ~FlushGuard()
- {
- if (!_hasFlushed) {
- LOG(debug, "Auto-flushing %s", _bucket.toString().c_str());
- spi::Result result =_spi.flush(_bucket, _context);
- if (result.hasError()) {
- LOG(debug, "Flush %s failed: %s",
- _bucket.toString().c_str(),
- result.toString().c_str());
- }
- }
- }
- void flush() {
- LOG(debug, "Flushing %s", _bucket.toString().c_str());
- _hasFlushed = true;
- checkResult(_spi.flush(_bucket, _context), _bucket, "flush");
- }
-};
-
struct IndirectDocEntryTimestampPredicate
{
bool operator()(const spi::DocEntry::UP& e1,
@@ -607,8 +573,6 @@ MergeHandler::applyDiffLocally(
std::vector<spi::DocEntry::UP> entries;
populateMetaData(bucket, MAX_TIMESTAMP, entries, context);
- FlushGuard flushGuard(_spi, bucket, context);
-
std::shared_ptr<const document::DocumentTypeRepo> repo(_env._component.getTypeRepo());
assert(repo.get() != nullptr);
@@ -702,8 +666,6 @@ MergeHandler::applyDiffLocally(
LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.",
bucket.toString().c_str(), addedCount);
- flushGuard.flush();
-
spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket));
if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) {
LOG(warning, "Failed to get bucket info for %s: %s",
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index e7123164659..aaf62a85e87 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -607,10 +607,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context
if (!checkForError(result, *tracker)) {
return tracker;
}
- result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), context);
- if (!checkForError(result, *tracker)) {
- return tracker;
- }
uint64_t lastModified = 0;
for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) {
document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]);
@@ -895,114 +891,24 @@ bool hasBucketInfo(const api::StorageMessage& msg)
}
void
-PersistenceThread::flushAllReplies(
- const document::Bucket& bucket,
- std::vector<std::unique_ptr<MessageTracker> >& replies)
-{
- if (replies.empty()) {
- return;
- }
-
- try {
- if (replies.size() > 1) {
- _env._metrics.batchingSize.addValue(replies.size());
- }
-#ifdef ENABLE_BUCKET_OPERATION_LOGGING
- {
- size_t nputs = 0, nremoves = 0, nother = 0;
- for (size_t i = 0; i < replies.size(); ++i) {
- if (dynamic_cast<api::PutReply*>(replies[i]->getReply().get()))
- {
- ++nputs;
- } else if (dynamic_cast<api::RemoveReply*>(
- replies[i]->getReply().get()))
- {
- ++nremoves;
- } else {
- ++nother;
- }
- }
- LOG_BUCKET_OPERATION(
- bucket.getBucketId(),
- vespalib::make_string(
- "flushing %zu operations (%zu puts, %zu removes, "
- "%zu other)",
- replies.size(), nputs, nremoves, nother));
- }
-#endif
- spi::Bucket b(bucket, spi::PartitionId(_env._partition));
- // Flush is not used for anything currentlu, and the context is not correct either when batching is done
- // So just faking it here.
- spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0);
- spi::Result result = _spi.flush(b, dummyContext);
- uint32_t errorCode = _env.convertErrorCode(result);
- if (errorCode != 0) {
- for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
- }
- }
- } catch (std::exception& e) {
- for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
- }
- }
-
- for (uint32_t i = 0; i < replies.size(); ++i) {
- LOG(spam, "Sending reply up (batched): %s %" PRIu64,
- replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId());
- _env._fileStorHandler.sendReply(replies[i]->getReply());
- }
-
- replies.clear();
-}
-
-void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
-{
+PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) {
std::vector<MessageTracker::UP> trackers;
document::Bucket bucket = lock.first->getBucket();
- while (lock.second) {
- LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
- std::shared_ptr<api::StorageMessage> msg(lock.second);
- bool batchable = isBatchable(*msg);
-
- // If the next operation wasn't batchable, we should flush
- // everything that came before.
- if (!batchable) {
- flushAllReplies(bucket, trackers);
- }
+ LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
+ api::StorageMessage & msg(*lock.second);
- std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
- if (!tracker || !tracker->getReply()) {
- // Was a reply
- break;
- }
-
- if (hasBucketInfo(*msg)) {
+ std::unique_ptr<MessageTracker> tracker = processMessage(msg);
+ if (tracker && tracker->getReply()) {
+ if (hasBucketInfo(msg)) {
if (tracker->getReply()->getResult().success()) {
_env.setBucketInfo(*tracker, bucket);
}
}
- if (batchable) {
- LOG(spam, "Adding reply %s to batch for bucket %s",
- tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str());
-
- trackers.push_back(std::move(tracker));
-
- if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(_env._partition, _stripeId, lock);
- } else {
- break;
- }
- } else {
- LOG(spam, "Sending reply up: %s %" PRIu64,
- tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
- _env._fileStorHandler.sendReply(tracker->getReply());
- break;
- }
+ LOG(spam, "Sending reply up: %s %" PRIu64,
+ tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
+ _env._fileStorHandler.sendReply(tracker->getReply());
}
-
- flushAllReplies(bucket, trackers);
}
void
@@ -1016,7 +922,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId));
if (lock.first) {
- processMessages(lock);
+ processLockedMessage(lock);
}
vespalib::MonitorGuard flushMonitorGuard(_flushMonitor);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index e410843c1be..56414835b7b 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -22,7 +22,7 @@ public:
PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics, uint16_t deviceIndex);
- ~PersistenceThread();
+ ~PersistenceThread() override;
/** Waits for current operation to be finished. */
void flush() override;
@@ -75,15 +75,13 @@ private:
void handleReply(api::StorageReply&);
MessageTracker::UP processMessage(api::StorageMessage& msg);
- void processMessages(FileStorHandler::LockedMessage & lock);
+ void processLockedMessage(FileStorHandler::LockedMessage & lock);
// Thread main loop
void run(framework::ThreadHandle&) override;
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const;
- void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers);
-
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index 5b94a3da027..f37c6723933 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -83,9 +83,9 @@ MessageTracker::UP
ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
spi::Context& context)
{
- MessageTracker::UP tracker(new MessageTracker(
+ auto tracker = std::make_unique<MessageTracker>(
_env._metrics.removeLocation[cmd.getLoadType()],
- _env._component.getClock()));
+ _env._component.getClock());
LOG(debug, "RemoveLocation(%s): using selection '%s'",
cmd.getBucketId().toString().c_str(),
@@ -99,13 +99,8 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
processor,
spi::NEWEST_DOCUMENT_ONLY,
context);
- spi::Result result = _spi.flush(bucket, context);
- uint32_t code = _env.convertErrorCode(result);
- if (code == 0) {
- tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));
- } else {
- tracker->fail(code, result.getErrorMessage());
- }
+
+ tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));
return tracker;
}
@@ -114,9 +109,9 @@ MessageTracker::UP
ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd,
spi::Context& context)
{
- MessageTracker::UP tracker(new MessageTracker(
+ auto tracker = std::make_unique<MessageTracker>(
_env._metrics.statBucket[cmd.getLoadType()],
- _env._component.getClock()));
+ _env._component.getClock());
std::ostringstream ost;
ost << "Persistence bucket " << cmd.getBucketId()
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index fe947bff4d5..76c420e76e6 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -118,12 +118,6 @@ ProviderErrorWrapper::get(const spi::Bucket& bucket,
return checkResult(_impl.get(bucket, fieldSet, docId, context));
}
-spi::Result
-ProviderErrorWrapper::flush(const spi::Bucket& bucket, spi::Context& context)
-{
- return checkResult(_impl.flush(bucket, context));
-}
-
spi::CreateIteratorResult
ProviderErrorWrapper::createIterator(const spi::Bucket& bucket,
const document::FieldSet& fieldSet,
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 3b5ace90d13..292eb004223 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -52,7 +52,6 @@ public:
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
- spi::Result flush(const spi::Bucket&, spi::Context&) override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;