diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-22 16:24:08 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-23 08:58:37 +0200 |
commit | 257527e34e0cb37936350d6aa55b38d478b2c6bf (patch) | |
tree | 16b2a8e8b8f5c120b7922259acd592e481636691 | |
parent | 69944390cf45f617fb2250a95e27d08e355da78d (diff) |
Use atomic types
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker.go | 26 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 14 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 8 |
3 files changed, 23 insertions, 25 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go index 17fc595d58f..f7f0f4360df 100644 --- a/client/go/internal/vespa/document/circuit_breaker.go +++ b/client/go/internal/vespa/document/circuit_breaker.go @@ -27,38 +27,38 @@ type timeCircuitBreaker struct { graceDuration time.Duration doomDuration time.Duration - failingSinceMillis int64 + failingSinceMillis atomic.Int64 lastError atomic.Value - halfOpen atomic.Value - open atomic.Value + halfOpen atomic.Bool + open atomic.Bool now func() time.Time } func (b *timeCircuitBreaker) Success() { - atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64) - if !b.open.Load().(bool) { + b.failingSinceMillis.Store(math.MaxInt64) + if !b.open.Load() { b.halfOpen.CompareAndSwap(true, false) } } func (b *timeCircuitBreaker) Error(err error) { - if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) { + if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { b.lastError.Store(err) } } func (b *timeCircuitBreaker) State() CircuitState { - failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis))) + failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load())) if failingDuration > b.graceDuration { b.halfOpen.CompareAndSwap(false, true) } if b.doomDuration > 0 && failingDuration > b.doomDuration { b.open.CompareAndSwap(false, true) } - if b.open.Load().(bool) { + if b.open.Load() { return CircuitOpen - } else if b.halfOpen.Load().(bool) { + } else if b.halfOpen.Load() { return CircuitHalfOpen } return CircuitClosed @@ -66,11 +66,11 @@ func (b *timeCircuitBreaker) State() CircuitState { func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker { b := &timeCircuitBreaker{ - graceDuration: graceDuration, - doomDuration: doomDuration, - now: time.Now, - failingSinceMillis: math.MaxInt64, + graceDuration: graceDuration, + doomDuration: doomDuration, + now: time.Now, } + b.failingSinceMillis.Store(math.MaxInt64) b.open.Store(false) b.halfOpen.Store(false) return b diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index ce57ac55f03..e083f017c4a 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -46,7 +46,7 @@ type Client struct { options ClientOptions httpClients []countingHTTPClient now func() time.Time - sendCount int32 + sendCount atomic.Int32 gzippers sync.Pool buffers sync.Pool pending chan *pendingDocument @@ -65,13 +65,11 @@ type ClientOptions struct { type countingHTTPClient struct { client util.HTTPClient - inflight int64 + inflight atomic.Int64 } -func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) } - func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) { - defer c.addInflight(-1) + defer c.inflight.Add(-1) return c.client.Do(req, timeout) } @@ -186,18 +184,18 @@ func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) { func (c *Client) leastBusyClient() *countingHTTPClient { leastBusy := c.httpClients[0] min := int64(math.MaxInt64) - next := atomic.AddInt32(&c.sendCount, 1) + next := c.sendCount.Add(1) start := int(next) % len(c.httpClients) for i := range c.httpClients { j := (i + start) % len(c.httpClients) client := c.httpClients[j] - inflight := atomic.LoadInt64(&client.inflight) + inflight := client.inflight.Load() if inflight < min { leastBusy = client min = inflight } } - leastBusy.addInflight(1) + leastBusy.inflight.Add(1) return &leastBusy } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 6eda5f04fd6..1bc3a6c9f39 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -36,13 +36,13 @@ func TestLeastBusyClient(t *testing.T) { httpClients = append(httpClients, &mockHTTPClient{i, &httpClient}) } client, _ := NewClient(ClientOptions{}, httpClients) - client.httpClients[0].addInflight(1) - client.httpClients[1].addInflight(1) + client.httpClients[0].inflight.Add(1) + client.httpClients[1].inflight.Add(1) assertLeastBusy(t, 2, client) assertLeastBusy(t, 2, client) assertLeastBusy(t, 3, client) - client.httpClients[3].addInflight(1) - client.httpClients[1].addInflight(-1) + client.httpClients[3].inflight.Add(1) + client.httpClients[1].inflight.Add(-1) assertLeastBusy(t, 1, client) } |