aboutsummaryrefslogtreecommitdiffstats
path: root/jrt/src/com/yahoo/jrt/Scheduler.java
blob: b70575bdf4bba48214b8a0c9e40b34c3bf4bca5d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;


class Scheduler {
    private static final int TICK  = 100;
    private static final int SLOTS = 512;
    private static final int MASK  = 511;
    private static final int SHIFT = 9;

    private Task[] slots    = new Task[SLOTS + 1];
    private int[]  counts   = new int[SLOTS + 1];
    private Queue  queue    = new Queue(TICK);
    private int    currIter = 0;
    private int    currSlot = 0;
    private long   nextTick;

    private static boolean isActive(Task task) {
        return (task.next() != null);
    }

    private void linkIn(Task task) {
        Task head = slots[task.slot()];
        if (head == null) {
            task.next(task);
            task.prev(task);
            slots[task.slot()] = task;
        } else {
            task.next(head);
            task.prev(head.prev());
            head.prev().next(task);
            head.prev(task);
        }
        ++counts[task.slot()];
    }

    private void linkOut(Task task) {
        Task head = slots[task.slot()];
        if (task.next() == task) {
            slots[task.slot()] = null;
        } else {
            task.prev().next(task.next());
            task.next().prev(task.prev());
            if (head == task) {
                slots[task.slot()] = task.next();
            }
        }
        task.next(null);
        task.prev(null);
        --counts[task.slot()];
    }

    public Scheduler(long now) {
        nextTick = now + TICK;
    }

    public synchronized void schedule(Task task, double seconds) {
        if (task.isKilled()) {
            return;
        }
        if (seconds < 0.0) {
            throw new IllegalArgumentException("cannot schedule a Task in the past");
        }
        int ticks = 2 + (int) Math.ceil(seconds * (1000.0 / TICK));
        if (isActive(task)) {
            linkOut(task);
        }
        task.slot((ticks + currSlot) & MASK);
        task.iter(currIter + ((ticks + currSlot) >> SHIFT));
        linkIn(task);
    }

    public synchronized void scheduleNow(Task task) {
        if (task.isKilled()) {
            return;
        }
        if (isActive(task)) {
            linkOut(task);
        }
        task.slot(SLOTS);
        task.iter(0);
        linkIn(task);
    }

    public synchronized boolean unschedule(Task task) {
        if (isActive(task)) {
            linkOut(task);
            return true;
        }
        return false;
    }

    public synchronized boolean kill(Task task) {
        task.setKilled();
        if (isActive(task)) {
            linkOut(task);
            return true;
        }
        return false;
    }

    private void queueTasks(int slot, int iter) {
        int cnt = counts[slot];
        Task task = slots[slot];
        for (int i = 0; i < cnt; i++) {
            Task next = task.next();
            if (task.iter() == iter) {
                linkOut(task);
                queue.enqueue(task);
            }
            task = next;
        }
    }

    public void checkTasks(long now) {
        if (slots[SLOTS] == null && now < nextTick) {
            return;
        }
        synchronized (this) {
            queueTasks(SLOTS, 0);
            for (int i = 0; now >= nextTick; i++, nextTick += TICK) {
                if (i < 3) {
                    if (++currSlot >= SLOTS) {
                        currSlot = 0;
                        currIter++;
                    }
                    queueTasks(currSlot, currIter);
                }
            }
        }
        while (!queue.isEmpty()) {
            Task task = (Task) queue.dequeue();
            task.perform();
        }
    }
}