summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-22 16:24:08 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-23 08:58:37 +0200
commit257527e34e0cb37936350d6aa55b38d478b2c6bf (patch)
tree16b2a8e8b8f5c120b7922259acd592e481636691
parent69944390cf45f617fb2250a95e27d08e355da78d (diff)
Use atomic types
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go26
-rw-r--r--client/go/internal/vespa/document/http.go14
-rw-r--r--client/go/internal/vespa/document/http_test.go8
3 files changed, 23 insertions, 25 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
index 17fc595d58f..f7f0f4360df 100644
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -27,38 +27,38 @@ type timeCircuitBreaker struct {
graceDuration time.Duration
doomDuration time.Duration
- failingSinceMillis int64
+ failingSinceMillis atomic.Int64
lastError atomic.Value
- halfOpen atomic.Value
- open atomic.Value
+ halfOpen atomic.Bool
+ open atomic.Bool
now func() time.Time
}
func (b *timeCircuitBreaker) Success() {
- atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64)
- if !b.open.Load().(bool) {
+ b.failingSinceMillis.Store(math.MaxInt64)
+ if !b.open.Load() {
b.halfOpen.CompareAndSwap(true, false)
}
}
func (b *timeCircuitBreaker) Error(err error) {
- if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) {
+ if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
b.lastError.Store(err)
}
}
func (b *timeCircuitBreaker) State() CircuitState {
- failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis)))
+ failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load()))
if failingDuration > b.graceDuration {
b.halfOpen.CompareAndSwap(false, true)
}
if b.doomDuration > 0 && failingDuration > b.doomDuration {
b.open.CompareAndSwap(false, true)
}
- if b.open.Load().(bool) {
+ if b.open.Load() {
return CircuitOpen
- } else if b.halfOpen.Load().(bool) {
+ } else if b.halfOpen.Load() {
return CircuitHalfOpen
}
return CircuitClosed
@@ -66,11 +66,11 @@ func (b *timeCircuitBreaker) State() CircuitState {
func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker {
b := &timeCircuitBreaker{
- graceDuration: graceDuration,
- doomDuration: doomDuration,
- now: time.Now,
- failingSinceMillis: math.MaxInt64,
+ graceDuration: graceDuration,
+ doomDuration: doomDuration,
+ now: time.Now,
}
+ b.failingSinceMillis.Store(math.MaxInt64)
b.open.Store(false)
b.halfOpen.Store(false)
return b
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index ce57ac55f03..e083f017c4a 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -46,7 +46,7 @@ type Client struct {
options ClientOptions
httpClients []countingHTTPClient
now func() time.Time
- sendCount int32
+ sendCount atomic.Int32
gzippers sync.Pool
buffers sync.Pool
pending chan *pendingDocument
@@ -65,13 +65,11 @@ type ClientOptions struct {
type countingHTTPClient struct {
client util.HTTPClient
- inflight int64
+ inflight atomic.Int64
}
-func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) }
-
func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) {
- defer c.addInflight(-1)
+ defer c.inflight.Add(-1)
return c.client.Do(req, timeout)
}
@@ -186,18 +184,18 @@ func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) {
func (c *Client) leastBusyClient() *countingHTTPClient {
leastBusy := c.httpClients[0]
min := int64(math.MaxInt64)
- next := atomic.AddInt32(&c.sendCount, 1)
+ next := c.sendCount.Add(1)
start := int(next) % len(c.httpClients)
for i := range c.httpClients {
j := (i + start) % len(c.httpClients)
client := c.httpClients[j]
- inflight := atomic.LoadInt64(&client.inflight)
+ inflight := client.inflight.Load()
if inflight < min {
leastBusy = client
min = inflight
}
}
- leastBusy.addInflight(1)
+ leastBusy.inflight.Add(1)
return &leastBusy
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 6eda5f04fd6..1bc3a6c9f39 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -36,13 +36,13 @@ func TestLeastBusyClient(t *testing.T) {
httpClients = append(httpClients, &mockHTTPClient{i, &httpClient})
}
client, _ := NewClient(ClientOptions{}, httpClients)
- client.httpClients[0].addInflight(1)
- client.httpClients[1].addInflight(1)
+ client.httpClients[0].inflight.Add(1)
+ client.httpClients[1].inflight.Add(1)
assertLeastBusy(t, 2, client)
assertLeastBusy(t, 2, client)
assertLeastBusy(t, 3, client)
- client.httpClients[3].addInflight(1)
- client.httpClients[1].addInflight(-1)
+ client.httpClients[3].inflight.Add(1)
+ client.httpClients[1].inflight.Add(-1)
assertLeastBusy(t, 1, client)
}