aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolImplTest.java
blob: 3d5375a9740f138c0754dd92f56d5f34fd1428fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.handler.threadpool;

import ai.vespa.metrics.ContainerMetrics;
import com.yahoo.collections.Tuple2;
import com.yahoo.concurrent.Receiver;
import com.yahoo.container.protect.ProcessTerminator;
import com.yahoo.container.test.MetricMock;
import com.yahoo.jdisc.Metric;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
 * @author Steinar Knutsen
 * @author bjorncs
 */
public class ContainerThreadPoolImplTest {

    private static final int CPUS = 16;

    @Test
    final void testThreadPool() throws InterruptedException {
        Metric metrics = new MetricMock();
        ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(1));
        ContainerThreadPool threadPool = new ContainerThreadpoolImpl(config, metrics);
        Executor exec = threadPool.executor();
        Tuple2<Receiver.MessageState, Boolean> reply;
        FlipIt command = new FlipIt();
        for (boolean done = false; !done; ) {
            try {
                exec.execute(command);
                done = true;
            } catch (RejectedExecutionException e) {
                // just try again
            }
        }
        reply = command.didItRun.get(5 * 60 * 1000);
        if (reply.first != Receiver.MessageState.VALID) {
            fail("Executor task probably timed out, five minutes should be enough to flip a boolean.");
        }
        if (reply.second != Boolean.TRUE) {
            fail("Executor task seemed to run, but did not get correct value.");
        }
        threadPool.close();
        command = new FlipIt();
        try {
            exec.execute(command);
        } catch (final RejectedExecutionException e) {
            // this is what should happen
            return;
        }
        fail("Pool did not reject tasks after shutdown.");
    }

    private ThreadPoolExecutor createPool(int maxThreads, int queueSize) {
        return createPool(new MetricMock(), maxThreads, queueSize);
    }
    private ThreadPoolExecutor createPool(Metric metric, int maxThreads, int queueSize) {
        ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder()
                .maxThreads(maxThreads)
                .minThreads(maxThreads)
                .queueSize(queueSize));
        ContainerThreadPool threadPool = new ContainerThreadpoolImpl(
                config, metric, new MockProcessTerminator(), CPUS);
        ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor();
        WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate();
        return executor;
    }

    @Test
    void testThatThreadPoolSizeFollowsConfig() {
        MetricMock metrics = new MetricMock();
        ThreadPoolExecutor executor = createPool(metrics, 3, 1200);
        assertEquals(3, executor.getMaximumPoolSize());
        assertEquals(1200, executor.getQueue().remainingCapacity());
        assertEquals(7, metrics.innvocations().size());
        assertEquals(3L, metrics.innvocations().get("serverThreadPoolSize").val);
        assertEquals(3L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_MAX_ALLOWED_SIZE.baseName()).val);
        assertEquals(0L, metrics.innvocations().get("serverActiveThreads").val);
        assertEquals(1200L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.baseName()).val);
        assertEquals(0L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE.baseName()).val);
    }

    @Test
    void testThatThreadPoolSizeAutoDetected() {
        MetricMock metrics = new MetricMock();
        ThreadPoolExecutor executor = createPool(metrics, 0, 0);
        assertEquals(CPUS * 4, executor.getMaximumPoolSize());
        assertEquals(0, executor.getQueue().remainingCapacity());
        assertEquals(7, metrics.innvocations().size());
        assertEquals(64L, metrics.innvocations().get("serverThreadPoolSize").val);
        assertEquals(64L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_MAX_ALLOWED_SIZE.baseName()).val);
        assertEquals(0L, metrics.innvocations().get("serverActiveThreads").val);
        assertEquals(64L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.baseName()).val);
        assertEquals(0L, metrics.innvocations().get(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE.baseName()).val);
    }

    @Test
    void testThatQueueSizeAutoDetected() {
        ThreadPoolExecutor executor = createPool(24, -50);
        assertEquals(24, executor.getMaximumPoolSize());
        assertEquals(24 * 50, executor.getQueue().remainingCapacity());
    }

    @Test
    void testThatThreadPoolSizeAndQueueSizeAutoDetected() {
        ThreadPoolExecutor executor = createPool(0, -100);
        assertEquals(CPUS * 4, executor.getMaximumPoolSize());
        assertEquals(CPUS * 4 * 100, executor.getQueue().remainingCapacity());
    }

    private class FlipIt implements Runnable {
        public final Receiver<Boolean> didItRun = new Receiver<>();

        @Override
        public void run() {
            didItRun.put(Boolean.TRUE);
        }
    }

    // Ignored because it depends on the system time and so is unstable on factory
    @Test
    @Disabled
    void testThreadPoolTerminationOnBreakdown() throws InterruptedException {
        ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(
                new ContainerThreadpoolConfig.Builder()
                        .maxThreads(2)
                        .maxThreadExecutionTimeSeconds(1));
        MockProcessTerminator terminator = new MockProcessTerminator();
        Metric metrics = new MetricMock();
        ContainerThreadPool threadPool = new ContainerThreadpoolImpl(config, metrics, terminator);

        // No dying when threads hang shorter than max thread execution time
        threadPool.executor().execute(new Hang(500));
        threadPool.executor().execute(new Hang(500));
        assertEquals(0, terminator.dieRequests);
        assertRejected(threadPool, new Hang(500)); // no more threads
        assertEquals(0, terminator.dieRequests); // ... but not for long enough yet
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        threadPool.executor().execute(new Hang(1));
        assertEquals(0, terminator.dieRequests);
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        } // Make sure both threads are available

        // Dying when hanging both thread pool threads for longer than max thread execution time
        threadPool.executor().execute(new Hang(2000));
        threadPool.executor().execute(new Hang(2000));
        assertEquals(0, terminator.dieRequests);
        assertRejected(threadPool, new Hang(2000)); // no more threads
        assertEquals(0, terminator.dieRequests); // ... but not for long enough yet
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        assertRejected(threadPool, new Hang(2000)); // no more threads
        assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime
    }

    private void assertRejected(ContainerThreadPool threadPool, Runnable task) {
        try {
            threadPool.executor().execute(task);
            fail("Expected execution rejected");
        } catch (final RejectedExecutionException expected) {
        }
    }

    private class Hang implements Runnable {

        private final long hangMillis;

        public Hang(int hangMillis) {
            this.hangMillis = hangMillis;
        }

        @Override
        public void run() {
            try { Thread.sleep(hangMillis); } catch (InterruptedException e) {}
        }

    }

    private static class MockProcessTerminator extends ProcessTerminator {

        public volatile int dieRequests = 0;

        @Override
        public void logAndDie(String message, boolean dumpThreads) {
            dieRequests++;
        }

    }

}