Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/asset/btc/livetest/livetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func Run(t *testing.T, cfg *Config) {
}
}

var blockReported uint32
var blockReported atomic.Uint32
blkFunc := func(name string) {
atomic.StoreUint32(&blockReported, 1)
blockReported.Store(1)
tLogger.Infof("%s has reported a new block", name)
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func Run(t *testing.T, cfg *Config) {

// Mine a block and find the redemption again.
mine()
if atomic.LoadUint32(&blockReported) == 0 {
if blockReported.Load() == 0 {
t.Fatalf("no block reported")
}
// Check that there is 1 confirmation on the swap
Expand Down
10 changes: 5 additions & 5 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type WsCfg struct {
type wsConn struct {
// 64-bit atomic variables first. See
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
rID uint64
rID atomic.Uint64
cancel context.CancelFunc
wg sync.WaitGroup
log dex.Logger
Expand All @@ -168,7 +168,7 @@ type wsConn struct {
writeMtx sync.Mutex
ws *websocket.Conn

connectionStatus uint32 // atomic
connectionStatus atomic.Uint32 // atomic

reqMtx sync.RWMutex
respHandlers map[uint64]*responseHandler
Expand Down Expand Up @@ -229,13 +229,13 @@ func (conn *wsConn) url() string {

// IsDown indicates if the connection is known to be down.
func (conn *wsConn) IsDown() bool {
return atomic.LoadUint32(&conn.connectionStatus) != uint32(Connected)
return conn.connectionStatus.Load() != uint32(Connected)
}

// setConnectionStatus updates the connection's status and runs the
// ConnectEventFunc in case of a change.
func (conn *wsConn) setConnectionStatus(status ConnectionStatus) {
oldStatus := atomic.SwapUint32(&conn.connectionStatus, uint32(status))
oldStatus := conn.connectionStatus.Swap(uint32(status))
statusChange := oldStatus != uint32(status)
if statusChange && conn.cfg.ConnectEventFunc != nil {
conn.cfg.ConnectEventFunc(status)
Expand Down Expand Up @@ -544,7 +544,7 @@ func (conn *wsConn) keepAlive(ctx context.Context) {

// NextID returns the next request id.
func (conn *wsConn) NextID() uint64 {
return atomic.AddUint64(&conn.rID, 1)
return conn.rID.Add(1)
}

// Connect connects the client. Any error encountered during the initial
Expand Down
4 changes: 2 additions & 2 deletions client/comms/wsconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func TestWsConn(t *testing.T) {
clientMtx.Unlock()
}()

var id uint64
var id atomic.Uint64
// server's "/ws" handler
handler := func(w http.ResponseWriter, r *http.Request) {
t.Helper()
id := atomic.AddUint64(&id, 1) // shadow id
id := id.Add(1) // shadow id
hCtx, hCancel := context.WithCancel(ctx)

c, err := upgrader.Upgrade(w, r, nil)
Expand Down
16 changes: 8 additions & 8 deletions client/orderbook/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type OrderBook struct {
// feeRates is at the top to account for atomic field alignment in
// 32-bit systems. See also https://golang.org/pkg/sync/atomic/#pkg-note-BUG
feeRates struct {
base uint64
quote uint64
base atomic.Uint64
quote atomic.Uint64
}

log dex.Logger
Expand Down Expand Up @@ -119,12 +119,12 @@ func NewOrderBook(logger dex.Logger) *OrderBook {

// BaseFeeRate is the last reported base asset fee rate.
func (ob *OrderBook) BaseFeeRate() uint64 {
return atomic.LoadUint64(&ob.feeRates.base)
return ob.feeRates.base.Load()
}

// QuoteFeeRate is the last reported quote asset fee rate.
func (ob *OrderBook) QuoteFeeRate() uint64 {
return atomic.LoadUint64(&ob.feeRates.quote)
return ob.feeRates.quote.Load()
}

// setSynced sets the synced state of the order book.
Expand Down Expand Up @@ -243,8 +243,8 @@ func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error {
ob.seq = snapshot.Seq
ob.seqMtx.Unlock()

atomic.StoreUint64(&ob.feeRates.base, snapshot.BaseFeeRate)
atomic.StoreUint64(&ob.feeRates.quote, snapshot.QuoteFeeRate)
ob.feeRates.base.Store(snapshot.BaseFeeRate)
ob.feeRates.quote.Store(snapshot.QuoteFeeRate)

ob.marketID = snapshot.MarketID

Expand Down Expand Up @@ -427,8 +427,8 @@ func (ob *OrderBook) UpdateRemaining(note *msgjson.UpdateRemainingNote) error {
// the future.
func (ob *OrderBook) LogEpochReport(note *msgjson.EpochReportNote) error {
// TODO: update future candlestick charts.
atomic.StoreUint64(&ob.feeRates.base, note.BaseFeeRate)
atomic.StoreUint64(&ob.feeRates.quote, note.QuoteFeeRate)
ob.feeRates.base.Store(note.BaseFeeRate)
ob.feeRates.quote.Store(note.QuoteFeeRate)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions client/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
// to facilitate testing.
pingPeriod = (pongWait * 9) / 10
// A client id counter.
cidCounter int32
cidCounter atomic.Int32
)

type bookFeed struct {
Expand All @@ -50,7 +50,7 @@ type wsClient struct {
func newWSClient(addr string, conn ws.Connection, hndlr func(msg *msgjson.Message) *msgjson.Error, logger dex.Logger) *wsClient {
return &wsClient{
WSLink: ws.NewWSLink(addr, conn, pingPeriod, hndlr, logger),
cid: atomic.AddInt32(&cidCounter, 1),
cid: cidCounter.Add(1),
}
}

Expand Down
8 changes: 4 additions & 4 deletions dex/ws/wslink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type WSLink struct {
conn Connection
// on is used internally to prevent multiple Close calls on the underlying
// connections.
on uint32
on atomic.Uint32
// quit is used to cancel the Context.
quit context.CancelFunc
// stopped is closed when quit is called.
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *WSLink) Connect(ctx context.Context) (*sync.WaitGroup, error) {
// started. The pong handler will set subsequent read deadlines. 2x ping
// period is a very generous initial pong wait; the readWait provided to
// NewConnection could be stored and used here (once) instead.
if !atomic.CompareAndSwapUint32(&c.on, 0, 1) {
if !c.on.CompareAndSwap(0, 1) {
return nil, fmt.Errorf("attempted to Start a running WSLink")
}
linkCtx, quit := context.WithCancel(ctx)
Expand All @@ -197,7 +197,7 @@ func (c *WSLink) Connect(ctx context.Context) (*sync.WaitGroup, error) {

func (c *WSLink) stop() {
// Flip the switch into the off position and cancel the context.
if !atomic.CompareAndSwapUint32(&c.on, 1, 0) {
if !c.on.CompareAndSwap(1, 0) {
return
}
// Signal to senders we are done.
Expand Down Expand Up @@ -497,7 +497,7 @@ out:

// Off will return true if the link has disconnected.
func (c *WSLink) Off() bool {
return atomic.LoadUint32(&c.on) == 0
return c.on.Load() == 0
}

// Addr returns the string-encoded IP address.
Expand Down
8 changes: 4 additions & 4 deletions dex/ws/wslink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ var tLogger = dex.StdOutLogger("ws_TEST", dex.LevelTrace)
type ConnStub struct {
inMsg chan []byte
inErr chan error
closed int32
closed atomic.Int32
}

func (c *ConnStub) Close() error {
// make ReadMessage return with a close error
atomic.StoreInt32(&c.closed, 1)
c.closed.Store(1)
c.inErr <- &websocket.CloseError{
Code: websocket.CloseNormalClosure,
Text: "bye",
Expand All @@ -45,7 +45,7 @@ func (c *ConnStub) SetWriteDeadline(t time.Time) error {
return nil
}
func (c *ConnStub) ReadMessage() (int, []byte, error) {
if atomic.LoadInt32(&c.closed) == 1 {
if c.closed.Load() == 1 {
return 0, nil, &websocket.CloseError{
Code: websocket.CloseAbnormalClosure,
Text: io.ErrUnexpectedEOF.Error(),
Expand All @@ -71,7 +71,7 @@ func microSecDelay(stdDev float64, min int64) time.Duration {
var lastID int64 = -1 // first msg.ID should be 0

func (c *ConnStub) WriteMessage(_ int, b []byte) error {
if atomic.LoadInt32(&c.closed) == 1 {
if c.closed.Load() == 1 {
return websocket.ErrCloseSent
}
msg, err := msgjson.DecodeMessage(b)
Expand Down
10 changes: 5 additions & 5 deletions server/admin/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type TCore struct {
epochOrdersErr error
marketMatches []*dexsrv.MatchData
marketMatchesErr error
dataEnabled uint32
dataEnabled atomic.Uint32
}

func (c *TCore) ConfigMsg() json.RawMessage { return nil }
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *TCore) EnableDataAPI(yes bool) {
if yes {
v = 1
}
atomic.StoreUint32(&c.dataEnabled, v)
c.dataEnabled.Store(v)
}

type tResponseWriter struct {
Expand Down Expand Up @@ -1367,11 +1367,11 @@ func TestEnableDataAPI(t *testing.T) {
t.Fatalf("%q: apiEnableDataAPI returned code %d, expected %d", test.name, w.Code, test.wantCode)
}

if test.wantEnabled != atomic.LoadUint32(&core.dataEnabled) {
t.Fatalf("%q: apiEnableDataAPI expected dataEnabled = %d, got %d", test.name, test.wantEnabled, atomic.LoadUint32(&core.dataEnabled))
if test.wantEnabled != core.dataEnabled.Load() {
t.Fatalf("%q: apiEnableDataAPI expected dataEnabled = %d, got %d", test.name, test.wantEnabled, core.dataEnabled.Load())
}

atomic.StoreUint32(&core.dataEnabled, 0)
core.dataEnabled.Store(0)
}

}
4 changes: 2 additions & 2 deletions server/apidata/apidata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var (
// Our internal millisecond representation of the bin sizes.
binSizes []uint64
started uint32
started atomic.Uint32
)

// DBSource is a source of persistent data. DBSource is used to prime the
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewDataAPI(dbSrc DBSource, registerHTTP func(route string, handler comms.HT
marketCaches: make(map[string]map[uint64]*cacheWithStoredTime),
}

if atomic.CompareAndSwapUint32(&started, 0, 1) {
if started.CompareAndSwap(0, 1) {
registerHTTP(msgjson.SpotsRoute, s.handleSpots)
registerHTTP(msgjson.CandlesRoute, s.handleCandles)
registerHTTP(msgjson.OrderBookRoute, s.handleOrderBook)
Expand Down
8 changes: 4 additions & 4 deletions server/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (s *TStorage) AddMatchOutcome(ctx context.Context, user account.AccountID,
return nil, nil
}

var dbIDCounter int64
var dbIDCounter atomic.Int64

func nextDBID() int64 {
return atomic.AddInt64(&dbIDCounter, 1)
return dbIDCounter.Add(1)
}

func (s *TStorage) AddOrderOutcome(ctx context.Context, user account.AccountID, oid order.OrderID, canceled bool) (*db.OrderOutcome, error) {
Expand Down Expand Up @@ -203,7 +203,7 @@ type TRPCClient struct {
banished bool
sends []*msgjson.Message
reqs []*tReq
on uint32
on atomic.Uint32
closed chan struct{}
}

Expand Down Expand Up @@ -243,7 +243,7 @@ func (c *TRPCClient) Done() <-chan struct{} {
return c.closed
}
func (c *TRPCClient) Disconnect() {
if atomic.CompareAndSwapUint32(&c.on, 0, 1) {
if c.on.CompareAndSwap(0, 1) {
close(c.closed)
}
}
Expand Down
18 changes: 9 additions & 9 deletions server/comms/comms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ func TestClientRequests(t *testing.T) {
c.Banish()
return nil
})
var httpSeen uint32
var httpSeen atomic.Uint32
server.RegisterHTTP("httproute", func(thing any) (any, error) {
atomic.StoreUint32(&httpSeen, 1)
httpSeen.Store(1)
srvChan <- nil
return struct{}{}, nil
})
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestClientRequests(t *testing.T) {
conn.addChan()
sendToServer("httproute", "{}")
readChannel(t, "httproute", srvChan)
if !atomic.CompareAndSwapUint32(&httpSeen, 1, 0) {
if !httpSeen.CompareAndSwap(1, 0) {
t.Fatalf("HTTP route not hit")
}
conn.wait(t, "http route success")
Expand All @@ -508,15 +508,15 @@ func TestClientRequests(t *testing.T) {
if resp.Error == nil || resp.Error.Code != msgjson.TooManyRequestsError {
t.Fatalf("no or incorrect error for disabled HTTP route: %v", resp.Error)
}
if atomic.CompareAndSwapUint32(&httpSeen, 1, 0) {
if httpSeen.CompareAndSwap(1, 0) {
t.Fatalf("disabled HTTP route hit")
}

// Make the route a critical route
criticalRoutes["httproute"] = true
sendToServer("httproute", "{}")
readChannel(t, "httproute", srvChan)
if !atomic.CompareAndSwapUint32(&httpSeen, 1, 0) {
if !httpSeen.CompareAndSwap(1, 0) {
t.Fatalf("critical HTTP route not hit")
}
conn.wait(t, "critical http route success")
Expand Down Expand Up @@ -908,11 +908,11 @@ func TestParseListeners(t *testing.T) {
}

type tHTTPHandler struct {
count uint32
count atomic.Uint32
}

func (h *tHTTPHandler) ServeHTTP(http.ResponseWriter, *http.Request) {
atomic.AddUint32(&h.count, 1)
h.count.Add(1)
}

func TestHTTPRateLimiter(t *testing.T) {
Expand All @@ -928,7 +928,7 @@ func TestHTTPRateLimiter(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)
f.ServeHTTP(recorder, req)
successes := atomic.LoadUint32(&tHandler.count)
successes := tHandler.count.Load()
if successes != uint32(DefaultIPBurstSize) {
t.Fatalf("expected %d requests. got %d", DefaultIPBurstSize, successes)
}
Expand Down Expand Up @@ -994,7 +994,7 @@ func TestXForwardedForIgnored(t *testing.T) {
f.ServeHTTP(httptest.NewRecorder(), req)
}

successes := atomic.LoadUint32(&tHandler.count)
successes := tHandler.count.Load()
if successes != uint32(DefaultIPBurstSize) {
t.Fatalf("expected %d successes (rate limited by real IP), got %d", DefaultIPBurstSize, successes)
}
Expand Down
4 changes: 2 additions & 2 deletions server/comms/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
pingPeriod = (pongWait * 9) / 10 // i.e. 18 sec
)

var idCounter uint64
var idCounter atomic.Uint64

// ipRateLimiter is used to track an IPs HTTP request rate.
type ipRateLimiter struct {
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s *Server) getIPLimiter(ip dex.IPKey) *ipRateLimiter {

// NextID returns a unique ID to identify a request-type message.
func NextID() uint64 {
return atomic.AddUint64(&idCounter, 1)
return idCounter.Add(1)
}

// MsgHandler describes a handler for a specific message route.
Expand Down
Loading