summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-18 22:01:06 +0100
committerGitHub <noreply@github.com>2022-01-18 22:01:06 +0100
commitba063a6ca30deee66590efe4ae6e4e9f6ac83b5c (patch)
treef8ffa9a2c5cb83fcda57a14d61dbf7829372c21a
parent04e894ae96dc13b44762f600dc6a4b3f97df13c7 (diff)
parentaeb3ab8725ee058c65036d582155edc20eff654b (diff)
Merge pull request #20857 from vespa-engine/balder/wire-in-control-of-unbound-q
Wire in control of whether taskLimit is hard.
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h8
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp2
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp36
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp17
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h1
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
12 files changed, 72 insertions, 26 deletions
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index a59b3e9bc6f..fc8bd474813 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500)
+ Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500)
: cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) {
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) {
ProtonConfigBuilder builder;
builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
@@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture)
auto tcfg = f.make(24);
EXPECT_EQUAL(2000u, tcfg.master_task_limit());
EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
+ EXPECT_TRUE(tcfg.is_task_limit_hard());
+}
+
+TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700))
+{
+ auto tcfg = f.make(24);
+ EXPECT_EQUAL(3000u, tcfg.master_task_limit());
+ EXPECT_EQUAL(700u, tcfg.defaultTaskLimit());
+ EXPECT_FALSE(tcfg.is_task_limit_hard());
}
namespace {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 36c8070f140..47eaef2b6b5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -72,7 +72,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
@@ -82,7 +82,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
}
@@ -98,7 +98,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index e32cd6f5f4e..5a3eaa02d3b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -28,6 +28,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
_field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor,
fw_cfg.indexingThreads() * 3,
fw_cfg.defaultTaskLimit(),
+ fw_cfg.is_task_limit_hard(),
fw_cfg.optimize(),
fw_cfg.kindOfwatermark());
if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 7982e8a8414..335d5bab8d0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t master_task_limit_,
- uint32_t defaultTaskLimit_,
+ int32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
_master_task_limit(master_task_limit_),
- _defaultTaskLimit(defaultTaskLimit_),
+ _defaultTaskLimit(std::abs(defaultTaskLimit_)),
+ _is_task_limit_hard(defaultTaskLimit_ >= 0),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
_reactionTime(reactionTime_),
@@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
return _indexingThreads == rhs._indexingThreads &&
_master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
+ _is_task_limit_hard == rhs._is_task_limit_hard &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
_reactionTime == rhs._reactionTime &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index f1a4f0525d1..a54c0674263 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -23,15 +23,16 @@ private:
uint32_t _indexingThreads;
uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
+ bool _is_task_limit_hard;
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_,
- OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_,
- SharedFieldWriterExecutor shared_field_writer_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_,
+ OptimizeFor optimize_, uint32_t kindOfWatermark_,
+ vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
@@ -40,6 +41,7 @@ public:
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
+ bool is_task_limit_hard() const { return _is_task_limit_hard; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
vespalib::duration reactionTime() const { return _reactionTime; }
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index 3528cf74040..c609c538977 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -60,7 +60,7 @@ int main(int argc, char **argv) {
auto optimize = optimize_for_throughput
? vespalib::Executor::OptimizeFor::THROUGHPUT
: vespalib::Executor::OptimizeFor::LATENCY;
- executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize);
+ executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, true, optimize);
}
vespalib::Timer timer;
for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 243935d4013..705d6346e8c 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -2,7 +2,8 @@
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
-
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/test/insertion_operators.h>
@@ -22,7 +23,10 @@ class Fixture
public:
std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { }
+ Fixture(bool is_task_limit_hard = true) :
+ _threads(SequencedTaskExecutor::create(sequenced_executor, 2, 1000, is_task_limit_hard,
+ Executor::OptimizeFor::LATENCY))
+ { }
};
@@ -258,6 +262,28 @@ TEST("require that you get correct number of executors") {
EXPECT_EQUAL(7u, seven->getNumExecutors());
}
+void verifyHardLimitForLatency(bool expect_hard) {
+ auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::LATENCY);
+ const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced);
+ EXPECT_EQUAL(expect_hard,nullptr != dynamic_cast<const BlockingThreadStackExecutor *>(seq.first_executor()));
+}
+
+void verifyHardLimitForThroughput(bool expect_hard) {
+ auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::THROUGHPUT);
+ const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced);
+ const SingleExecutor * first = dynamic_cast<const SingleExecutor *>(seq.first_executor());
+ EXPECT_TRUE(first != nullptr);
+ EXPECT_EQUAL(expect_hard, first->isBlocking());
+}
+
+TEST("require that you can get executor with both hard and soft limit") {
+ verifyHardLimitForLatency(true);
+ verifyHardLimitForLatency(false);
+ verifyHardLimitForThroughput(true);
+ verifyHardLimitForThroughput(false);
+}
+
+
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
@@ -307,15 +333,15 @@ TEST("Test creation of different types") {
auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::LATENCY);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::THROUGHPUT);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::ADAPTIVE, 17);
auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
ASSERT_TRUE(aseq != nullptr);
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 58ae862f7c6..88a679b4cdb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -3,6 +3,7 @@
#include "sequencedtaskexecutor.h"
#include "adaptive_sequenced_executor.h"
#include "singleexecutor.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/stllike/hashtable.h>
@@ -46,17 +47,17 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads) {
std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit) {
- return create(func, threads, taskLimit, OptimizeFor::LATENCY);
+ return create(func, threads, taskLimit, true, OptimizeFor::LATENCY);
}
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) {
- return create(func, threads, taskLimit, optimize, 0);
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize) {
+ return create(func, threads, taskLimit, is_task_limit_hard, optimize, 0);
}
std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
- OptimizeFor optimize, uint32_t kindOfWatermark)
+ bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark)
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
@@ -67,9 +68,13 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, is_task_limit_hard, watermark, 100ms));
} else {
- executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ if (is_task_limit_hard) {
+ executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ } else {
+ executors.push_back(std::make_unique<ThreadStackExecutor>(1, stackSize, func));
+ }
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 91304a6a2e3..a4b1b82aacf 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -35,10 +35,10 @@ public:
static std::unique_ptr<ISequencedTaskExecutor>
create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit);
static std::unique_ptr<ISequencedTaskExecutor>
- create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize);
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize);
static std::unique_ptr<ISequencedTaskExecutor>
create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
- OptimizeFor optimize, uint32_t kindOfWatermark);
+ bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark);
/**
* For testing only
*/
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 4fdc217e701..dd755a76302 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -33,6 +33,7 @@ public:
duration get_reaction_time() const { return _reactionTime; }
ExecutorStats getStats() override;
SingleExecutor & shutdown() override;
+ bool isBlocking() const { return !_overflow; }
private:
using Lock = std::unique_lock<std::mutex>;
void drain(Lock & lock);
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index c163f6de024..c35be2789da 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -127,7 +127,7 @@ VESPA_THREAD_STACK_TAG(test_executor)
void
PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
- _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
+ _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, true, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
StorBucketDatabase::WrappedEntry
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 4a215c7a348..530d96dfe3f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -218,7 +218,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
_compReg, std::move(operation_throttler));
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000,
- selectSequencer(_config->responseSequencerType));
+ true, selectSequencer(_config->responseSequencerType));
assert(_sequencedExecutor);
LOG(spam, "Setting up the disk");
for (uint32_t i = 0; i < numThreads; i++) {