aboutsummaryrefslogtreecommitdiffstats
path: root/sql/cache.go
blob: 82259bc9daed40c9624b99da52d4f2071ac646b4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package sql

import (
	"log"
	"sync"

	"github.com/mpolden/zdns/cache"
)

const (
	setOp = iota
	removeOp
	resetOp
)

type query struct {
	op    int
	key   uint32
	value cache.Value
}

// Cache is a persistent cache. Entries are written to a SQL database.
type Cache struct {
	wg     sync.WaitGroup
	queue  chan query
	client *Client
	logger *log.Logger
}

// NewCache creates a new cache using client to persist entries.
func NewCache(client *Client, logger *log.Logger) *Cache {
	c := &Cache{
		queue:  make(chan query, 100),
		client: client,
		logger: logger,
	}
	go c.readQueue()
	return c
}

// Close drains and persist queued requests in this cache.
func (c *Cache) Close() error {
	c.wg.Wait()
	return nil
}

// Set associates the value v with key.
func (c *Cache) Set(key uint32, v cache.Value) { c.enqueue(query{op: setOp, key: key, value: v}) }

// Evict removes any value associated with key.
func (c *Cache) Evict(key uint32) { c.enqueue(query{op: removeOp, key: key}) }

// Reset removes all entries from the cache.
func (c *Cache) Reset() { c.enqueue(query{op: resetOp}) }

// Read returns all entries in the cache.
func (c *Cache) Read() []cache.Value {
	c.wg.Wait()
	entries, err := c.client.readCache()
	if err != nil {
		c.logger.Print(err)
		return nil
	}
	values := make([]cache.Value, 0, len(entries))
	for _, entry := range entries {
		unpacked, err := cache.Unpack(entry.Data)
		if err != nil {
			panic(err) // Should never happen
		}
		values = append(values, unpacked)
	}
	return values
}

func (c *Cache) enqueue(q query) {
	c.wg.Add(1)
	c.queue <- q
}

func (c *Cache) readQueue() {
	for q := range c.queue {
		switch q.op {
		case setOp:
			packed, err := q.value.Pack()
			if err != nil {
				c.logger.Fatalf("failed to pack value: %w", err)
			}
			if err := c.client.writeCacheValue(q.key, packed); err != nil {
				c.logger.Printf("failed to write key=%d data=%q: %w", q.key, packed, err)
			}
		case removeOp:
			if err := c.client.removeCacheValue(q.key); err != nil {
				c.logger.Printf("failed to remove key=%d: %w", q.key, err)
			}
		case resetOp:
			if err := c.client.truncateCache(); err != nil {
				c.logger.Printf("failed to truncate cache: %w", err)
			}
		default:
			c.logger.Printf("unhandled operation %d", q.op)
		}
		c.wg.Add(-1)
	}
}