1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
package sql
import (
"log"
"sync"
"github.com/mpolden/zdns/cache"
)
const (
setOp = iota
removeOp
resetOp
)
type query struct {
op int
key uint32
value cache.Value
}
// Cache is a persistent cache. Entries are written to a SQL database.
type Cache struct {
wg sync.WaitGroup
queue chan query
client *Client
logger *log.Logger
}
// NewCache creates a new cache using client to persist entries.
func NewCache(client *Client, logger *log.Logger) *Cache {
c := &Cache{
queue: make(chan query, 100),
client: client,
logger: logger,
}
go c.readQueue()
return c
}
// Close drains and persist queued requests in this cache.
func (c *Cache) Close() error {
c.wg.Wait()
return nil
}
// Set associates the value v with key.
func (c *Cache) Set(key uint32, v cache.Value) { c.enqueue(query{op: setOp, key: key, value: v}) }
// Evict removes any value associated with key.
func (c *Cache) Evict(key uint32) { c.enqueue(query{op: removeOp, key: key}) }
// Reset removes all entries from the cache.
func (c *Cache) Reset() { c.enqueue(query{op: resetOp}) }
// Read returns all entries in the cache.
func (c *Cache) Read() []cache.Value {
c.wg.Wait()
entries, err := c.client.readCache()
if err != nil {
c.logger.Print(err)
return nil
}
values := make([]cache.Value, 0, len(entries))
for _, entry := range entries {
unpacked, err := cache.Unpack(entry.Data)
if err != nil {
panic(err) // Should never happen
}
values = append(values, unpacked)
}
return values
}
func (c *Cache) enqueue(q query) {
c.wg.Add(1)
c.queue <- q
}
func (c *Cache) readQueue() {
for q := range c.queue {
switch q.op {
case setOp:
packed, err := q.value.Pack()
if err != nil {
c.logger.Fatalf("failed to pack value: %w", err)
}
if err := c.client.writeCacheValue(q.key, packed); err != nil {
c.logger.Printf("failed to write key=%d data=%q: %w", q.key, packed, err)
}
case removeOp:
if err := c.client.removeCacheValue(q.key); err != nil {
c.logger.Printf("failed to remove key=%d: %w", q.key, err)
}
case resetOp:
if err := c.client.truncateCache(); err != nil {
c.logger.Printf("failed to truncate cache: %w", err)
}
default:
c.logger.Printf("unhandled operation %d", q.op)
}
c.wg.Add(-1)
}
}
|