aboutsummaryrefslogtreecommitdiffstats
path: root/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
blob: 204e481af76d823b820dbb8e1ab3b6f8ad1a80c8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc;

import com.yahoo.collections.Pair;
import com.yahoo.component.ComponentId;
import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.core.document.ContainerDocumentConfig;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.messagebus.MbusServerProvider;
import com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.impl.DocprocService;
import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;

import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.AbstractResource;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.application.ContainerBuilder;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.MbusClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.shared.SharedSourceSession;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;

/**
 * @author Einar M R Rosenvinge
 */
public abstract class DocumentProcessingHandlerTestBase {

    protected DocumentProcessingHandler handler;
    protected ServerTestDriver driver;
    protected RemoteServer remoteServer;
    protected DocumentTypeManager documentTypeManager = new DocumentTypeManager();
    SessionCache sessionCache;
    private final List<MbusServerProvider> serviceProviders = new ArrayList<>();

    @Before
    public void createHandler() {
        documentTypeManager.register(getType());

        Protocol protocol = new DocumentProtocol(documentTypeManager);

        driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol, true);

        RPCNetwork net = new RPCNetwork(NetworkMultiplexerProvider.asParameters(new ContainerMbusConfig.Builder().build(),
                                                                                driver.client().slobroksConfig(),
                                                                                "test"));
        sessionCache = new SessionCache(() -> NetworkMultiplexer.dedicated(net),
                                        new ContainerMbusConfig.Builder().build(),
                                        new MessagebusConfig.Builder().build(),
                                        protocol);

        ContainerBuilder builder = driver.parent().newContainerBuilder();
        ComponentRegistry<DocprocService> registry = new ComponentRegistry<>();

        handler = new DocumentProcessingHandler(registry,
                new ComponentRegistry<>(),
                new ComponentRegistry<>(),
                new DocumentProcessingHandlerParameters().
                        setDocumentTypeManager(documentTypeManager).
                        setContainerDocumentConfig(new ContainerDocumentConfig(new ContainerDocumentConfig.Builder())));
        builder.serverBindings().bind("mbus://*/*", handler);

        ReferencedResource<SharedSourceSession> sessionRef = sessionCache.retainSource(new SourceSessionParams());
        MbusClient sourceClient = new MbusClient(sessionRef.getResource());
        builder.clientBindings().bind("mbus://*/source", sourceClient);
        builder.clientBindings().bind("mbus://*/" + MbusRequestContext.internalNoThrottledSource, sourceClient);
        sourceClient.start();

        List<Pair<String, CallStack>> callStacks = getCallStacks();
        List<AbstractResource> resources = new ArrayList<>();
        for (Pair<String, CallStack> callStackPair : callStacks) {
            DocprocService service = new DocprocService(callStackPair.getFirst());
            service.setCallStack(callStackPair.getSecond());
            service.setInService(true);

            ComponentId serviceId = new ComponentId(service.getName());
            registry.register(serviceId, service);

            ComponentId sessionName = ComponentId.fromString("chain." + serviceId);
            MbusServerProvider serviceProvider = new MbusServerProvider(sessionName, sessionCache, driver.parent());
            serviceProvider.get().start();

            serviceProviders.add(serviceProvider);

            MbusClient intermediateClient = new MbusClient(serviceProvider.getSession());
            builder.clientBindings().bind("mbus://*/" + sessionName.stringValue(), intermediateClient);
            intermediateClient.start();
            resources.add(intermediateClient);
        }

        driver.parent().activateContainer(builder);
        sessionRef.getReference().close();
        sourceClient.release();

        for (AbstractResource resource : resources) {
            resource.release();
        }

        remoteServer = RemoteServer.newInstance("foobar", protocol);
    }

    @After
    public void destroy() {
        for (MbusServerProvider serviceProvider : serviceProviders) {
            serviceProvider.deconstruct();
        }
        driver.close();
        remoteServer.close();
    }

    protected abstract List<Pair<String, CallStack>> getCallStacks();

    protected abstract DocumentType getType();

    public boolean sendMessage(String destinationChainName, DocumentMessage msg) {
        msg.setRoute(Route.parse("test/chain." + destinationChainName + " " + remoteServer.connectionSpec()));
        msg.getTrace().setLevel(9);
        msg.setTimeRemaining(60 * 1000);
        return driver.client().sendMessage(msg).isAccepted();
    }
}