aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorDestinationSession.java
blob: 05578d7d35fa7fbd77200ca8b380718f904cd17f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus;

import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.messagebus.protocol.*;
import java.util.logging.Level;
import com.yahoo.messagebus.*;

import java.util.logging.Logger;

/**
 * A visitor destination session for receiving data from a visitor using a
 * messagebus destination session. The default behaviour of the visitor session
 * is to control visiting and receive the data. As an alternative, you may set
 * up one or more visitor destination sessions and tell the visitor to send
 * data to the remote destination(s). This is convenient if you want to receive
 * data decoupled from controlling the visitor, but also to avoid a single data
 * destination becoming a bottleneck.
 * <p>
 * Create the visitor destination session by calling the
 * <code>MessageBusDocumentAccess.createVisitorDestinationSession</code>
 * method. The visitor must be started by calling the
 * <code>MessageBusDocumentAccess.createVisitorSession</code> method and
 * progress tracked through the resulting visitor session.
 *
 * @author Thomas Gundersen
 */
public class MessageBusVisitorDestinationSession implements VisitorDestinationSession, MessageHandler
{
    private static final Logger log = Logger.getLogger(MessageBusVisitorDestinationSession.class.getName());

    private DestinationSession session;
    private VisitorDestinationParameters params;

    /**
     * Creates a message bus visitor destination session.
     *
     * @param params the parameters for the visitor destination session
     * @param bus the message bus to use
     */
    public MessageBusVisitorDestinationSession(VisitorDestinationParameters params, MessageBus bus) {
        this.params = params;
        session = bus.createDestinationSession(params.getSessionName(), true, this);
        params.getDataHandler().setSession(this);
    }

    public void handleMessage(Message message) {
        Reply reply = ((DocumentMessage)message).createReply();
        message.swapState(reply);

        params.getDataHandler().onMessage(message, new AckToken(reply));
    }

    public void ack(AckToken token) {
        try {
            log.log(Level.FINE, () -> "Sending ack " + token.ackObject);
            session.reply((Reply) token.ackObject);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void destroy() {
        session.destroy();
        session = null;
    }

    public void abort() {
        destroy();
    }

    public VisitorResponse getNext() {
        return params.getDataHandler().getNext();
    }

    public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException {
        return params.getDataHandler().getNext(timeoutMilliseconds);
    }

}