summaryrefslogtreecommitdiffstats
path: root/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
blob: 05009f484c8313e321d77b547dbff3c598de9400 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc;

import com.yahoo.collections.Pair;
import com.yahoo.component.ComponentId;
import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.core.document.ContainerDocumentConfig;
import com.yahoo.container.jdisc.messagebus.MbusServerProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.DocprocService;
import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;

import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.AbstractResource;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.application.ContainerBuilder;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.MbusClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.shared.SharedSourceSession;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;

/**
 * @author Einar M R Rosenvinge
 */
public abstract class DocumentProcessingHandlerTestBase {

    protected DocumentProcessingHandler handler;
    protected ServerTestDriver driver;
    protected RemoteServer remoteServer;
    protected DocumentTypeManager documentTypeManager = new DocumentTypeManager();
    SessionCache sessionCache;
    private final List<MbusServerProvider> serviceProviders = new ArrayList<>();

    @Before
    public void createHandler() {
        documentTypeManager.register(getType());

        Protocol protocol = new DocumentProtocol(documentTypeManager);

        driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol, true);

        sessionCache =
                new SessionCache("raw:", driver.client().slobrokId(), "test", "raw:", null, "raw:", documentTypeManager);

        ContainerBuilder builder = driver.parent().newContainerBuilder();
        ComponentRegistry<DocprocService> registry = new ComponentRegistry<>();

        handler = new DocumentProcessingHandler(registry,
                new ComponentRegistry<>(),
                new ComponentRegistry<>(),
                new DocumentProcessingHandlerParameters().
                        setDocumentTypeManager(documentTypeManager).
                        setContainerDocumentConfig(new ContainerDocumentConfig(new ContainerDocumentConfig.Builder())));
        builder.serverBindings().bind("mbus://*/*", handler);

        ReferencedResource<SharedSourceSession> sessionRef = sessionCache.retainSource(new SourceSessionParams());
        MbusClient sourceClient = new MbusClient(sessionRef.getResource());
        builder.clientBindings().bind("mbus://*/source", sourceClient);
        builder.clientBindings().bind("mbus://*/" + MbusRequestContext.internalNoThrottledSource, sourceClient);
        sourceClient.start();

        List<Pair<String, CallStack>> callStacks = getCallStacks();
        List<AbstractResource> resources = new ArrayList<>();
        for (Pair<String, CallStack> callStackPair : callStacks) {
            DocprocService service = new DocprocService(callStackPair.getFirst());
            service.setCallStack(callStackPair.getSecond());
            service.setInService(true);

            ComponentId serviceId = new ComponentId(service.getName());
            registry.register(serviceId, service);

            ComponentId sessionName = ComponentId.fromString("chain." + serviceId);
            MbusServerProvider serviceProvider = new MbusServerProvider(sessionName, sessionCache, driver.parent());
            serviceProvider.get().start();

            serviceProviders.add(serviceProvider);

            MbusClient intermediateClient = new MbusClient(serviceProvider.getSession());
            builder.clientBindings().bind("mbus://*/" + sessionName.stringValue(), intermediateClient);
            intermediateClient.start();
            resources.add(intermediateClient);
        }

        driver.parent().activateContainer(builder);
        sessionRef.getReference().close();
        sourceClient.release();

        for (AbstractResource resource : resources) {
            resource.release();
        }

        remoteServer = RemoteServer.newInstance(driver.client().slobrokId(), "foobar", protocol);
    }

    @After
    public void destroy() {
        for (MbusServerProvider serviceProvider : serviceProviders) {
            serviceProvider.deconstruct();
        }
        driver.close();
        remoteServer.close();
    }

    protected abstract List<Pair<String, CallStack>> getCallStacks();

    protected abstract DocumentType getType();

    public boolean sendMessage(String destinationChainName, DocumentMessage msg) {
        msg.setRoute(Route.parse("test/chain." + destinationChainName + " " + remoteServer.connectionSpec()));
        msg.setPriority(DocumentProtocol.Priority.HIGH_1);
        msg.setLoadType(LoadType.DEFAULT);
        msg.getTrace().setLevel(9);
        msg.setTimeRemaining(60 * 1000);
        return driver.client().sendMessage(msg).isAccepted();
    }
}