summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-03-20 13:22:39 +0100
committerGitHub <noreply@github.com>2018-03-20 13:22:39 +0100
commit76cd17602b493a13d1e993b1f72cfdaf7b627e0b (patch)
tree2d3bd1ee963de6107b80b183a23d03708764fa1a
parente31b9616921f9046a1fabbe9fca9cf3a5d535279 (diff)
parentb9c175537bc743a0bf457d579d03b59867d1a1f1 (diff)
Merge pull request #5378 from vespa-engine/balder/no-more-thread-priorities
Balder/no more thread priorities
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java37
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java44
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java9
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def403
-rw-r--r--storage/src/tests/common/testhelper.cpp23
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp413
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp99
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp18
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp32
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp159
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h30
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp156
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h24
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp188
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/pausehandler.h34
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp215
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp19
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h10
-rw-r--r--storageserver/src/tests/storageservertest.cpp3
-rw-r--r--storageserver/src/tests/testhelper.cpp23
23 files changed, 377 insertions, 1614 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java b/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java
deleted file mode 100644
index e239c66f1cf..00000000000
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.model.content;
-
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.vespa.model.builder.xml.dom.ModelElement;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Created with IntelliJ IDEA.
- * User: thomasg
- * Date: 5/7/12
- * Time: 2:00 PM
- * To change this template use File | Settings | File Templates.
- */
-public class PriorityMapping {
- ModelElement clusterXml;
- Map<DocumentProtocol.Priority, Integer> priorityMappings = new HashMap<>();
-
- public PriorityMapping(ModelElement clusterXml) {
- this.clusterXml = clusterXml;
-
- int val = 50;
- for (DocumentProtocol.Priority p : DocumentProtocol.Priority.values()) {
- priorityMappings.put(p, val);
- val += 10;
- }
- priorityMappings.put(DocumentProtocol.Priority.HIGHEST, 0);
- priorityMappings.put(DocumentProtocol.Priority.LOWEST, 255);
- }
-
- public int getPriorityMapping(String priorityName) {
- return priorityMappings.get(Enum.valueOf(DocumentProtocol.Priority.class, priorityName));
- }
-
-}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
index b4faa6eeb7e..0cc62fb6680 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
@@ -3,12 +3,8 @@ package com.yahoo.vespa.model.content.storagecluster;
import com.yahoo.vespa.config.content.StorFilestorConfig;
import com.yahoo.vespa.model.builder.xml.dom.ModelElement;
-import com.yahoo.vespa.model.content.PriorityMapping;
import com.yahoo.vespa.model.content.cluster.ContentCluster;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Serves stor-filestor for storage clusters.
*/
@@ -19,7 +15,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
return new FileStorProducer(parent, getThreads(clusterElem));
}
- private List<StorFilestorConfig.Threads.Builder> getThreads(ModelElement clusterElem) {
+ private Integer getThreads(ModelElement clusterElem) {
ModelElement tuning = clusterElem.getChild("tuning");
if (tuning == null) {
return null;
@@ -29,44 +25,34 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
return null;
}
- List<StorFilestorConfig.Threads.Builder> retVal = new ArrayList<>();
-
- PriorityMapping mapping = new PriorityMapping(clusterElem);
+ Integer count = threads.getIntegerAttribute("count");
+ if (count != null) {
+ return count;
+ }
+ // Backward compatible fallback
+ int numThreads = 0;
for (ModelElement thread : threads.subElements("thread")) {
- String priorityName = thread.getStringAttribute("lowest-priority");
- if (priorityName == null) {
- priorityName = "LOWEST";
- }
-
- Integer count = thread.getIntegerAttribute("count");
- if (count == null) {
- count = 1;
- }
-
- for (int i = 0; i < count; ++i) {
- retVal.add(new StorFilestorConfig.Threads.Builder().lowestpri(mapping.getPriorityMapping(priorityName)));
- }
+ count = thread.getIntegerAttribute("count");
+ numThreads += (count == null) ? 1 : count;
}
- return retVal;
+ return numThreads;
}
}
- private List<StorFilestorConfig.Threads.Builder> threads;
+ private Integer numThreads;
private ContentCluster cluster;
- public FileStorProducer(ContentCluster parent, List<StorFilestorConfig.Threads.Builder> threads) {
- this.threads = threads;
+ public FileStorProducer(ContentCluster parent, Integer numThreads) {
+ this.numThreads = numThreads;
this.cluster = parent;
}
@Override
public void getConfig(StorFilestorConfig.Builder builder) {
- if (threads != null) {
- for (StorFilestorConfig.Threads.Builder t : threads) {
- builder.threads.add(t);
- }
+ if (numThreads != null) {
+ builder.num_threads(numThreads);
}
builder.enable_multibit_split_optimalization(cluster.getPersistence().enableMultiLevelSplitting());
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
index 0fae6c6d751..95da1516e80 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
@@ -120,12 +120,7 @@ public class StorageClusterTest {
StorFilestorConfig config = new StorFilestorConfig(builder);
- assertEquals(4, config.threads().size());
- assertEquals(190, config.threads().get(0).lowestpri());
- assertEquals(190, config.threads().get(1).lowestpri());
- assertEquals(60, config.threads().get(2).lowestpri());
- assertEquals(255, config.threads().get(3).lowestpri());
-
+ assertEquals(4, config.num_threads());
assertEquals(true, config.enable_multibit_split_optimalization());
}
@@ -145,7 +140,7 @@ public class StorageClusterTest {
StorFilestorConfig config = new StorFilestorConfig(builder);
- assertEquals(0, config.threads().size());
+ assertEquals(6, config.num_threads());
}
@Test
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 80994bfb477..c02a9018064 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -1,79 +1,8 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
namespace=vespa.config.content
-## Use the new storage core
-use_new_core bool default=false restart
-
-## FILE LAYOUT PARAMETERS
-
-## Number of directories per level to spread files across
-## DEPRECATED: see stor-memfilepersistence config instead
-dir_spread int default=256 restart
-
-## Number of directory levels
-## DEPRECATED: see stor-memfilepersistence config instead
-dir_levels int default=1 restart
-
-## FILE SIZE PARAMETERS
-
-## Minimum number of meta data entries in one slotfile. When creating new
-## files or resizing files, enforce it to contain at least this many meta
-## entries. Set to 512 by default, using 20544 bytes total for metadata in
-## new files.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_file_meta_slots int default=512
-
-## Maximum number of entries in one slotfile. File must be split before
-## accepting more data
-##
-## Default ensure meta data is less than 512 kB.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_file_meta_slots int default=13106
-
-## Minimum size of header block. At least this amount of header space will
-## be available in new or resized files. For 512 documents, this will be
-## 200 bytes per document.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_file_header_block_size int default=102848
-
-## Maximum header block size (in bytes). Since the whole meta data list and
-## header block needs to be read into memory for some operations, a limit
-## can be set for the header block, to avoid consuming too much memory.
-##
-## Default is set high, as we dont configure it automatically right now, so we
-## would rather people configure it down than up.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_file_header_block_size int default=33554432
-
-## Minimum size of a single slotfile. When creating or resizing files, they
-## will never become smaller than this. Default of 1 MB, will be 1807 byte
-## per doc if we use all 512 meta data entries set as default min meta
-## entries.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_file_size int default=1048576
-
-## Maximum size of a single slotfile. File must be split before accepting
-## more data. Will return file full errors.
-##
-## Default is set high, as we dont configure it automatically right now, so we
-## would rather people configure it down than up.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_file_size int default=268431360
-
-## When creating new files, always create files as a multiplum of this size.
-## Should be set to the block size on the disk, or possibly a multiplum of
-## that, though we know no advantage of setting it to more than the block
-## size. If you want more free space in files, you should rather adjust that
-## with growfactor or min file sizes.
-## DEPRECATED: see stor-memfilepersistence config instead
-file_block_size int default=4096
-
## DETECT FAILURE PARAMETERS
-## If persistence operations take more than this amount of milliseconds, it
-## will be logged as a warning.
-warn_on_slow_operations int default=5000
-
## After seeing given number of errors on a disk, storage will disable the
## disk and restart. If set to 0, storage will never disable disks. Note
## that if you get disk errors, which arent automatically fixed, this will
@@ -92,75 +21,15 @@ fail_disk_after_error_count int default=1 restart
## during normal load.
disk_operation_timeout int default=0 restart
-## Timestamps provided for entries written are normally provided externally.
-## To detect clock skew between nodes, storage will warn if timestamps look
-## out of sync. Future time clearly indicates clock skew and thus the limit
-## here is low. Time in the past might just indicate that the operation has
-## been queued for a while, so the default value here is not very strict.
-## Time is given in seconds. Note that there are valid usecases for
-## inserting data with old timestamps, for instance when synchronizing two
-## copies of the same data. Warnings should not be printed in these cases.
-time_future_limit int default=5
-time_past_limit int default=3600
-
-## Enabling debug verifications will make storage do extra verifications
-## to find errors as soon as possible. These extra verifications will use up
-## a lot of resources though, and should not be needed in normal operations.
-## They are mostly to be used during test phases or when debugging problems.
-## The value itself is a bit mask, where you can enable different kinds of
-## verifications by setting given bits.
-debug_verifications int default=0 restart
-
-## CONSISTENCY PARAMETERS
-
-## If true, fsync after all disk operations, to ensure no dirty OS file
-## cache afterwards. This is only useful if using cached IO, which is not
-## recommended to start with.
-## DEPRECATED: see stor-memfilepersistence config instead
-fsync_after_each_operation bool default=false restart
-
-## Time period to keep all updates (given in seconds). One can revert any
-## operation done within this time.
-## DEPRECATED: see stor-memfilepersistence config instead
-revert_time_period int default=300
-
-## If a remove is older than the reverttimeperiod, the document it is
-## removing may be removed from the file. There are a few advantages to
-## keeping removes or a bit longer though. If you use this copy to
-## synchronize another copy of data, having the remove entry makes it easy
-## to detect that you should delete this entry from the other data copy.
-## This is useful for internal synchronization of files within VDS if you
-## use multiple copies, or for partial recovery or BCP situations. To
-## guarantuee consistency in such situations, a data destination that have
-## been unavailable for longer than this amount of time, should have its
-## data cleared before being set into the system again. This value is given
-## in seconds, with the default being one week
-## DEPRECATED: see stor-memfilepersistence config instead
-keep_remove_time_period int default=604800
-
-## Maximum number of entries to keep for a single document. Heavy resending
-## within the revert time period may add loads of entries to a file, ruining
-## performance. This setting sets a maximum number of entries for each
-## document. This will make entries potentially live shorter than the revert
-## time period to avoid a resending worst case. A value of 0 means infinite.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_versions_of_single_document_stored int default=0
-
## PERFORMANCE PARAMETERS
-## Number of threads to use for each mountpoint. VDS needs memory per thread
-## to perform disk operations, so increasing this number will increase
-## memory usage, but it will also make the disk queue on a given disk be
-## able to be larger, such that the disk can choose operations to optimize
-## seek time.
-## See benchmarks for performance/memory tradeoff.
-threads[].lowestpri int default=255 restart
+## Number of threads to use for each mountpoint.
+num_threads int default=6 restart
-## Pause operations (and block new ones from starting) with priority
-## lower than this value when executing operations with higher pri than
-## min_priority_to_be_blocking
-max_priority_to_block int default=255 restart
-min_priority_to_be_blocking int default=0 restart
+## When merging, if we find more than this number of documents that exist on all
+## of the same copies, send a separate apply bucket diff with these entries
+## to an optimized merge chain that guarantuees minimum data transfer.
+common_merge_chain_optimalization_minimum_size int default=64 restart
## Chunksize to use while merging buckets between nodes.
##
@@ -169,173 +38,6 @@ min_priority_to_be_blocking int default=0 restart
## while still reading 4k blocks from disk.
bucket_merge_chunk_size int default=4190208 restart
-## When reading a slotfile, one does not know the size of the meta data
-## list, so one have to read a static amount of data, and possibly read more
-## if one didnt read enough. This value needs to be at least 64 byte to read
-## the initial header stating the true size of the meta data list.
-## DEPRECATED: see stor-memfilepersistence config instead
-initial_index_read int default=61440
-
-## Similar to index read, when reading the document identifiers, one does
-## not know the length of the name prior to reading. Thus, if you read less
-## than the size, you will have to do an extra read to get the rest. If you
-## use very long document identifiers you should increase this value to be
-## larger than most of your identifiers.
-## restart flag was added automatically and needs to be verified.
-## DEPRECATED: see stor-memfilepersistence config instead
-initial_name_read int default=512 restart
-
-## When we need to read (or write) multiple entries in a file where we can
-## either read a big enough section to cover all of them. But at some
-## distance between the areas we need, it becomes beneficial to do multiple
-## reads rather than to read over them. This setting set how many sequential
-## bytes we dont need that we allow to be read/written in order to join two
-## logical IO operations together in the application. Setting this low might
-## be ok if system calls are cheap and we manage to queue up next IO
-## operation in time such that the disk dont need to spin an extra round.
-## Setting it high will make the disk more likely to process them together,
-## but the time used to read/write the gap might have been used to do
-## something useful instead.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_gap_to_read_through int default=65536
-
-## Currently not in here as we dont have append functionality yet. Might
-## improve performance in some cases.
-## max_file_appends int default=0
-
-## When writing with direct IO, we need to align blocks to 512b, and to
-## avoid reading we write garbage after each doc to fill 512b block. When
-## resizing or splitting the file we can realign the files such that we
-## remove the gaps of existing data, as we will rewrite everything anyhow.
-## If using very small documents this might improve your disk space
-## utilization. For larger documents it doesnt really reduce much, so might
-## be useful to turn it off to save CPU.
-## DEPRECATED: see stor-memfilepersistence config instead
-remove_file_gaps_on_rewrite bool default=true restart
-
-## The order we try to enforce in file body blocks. Visitors that only need
-## to visit some of the data in a bucket will be able to read less if what
-## it needs to read is located next to each other on disk. However, as we
-## dont enforce order on write operations, this demands that resize/split
-## operations do the resorting, which, if we cant do it all in memory is
-## very expensive. ANY will not do any reordering. Timestamp will enforce
-## timestamp order, which is fairly close to order that will normally be
-## written anyway, so it should be cheap to reorder even if we cant do it
-## all in memory. This might be useful if you often visit subsets based on
-## time. RGID uses reverse GID, which stores data from one location
-## together. This is useful if you want to visit data from one user from
-## buckets that have many users often. This is a much more expensive sort
-## though, and should only be done if we have enough memory.
-## DEPRECATED: see stor-memfilepersistence config instead
-body_block_order enum { ANY, TIMESTAMP, RGID } default=ANY restart
-
-## If set to true, we will refuse to do reordering of memory unless we have
-## enough memory to do it all in memory. See body_block_order comments.
-## DEPRECATED: see stor-memfilepersistence config instead
-only_reorder_body_in_memory bool default=true restart
-
-## Whether or not we should verify checksums of all read data during regular
-## operations like put, get & remove. Note that some operations, like merge
-## and bucket integrity verify operations will still check checksums even if
-## this is set false.
-## DEPRECATED: see stor-memfilepersistence config instead
-verify_checksum_on_regular_load bool default=true restart
-
-## For streaming search, visiting is very performance critical. Thus you can
-## specifically disable checksum verification for visiting.
-## DEPRECATED: see stor-memfilepersistence config instead
-verify_checksum_on_visit bool default=true restart
-
-## Maximum size of index buffer that will be allowed to stay in memory and
-## not being reduced back to this size after we no longer need it.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_sustainable_index_buffer_size int default=1044480 restart
-
-## Maximum size of input buffer that will be allowed to stay in memory and
-## not being reduced back to this size after we no longer need it.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_sustainable_input_buffer_size int default=1044480 restart
-
-## Maximum size of output buffer that will be allowed to stay in memory and
-## not being reduced back to this size after we no longer need it.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_sustainable_output_buffer_size int default=1044480 restart
-
-## Whether to downsize index buffer immediately after usage if its above the
-## maximum size. If not, it will not be resized down until someone requests
-## to use it that needs less data in the buffer. Index buffer is used all
-## the time so there should be little reason for immediately downsizing it.
-## DEPRECATED: see stor-memfilepersistence config instead
-downsize_index_buffer_immediately_after_use bool default=false restart
-
-## Whether to downsize input buffer immediately after usage if its above the
-## maximum size. If not, it will not be resized down until someone requests
-## to use it that needs less data in the buffer. Input buffer is not used
-## that often, so downsizing it immediately might save some memory.
-## DEPRECATED: see stor-memfilepersistence config instead
-downsize_input_buffer_immediately_after_use bool default=true restart
-
-## Whether to downsize output buffer immediately after usage if its above
-## the maximum size. If not, it will not be resized down until someone
-## requests to use it that needs less data in the buffer. Input buffer is
-## not used that often, so downsizing it immediately might save some memory.
-## DEPRECATED: see stor-memfilepersistence config instead
-downsize_output_buffer_immediately_after_use bool default=true restart
-
-## Minimum size of buffer used to write a continuous file. If maximum amount
-## of memory is not available. At least this amount will be allocated.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_continuous_file_write_buffer_size int default=1044480 restart
-
-## Maximum size of buffer used to write a continuous file. If set above max
-## file size we will always write new files in one go, which probably makes
-## for the least chance of getting fragmentation, but will also consume the
-## most memory. Default of writing a MB at a time, should make for little
-## performance loss because of disk seek time, and hopefully get little
-## fragmentation while keeping memory usage down.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_continuous_file_write_buffer_size int default=1044480 restart
-
-## Minimum amount of memory allocated to read source data during join. This
-## amount of memory will be forced allocated.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_join_source_body_read_buffer_size int default=1044480 restart
-
-## This sets the maximum size of the buffer used in join to read source
-## data. Join uses the least IO if this buffer is as big as the body block
-## of the source file. Due to the memory manager, each join might get that
-## much memory though.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_join_source_body_read_buffer_size int default=16773120 restart
-
-## Minimum amount of memory allocated to read source data during export. This
-## amount of memory will be forced allocated.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_export_source_body_read_buffer_size int default=1044480 restart
-
-## This sets the maximum size of the buffer used in export to read source
-## data. Export uses the least IO if this buffer is as big as the body block
-## of the source file. In addition, reordering of body block might not be
-## feasibly unless the buffer is big enough to include the whole body block.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_export_source_body_read_buffer_size int default=33550336 restart
-
-## Minimum size of buffer used to read data during defragmentation.
-## DEPRECATED: see stor-memfilepersistence config instead
-minimum_defrag_source_body_read_buffer_size int default=1044480 restart
-
-## This sets the maximum size of the buffer used in defragmentation to read
-## source data. Defragmentation uses the least IO if this buffer is as big
-## as the body block of the source file, but this might consume some memory.
-## Defragmentation is not enabled by default.
-## DEPRECATED: see stor-memfilepersistence config instead
-maximum_defrag_source_body_read_buffer_size int default=1044480 restart
-
-## When merging, if we find more than this number of documents that exist on all
-## of the same copies, send a separate apply bucket diff with these entries
-## to an optimized merge chain that guarantuees minimum data transfer.
-common_merge_chain_optimalization_minimum_size int default=64 restart
-
## When merging, it is possible to send more metadata than needed in order to
## let local nodes in merge decide which entries fits best to add this time
## based on disk location. Toggle this option on to use it. Note that memory
@@ -344,100 +46,7 @@ common_merge_chain_optimalization_minimum_size int default=64 restart
## fill all.
enable_merge_local_node_choose_docs_optimalization bool default=true restart
-## If set, when we need to cache the entire header, and we already have cached
-## all the metadata, read the metadata to find max header position, and only
-## read the part containing information.
-## DEPRECATED: see stor-memfilepersistence config instead
-read_only_used_header_part_when_possible bool default=true restart
-
-## Enable the slotfile read cache. The cache holds recent metadata and header
-## blocks read from disks, and even if small, is very useful for localized
-## access.
-## DEPRECATED: see stor-memfilepersistence config instead
-enable_slotfile_cache bool default=true restart
-
-## Let the slotfile cache the whole header on a header only operation not
-## needing the entire header, if this amount of header only accesses needing
-## part of the header has already happened.
-##
-## Set very high to begin with to never reduce performance. If you have heavy
-## header only access to some files, you may get better performance by tuning
-## this value.
-## DEPRECATED: see stor-memfilepersistence config instead
-slotfile_precache_header_after_access_count int default=512 restart
-
## Whether or not to enable the multibit split optimalization. This is useful
## if splitting is expensive, but listing document identifiers is fairly cheap.
## This is true for memfile persistence layer, but not for vespa search.
enable_multibit_split_optimalization bool default=true restart
-
-## STORAGE SPACE vs IO/CPU PERFORMANCE OPTIONS
-
-## If true, use direct IO, bypassing OS caches for disk access. This is very
-## useful as VDS does a random distribution dividing load, so it is unlikely
-## that the OS cache will ever hit, and thus it is a huge performance drain.
-## DEPRECATED: see stor-memfilepersistence config instead
-use_direct_io bool default=true restart
-
-## All IO operations will be aligned to this amount of bytes if direct IO is
-## enabled.
-## DEPRECATED: see stor-memfilepersistence config instead
-block_alignment_size int default=512 restart
-
-## When a disk is fuller than this factor, we will not allow more data to be
-## written to the system, unless this data is written in order to reduce
-## storage consumption, such as resizing files to become smaller or add
-## meta entries to write remove entries into. This value is set high as
-## default as we expect a lot of users to have formatted their disks to
-## already reserve 8% of the data to root user which is often default. We
-## suggest using 0% reserved for root, and rather set this parameter lower
-## to reserve space. That way, VDS have more space available in worst case
-## in order to resize files to become smaller.
-## DEPRECATED: see stor-memfilepersistence config instead
-disk_full_factor double default=0.98 restart
-
-## The grow factor sets how much free space to add to a file when we resize
-## it, whether we are making it smaller or larger. If the space we need
-## after the operation triggering the resize is X, then the file will be
-## resized to be of size X * growfactor. In a growing system with no
-## deletes, a growfactor of 2 will make the files have 25% free space on
-## average. Reducing it to 1.5 will reduce the average free space to 16.7%.
-## DEPRECATED: see stor-memfilepersistence config instead
-grow_factor double default=2.0
-
-## If files fall below this fill rate, resize file to become smaller.
-## Note that this parameter is tightly coupled with the growfactor. For
-## instance, with a growfactor of 2.0, a file will contain 50 % free space
-## after a resize. If the min fill rate then is 0.50, that means that if a
-## single doc is deleted from this file, we need to resize it to
-## become smaller straight away.
-## DEPRECATED: see stor-memfilepersistence config instead
-min_fill_rate double default=0.1
-
-## Minimum part of defragmented space one need to reclaim to allow
-## defragmentation of file. This value is given as a ratio of reclaimed
-## space compared to the total size of the data block.
-## Example: A body block of 100 MB, has 15 MB free space, with largest
-## continuos free space of 5 MB. Gain of defragmentation will then be 0.1.
-## DEPRECATED: see stor-memfilepersistence config instead
-defrag_minimum_gain double default=1.0 restart
-
-## When creating/resizing slotfiles, one uses average document sizes to
-## decide how much free space to add to metadata, header and body portions.
-## This option can be used to allocate extra free space to meta data in
-## order to reduce the chance of the file needing resize due to lack of free
-## meta data entries.
-## DEPRECATED: see stor-memfilepersistence config instead
-overrepresent_meta_data_factor double default=1.2
-
-## When creating/resizing slotfiles, one uses average document sizes to
-## decide how much free space to add to metadata, header and body portions.
-## This option can be used to allocate extra free space to header data in
-## order to reduce the chance of the file needing resize due to lack of
-## header block space.
-## DEPRECATED: see stor-memfilepersistence config instead
-overrepresent_header_block_factor double default=1.1
-
-## Load types to use cache for. If empty, cache for all.
-## DEPRECATED: see stor-memfilepersistence config instead
-load_types_to_cache[] string restart
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
index 296a9a3cc0f..f7da854be68 100644
--- a/storage/src/tests/common/testhelper.cpp
+++ b/storage/src/tests/common/testhelper.cpp
@@ -9,23 +9,6 @@ LOG_SETUP(".testhelper");
namespace storage {
-namespace {
- bool useNewStorageCore() {
- if ( // Unit test directory
- vespalib::fileExists("use_new_storage_core") ||
- // src/cpp directory
- vespalib::fileExists("../use_new_storage_core") ||
- // Top build directory where storage-HEAD remains
- vespalib::fileExists("../../../../use_new_storage_core"))
- {
- std::cerr << "Using new storage core for unit tests\n";
- return true;
- }
- return false;
- }
- bool newStorageCore(useNewStorageCore());
-}
-
void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
{
vdstestlib::DirConfig::Config* config;
@@ -124,18 +107,14 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro
config->set("minimum_file_meta_slots", "2");
config->set("minimum_file_header_block_size", "368");
config->set("minimum_file_size", "4096");
- config->set("threads[1]");
- config->set("threads[0].lowestpri 255");
+ config->set("num_threads", "1");
config->set("dir_spread", "4");
config->set("dir_levels", "0");
- config->set("use_new_core", newStorageCore ? "true" : "false");
config->set("maximum_versions_of_single_document_stored", "0");
//config->set("enable_slotfile_cache", "false");
// Unit tests typically use fake low time values, so don't complain
// about them or compact/delete them by default. Override in tests testing that
// behavior
- config->set("time_future_limit", "5");
- config->set("time_past_limit", "2000000000");
config->set("keep_remove_time_period", "2000000000");
config->set("revert_time_period", "2000000000");
// Don't want test to call exit()
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 8d03c9de54c..26bfeb41b42 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -7,17 +7,10 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/storageserver/statemanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/document/update/assignvalueupdate.h>
-#include <vespa/document/datatype/datatype.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldvalue/rawfieldvalue.h>
-#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/select/parser.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/storageapi/message/bucketsplitting.h>
@@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testFlush();
void testRemapSplit();
void testHandlerPriority();
- void testHandlerPriorityBlocking();
- void testHandlerPriorityPreempt();
void testHandlerMulti();
void testHandlerTimeout();
void testHandlerPause();
@@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testMergeBucketImplicitCreateBucket();
void testNewlyCreatedBucketIsReady();
void testCreateBucketSetsActiveFlagInDatabaseAndReply();
- void testFileStorThreadLockingStressTest();
void testStateChange();
void testRepairNotifiesDistributorOnChange();
void testDiskMove();
@@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testFlush);
CPPUNIT_TEST(testRemapSplit);
CPPUNIT_TEST(testHandlerPriority);
- CPPUNIT_TEST(testHandlerPriorityBlocking);
- CPPUNIT_TEST(testHandlerPriorityPreempt);
CPPUNIT_TEST(testHandlerMulti);
CPPUNIT_TEST(testHandlerTimeout);
CPPUNIT_TEST(testHandlerPause);
@@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void createBucket(document::BucketId bid, uint16_t disk)
{
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bid, spi::PartitionId(disk)), context);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context);
StorBucketDatabase::WrappedEntry entry(
- _node->getStorageBucketDatabase().get(bid, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
+ _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
entry->disk = disk;
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
entry.write();
}
- document::Document::UP createDocument(
- const std::string& content, const std::string& id)
+ document::Document::UP createDocument(const std::string& content, const std::string& id)
{
return _node->getTestDocMan().createDocument(content, id);
}
@@ -265,15 +249,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
+ uint16_t deviceIndex)
{
(void) config;
std::unique_ptr<DiskThread> disk;
disk.reset(new PersistenceThread(
node.getComponentRegister(), config.getConfigId(), provider,
filestorHandler, metrics,
- deviceIndex, lowestPriority));
+ deviceIndex));
return disk;
}
@@ -629,39 +612,31 @@ FileStorManagerTest::testHandlerPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- Document::SP doc(createDocument(
- content, "userdoc:footype:1234:bar").release());
+ Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
class MessagePusherThread : public document::Runnable
@@ -678,11 +653,9 @@ public:
void run() override {
while (!_done) {
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- _doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), _doc, 100));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100);
_handler.schedule(cmd, 0);
FastOS_Thread::Sleep(1);
}
@@ -694,7 +667,7 @@ public:
MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc)
: _handler(handler), _doc(doc), _done(false), _threadDone(false)
{}
-MessagePusherThread::~MessagePusherThread() {}
+MessagePusherThread::~MessagePusherThread() = default;
class MessageFetchingThread : public document::Runnable {
public:
@@ -711,7 +684,7 @@ public:
void run() override {
while (!_done) {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
if (msg.second.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
@@ -747,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -799,8 +771,7 @@ FileStorManagerTest::testHandlerPause()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -810,29 +781,26 @@ FileStorManagerTest::testHandlerPause()
document::BucketIdFactory factory;
document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL);
+ CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL);
}
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
namespace {
@@ -855,8 +823,7 @@ FileStorManagerTest::testRemapSplit()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
// Since we fake time with small numbers, we need to make sure we dont
@@ -867,7 +834,7 @@ FileStorManagerTest::testRemapSplit()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -882,10 +849,8 @@ FileStorManagerTest::testRemapSplit()
// Populate bucket with the given data
for (uint32_t i = 1; i < 4; i++) {
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0);
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0);
}
CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n"
@@ -933,7 +898,7 @@ FileStorManagerTest::testHandlerMulti()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -943,10 +908,8 @@ FileStorManagerTest::testHandlerMulti()
Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket1(16, factory.getBucketId(
- doc1->getId()).getRawId());
- document::BucketId bucket2(16, factory.getBucketId(
- doc2->getId()).getRawId());
+ document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId());
+ document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 10; i++) {
@@ -957,21 +920,21 @@ FileStorManagerTest::testHandlerMulti()
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second));
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second));
}
}
@@ -984,8 +947,7 @@ FileStorManagerTest::testHandlerTimeout()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -997,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -1036,7 +998,7 @@ FileStorManagerTest::testHandlerTimeout()
FastOS_Thread::Sleep(51);
for (;;) {
- auto lock = filestorHandler.getNextMessage(0, 255);
+ auto lock = filestorHandler.getNextMessage(0);
if (lock.first.get()) {
CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority());
break;
@@ -1050,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout()
}
void
-FileStorManagerTest::testHandlerPriorityBlocking()
-{
- TestName testName("testHandlerPriorityBlocking");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- // Populate bucket with the given data
- for (uint32_t i = 1; i < 6; i++) {
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(i * 15);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock2.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40);
- CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority());
-
- // New high-pri message comes in
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(15);
- filestorHandler.schedule(cmd, 0);
-
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock3.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255);
- CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority());
-
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
- CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority());
- }
- LOG(debug, "Test done");
-}
-
-class PausedThread : public document::Runnable {
-private:
- FileStorHandler& _handler;
-
-public:
- bool pause;
- bool done;
- bool gotoperation;
-
- PausedThread(FileStorHandler& handler)
- : _handler(handler), pause(false), done(false), gotoperation(false) {}
-
- void run() override {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
- gotoperation = true;
-
- while (!done) {
- if (pause) {
- _handler.pause(0, msg.second->getPriority());
- pause = false;
- }
- FastOS_Thread::Sleep(10);
- }
-
- done = false;
- };
-};
-
-void
-FileStorManagerTest::testHandlerPriorityPreempt()
-{
- TestName testName("testHandlerPriorityPreempt");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- {
- Document::SP doc(createDocument(content, "doc:foo:1").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(60);
- filestorHandler.schedule(cmd, 0);
- }
-
- PausedThread thread(filestorHandler);
- FastOS_ThreadPool pool(512 * 1024);
- thread.start(pool);
-
- while (!thread.gotoperation) {
- FastOS_Thread::Sleep(10);
- }
-
- {
- Document::SP doc(createDocument(content, "doc:foo:2").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(20);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority());
-
- thread.pause = true;
-
- for (uint32_t i = 0; i < 10; i++) {
- CPPUNIT_ASSERT(thread.pause);
- FastOS_Thread::Sleep(100);
- }
- }
-
- while (thread.pause) {
- FastOS_Thread::Sleep(10);
- }
-
- thread.done = true;
-
- while (thread.done) {
- FastOS_Thread::Sleep(10);
- }
-}
-
-void
FileStorManagerTest::testPriority()
{
TestName testName("testPriority");
@@ -1269,14 +1029,13 @@ FileStorManagerTest::testPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 25));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
std::unique_ptr<DiskThread> thread2(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[1], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[1], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -1284,8 +1043,7 @@ FileStorManagerTest::testPriority()
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i;
Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
@@ -1294,20 +1052,16 @@ FileStorManagerTest::testPriority()
// Create buckets in separate, initial pass to avoid races with puts
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bucket), context);
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context);
}
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
std::shared_ptr<api::PutCommand> cmd(
new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
@@ -1342,9 +1096,8 @@ FileStorManagerTest::testPriority()
CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()),
metrics.disks[0]->threads[0]->operations.getValue()
+ metrics.disks[0]->threads[1]->operations.getValue());
- CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13);
- // Closing file stor handler before threads are deleted, such that
- // file stor threads getNextMessage calls returns.
+ // Closing file stor handler before threads are deleted, such that
+ // file stor threads getNextMessage calls returns.
filestorHandler.close();
}
@@ -1363,11 +1116,10 @@ FileStorManagerTest::testSplit1()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1532,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
// splitbit set, and once where all the data ends up in file with
@@ -1544,7 +1294,7 @@ FileStorManagerTest::testSplitSingleGroup()
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1673,11 +1423,10 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(16, 0x10001);
@@ -1751,11 +1500,10 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1801,21 +1549,18 @@ FileStorManagerTest::testJoin()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
- Document::SP doc(createDocument(
- content, uri.str()).release());
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i;
+ Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
document::BucketIdFactory factory;
@@ -1826,36 +1571,26 @@ FileStorManagerTest::testJoin()
{
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(17, factory.getBucketId(
- documents[i]->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId());
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
filestorHandler.schedule(cmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::PutReply> reply(
- std::dynamic_pointer_cast<api::PutReply>(
- top.getReply(0)));
+ auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0));
CPPUNIT_ASSERT(reply.get());
- CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
- reply->getResult());
+ CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult());
top.reset();
// Delete every 5th document to have delete entries in file too
if (i % 5 == 0) {
- std::shared_ptr<api::RemoveCommand> rcmd(
- new api::RemoveCommand(
- makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i));
+ auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket),
+ documents[i]->getId(), 1000000 + 100 + i);
rcmd->setAddress(*address);
filestorHandler.schedule(rcmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::RemoveReply> rreply(
- std::dynamic_pointer_cast<api::RemoveReply>(
- top.getReply(0)));
+ auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0));
CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(),
rreply.get());
CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 95642ac6215..64ef48b5719 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -43,10 +43,8 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const document::Document::SP& doc,
- spi::Context& context) override
+ spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp,
+ const document::Document::SP& doc, spi::Context& context) override
{
(void) bucket;
(void) timestamp;
@@ -90,26 +88,20 @@ public:
void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) {
FileStorTestFixture::setupDisks(diskCount);
- _dummyProvider.reset(new spi::dummy::DummyPersistence(
- _node->getTypeRepo(), diskCount));
+ _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount));
_queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads));
_completionBarrier.reset(new vespalib::Barrier(2));
- _blockingProvider = new BlockingMockProvider(*_dummyProvider,
- *_queueBarrier, *_completionBarrier);
-
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(_blockingProvider));
+ _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier);
+ _node->setPersistenceProvider(spi::PersistenceProvider::UP(_blockingProvider));
}
- void validateReplies(DummyStorageLink& link,
- size_t repliesTotal,
+ void validateReplies(DummyStorageLink& link, size_t repliesTotal,
const std::vector<document::BucketId>& okReplies,
const std::vector<document::BucketId>& abortedGetDiffs);
- void doTestSpecificOperationsNotAborted(
- const char* testName,
- const std::vector<api::StorageMessage::SP>& msgs,
- bool shouldCreateBucketInitially);
+ void doTestSpecificOperationsNotAborted(const char* testName,
+ const std::vector<api::StorageMessage::SP>& msgs,
+ bool shouldCreateBucketInitially);
api::BucketInfo getBucketInfoFromDB(const document::BucketId&) const;
@@ -138,8 +130,7 @@ namespace {
template <typename T, typename Collection>
bool
existsIn(const T& elem, const Collection& collection) {
- return (std::find(collection.begin(), collection.end(), elem)
- != collection.end());
+ return (std::find(collection.begin(), collection.end(), elem) != collection.end());
}
}
@@ -150,18 +141,15 @@ OperationAbortingTest::setUp()
}
void
-OperationAbortingTest::validateReplies(
- DummyStorageLink& link,
- size_t repliesTotal,
- const std::vector<document::BucketId>& okReplies,
- const std::vector<document::BucketId>& abortedGetDiffs)
+OperationAbortingTest::validateReplies(DummyStorageLink& link, size_t repliesTotal,
+ const std::vector<document::BucketId>& okReplies,
+ const std::vector<document::BucketId>& abortedGetDiffs)
{
link.waitForMessages(repliesTotal, MSG_WAIT_TIME);
CPPUNIT_ASSERT_EQUAL(repliesTotal, link.getNumReplies());
for (uint32_t i = 0; i < repliesTotal; ++i) {
- api::StorageReply& reply(
- dynamic_cast<api::StorageReply&>(*link.getReply(i)));
+ api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*link.getReply(i)));
LOG(info, "Checking reply %s", reply.toString(true).c_str());
switch (static_cast<uint32_t>(reply.getType().getId())) {
case api::MessageType::PUT_REPLY_ID:
@@ -222,11 +210,8 @@ template <typename Container>
AbortBucketOperationsCommand::SP
makeAbortCmd(const Container& buckets)
{
- std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred(
- new ExplicitBucketSetPredicate(buckets.begin(), buckets.end()));
- AbortBucketOperationsCommand::SP cmd(
- new AbortBucketOperationsCommand(std::move(pred)));
- return cmd;
+ auto pred = std::make_unique<ExplicitBucketSetPredicate>(buckets.begin(), buckets.end());
+ return std::make_shared<AbortBucketOperationsCommand>(std::move(pred));
}
}
@@ -320,8 +305,7 @@ public:
void
OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
{
- uint32_t queueBarrierThreads = 3;
- setupDisks(1, queueBarrierThreads);
+ setupDisks(1, 3);
TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket");
document::BucketId bucket(16, 1);
@@ -350,10 +334,8 @@ OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
// reply, as it must finish processing fully before the abort returns.
c.top.waitForMessages(2, MSG_WAIT_TIME);
CPPUNIT_ASSERT_EQUAL(size_t(2), c.top.getNumReplies());
- CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY,
- c.top.getReply(0)->getType());
- CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY,
- c.top.getReply(1)->getType());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, c.top.getReply(0)->getType());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, c.top.getReply(1)->getType());
}
void
@@ -364,10 +346,7 @@ OperationAbortingTest::testDoNotAbortCreateBucketCommands()
msgs.push_back(api::StorageMessage::SP(new api::CreateBucketCommand(makeDocumentBucket(bucket))));
bool shouldCreateBucketInitially(false);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortCreateBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortCreateBucketCommands", msgs, shouldCreateBucketInitially);
}
void
@@ -378,18 +357,14 @@ OperationAbortingTest::testDoNotAbortRecheckBucketCommands()
msgs.push_back(api::StorageMessage::SP(new RecheckBucketInfoCommand(makeDocumentBucket(bucket))));
bool shouldCreateBucketInitially(true);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortRecheckBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially);
}
api::BucketInfo
OperationAbortingTest::getBucketInfoFromDB(const document::BucketId& id) const
{
StorBucketDatabase::WrappedEntry entry(
- _node->getStorageBucketDatabase().get(id, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
+ _node->getStorageBucketDatabase().get(id, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
CPPUNIT_ASSERT(entry.exist());
return entry->info;
}
@@ -403,20 +378,15 @@ OperationAbortingTest::testDoNotAbortDeleteBucketCommands()
msgs.push_back(cmd);
bool shouldCreateBucketInitially(true);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortRecheckBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially);
}
void
-OperationAbortingTest::doTestSpecificOperationsNotAborted(
- const char* testName,
- const std::vector<api::StorageMessage::SP>& msgs,
- bool shouldCreateBucketInitially)
+OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName,
+ const std::vector<api::StorageMessage::SP>& msgs,
+ bool shouldCreateBucketInitially)
{
- uint32_t queueBarrierThreads = 2;
- setupDisks(1, queueBarrierThreads);
+ setupDisks(1, 2);
TestFileStorComponents c(*this, testName);
document::BucketId bucket(16, 1);
document::BucketId blockerBucket(16, 2);
@@ -443,8 +413,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
break;
case api::MessageType::DELETEBUCKET_ID:
{
- api::DeleteBucketCommand& delCmd(
- dynamic_cast<api::DeleteBucketCommand&>(*msgs[i]));
+ api::DeleteBucketCommand& delCmd(dynamic_cast<api::DeleteBucketCommand&>(*msgs[i]));
delCmd.setBucketInfo(getBucketInfoFromDB(delCmd.getBucketId()));
}
++expectedDeleteBuckets;
@@ -473,8 +442,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
c.sendDummyGet(blockerBucket);
// put+abort+get + any other creates/deletes/rechecks
- size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets
- + expectedRecheckReplies);
+ size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets + expectedRecheckReplies);
LOG(info, "barrier passed, waiting for %zu replies", expectedMsgs);
std::vector<document::BucketId> okReplies;
@@ -483,13 +451,10 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
std::vector<document::BucketId> abortedGetDiffs;
validateReplies(c.top, expectedMsgs, okReplies, abortedGetDiffs);
- CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations,
- _blockingProvider->_bucketInfoInvocations);
+ CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, _blockingProvider->_bucketInfoInvocations);
CPPUNIT_ASSERT_EQUAL(expectedCreateBuckets + (shouldCreateBucketInitially ? 2 : 1),
_blockingProvider->_createBucketInvocations);
- CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets,
- _blockingProvider->_deleteBucketInvocations);
+ CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, _blockingProvider->_deleteBucketInvocations);
}
-
-
+
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 77e0ed6b71c..f5f190ad718 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
metrics.initDiskMetrics(_node->getPartitions().size(),
loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics,
- _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
@@ -89,17 +87,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
filestorHandler.schedule(createPut(1234, 1), 0);
filestorHandler.schedule(createPut(5432, 0), 0);
- auto lock0 = filestorHandler.getNextMessage(0, 255);
+ auto lock0 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock0.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
+ dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, 255);
+ auto lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock1.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
+ dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 9d3a3d5f008..d46708b3aaa 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -52,21 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
_node.setupDummyPersistence();
_metrics.initDiskMetrics(
numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1);
- _handler.reset(new FileStorHandler(
- _messageKeeper, _metrics,
- _node.getPersistenceProvider().getPartitionStates().getList(),
- _node.getComponentRegister(), 255, 0));
+ _handler.reset(new FileStorHandler(_messageKeeper, _metrics,
+ _node.getPersistenceProvider().getPartitionStates().getList(),
+ _node.getComponentRegister()));
for (uint32_t i = 0; i < numDisks; i++) {
_diskEnvs.push_back(
- std::unique_ptr<PersistenceUtil>(
- new PersistenceUtil(
- _config.getConfigId(),
- _node.getComponentRegister(),
- *_handler,
- *_metrics.disks[i]->threads[0],
- i,
- 255,
- _node.getPersistenceProvider())));
+ std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
+ *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()));
}
}
@@ -79,8 +71,7 @@ PersistenceTestUtils::~PersistenceTestUtils()
}
std::string
-PersistenceTestUtils::dumpBucket(const document::BucketId& bid,
- uint16_t disk) {
+PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) {
return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk)));
}
@@ -92,14 +83,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) {
std::unique_ptr<PersistenceThread>
PersistenceTestUtils::createPersistenceThread(uint32_t disk)
{
- return std::unique_ptr<PersistenceThread>(
- new PersistenceThread(_env->_node.getComponentRegister(),
- _env->_config.getConfigId(),
- getPersistenceProvider(),
- getEnv()._fileStorHandler,
- getEnv()._metrics,
- disk,
- 255));
+ return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(),
+ getPersistenceProvider(), getEnv()._fileStorHandler,
+ getEnv()._metrics, disk);
}
document::Document::SP
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 8636628c7cc..b53c8c270f2 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,21 +14,16 @@ using namespace storage::api;
namespace storage {
-StorageLink::~StorageLink()
-{
-}
+StorageLink::~StorageLink() = default;
void StorageLink::push_back(StorageLink::UP link)
{
if (_state != CREATED) {
- LOG(error, "Attempted to alter chain by adding link %s after link %s "
- "while state is %s",
- link->toString().c_str(),
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s",
+ link->toString().c_str(), toString().c_str(), stateToString(_state));
assert(false);
}
- assert(link.get());
+ assert(link);
if (isBottom()) {
link->_up = this;
_down = std::move(link);
@@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link)
void StorageLink::open()
{
- // First tag all states as opened, as components are allowed to send
- // messages both ways in onOpen call, in case any component send message
- // up, the link receiving them should have their state as opened.
+ // First tag all states as opened, as components are allowed to send
+ // messages both ways in onOpen call, in case any component send message
+ // up, the link receiving them should have their state as opened.
StorageLink* link = this;
while (true) {
if (link->_state != CREATED) {
- LOG(error, "During open(), link %s should be in CREATED state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(link->_state));
+ LOG(error, "During open(), link %s should be in CREATED state, not in state %s.",
+ toString().c_str(), stateToString(link->_state));
assert(false);
}
link->_state = OPENED;
if (link->_down.get() == 0) break;
link = link->_down.get();
}
- // When give all links an onOpen call, bottoms up. Do it bottoms up, as
- // links are more likely to send messages down in their onOpen() call
- // than up. Thus, chances are best that the component is ready to
- // receive messages sent during onOpen().
+ // When give all links an onOpen call, bottoms up. Do it bottoms up, as
+ // links are more likely to send messages down in their onOpen() call
+ // than up. Thus, chances are best that the component is ready to
+ // receive messages sent during onOpen().
while (link != 0) {
link->onOpen();
link = link->_up;
@@ -91,34 +84,31 @@ void StorageLink::closeNextLink() {
void StorageLink::flush()
{
if (_state != CLOSING) {
- LOG(error, "During flush(), link %s should be in CLOSING state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.",
+ toString().c_str(), stateToString(_state));
assert(false);
}
- // First flush down to get all requests out of the system.
+ // First flush down to get all requests out of the system.
_state = FLUSHINGDOWN;
LOG(debug, "Flushing link %s on the way down.", toString().c_str());
onFlush(true);
LOG(debug, "Flushed link %s on the way down.", toString().c_str());
if (!isBottom()) {
_down->flush();
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
} else {
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
}
_state = CLOSED;
- LOG(debug, "Link %s is now closed and should do nothing more.",
- toString().c_str());
+ LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str());
}
void StorageLink::sendDown(const StorageMessage::SP& msg)
@@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case FLUSHINGDOWN:
break;
default:
- LOG(error,
- "Link %s trying to send %s down while in state %s",
- toString().c_str(),
- msg->toString().c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s down while in state %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
- LOG(spam, "Storage Link %s to handle %s",
- toString().c_str(), msg->toString().c_str());
+ LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
- LOG(spam, "Storage link %s at bottom of chain got message %s.",
- toString().c_str(), msg->toString().c_str());
- /*
- if (isFlush(msg)) {
+ LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
+ ostringstream ost;
+ ost << "Unhandled message at bottom of chain " << *msg << " (message type "
+ << msg->getType().getName() << "). " << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendUp(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at bottom of chain "
- << *msg << " (message type "
- << msg->getType().getName()
- << "). "
- << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendUp(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_down->onDown(msg)) {
- //LOG(spam, "Storage link %s forwarding message %s.",
- // toString().c_str(), msg->toString().c_str());
_down->sendDown(msg);
} else {
LOG(spam, "Storage link %s handled message %s.",
@@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
{
- // Verify acceptable state to send messages up
+ // Verify acceptable state to send messages up
switch(_state) {
case OPENED:
case CLOSING:
@@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error,
- "Link %s trying to send %s up while in state %s",
- toString().c_str(),
- msg->toString(true).c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s up while in state %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
if (isTop()) {
- /*
- if (isFlush(msg)) {
+ ostringstream ost;
+ ost << "Unhandled message at top of chain " << *msg << ".";
+ ost << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendDown(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at top of chain " << *msg << ".";
- ost << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendDown(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_up->onUp(msg)) {
_up->sendUp(msg);
}
@@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const {
bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg)
{
- //LOG(spam, "Checking if storage link %s handles %s.",
- // toString().c_str(), msg->toString().c_str());
- bool result = msg->callHandler(*this, msg);
- /*
- if (result) {
- LOG(spam, "Storage link %s handled message %s.",
- toString().c_str(), msg->toString().c_str());
- } else {
- LOG(spam, "Storage link %s did not handle message %s.",
- toString().c_str(), msg->toString().c_str());
- }
- */
- return result;
+ return msg->callHandler(*this, msg);
}
bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg)
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index e23bfda192c..949ceb901e1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -7,12 +7,8 @@ namespace storage {
FileStorHandler::FileStorHandler(MessageSender& sender,
FileStorMetrics& metrics,
const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
- : _impl(new FileStorHandlerImpl(
- sender, metrics, partitions, compReg,
- maxPriorityToBlock, minPriorityToBeBlocking))
+ ServiceLayerComponentRegister& compReg)
+ : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg))
{
}
@@ -57,23 +53,16 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread)
return _impl->schedule(msg, thread);
}
-void
-FileStorHandler::pause(uint16_t disk, uint8_t priority) const {
- return _impl->pause(disk, priority);
-}
-
FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread)
{
- return _impl->getNextMessage(thread, lowestPriority);
+ return _impl->getNextMessage(thread);
}
FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t thread,
- LockedMessage& lck,
- uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck)
{
- return _impl->getNextMessage(thread, lck, lowestPriority);
+ return _impl->getNextMessage(thread, lck);
}
FileStorHandler::BucketLockInterface::SP
@@ -89,30 +78,23 @@ FileStorHandler::remapQueueAfterDiskMove(
{
RemapInfo target(bucket, targetDisk);
- _impl->remapQueue(RemapInfo(bucket, sourceDisk), target,
- FileStorHandlerImpl::MOVE);
+ _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE);
}
void
-FileStorHandler::remapQueueAfterJoin(
- const RemapInfo& source,
- RemapInfo& target)
+FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target)
{
_impl->remapQueue(source, target, FileStorHandlerImpl::JOIN);
}
void
-FileStorHandler::remapQueueAfterSplit(
- const RemapInfo& source,
- RemapInfo& target1,
- RemapInfo& target2)
+FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2)
{
_impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT);
}
void
-FileStorHandler::failOperations(const document::Bucket &bucket,
- uint16_t fromDisk, const api::ReturnCode& err)
+FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
{
_impl->failOperations(bucket, fromDisk, err);
}
@@ -130,8 +112,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg)
}
void
-FileStorHandler::getStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
_impl->getStatus(out, path);
}
@@ -149,8 +130,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const
}
void
-FileStorHandler::addMergeStatus(const document::Bucket& bucket,
- MergeStatus::SP ms)
+FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms)
{
return _impl->addMergeStatus(bucket, ms);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 6e2d6a0fc07..2cfc97ca71b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -15,7 +15,6 @@
#include "mergestatus.h"
#include <vespa/document/bucket/bucket.h>
-#include <ostream>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
@@ -72,12 +71,7 @@ public:
CLOSED
};
- FileStorHandler(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandler();
// Commands used by file stor manager
@@ -115,32 +109,19 @@ public:
* Schedule a storage message to be processed by the given disk
* @return True if we maanged to schedule operation. False if not
*/
- bool schedule(const std::shared_ptr<api::StorageMessage>&,
- uint16_t disk);
-
- // Commands used by file stor threads
-
- /**
- * When called, checks if any running operations have "preempting"
- * priority. If so, and the given priority is less than that, this call
- * will hang until the other operation is done.
- */
- void pause(uint16_t disk, uint8_t priority) const;
+ bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
/**
* Used by file stor threads to get their next message to process.
*
* @param disk The disk to get messages for
- * @param lowestPriority The lowest priority of operation we should return
*/
- LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ LockedMessage getNextMessage(uint16_t disk);
/**
* Returns the next message for the same bucket.
*/
- LockedMessage & getNextMessage(uint16_t disk,
- LockedMessage& lock,
- uint8_t lowestPriority);
+ LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock);
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -219,8 +200,7 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::Bucket&, uint16_t fromDisk,
- const api::ReturnCode&);
+ void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
/**
* Add a new merge state to the registry.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c8b5c71ee2e..5eb168a9a42 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -23,20 +23,14 @@ using document::BucketSpace;
namespace storage {
-FileStorHandlerImpl::FileStorHandlerImpl(
- MessageSender& sender,
- FileStorMetrics& metrics,
- const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
+FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+ const spi::PartitionStateList& partitions,
+ ServiceLayerComponentRegister& compReg)
: _partitions(partitions),
_component(compReg, "filestorhandlerimpl"),
_diskInfo(_component.getDiskCount()),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
- _maxPriorityToBlock(maxPriorityToBlock),
- _minPriorityToBeBlocking(minPriorityToBeBlocking),
_getNextMessageTimeout(100),
_paused(false)
{
@@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl(
}
if (_diskInfo.size() == 0) {
- throw vespalib::IllegalArgumentException(
- "No disks configured", VESPA_STRLOC);
+ throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC);
}
// Add update hook, so we will get callbacks each 5 seconds to update
// metrics.
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
}
-FileStorHandlerImpl::~FileStorHandlerImpl() { }
+FileStorHandlerImpl::~FileStorHandlerImpl() = default;
void
FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status)
{
vespalib::LockGuard mlock(_mergeStatesLock);
if (_mergeStates.find(bucket) != _mergeStates.end()) {;
- LOG(warning, "A merge status already existed for %s. Overwriting it.",
- bucket.toString().c_str());
+ LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str());
}
_mergeStates[bucket] = status;
}
@@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
vespalib::LockGuard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
if (status.get() == 0) {
- throw vespalib::IllegalStateException(
- "No merge state exist for " + bucket.toString(), VESPA_STRLOC);
+ throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
return *status;
}
@@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
-FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket,
- const api::ReturnCode* code)
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code)
{
vespalib::LockGuard mlock(_mergeStatesLock);
auto it = _mergeStates.find(bucket);
@@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED,
"Storage node is shutting down");
- for (std::map<document::Bucket, MergeStatus::SP>::iterator it
- = _mergeStates.begin(); it != _mergeStates.end(); ++it)
+ for (auto & entry : _mergeStates)
{
- MergeStatus& s(*it->second);
+ MergeStatus& s(*entry.second);
if (s.pendingGetDiff.get() != 0) {
s.pendingGetDiff->setResult(code);
_messageSender.sendReply(s.pendingGetDiff);
@@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg,
std::shared_ptr<api::StorageReply> rep(
static_cast<api::StorageCommand&>(msg).makeReply().release());
if (state == FileStorHandler::DISABLED) {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::DISK_FAILURE, "Disk disabled"));
+ rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"));
} else {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::ABORTED, "Shutting down storage node."));
+ rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
}
_messageSender.sendReply(rep);
}
@@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
return true;
}
-void
-FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const {
- if (priority < _maxPriorityToBlock) {
- return;
- }
-
- assert(disk < _diskInfo.size());
- const Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
-
- bool paused = true;
- while (paused) {
- paused = false;
- for (auto& lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- paused = true;
- lockGuard.wait();
- break;
- }
- }
- }
-}
-
bool
FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
{
@@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations(
}
}
-bool
-FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const
-{
- for (auto& lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- return true;
- }
- }
-
- return false;
-}
-
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
@@ -419,16 +371,11 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
}
FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t disk,
- FileStorHandler::LockedMessage& lck,
- uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck)
{
document::Bucket bucket(lck.first->getBucket());
- LOG(spam,
- "Disk %d retrieving message for buffered bucket %s",
- disk,
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str());
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
@@ -451,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
api::StorageMessage & m(*range.first->_command);
mbus::Trace& trace = m.getTrace();
- // Priority is too low, not buffering any more.
- if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) {
- lck.second.reset();
- return lck;
- }
+ MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket.");
- MBUS_TRACE(trace, 9,
- "FileStorHandler: Message identified by disk thread looking for "
- "more requests to active bucket.");
+ uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
+ t.metrics->averageQueueWaitingTime[m.getLoadType()]));
- uint64_t waitTime(
- const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
- t.metrics->averageQueueWaitingTime[m.getLoadType()]));
-
- LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), "
- "timeout %d",
+ LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d",
m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(),
static_cast<api::StorageCommand&>(m).getTimeout());
@@ -480,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
lockGuard.unlock();
return lck;
} else {
- std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(m)
- .makeReply().release());
+ std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release());
idx.erase(range.first);
lockGuard.broadcast();
lockGuard.unlock();
- msgReply->setResult(api::ReturnCode(
- api::ReturnCode::TIMEOUT,
- "Message waited too long in storage queue"));
+ msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
_messageSender.sendReply(msgReply);
lck.second.reset();
@@ -517,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::operationBlockedByHigherPriorityThread(
- const api::StorageMessage& msg,
- const Disk& disk) const
-{
- return ((msg.getPriority() >= _maxPriorityToBlock)
- && hasBlockingOperations(disk));
-}
-
-bool
FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
uint64_t waitTime) const
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
}
- return (waitTime >= static_cast<const api::StorageCommand&>(
- msg).getTimeout());
+ return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout());
}
std::unique_ptr<FileStorHandler::BucketLockInterface>
@@ -543,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership(
const document::Bucket &bucket,
const api::StorageMessage& msg)
{
- return std::unique_ptr<FileStorHandler::BucketLockInterface>(
- new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary()));
+ return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary());
}
std::unique_ptr<api::StorageReply>
@@ -564,23 +486,10 @@ namespace {
bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) {
return (id.getBucketId().getRawId() != 0 && t.isLocked(id));
}
-
- /**
- * Return whether msg has sufficiently high priority that a thread with
- * a configured priority threshold of maxPriority can even run in.
- * Often, operations such as streaming searches will have dedicated threads
- * that refuse lower priority operations such as Puts etc.
- */
- bool
- operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority)
- {
- // NOTE: priority integral value 0 is considered highest pri.
- return (msg.getPriority() <= maxPriority);
- }
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk)
{
assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
@@ -602,12 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
iter++;
}
if (iter != end) {
- api::StorageMessage &m(*iter->_command);
-
- if (operationHasHighEnoughPriorityToBeRun(m, maxPriority)
- && ! operationBlockedByHigherPriorityThread(m, t)
- && ! isPaused())
- {
+ if (! isPaused()) {
return getMessage(lockGuard, t, idx, iter);
}
}
@@ -655,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk)
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
- LOG(spam,
- "Acquiring filestor lock for %s on disk %d",
- bucket.getBucketId().toString().c_str(),
- disk);
+ LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk);
vespalib::MonitorGuard lockGuard(t.lock);
while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) {
- LOG(spam,
- "Contending for filestor lock for %s",
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
lockGuard.wait(100);
}
- std::shared_ptr<FileStorHandler::BucketLockInterface> locker(
- new BucketLock(lockGuard, t, bucket, 255, "External lock"));
+ auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock");
lockGuard.broadcast();
return locker;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 56ac9ea0577..6b6d154e149 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -142,12 +142,8 @@ public:
document::Bucket _bucket;
};
- FileStorHandlerImpl(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandlerImpl(MessageSender&, FileStorMetrics&,
+ const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; }
@@ -158,12 +154,10 @@ public:
void close();
bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
- void pause(uint16_t disk, uint8_t priority) const;
- FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ FileStorHandler::LockedMessage getNextMessage(uint16_t disk);
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock,
- uint8_t lowestPriority);
+ FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock);
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
@@ -204,8 +198,6 @@ private:
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
- uint8_t _maxPriorityToBlock;
- uint8_t _minPriorityToBeBlocking;
uint32_t _getNextMessageTimeout;
vespalib::Monitor _pauseMonitor;
@@ -237,12 +229,6 @@ private:
bool diskIsClosed(uint16_t disk) const;
/**
- * Return whether an already running high priority operation pre-empts
- * (blocks) the operation in msg from even starting in the current thread.
- */
- bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const;
-
- /**
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
@@ -262,7 +248,6 @@ private:
*/
std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
bool messageMayBeAborted(const api::StorageMessage& msg) const;
- bool hasBlockingOperations(const Disk& t) const;
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const;
void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
@@ -287,4 +272,3 @@ private:
};
} // storage
-
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 6644d09da7f..b4735c2961a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -29,10 +29,8 @@ using document::BucketSpace;
namespace storage {
FileStorManager::
-FileStorManager(const config::ConfigUri & configUri,
- const spi::PartitionStateList& partitions,
- spi::PersistenceProvider& provider,
- ServiceLayerComponentRegister& compReg)
+FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions,
+ spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg)
: StorageLinkQueued("File store manager", compReg),
framework::HtmlStatusReporter("filestorman", "File store manager"),
_compReg(compReg),
@@ -83,8 +81,7 @@ FileStorManager::~FileStorManager()
}
}
}
- LOG(debug, "Closing all filestor queues, answering queued messages. "
- "New messages will be refused.");
+ LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused.");
_filestorHandler->close();
LOG(debug, "Deleting filestor threads. Waiting for their current operation "
"to finish. Stop their threads and delete objects.");
@@ -116,38 +113,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_disks.resize(_component.getDiskCount());
- _metrics->initDiskMetrics(
- _disks.size(),
- _component.getLoadTypes()->getMetricLoadTypes(),
- (_config->threads.size() > 0) ? (_config->threads.size()) : 6);
+ size_t numThreads = _config->numThreads;
+ _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads);
- _filestorHandler.reset(new FileStorHandler(
- *this, *_metrics, _partitions, _compReg,
- _config->maxPriorityToBlock, _config->minPriorityToBeBlocking));
+ _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
- if (_config->threads.size() == 0) {
- LOG(spam, "Setting up disk %u", i);
- for (uint32_t j = 0; j < 4; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 255)));
-
- }
- for (uint32_t j = 4; j < 6; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 100)));
- }
- }
-
- for (uint16_t j = 0; j < _config->threads.size(); j++) {
- LOG(spam, "Setting up disk %u, thread %u with priority %d",
- i, j, _config->threads[j].lowestpri);
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
-
+ LOG(spam, "Setting up disk %u", i);
+ for (uint32_t j = 0; j < numThreads; j++) {
+ _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i));
}
} else {
_filestorHandler->disable(i);
@@ -157,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
}
void
-FileStorManager::replyDroppedOperation(api::StorageMessage& msg,
- const document::Bucket& bucket,
- api::ReturnCode::Result returnCode,
- vespalib::stringref reason)
+FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket,
+ api::ReturnCode::Result returnCode, vespalib::stringref reason)
{
std::ostringstream error;
error << "Dropping " << msg.getType() << " to bucket "
<< bucket.toString() << ". Reason: " << reason;
LOGBT(debug, bucket.toString(), "%s", error.str().c_str());
if (!msg.getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply();
reply->setResult(api::ReturnCode(returnCode, error.str()));
sendUp(reply);
}
}
void
-FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket)
{
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::BUCKET_NOT_FOUND,
- "bucket does not exist");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist");
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::mapOperationToDisk"));
@@ -197,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
- const document::DocumentId* docId)
+FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId)
{
StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace());
StorBucketDatabase::WrappedEntry entry(database.get(
@@ -208,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
if (docId) {
specific = _bucketIdFactory.getBucketId(*docId);
}
- typedef std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry> BucketMap;
+ typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap;
std::shared_ptr<api::StorageReply> reply;
{
- BucketMap results(
- database.getContained(
- specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
+ BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
if (results.size() == 1) {
- LOG(debug,
- "Remapping %s operation to specific %s versus "
- "non-existing %s to %s.",
+ LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.",
cmd.toString().c_str(), specific.toString().c_str(),
cmd.getBucketId().toString().c_str(),
results.begin()->first.toString().c_str());
@@ -243,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str());
- reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release());
- reply->setResult(
- api::ReturnCode(
- api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
+ reply = static_cast<api::StorageCommand&>(cmd).makeReply();
+ reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
}
sendUp(reply);
}
@@ -254,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
bool
-FileStorManager::handlePersistenceMessage(
- const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
+FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
{
api::ReturnCode errorCode(api::ReturnCode::OK);
do {
- LOG(spam, "Received %s. Attempting to queue it to disk %u.",
- msg->getType().getName().c_str(), disk);
+ LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk);
LOG_BUCKET_OPERATION_NO_LOCK(
getStorageMessageBucket(*msg).getBucketId(),
- vespalib::make_string("Attempting to queue %s to disk %u",
- msg->toString().c_str(), disk));
+ vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk));
if (_filestorHandler->schedule(msg, disk)) {
@@ -275,12 +231,10 @@ FileStorManager::handlePersistenceMessage(
}
switch (_filestorHandler->getDiskState(disk)) {
case FileStorHandler::DISABLED:
- errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE,
- "Disk disabled");
+ errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled");
break;
case FileStorHandler::CLOSED:
- errorCode = api::ReturnCode(api::ReturnCode::ABORTED,
- "Shutting down storage node.");
+ errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.");
break;
case FileStorHandler::AVAILABLE:
assert(false);
@@ -289,8 +243,7 @@ FileStorManager::handlePersistenceMessage(
// If we get here, we failed to schedule message. errorCode says why
// We need to reply to message (while not having bucket lock)
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply();
reply->setResult(errorCode);
LOG(spam, "Received persistence message %s. Returning reply: %s",
msg->getType().getName().c_str(), errorCode.toString().c_str());
@@ -303,7 +256,7 @@ bool
FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Put command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -311,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -323,7 +275,7 @@ bool
FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Update command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -331,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -342,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
bool
FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -354,7 +304,7 @@ bool
FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Remove command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -362,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -373,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
bool
FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -423,12 +371,10 @@ FileStorManager::onCreateBucket(
bucket.getBucketId(), "FileStorManager::onCreateBucket",
StorBucketDatabase::CREATE_IF_NONEXISTING));
if (entry.preExisted()) {
- LOG(debug,
- "Got create bucket request for %s which already exists: %s",
+ LOG(debug, "Got create bucket request for %s which already exists: %s",
cmd->getBucketId().toString().c_str(),
entry->getBucketInfo().toString().c_str());
- code = api::ReturnCode(api::ReturnCode::EXISTS,
- "Bucket already exist");
+ code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist");
} else {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
@@ -445,12 +391,9 @@ FileStorManager::onCreateBucket(
return true;
} else {
entry.remove();
- code = api::ReturnCode(
- api::ReturnCode::IO_FAILURE,
- vespalib::make_string(
- "Trying to create bucket %s on disabled disk %d",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
+ code = api::ReturnCode(api::ReturnCode::IO_FAILURE,
+ vespalib::make_string("Trying to create bucket %s on disabled disk %d",
+ cmd->getBucketId().toString().c_str(), entry->disk));
}
}
}
@@ -471,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
"FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
sendUp(reply);
return true;
}
@@ -493,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
<< entry->getBucketInfo().toString();
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
@@ -543,8 +486,7 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -567,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getClusterStateBundle()->getVersion()));
LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
- api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
+ auto reply = std::make_shared<api::MergeBucketReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -582,39 +524,30 @@ bool
FileStorManager::onGetBucketDiff(
const shared_ptr<api::GetBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucket(),
- *cmd,
- "FileStorManager::onGetBucketDiff"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff"));
if (!entry.exist()) {
return true;
}
if (!entry.preExisted()) {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to get bucket diff being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
entry->info.setTotalDocumentSize(0);
entry->info.setUsedFileSize(0);
entry->info.setReady(true);
- // Call before writing bucket entry as we need to have bucket
- // lock while calling
+ // Call before writing bucket entry as we need to have bucket
+ // lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
- "Trying to merge non-existing bucket %s, which "
- "can't be created because target disk %d is down",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
- LOGBT(warning, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
- api::GetBucketDiffReply::SP reply(
- new api::GetBucketDiffReply(*cmd));
+ "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down",
+ cmd->getBucketId().toString().c_str(), entry->disk));
+ LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
+ auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -634,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const
BucketSpace bucketSpace(msg.getBucket().getBucketSpace());
if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) {
document::Bucket bucket(bucketSpace, entry.getBucketId());
- replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED,
- "bucket became inconsistent during merging");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging");
return false;
}
return true;
@@ -715,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c
}
bool
-FileStorManager::onSetBucketState(
- const std::shared_ptr<api::SetBucketStateCommand>& cmd)
+FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
@@ -821,7 +752,7 @@ void
FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
- sendReply(api::StorageReply::SP(cmd->makeReply().release()));
+ sendReply(api::StorageReply::SP(cmd->makeReply()));
}
bool
@@ -913,8 +844,7 @@ void FileStorManager::onFlush(bool downwards)
}
void
-FileStorManager::reportHtmlStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
bool showStatus = !path.hasAttribute("thread");
bool verbose = path.hasAttribute("verbose");
@@ -962,7 +892,7 @@ FileStorManager::updateState()
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
- // If edge where we go down
+ // If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;
diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
deleted file mode 100644
index 59c543b0067..00000000000
--- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class PauseHandler
- * @ingroup persistence
- *
- * @brief Object that can be used to possibly pause running operation
- */
-#pragma once
-
-#include <vespa/storage/persistence/filestorage/filestorhandler.h>
-
-namespace storage {
-
-class PauseHandler {
- FileStorHandler* _handler;
- uint16_t _disk;
- uint8_t _priority;
-
-public:
- PauseHandler() : _handler(0), _disk(0), _priority(0) {}
- PauseHandler(FileStorHandler& handler, uint16_t disk)
- : _handler(&handler),
- _disk(disk),
- _priority(0)
- {
- }
-
- void setPriority(uint8_t priority) { _priority = priority; }
-
- void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); }
-};
-
-} // storage
-
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 6c4cd7b64d2..be6f9577642 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
-#include <algorithm>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.thread");
@@ -21,9 +20,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
- : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
+ uint16_t deviceIndex)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -106,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
MessageTracker::UP
PersistenceThread::handlePut(api::PutCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.put[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::Result response =
- _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocument(),
- _context);
+ spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context);
checkForError(response, *tracker);
return tracker;
}
@@ -126,22 +119,16 @@ PersistenceThread::handlePut(api::PutCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRemove(api::RemoveCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.remove[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::RemoveResult response =
- _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocumentId(), _context);
+ spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context);
if (checkForError(response, *tracker)) {
- api::RemoveReply* reply(new api::RemoveReply(
- cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
if (!response.wasFound()) {
++_env._metrics.remove[cmd.getLoadType()].notFound;
@@ -152,22 +139,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd)
MessageTracker::UP
PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.update[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::UpdateResult response =
- _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getUpdate(), _context);
+ spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context);
if (checkForError(response, *tracker)) {
- api::UpdateReply* reply = new api::UpdateReply(cmd);
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::move(reply));
}
return tracker;
}
@@ -175,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGet(api::GetCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.get[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFieldSet());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- *fieldSet,
- cmd.getDocumentId(),
- _context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
++_env._metrics.get[cmd.getLoadType()].notFound;
}
-
- api::GetReply::UP reply(
- new api::GetReply(cmd,
- Document::SP(result.getDocumentPtr()),
- result.getTimestamp()));
-
- tracker->setReply(api::StorageReply::SP(reply.release()));
+ tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp()));
}
return tracker;
@@ -207,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.repairs,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
- LOG(debug, "Repair(%s): %s",
- cmd.getBucketId().toString().c_str(),
+ LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
api::BucketInfo before = _env.getBucketInfo(cmd.getBucket());
spi::Result result =
- _spi.maintain(spi::Bucket(cmd.getBucket(),
- spi::PartitionId(_env._partition)),
- cmd.verifyBody() ?
- spi::HIGH : spi::LOW);
+ _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
+ cmd.verifyBody() ? spi::HIGH : spi::LOW);
if (checkForError(result, *tracker)) {
api::BucketInfo after = _env.getBucketInfo(cmd.getBucket());
@@ -239,15 +205,11 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRevert(api::RevertCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.revert[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const std::vector<api::Timestamp> tokens = cmd.getRevertTokens();
- for (uint32_t i = 0; i < tokens.size(); ++i) {
- spi::Result result = _spi.removeEntry(b,
- spi::Timestamp(tokens[i]),
- _context);
+ const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
+ for (const api::Timestamp & token : tokens) {
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context);
}
return tracker;
}
@@ -255,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd)
MessageTracker::UP
PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.",
@@ -299,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
result.getErrorMessage().c_str());
return false;
}
- api::BucketInfo providerInfo(
- _env.convertBucketInfo(result.getBucketInfo()));
+ api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo()));
// Don't check meta fields or active/ready fields since these are not
// that important and ready may change under the hood in a race with
// getModifiedBuckets(). If bucket is empty it means it has already
@@ -323,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
MessageTracker::UP
PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.deleteBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
_env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket was deleted during the merge"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
}
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
@@ -340,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
_spi.deleteBucket(bucket, _context);
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
- StorBucketDatabase::WrappedEntry entry(db.get(
- cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
if (entry.exist() && entry->getMetaCount() > 0) {
LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
"active operation when delete bucket was queued. "
@@ -365,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGetIter(GetIterCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.visit[cmd.getLoadType()],
- _env._component.getClock()));
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(),
- cmd.getMaxByteSize(), _context));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context));
if (checkForError(result, *tracker)) {
GetIterReply::SP reply(new GetIterReply(cmd));
reply->getEntries() = result.steal_entries();
@@ -386,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketList,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock());
spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition()));
if (checkForError(result, *tracker)) {
- ReadBucketListReply::SP reply(new ReadBucketListReply(cmd));
+ auto reply = std::make_shared<ReadBucketListReply>(cmd);
result.getList().swap(reply->getBuckets());
tracker->setReply(reply);
}
@@ -403,36 +353,24 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketInfo,
- _env._component.getClock()));
-
- _env.updateBucketDatabase(cmd.getBucket(),
- _env.getBucketInfo(cmd.getBucket()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock());
+ _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
return tracker;
}
MessageTracker::UP
PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createIterator,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFields());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
// _context is reset per command, so it's safe to modify it like this.
_context.setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet,
- cmd.getSelection(),
- cmd.getIncludedVersions(),
- _context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context));
if (checkForError(result, *tracker)) {
- tracker->setReply(CreateIteratorReply::SP(
- new CreateIteratorReply(
- cmd, spi::IteratorId(result.getIteratorId()))));
+ tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
return tracker;
}
@@ -447,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Calculate the various bucket ids involved.
if (cmd.getBucketId().getUsedBits() >= 58) {
- tracker->fail(
- api::ReturnCode::ILLEGAL_PARAMETERS,
- "Can't split anymore since maximum split bits "
- "is already reached");
+ tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "Can't split anymore since maximum split bits is already reached");
return tracker;
}
if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
@@ -470,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
document::BucketId target1(src.getUsedBits() + 1, src.getId());
- document::BucketId target2(src.getUsedBits() + 1, src.getId()
- | (uint64_t(1) << src.getUsedBits()));
+ document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits()));
targetInfo = SplitBitDetector::Result(target1, target2, false);
}
if (targetInfo.failed()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE,
- targetInfo.getReason());
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason());
return tracker;
}
// If we get here, we're splitting data in two.
@@ -519,8 +453,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Ensure to take them in rising order.
StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get(
cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source"));
- api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd));
- tracker->setReply(api::StorageReply::SP(splitReply));
+ auto reply = std::make_shared<api::SplitBucketReply>(cmd);
+ api::SplitBucketReply & splitReply = *reply;
+ tracker->setReply(std::move(reply));
typedef std::pair<StorBucketDatabase::WrappedEntry,
FileStorHandler::RemapInfo> TargetInfo;
@@ -581,7 +516,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
createTarget.toString().c_str());
_spi.createBucket(createTarget, _context);
}
- splitReply->getSplitInfo().push_back(
+ splitReply.getSplitInfo().push_back(
api::SplitBucketReply::Entry(
targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo()));
@@ -953,28 +888,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
++_env._metrics.operations;
if (msg.getType().isReply()) {
try{
- _env._pauseHandler.setPriority(msg.getPriority());
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
handleReply(static_cast<api::StorageReply&>(msg));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
- LOG(debug, "Caught exception for %s: %s",
- msg.toString().c_str(),
- e.what());
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
}
} else {
api::StorageCommand& initiatingCommand =
static_cast<api::StorageCommand&>(msg);
try {
- int64_t startTime(
- _component->getClock().getTimeInMillis().getTime());
+ int64_t startTime(_component->getClock().getTimeInMillis().getTime());
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- std::unique_ptr<MessageTracker> tracker(
- handleCommand(initiatingCommand));
+ auto tracker(handleCommand(initiatingCommand));
if (!tracker.get()) {
LOG(debug, "Received unsupported command %s",
msg.getType().getName().c_str());
@@ -1080,23 +1010,18 @@ PersistenceThread::flushAllReplies(
if (errorCode != 0) {
for (uint32_t i = 0; i < replies.size(); ++i) {
replies[i]->getReply()->setResult(
- api::ReturnCode(
- (api::ReturnCode::Result)errorCode,
- result.getErrorMessage()));
+ api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
}
}
} catch (std::exception& e) {
for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, e.what()));
+ replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
}
}
for (uint32_t i = 0; i < replies.size(); ++i) {
- LOG(spam,
- "Sending reply up (batched): %s %zu",
- replies[i]->getReply()->toString().c_str(),
- replies[i]->getReply()->getMsgId());
+ LOG(spam, "Sending reply up (batched): %s %zu",
+ replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId());
_env._fileStorHandler.sendReply(replies[i]->getReply());
}
@@ -1108,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
std::vector<MessageTracker::UP> trackers;
document::Bucket bucket = lock.first->getBucket();
- while (lock.second.get() != 0) {
+ while (lock.second) {
LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p",
_env._partition, _env._nodeIndex, lock.second.get());
std::shared_ptr<api::StorageMessage> msg(lock.second);
@@ -1121,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
- if (!tracker.get() || !tracker->getReply().get()) {
+ if (!tracker || !tracker->getReply()) {
// Was a reply
break;
}
@@ -1133,24 +1058,18 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
if (batchable) {
LOG(spam, "Adding reply %s to batch for bucket %s",
- tracker->getReply()->toString().c_str(),
- bucket.getBucketId().toString().c_str());
+ tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str());
trackers.push_back(std::move(tracker));
if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(
- _env._partition,
- lock,
- _env._lowestPriority);
+ _env._fileStorHandler.getNextMessage(_env._partition, lock);
} else {
break;
}
} else {
- LOG(spam,
- "Sending reply up: %s %zu",
- tracker->getReply()->toString().c_str(),
- tracker->getReply()->getMsgId());
+ LOG(spam, "Sending reply up: %s %zu",
+ tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
_env._fileStorHandler.sendReply(tracker->getReply());
break;
@@ -1165,16 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread with pid %d", getpid());
- while (!thread.interrupted()
- && !_env._fileStorHandler.closed(_env._partition))
- {
+ while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) {
thread.registerTick();
- FileStorHandler::LockedMessage lock(
- _env._fileStorHandler.getNextMessage(
- _env._partition, _env._lowestPriority));
+ FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition));
- if (lock.first.get()) {
+ if (lock.first) {
processMessages(lock);
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 07fadb875cc..640614f7edf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types
public:
PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority);
+ FileStorThreadMetrics& metrics, uint16_t deviceIndex);
~PersistenceThread();
/** Waits for current operation to be finished. */
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 8e96c05a66a..c2dcb8e2a29 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil(
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider)
: _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())),
_compReg(compReg),
@@ -77,24 +76,18 @@ PersistenceUtil::PersistenceUtil(
_metrics(metrics),
_bucketFactory(_component.getBucketIdFactory()),
_repo(_component.getTypeRepo()),
- _lowestPriority(lowestPriority),
- _pauseHandler(),
_spi(provider)
{
}
-PersistenceUtil::~PersistenceUtil()
-{
-}
+PersistenceUtil::~PersistenceUtil() { }
void
-PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
- const api::BucketInfo& i)
+PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
{
// Update bucket database
- StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
- bucket.getBucketId(),
- "env::updatebucketdb"));
+ StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(),
+ "env::updatebucketdb"));
if (entry.exist()) {
api::BucketInfo info = i;
@@ -106,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
entry->setBucketInfo(info);
entry.write();
} else {
- LOG(debug,
- "Bucket(%s).getBucketInfo: Bucket does not exist.",
- bucket.getBucketId().toString().c_str());
+ LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str());
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 5126931bab6..bf347ccb10a 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -1,12 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "types.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/persistence/filestorage/filestorhandler.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
-#include <vespa/storage/persistence/filestorage/pausehandler.h>
-#include <vespa/storage/persistence/types.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/storage/storageutil/utils.h>
#include <vespa/config-stor-filestor.h>
@@ -70,8 +69,6 @@ struct PersistenceUtil {
FileStorThreadMetrics& _metrics;
const document::BucketIdFactory& _bucketFactory;
const std::shared_ptr<document::DocumentTypeRepo> _repo;
- uint8_t _lowestPriority;
- PauseHandler _pauseHandler;
spi::PersistenceProvider& _spi;
PersistenceUtil(
@@ -80,7 +77,6 @@ struct PersistenceUtil {
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider);
~PersistenceUtil();
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 38a48ac2eed..c8739442c13 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -115,20 +115,20 @@ private:
bool _attemptedStopped;
vespalib::string _pidFile;
- // First components that doesn't depend on others
+ // First components that doesn't depend on others
std::unique_ptr<StatusWebServer> _statusWebServer;
std::shared_ptr<StorageMetricSet> _metrics;
std::unique_ptr<metrics::MetricManager> _metricManager;
- // Depends on bucket databases and stop() functionality
+ // Depends on bucket databases and stop() functionality
std::unique_ptr<DeadLockDetector> _deadLockDetector;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StatusMetricConsumer> _statusMetrics;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
- // The storage chain can depend on anything.
+ // The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
/** Implementation of config callbacks. */
diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp
index 36d72bdc2b4..03b4dfd80da 100644
--- a/storageserver/src/tests/storageservertest.cpp
+++ b/storageserver/src/tests/storageservertest.cpp
@@ -222,8 +222,7 @@ StorageServerTest::setUp()
storConfig.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
addSlobrokConfig(*distConfig, *slobrok);
addSlobrokConfig(*storConfig, *slobrok);
- storConfig->getConfig("stor-filestor")
- .set("fail_disk_after_error_count", "1");
+ storConfig->getConfig("stor-filestor").set("fail_disk_after_error_count", "1");
system("mkdir -p vdsroot/disks/d0");
system("mkdir -p vdsroot.distributor");
slobrokMirror.reset(new SlobrokMirror(slobrok->config()));
diff --git a/storageserver/src/tests/testhelper.cpp b/storageserver/src/tests/testhelper.cpp
index bcc5967215b..b245a6500bd 100644
--- a/storageserver/src/tests/testhelper.cpp
+++ b/storageserver/src/tests/testhelper.cpp
@@ -7,23 +7,6 @@ LOG_SETUP(".testhelper");
namespace storage {
-namespace {
- bool useNewStorageCore() {
- if ( // Unit test directory
- vespalib::fileExists("use_new_storage_core") ||
- // src/cpp directory
- vespalib::fileExists("../use_new_storage_core") ||
- // Top build directory where storage-HEAD remains
- vespalib::fileExists("../../../../../use_new_storage_core"))
- {
- std::cerr << "Using new storage core for unit tests\n";
- return true;
- }
- return false;
- }
- bool newStorageCore(useNewStorageCore());
-}
-
void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
{
vdstestlib::DirConfig::Config* config;
@@ -72,18 +55,13 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode) {
config->set("threads[0].lowestpri 255");
config->set("dir_spread", "4");
config->set("dir_levels", "0");
- config->set("use_new_core", newStorageCore ? "true" : "false");
config->set("maximum_versions_of_single_document_stored", "0");
//config->set("enable_slotfile_cache", "false");
// Unit tests typically use fake low time values, so don't complain
// about them or compact/delete them by default. Override in tests testing that
// behavior
- config->set("time_future_limit", "5");
- config->set("time_past_limit", "2000000000");
config->set("keep_remove_time_period", "2000000000");
config->set("revert_time_period", "2000000000");
- // Don't want test to call exit()
- config->set("fail_disk_after_error_count", "0");
config = &dc.addConfig("stor-memfilepersistence");
// Easier to see what goes wrong with only 1 thread per disk.
config->set("minimum_file_meta_slots", "2");
@@ -94,6 +72,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode) {
config = &dc.addConfig("persistence");
config->set("keep_remove_time_period", "2000000000");
config->set("revert_time_period", "2000000000");
+ config->set("fail_disk_after_error_count", "0");
config = &dc.addConfig("stor-bouncer");
config = &dc.addConfig("stor-integritychecker");
config = &dc.addConfig("stor-bucketmover");