diff --git a/client/asset/btc/livetest/livetest.go b/client/asset/btc/livetest/livetest.go index d6199a7d72..f58978a0eb 100644 --- a/client/asset/btc/livetest/livetest.go +++ b/client/asset/btc/livetest/livetest.go @@ -188,9 +188,9 @@ func Run(t *testing.T, cfg *Config) { } } - var blockReported uint32 + var blockReported atomic.Uint32 blkFunc := func(name string) { - atomic.StoreUint32(&blockReported, 1) + blockReported.Store(1) tLogger.Infof("%s has reported a new block", name) } @@ -500,7 +500,7 @@ func Run(t *testing.T, cfg *Config) { // Mine a block and find the redemption again. mine() - if atomic.LoadUint32(&blockReported) == 0 { + if blockReported.Load() == 0 { t.Fatalf("no block reported") } // Check that there is 1 confirmation on the swap diff --git a/client/comms/wsconn.go b/client/comms/wsconn.go index 925778fad3..924b0ece88 100644 --- a/client/comms/wsconn.go +++ b/client/comms/wsconn.go @@ -155,7 +155,7 @@ type WsCfg struct { type wsConn struct { // 64-bit atomic variables first. See // https://golang.org/pkg/sync/atomic/#pkg-note-BUG. - rID uint64 + rID atomic.Uint64 cancel context.CancelFunc wg sync.WaitGroup log dex.Logger @@ -168,7 +168,7 @@ type wsConn struct { writeMtx sync.Mutex ws *websocket.Conn - connectionStatus uint32 // atomic + connectionStatus atomic.Uint32 // atomic reqMtx sync.RWMutex respHandlers map[uint64]*responseHandler @@ -229,13 +229,13 @@ func (conn *wsConn) url() string { // IsDown indicates if the connection is known to be down. func (conn *wsConn) IsDown() bool { - return atomic.LoadUint32(&conn.connectionStatus) != uint32(Connected) + return conn.connectionStatus.Load() != uint32(Connected) } // setConnectionStatus updates the connection's status and runs the // ConnectEventFunc in case of a change. func (conn *wsConn) setConnectionStatus(status ConnectionStatus) { - oldStatus := atomic.SwapUint32(&conn.connectionStatus, uint32(status)) + oldStatus := conn.connectionStatus.Swap(uint32(status)) statusChange := oldStatus != uint32(status) if statusChange && conn.cfg.ConnectEventFunc != nil { conn.cfg.ConnectEventFunc(status) @@ -544,7 +544,7 @@ func (conn *wsConn) keepAlive(ctx context.Context) { // NextID returns the next request id. func (conn *wsConn) NextID() uint64 { - return atomic.AddUint64(&conn.rID, 1) + return conn.rID.Add(1) } // Connect connects the client. Any error encountered during the initial diff --git a/client/comms/wsconn_test.go b/client/comms/wsconn_test.go index 0912f25fb7..7e3ac4bf1a 100644 --- a/client/comms/wsconn_test.go +++ b/client/comms/wsconn_test.go @@ -85,11 +85,11 @@ func TestWsConn(t *testing.T) { clientMtx.Unlock() }() - var id uint64 + var id atomic.Uint64 // server's "/ws" handler handler := func(w http.ResponseWriter, r *http.Request) { t.Helper() - id := atomic.AddUint64(&id, 1) // shadow id + id := id.Add(1) // shadow id hCtx, hCancel := context.WithCancel(ctx) c, err := upgrader.Upgrade(w, r, nil) diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index 463e58d3cd..0645cc44fd 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -73,8 +73,8 @@ type OrderBook struct { // feeRates is at the top to account for atomic field alignment in // 32-bit systems. See also https://golang.org/pkg/sync/atomic/#pkg-note-BUG feeRates struct { - base uint64 - quote uint64 + base atomic.Uint64 + quote atomic.Uint64 } log dex.Logger @@ -119,12 +119,12 @@ func NewOrderBook(logger dex.Logger) *OrderBook { // BaseFeeRate is the last reported base asset fee rate. func (ob *OrderBook) BaseFeeRate() uint64 { - return atomic.LoadUint64(&ob.feeRates.base) + return ob.feeRates.base.Load() } // QuoteFeeRate is the last reported quote asset fee rate. func (ob *OrderBook) QuoteFeeRate() uint64 { - return atomic.LoadUint64(&ob.feeRates.quote) + return ob.feeRates.quote.Load() } // setSynced sets the synced state of the order book. @@ -243,8 +243,8 @@ func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error { ob.seq = snapshot.Seq ob.seqMtx.Unlock() - atomic.StoreUint64(&ob.feeRates.base, snapshot.BaseFeeRate) - atomic.StoreUint64(&ob.feeRates.quote, snapshot.QuoteFeeRate) + ob.feeRates.base.Store(snapshot.BaseFeeRate) + ob.feeRates.quote.Store(snapshot.QuoteFeeRate) ob.marketID = snapshot.MarketID @@ -427,8 +427,8 @@ func (ob *OrderBook) UpdateRemaining(note *msgjson.UpdateRemainingNote) error { // the future. func (ob *OrderBook) LogEpochReport(note *msgjson.EpochReportNote) error { // TODO: update future candlestick charts. - atomic.StoreUint64(&ob.feeRates.base, note.BaseFeeRate) - atomic.StoreUint64(&ob.feeRates.quote, note.QuoteFeeRate) + ob.feeRates.base.Store(note.BaseFeeRate) + ob.feeRates.quote.Store(note.QuoteFeeRate) return nil } diff --git a/client/websocket/websocket.go b/client/websocket/websocket.go index 6a5528e948..b747e69857 100644 --- a/client/websocket/websocket.go +++ b/client/websocket/websocket.go @@ -28,7 +28,7 @@ var ( // to facilitate testing. pingPeriod = (pongWait * 9) / 10 // A client id counter. - cidCounter int32 + cidCounter atomic.Int32 ) type bookFeed struct { @@ -50,7 +50,7 @@ type wsClient struct { func newWSClient(addr string, conn ws.Connection, hndlr func(msg *msgjson.Message) *msgjson.Error, logger dex.Logger) *wsClient { return &wsClient{ WSLink: ws.NewWSLink(addr, conn, pingPeriod, hndlr, logger), - cid: atomic.AddInt32(&cidCounter, 1), + cid: cidCounter.Add(1), } } diff --git a/dex/ws/wslink.go b/dex/ws/wslink.go index 14ddee8f7e..d70e0df913 100644 --- a/dex/ws/wslink.go +++ b/dex/ws/wslink.go @@ -63,7 +63,7 @@ type WSLink struct { conn Connection // on is used internally to prevent multiple Close calls on the underlying // connections. - on uint32 + on atomic.Uint32 // quit is used to cancel the Context. quit context.CancelFunc // stopped is closed when quit is called. @@ -172,7 +172,7 @@ func (c *WSLink) Connect(ctx context.Context) (*sync.WaitGroup, error) { // started. The pong handler will set subsequent read deadlines. 2x ping // period is a very generous initial pong wait; the readWait provided to // NewConnection could be stored and used here (once) instead. - if !atomic.CompareAndSwapUint32(&c.on, 0, 1) { + if !c.on.CompareAndSwap(0, 1) { return nil, fmt.Errorf("attempted to Start a running WSLink") } linkCtx, quit := context.WithCancel(ctx) @@ -197,7 +197,7 @@ func (c *WSLink) Connect(ctx context.Context) (*sync.WaitGroup, error) { func (c *WSLink) stop() { // Flip the switch into the off position and cancel the context. - if !atomic.CompareAndSwapUint32(&c.on, 1, 0) { + if !c.on.CompareAndSwap(1, 0) { return } // Signal to senders we are done. @@ -497,7 +497,7 @@ out: // Off will return true if the link has disconnected. func (c *WSLink) Off() bool { - return atomic.LoadUint32(&c.on) == 0 + return c.on.Load() == 0 } // Addr returns the string-encoded IP address. diff --git a/dex/ws/wslink_test.go b/dex/ws/wslink_test.go index 41d0079295..4aaaadd407 100644 --- a/dex/ws/wslink_test.go +++ b/dex/ws/wslink_test.go @@ -25,12 +25,12 @@ var tLogger = dex.StdOutLogger("ws_TEST", dex.LevelTrace) type ConnStub struct { inMsg chan []byte inErr chan error - closed int32 + closed atomic.Int32 } func (c *ConnStub) Close() error { // make ReadMessage return with a close error - atomic.StoreInt32(&c.closed, 1) + c.closed.Store(1) c.inErr <- &websocket.CloseError{ Code: websocket.CloseNormalClosure, Text: "bye", @@ -45,7 +45,7 @@ func (c *ConnStub) SetWriteDeadline(t time.Time) error { return nil } func (c *ConnStub) ReadMessage() (int, []byte, error) { - if atomic.LoadInt32(&c.closed) == 1 { + if c.closed.Load() == 1 { return 0, nil, &websocket.CloseError{ Code: websocket.CloseAbnormalClosure, Text: io.ErrUnexpectedEOF.Error(), @@ -71,7 +71,7 @@ func microSecDelay(stdDev float64, min int64) time.Duration { var lastID int64 = -1 // first msg.ID should be 0 func (c *ConnStub) WriteMessage(_ int, b []byte) error { - if atomic.LoadInt32(&c.closed) == 1 { + if c.closed.Load() == 1 { return websocket.ErrCloseSent } msg, err := msgjson.DecodeMessage(b) diff --git a/server/admin/server_test.go b/server/admin/server_test.go index 9745bea9d1..f1ff2a8977 100644 --- a/server/admin/server_test.go +++ b/server/admin/server_test.go @@ -67,7 +67,7 @@ type TCore struct { epochOrdersErr error marketMatches []*dexsrv.MatchData marketMatchesErr error - dataEnabled uint32 + dataEnabled atomic.Uint32 } func (c *TCore) ConfigMsg() json.RawMessage { return nil } @@ -179,7 +179,7 @@ func (c *TCore) EnableDataAPI(yes bool) { if yes { v = 1 } - atomic.StoreUint32(&c.dataEnabled, v) + c.dataEnabled.Store(v) } type tResponseWriter struct { @@ -1367,11 +1367,11 @@ func TestEnableDataAPI(t *testing.T) { t.Fatalf("%q: apiEnableDataAPI returned code %d, expected %d", test.name, w.Code, test.wantCode) } - if test.wantEnabled != atomic.LoadUint32(&core.dataEnabled) { - t.Fatalf("%q: apiEnableDataAPI expected dataEnabled = %d, got %d", test.name, test.wantEnabled, atomic.LoadUint32(&core.dataEnabled)) + if test.wantEnabled != core.dataEnabled.Load() { + t.Fatalf("%q: apiEnableDataAPI expected dataEnabled = %d, got %d", test.name, test.wantEnabled, core.dataEnabled.Load()) } - atomic.StoreUint32(&core.dataEnabled, 0) + core.dataEnabled.Store(0) } } diff --git a/server/apidata/apidata.go b/server/apidata/apidata.go index b2e5c611c3..16e6182456 100644 --- a/server/apidata/apidata.go +++ b/server/apidata/apidata.go @@ -20,7 +20,7 @@ import ( var ( // Our internal millisecond representation of the bin sizes. binSizes []uint64 - started uint32 + started atomic.Uint32 ) // DBSource is a source of persistent data. DBSource is used to prime the @@ -72,7 +72,7 @@ func NewDataAPI(dbSrc DBSource, registerHTTP func(route string, handler comms.HT marketCaches: make(map[string]map[uint64]*cacheWithStoredTime), } - if atomic.CompareAndSwapUint32(&started, 0, 1) { + if started.CompareAndSwap(0, 1) { registerHTTP(msgjson.SpotsRoute, s.handleSpots) registerHTTP(msgjson.CandlesRoute, s.handleCandles) registerHTTP(msgjson.OrderBookRoute, s.handleOrderBook) diff --git a/server/auth/auth_test.go b/server/auth/auth_test.go index c79dcfceb3..dd577f7169 100644 --- a/server/auth/auth_test.go +++ b/server/auth/auth_test.go @@ -148,10 +148,10 @@ func (s *TStorage) AddMatchOutcome(ctx context.Context, user account.AccountID, return nil, nil } -var dbIDCounter int64 +var dbIDCounter atomic.Int64 func nextDBID() int64 { - return atomic.AddInt64(&dbIDCounter, 1) + return dbIDCounter.Add(1) } func (s *TStorage) AddOrderOutcome(ctx context.Context, user account.AccountID, oid order.OrderID, canceled bool) (*db.OrderOutcome, error) { @@ -203,7 +203,7 @@ type TRPCClient struct { banished bool sends []*msgjson.Message reqs []*tReq - on uint32 + on atomic.Uint32 closed chan struct{} } @@ -243,7 +243,7 @@ func (c *TRPCClient) Done() <-chan struct{} { return c.closed } func (c *TRPCClient) Disconnect() { - if atomic.CompareAndSwapUint32(&c.on, 0, 1) { + if c.on.CompareAndSwap(0, 1) { close(c.closed) } } diff --git a/server/comms/comms_test.go b/server/comms/comms_test.go index 8382193493..c0b9e6cd34 100644 --- a/server/comms/comms_test.go +++ b/server/comms/comms_test.go @@ -383,9 +383,9 @@ func TestClientRequests(t *testing.T) { c.Banish() return nil }) - var httpSeen uint32 + var httpSeen atomic.Uint32 server.RegisterHTTP("httproute", func(thing any) (any, error) { - atomic.StoreUint32(&httpSeen, 1) + httpSeen.Store(1) srvChan <- nil return struct{}{}, nil }) @@ -496,7 +496,7 @@ func TestClientRequests(t *testing.T) { conn.addChan() sendToServer("httproute", "{}") readChannel(t, "httproute", srvChan) - if !atomic.CompareAndSwapUint32(&httpSeen, 1, 0) { + if !httpSeen.CompareAndSwap(1, 0) { t.Fatalf("HTTP route not hit") } conn.wait(t, "http route success") @@ -508,7 +508,7 @@ func TestClientRequests(t *testing.T) { if resp.Error == nil || resp.Error.Code != msgjson.TooManyRequestsError { t.Fatalf("no or incorrect error for disabled HTTP route: %v", resp.Error) } - if atomic.CompareAndSwapUint32(&httpSeen, 1, 0) { + if httpSeen.CompareAndSwap(1, 0) { t.Fatalf("disabled HTTP route hit") } @@ -516,7 +516,7 @@ func TestClientRequests(t *testing.T) { criticalRoutes["httproute"] = true sendToServer("httproute", "{}") readChannel(t, "httproute", srvChan) - if !atomic.CompareAndSwapUint32(&httpSeen, 1, 0) { + if !httpSeen.CompareAndSwap(1, 0) { t.Fatalf("critical HTTP route not hit") } conn.wait(t, "critical http route success") @@ -908,11 +908,11 @@ func TestParseListeners(t *testing.T) { } type tHTTPHandler struct { - count uint32 + count atomic.Uint32 } func (h *tHTTPHandler) ServeHTTP(http.ResponseWriter, *http.Request) { - atomic.AddUint32(&h.count, 1) + h.count.Add(1) } func TestHTTPRateLimiter(t *testing.T) { @@ -928,7 +928,7 @@ func TestHTTPRateLimiter(t *testing.T) { } time.Sleep(100 * time.Millisecond) f.ServeHTTP(recorder, req) - successes := atomic.LoadUint32(&tHandler.count) + successes := tHandler.count.Load() if successes != uint32(DefaultIPBurstSize) { t.Fatalf("expected %d requests. got %d", DefaultIPBurstSize, successes) } @@ -994,7 +994,7 @@ func TestXForwardedForIgnored(t *testing.T) { f.ServeHTTP(httptest.NewRecorder(), req) } - successes := atomic.LoadUint32(&tHandler.count) + successes := tHandler.count.Load() if successes != uint32(DefaultIPBurstSize) { t.Fatalf("expected %d successes (rate limited by real IP), got %d", DefaultIPBurstSize, successes) } diff --git a/server/comms/server.go b/server/comms/server.go index 127c313d64..2bad34ce50 100644 --- a/server/comms/server.go +++ b/server/comms/server.go @@ -71,7 +71,7 @@ var ( pingPeriod = (pongWait * 9) / 10 // i.e. 18 sec ) -var idCounter uint64 +var idCounter atomic.Uint64 // ipRateLimiter is used to track an IPs HTTP request rate. type ipRateLimiter struct { @@ -113,7 +113,7 @@ func (s *Server) getIPLimiter(ip dex.IPKey) *ipRateLimiter { // NextID returns a unique ID to identify a request-type message. func NextID() uint64 { - return atomic.AddUint64(&idCounter, 1) + return idCounter.Add(1) } // MsgHandler describes a handler for a specific message route. diff --git a/server/market/market.go b/server/market/market.go index 265f7117e7..28dd4905f3 100644 --- a/server/market/market.go +++ b/server/market/market.go @@ -134,7 +134,7 @@ type Market struct { runMtx sync.RWMutex running chan struct{} // closed when running (accepting new orders) - up uint32 // Run is called, either waiting for first epoch or running + up atomic.Uint32 // Run is called, either waiting for first epoch or running bookMtx sync.Mutex // guards book and bookEpochIdx book *book.Book @@ -1360,11 +1360,11 @@ func (m *Market) lazy(do func()) { // function using sendToFeeds. func (m *Market) Run(ctx context.Context) { // Prevent multiple incantations of Run. - if !atomic.CompareAndSwapUint32(&m.up, 0, 1) { + if !m.up.CompareAndSwap(0, 1) { log.Errorf("Run: Market not stopped!") return } - defer atomic.StoreUint32(&m.up, 0) + defer m.up.Store(0) var running bool ctxRun, cancel := context.WithCancel(ctx) diff --git a/server/market/routers_test.go b/server/market/routers_test.go index abf5067c1e..a538abc88d 100644 --- a/server/market/routers_test.go +++ b/server/market/routers_test.go @@ -1504,7 +1504,7 @@ type TLink struct { sendErr error sendTrigger chan struct{} banished bool - on uint32 + on atomic.Uint32 closed chan struct{} sendRawErr error } @@ -1592,7 +1592,7 @@ func (conn *TLink) Done() <-chan struct{} { return conn.closed } func (conn *TLink) Disconnect() { - if atomic.CompareAndSwapUint32(&conn.on, 0, 1) { + if conn.on.CompareAndSwap(0, 1) { close(conn.closed) } } diff --git a/server/noderelay/cmd/sourcenode/main.go b/server/noderelay/cmd/sourcenode/main.go index 830094d4a2..c1f6c1a1b7 100644 --- a/server/noderelay/cmd/sourcenode/main.go +++ b/server/noderelay/cmd/sourcenode/main.go @@ -136,10 +136,10 @@ func mainErr() (err error) { // Keep track of some basic stats. var stats struct { - requests uint32 - errors uint32 - received uint64 - sent uint64 + requests atomic.Uint32 + errors atomic.Uint32 + received atomic.Uint64 + sent atomic.Uint64 } // Periodically print the node usage statistics. @@ -152,8 +152,8 @@ func mainErr() (err error) { return } log.Infof("%d requests, %.4g MB received, %.4g MB sent, %d errors in %s", - atomic.LoadUint32(&stats.requests), float64(atomic.LoadUint64(&stats.received))/1e6, - float64(atomic.LoadUint64(&stats.sent))/1e6, atomic.LoadUint32(&stats.errors), + stats.requests.Load(), float64(stats.received.Load())/1e6, + float64(stats.sent.Load())/1e6, stats.errors.Load(), time.Since(start)) } }() @@ -209,12 +209,12 @@ func mainErr() (err error) { ConnectEventFunc: func(s comms.ConnectionStatus) {}, Logger: dex.StdOutLogger("CL", dex.LevelDebug), RawHandler: func(b []byte) { - atomic.AddUint64(&stats.received, uint64(len(b))) - atomic.AddUint32(&stats.requests, 1) + stats.received.Add(uint64(len(b))) + stats.requests.Add(1) // Request received from server. var msg noderelay.RelayedMessage if err := json.Unmarshal(b, &msg); err != nil { - atomic.AddUint32(&stats.errors, 1) + stats.errors.Add(1) log.Errorf("json unmarshal error: %v", err) return } @@ -223,7 +223,7 @@ func mainErr() (err error) { defer cancel() req, err := http.NewRequestWithContext(ctx, msg.Method, localNodeURL, bytes.NewReader(msg.Body)) if err != nil { - atomic.AddUint32(&stats.errors, 1) + stats.errors.Add(1) log.Errorf("Error constructing request: %v", err) return } @@ -231,7 +231,7 @@ func mainErr() (err error) { // Send request to local service. resp, err := httpClient.Do(req) if err != nil { - atomic.AddUint32(&stats.errors, 1) + stats.errors.Add(1) log.Errorf("error processing request: %v", err) return } @@ -239,11 +239,11 @@ func mainErr() (err error) { b, err = io.ReadAll(resp.Body) resp.Body.Close() if err != nil { - atomic.AddUint32(&stats.errors, 1) + stats.errors.Add(1) log.Errorf("Error reading response: %v", err) return } - atomic.AddUint64(&stats.sent, uint64(len(b))) + stats.sent.Add(uint64(len(b))) encResp, err := json.Marshal(&noderelay.RelayedMessage{ MessageID: msg.MessageID, Body: b, diff --git a/server/swap/swap.go b/server/swap/swap.go index 06e0a35455..965a8de6e5 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -82,8 +82,8 @@ type swapStatus struct { swapAsset uint32 redeemAsset uint32 - swapSearching uint32 // atomic - redeemSearching uint32 // atomic + swapSearching atomic.Uint32 // atomic + redeemSearching atomic.Uint32 // atomic mtx sync.RWMutex // The time that the swap coordinator sees the transaction. @@ -105,19 +105,19 @@ func (ss *swapStatus) String() string { } func (ss *swapStatus) startSwapSearch() bool { - return atomic.CompareAndSwapUint32(&ss.swapSearching, 0, 1) + return ss.swapSearching.CompareAndSwap(0, 1) } func (ss *swapStatus) endSwapSearch() { - atomic.StoreUint32(&ss.swapSearching, 0) + ss.swapSearching.Store(0) } func (ss *swapStatus) startRedeemSearch() bool { - return atomic.CompareAndSwapUint32(&ss.redeemSearching, 0, 1) + return ss.redeemSearching.CompareAndSwap(0, 1) } func (ss *swapStatus) endRedeemSearch() { - atomic.StoreUint32(&ss.redeemSearching, 0) + ss.redeemSearching.Store(0) } func (ss *swapStatus) swapConfTime() time.Time { diff --git a/server/swap/swap_test.go b/server/swap/swap_test.go index 53c2dc2fdd..1db21accff 100644 --- a/server/swap/swap_test.go +++ b/server/swap/swap_test.go @@ -551,10 +551,10 @@ func TNewAsset(backend asset.Backend, assetID uint32) *asset.BackedAsset { } } -var testMsgID uint64 +var testMsgID atomic.Uint64 func nextID() uint64 { - return atomic.AddUint64(&testMsgID, 1) + return testMsgID.Add(1) } func tNewResponse(id uint64, resp []byte) *msgjson.Message {