summaryrefslogtreecommitdiffstats
path: root/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java
blob: 6956f288d1a1e468c92b7693b4726e547e03dedb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch.test.fs4mock;


import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.yahoo.prelude.ConfigurationException;
import com.yahoo.prelude.fastsearch.test.DocsumDefinitionTestCase;


/**
 * A server which replies to any query with the same query result after
 * a configurable delay, with a configurable slowness (delay between each byte).
 * Connections are never timed out.
 *
 * @author bratseth
 */
public class MockFDispatch {

    private static int connectionCount = 0;

    private static Logger log = Logger.getLogger(MockFDispatch.class.getName());

    /** The port we accept incoming requests at */
    private int listenPort = 0;

    private long replyDelay;

    private long byteDelay;

    private Object barrier;

    private static byte[] queryResultPacketData = new byte[] {
        0, 0, 0, 64, 0, 0,
        0, 214 - 256, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 5, 0, 0, 0,
        25, 0, 0, 0, 111, 0, 0, 0, 97, 0, 0, 0, 3, 0, 0, 0, 23, 0, 0, 0, 7, 0, 0,
        0, 36, 0, 0, 0, 4, 0, 0, 0, 21, 0, 0, 0, 8, 0, 0, 0, 37};

    private static byte[] docsumData = DocsumDefinitionTestCase.makeDocsum();

    private static byte[] docsumHeadPacketData = new byte[] {
        0, 0, 3, 39, 0, 0,
        0, 205 - 256, 0, 0, 0, 1, 0, 0, 0, 0};

    private static byte[] eolPacketData = new byte[] {
        0, 0, 0, 8, 0, 0, 0,
        200 - 256, 0, 0, 0, 1 };

    private Set<ConnectionThread> connectionThreads = new HashSet<>();

    public MockFDispatch(int listenPort, long replyDelay, long byteDelay) {
        this.replyDelay = replyDelay;
        this.byteDelay = byteDelay;
        this.listenPort = listenPort;
    }

    public void setBarrier(Object barrier) {
        this.barrier = barrier;
    }

    public void setListenPort(int listenPort) {
        this.listenPort = listenPort;
    }

    public void run() {
        try {
            ServerSocketChannel channel = createServerSocket(listenPort);

            channel.socket().setReuseAddress(true);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // notify those waiting at the barrier that they
                    // can now proceed and talk to us
                    synchronized (barrier) {
                        if (barrier != null) {
                            barrier.notify();
                        }
                    }
                    SocketChannel socketChannel = channel.accept();

                    connectionThreads.add(new ConnectionThread(socketChannel));
                } catch (ClosedByInterruptException e) {// We'll exit
                } catch (ClosedChannelException e) {
                    return;
                } catch (Exception e) {
                    log.log(Level.WARNING, "Unexpected error reading request", e);
                }
            }
            channel.close();
        } catch (IOException e) {
            throw new ConfigurationException("Socket channel failure", e);
        }
    }

    private ServerSocketChannel createServerSocket(int listenPort)
        throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        ServerSocket socket = channel.socket();

        socket.bind(
                new InetSocketAddress(InetAddress.getLocalHost(), listenPort));
        String host = socket.getInetAddress().getHostName();

        log.fine("Accepting dfispatch requests at " + host + ":" + listenPort);
        return channel;
    }

    public static void main(String[] args) {
        log.setLevel(Level.FINE);
        MockFDispatch m = new MockFDispatch(7890, Integer.parseInt(args[0]),
                Integer.parseInt(args[1]));

        m.run();
    }

    private class ConnectionThread extends Thread {

        private ByteBuffer writeBuffer = ByteBuffer.allocate(2000);

        private ByteBuffer readBuffer = ByteBuffer.allocate(2000);

        private int connectionNr = 0;

        private SocketChannel channel;

        public ConnectionThread(SocketChannel channel) {
            this.channel = channel;
            fillBuffer(writeBuffer);
            start();
        }

        private void fillBuffer(ByteBuffer buffer) {
            buffer.clear();
            buffer.put(queryResultPacketData);
            buffer.put(docsumHeadPacketData);
            buffer.put(docsumData);
            buffer.put(docsumHeadPacketData);
            buffer.put(docsumData);
            buffer.put(eolPacketData);
        }

        public void run() {
            connectionNr = connectionCount++;
            log.fine("Opened connection " + connectionNr);

            try {
                long lastRequest = System.currentTimeMillis();

                while ((System.currentTimeMillis() - lastRequest) <= 5000
                        && (!isInterrupted())) {
                    readBuffer.clear();
                    channel.read(readBuffer);
                    lastRequest = System.currentTimeMillis();
                    delay(replyDelay);

                    if (byteDelay > 0) {
                        writeSlow(writeBuffer);
                    } else {
                        write(writeBuffer);
                    }
                    log.fine(
                            "Replied in "
                                    + (System.currentTimeMillis() - lastRequest)
                                    + " ms");
                }

                log.fine("Closing timed out connection " + connectionNr);
                connectionCount--;
                channel.close();
            } catch (IOException e) {}
        }

        private void write(ByteBuffer writeBuffer) throws IOException {
            writeBuffer.flip();
            channel.write(writeBuffer);
        }

        private void writeSlow(ByteBuffer writeBuffer) throws IOException {
            writeBuffer.flip();
            int dataSize = writeBuffer.limit();

            for (int i = 0; i < dataSize; i++) {
                writeBuffer.position(i);
                writeBuffer.limit(i + 1);
                channel.write(writeBuffer);
                delay(byteDelay);
            }
            writeBuffer.limit(dataSize);
        }

        private void delay(long delay) {

            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {}
        }

    }

}