diff --git a/client/core/account.go b/client/core/account.go index fc96efdccb..3bd103dadd 100644 --- a/client/core/account.go +++ b/client/core/account.go @@ -68,7 +68,7 @@ func (c *Core) ToggleAccountStatus(pw []byte, host string, disable bool) error { if disable { // Check active orders or bonds. - if dc.hasActiveOrders() { + if c.hasActiveOrders(host) { return errors.New("cannot disable account with active orders") } @@ -364,7 +364,7 @@ func (c *Core) UpdateDEXHost(oldHost, newHost string, appPW []byte, certI any) ( return nil, err } - if oldDc.hasActiveOrders() { + if c.hasActiveOrders(oldHost) { return nil, fmt.Errorf("cannot update host while dex has active orders") } diff --git a/client/core/account_test.go b/client/core/account_test.go index 62f20bb4a6..e5f07e3dce 100644 --- a/client/core/account_test.go +++ b/client/core/account_test.go @@ -61,7 +61,7 @@ func TestAccountExport(t *testing.T) { func TestToggleAccountStatus(t *testing.T) { activeTrades := map[order.OrderID]*trackedTrade{ - {}: {metaData: &db.OrderMetaData{Status: order.OrderStatusBooked}}, + {}: {metaData: &db.OrderMetaData{Status: order.OrderStatusBooked}, dc: &dexConnection{acct: &dexAccount{host: tDexHost}}}, } tests := []struct { @@ -124,7 +124,7 @@ func TestToggleAccountStatus(t *testing.T) { rig.db.disabledHost = nil rig.db.disableAccountErr = test.disableAcctErr tCore.connMtx.Lock() - tCore.conns[tDexHost].trades = test.activeTrades + tCore.trades = test.activeTrades if test.loseConns { // Lose the dexConnection diff --git a/client/core/bookie.go b/client/core/bookie.go index da944c6c1f..0d58e9a50a 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -739,8 +739,8 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) // Revoke all active orders of the suspended market for the dex. c.log.Warnf("Revoking all active orders for market %s at %s.", sp.MarketID, dc.acct.host) updatedAssets := make(assetMap) - dc.tradeMtx.RLock() - for _, tracker := range dc.trades { + c.tradeMtx.RLock() + for _, tracker := range c.trades { if tracker.Order.Base() == mkt.Base && tracker.Order.Quote() == mkt.Quote && tracker.metaData.Host == dc.acct.host && tracker.status() == order.OrderStatusBooked { // Locally revoke the purged book order. @@ -750,7 +750,7 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) updatedAssets.count(tracker.fromAssetID) } } - dc.tradeMtx.RUnlock() + c.tradeMtx.RUnlock() // Clear the book. book.send(&BookUpdate{ diff --git a/client/core/core.go b/client/core/core.go index 185ded7b8e..409fa82633 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -165,21 +165,6 @@ type dexConnection struct { booksMtx sync.RWMutex books map[string]*bookie - // tradeMtx is used to synchronize access to the trades map. - tradeMtx sync.RWMutex - // trades tracks outstanding orders issued by this client. - trades map[order.OrderID]*trackedTrade - // inFlightOrders tracks orders issued by this client that have not been - // processed by a dex server. - inFlightOrders map[uint64]*InFlightOrder - - // A map linking cancel order IDs to trade order IDs. - cancelsMtx sync.RWMutex - cancels map[order.OrderID]order.OrderID - - blindCancelsMtx sync.Mutex - blindCancels map[order.OrderID]order.Preimage - epochMtx sync.RWMutex epoch map[string]uint64 // resolvedEpoch differs from epoch in that an epoch is not considered @@ -257,22 +242,22 @@ func (dc *dexConnection) bondAssets() (map[uint32]*BondAsset, uint64) { return bondAssets, cfg.BondExpiry } -func (dc *dexConnection) registerCancelLink(cid, oid order.OrderID) { - dc.cancelsMtx.Lock() - dc.cancels[cid] = oid - dc.cancelsMtx.Unlock() +func (c *Core) registerCancelLink(cid, oid order.OrderID) { + c.cancelsMtx.Lock() + c.cancels[cid] = oid + c.cancelsMtx.Unlock() } -func (dc *dexConnection) deleteCancelLink(cid order.OrderID) { - dc.cancelsMtx.Lock() - delete(dc.cancels, cid) - dc.cancelsMtx.Unlock() +func (c *Core) deleteCancelLink(cid order.OrderID) { + c.cancelsMtx.Lock() + delete(c.cancels, cid) + c.cancelsMtx.Unlock() } -func (dc *dexConnection) cancelTradeID(cid order.OrderID) (order.OrderID, bool) { - dc.cancelsMtx.RLock() - defer dc.cancelsMtx.RUnlock() - oid, found := dc.cancels[cid] +func (c *Core) cancelTradeID(cid order.OrderID) (order.OrderID, bool) { + c.cancelsMtx.RLock() + defer c.cancelsMtx.RUnlock() + oid, found := c.cancels[cid] return oid, found } @@ -292,7 +277,7 @@ func (dc *dexConnection) assetConfig(assetID uint32) *dex.Asset { // marketMap creates a map of this DEX's *Market keyed by name/ID, // [base]_[quote]. -func (dc *dexConnection) marketMap() map[string]*Market { +func (c *Core) marketMap(dc *dexConnection) map[string]*Market { dc.cfgMtx.RLock() cfg := dc.cfg dc.cfgMtx.RUnlock() @@ -303,7 +288,7 @@ func (dc *dexConnection) marketMap() map[string]*Market { marketMap := make(map[string]*Market, len(mktConfigs)) for _, msgMkt := range mktConfigs { - mkt := coreMarketFromMsgMarket(dc, msgMkt) + mkt := c.coreMarketFromMsgMarket(dc, msgMkt) marketMap[mkt.marketName()] = mkt } @@ -319,7 +304,7 @@ func (dc *dexConnection) marketMap() map[string]*Market { // marketMap creates a map of this DEX's *Market keyed by name/ID, // [base]_[quote]. -func (dc *dexConnection) coreMarket(mktName string) *Market { +func (c *Core) coreMarket(dc *dexConnection, mktName string) *Market { dc.cfgMtx.RLock() cfg := dc.cfg dc.cfgMtx.RUnlock() @@ -329,7 +314,7 @@ func (dc *dexConnection) coreMarket(mktName string) *Market { var mkt *Market for _, m := range cfg.Markets { if m.Name == mktName { - mkt = coreMarketFromMsgMarket(dc, m) + mkt = c.coreMarketFromMsgMarket(dc, m) break } } @@ -345,7 +330,7 @@ func (dc *dexConnection) coreMarket(mktName string) *Market { return mkt } -func coreMarketFromMsgMarket(dc *dexConnection, msgMkt *msgjson.Market) *Market { +func (c *Core) coreMarketFromMsgMarket(dc *dexConnection, msgMkt *msgjson.Market) *Market { // The presence of the asset for every market was already verified when the // dexConnection was created in connectDEX. dc.assetsMtx.RLock() @@ -370,7 +355,7 @@ func coreMarketFromMsgMarket(dc *dexConnection, msgMkt *msgjson.Market) *Market MinimumRate: dc.minimumMarketRate(quote, msgMkt.LotSize), } - trades, inFlight := dc.marketTrades(mkt.marketName()) + trades, inFlight := c.marketTrades(dc.acct.host, mkt.marketName()) mkt.InFlightOrders = inFlight for _, trade := range trades { @@ -394,28 +379,28 @@ func (dc *dexConnection) minimumMarketRate(q *dex.Asset, lotSize uint64) uint64 var temporaryOrderIDCounter uint64 // storeInFlightOrder stores an inflight order and returns a generated ID. -func (dc *dexConnection) storeInFlightOrder(ord *Order) uint64 { +func (c *Core) storeInFlightOrder(ord *Order) uint64 { tempID := atomic.AddUint64(&temporaryOrderIDCounter, 1) - dc.tradeMtx.Lock() - dc.inFlightOrders[tempID] = &InFlightOrder{ + c.tradeMtx.Lock() + c.inFlightOrders[tempID] = &InFlightOrder{ Order: ord, TemporaryID: tempID, } - dc.tradeMtx.Unlock() + c.tradeMtx.Unlock() return tempID } -func (dc *dexConnection) deleteInFlightOrder(tempID uint64) { - dc.tradeMtx.Lock() - delete(dc.inFlightOrders, tempID) - dc.tradeMtx.Unlock() +func (c *Core) deleteInFlightOrder(tempID uint64) { + c.tradeMtx.Lock() + delete(c.inFlightOrders, tempID) + c.tradeMtx.Unlock() } -func (dc *dexConnection) trackedTrades() []*trackedTrade { - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() - allTrades := make([]*trackedTrade, 0, len(dc.trades)) - for _, trade := range dc.trades { +func (c *Core) trackedTrades() []*trackedTrade { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() + allTrades := make([]*trackedTrade, 0, len(c.trades)) + for _, trade := range c.trades { allTrades = append(allTrades, trade) } return allTrades @@ -423,9 +408,9 @@ func (dc *dexConnection) trackedTrades() []*trackedTrade { // marketTrades returns a slice of active trades in the trades map and a slice // of inflight orders in the inFlightOrders map. -func (dc *dexConnection) marketTrades(mktID string) ([]*trackedTrade, []*InFlightOrder) { +func (c *Core) marketTrades(host string, mktID string) ([]*trackedTrade, []*InFlightOrder) { // Copy trades to avoid locking both tradeMtx and trackedTrade.mtx. - allTrades := dc.trackedTrades() + allTrades := c.trackedTrades() trades := make([]*trackedTrade, 0, len(allTrades)) // may over-allocate for _, trade := range allTrades { if trade.mktID == mktID && trade.isActive() { @@ -434,14 +419,14 @@ func (dc *dexConnection) marketTrades(mktID string) ([]*trackedTrade, []*InFligh // Retiring inactive orders is presently the responsibility of ticker. } - dc.tradeMtx.RLock() - inFlight := make([]*InFlightOrder, 0, len(dc.inFlightOrders)) // may over-allocate - for _, ord := range dc.inFlightOrders { - if ord.MarketID == mktID { + c.tradeMtx.RLock() + inFlight := make([]*InFlightOrder, 0, len(c.inFlightOrders)) // may over-allocate + for _, ord := range c.inFlightOrders { + if ord.MarketID == mktID && ord.Host == host { inFlight = append(inFlight, ord) } } - dc.tradeMtx.RUnlock() + c.tradeMtx.RUnlock() return trades, inFlight } @@ -510,7 +495,7 @@ func (c *Core) exchangeInfo(dc *dexConnection) *Exchange { return &Exchange{ Host: dc.acct.host, AcctID: acctID, - Markets: dc.marketMap(), + Markets: c.marketMap(dc), Assets: assets, BondExpiry: cfg.BondExpiry, BondAssets: bondAssets, @@ -546,17 +531,17 @@ func assetFamily(assetID uint32) map[uint32]bool { // hasActiveAssetOrders checks whether there are any active orders or negotiating // matches for the specified asset. -func (dc *dexConnection) hasActiveAssetOrders(assetID uint32) bool { +func (c *Core) hasActiveAssetOrders(assetID uint32) bool { familial := assetFamily(assetID) - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() - for _, inFlight := range dc.inFlightOrders { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() + for _, inFlight := range c.inFlightOrders { if familial[inFlight.BaseID] || familial[inFlight.QuoteID] { return true } } - for _, trade := range dc.trades { + for _, trade := range c.trades { if (familial[trade.Base()] || familial[trade.Quote()]) && trade.isActive() { return true @@ -567,36 +552,37 @@ func (dc *dexConnection) hasActiveAssetOrders(assetID uint32) bool { } // hasActiveOrders checks whether there are any active orders for the dexConnection. -func (dc *dexConnection) hasActiveOrders() bool { - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() +func (c *Core) hasActiveOrders(host string) bool { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() - if len(dc.inFlightOrders) > 0 { + if len(c.inFlightOrders) > 0 { return true } - for _, trade := range dc.trades { - if trade.isActive() { + for _, trade := range c.trades { + if trade.dc.acct.host == host && trade.isActive() { return true } } return false } -// activeOrders returns a slice of active orders and inflight orders. -func (dc *dexConnection) activeOrders() ([]*Order, []*InFlightOrder) { - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() +// activeOrdersForDEX returns a slice of active orders and inflight orders for +// a DEX. +func (c *Core) activeOrdersForDEX(host string) ([]*Order, []*InFlightOrder) { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() var activeOrders []*Order - for _, trade := range dc.trades { - if trade.isActive() { + for _, trade := range c.trades { + if trade.dc.acct.host == host && trade.isActive() { activeOrders = append(activeOrders, trade.coreOrder()) } } var inflightOrders []*InFlightOrder - for _, ord := range dc.inFlightOrders { + for _, ord := range c.inFlightOrders { inflightOrders = append(inflightOrders, ord) } @@ -605,19 +591,19 @@ func (dc *dexConnection) activeOrders() ([]*Order, []*InFlightOrder) { // findOrder returns the tracker and preimage for an order ID, and a boolean // indicating whether this is a cancel order. -func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, isCancel bool) { - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() +func (c *Core) findOrder(oid order.OrderID) (tracker *trackedTrade, isCancel bool) { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() // Try to find the order as a trade. - if tracker, found := dc.trades[oid]; found { + if tracker, found := c.trades[oid]; found { return tracker, false } - if tid, found := dc.cancelTradeID(oid); found { - if tracker, found := dc.trades[tid]; found { + if tid, found := c.cancelTradeID(oid); found { + if tracker, found := c.trades[tid]; found { return tracker, true } else { - dc.log.Errorf("Did not find trade for cancel order ID %s", oid) + c.log.Errorf("Did not find trade for cancel order ID %s", oid) } } return @@ -675,7 +661,7 @@ func (c *Core) sendCancelOrder(dc *dexConnection, oid order.OrderID, base, quote // tryCancel will look for an order with the specified order ID, and attempt to // cancel the order. It is not an error if the order is not found. func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err error) { - tracker, _ := dc.findOrder(oid) + tracker, _ := c.findOrder(oid) if tracker == nil { return // false, nil } @@ -704,7 +690,7 @@ func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error { if tracker.cancel != nil { // Existing cancel might be stale. Deleting it now allows this // cancel attempt to proceed. - tracker.deleteStaleCancelOrder() + c.deleteStaleCancelOrder(tracker) if tracker.cancel != nil { return fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. "+ @@ -720,7 +706,7 @@ func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error { defer close(commitSig) // Store the cancel order with the tracker. - err = tracker.cancelTrade(co, preImg, mktConf.EpochLen) + err = c.cancelTrade(tracker, co, preImg, mktConf.EpochLen) if err != nil { return fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err) } @@ -794,21 +780,21 @@ type serverMatches struct { // parseMatches sorts the list of matches and associates them with a trade. This // may be called from handleMatchRoute on receipt of a new 'match' request, or // by authDEX with the list of active matches returned by the 'connect' request. -func (dc *dexConnection) parseMatches(msgMatches []*msgjson.Match, checkSigs bool) (map[order.OrderID]*serverMatches, []msgjson.Acknowledgement, error) { +func (c *Core) parseMatches(dc *dexConnection, msgMatches []*msgjson.Match, checkSigs bool) (map[order.OrderID]*serverMatches, []msgjson.Acknowledgement, error) { var acks []msgjson.Acknowledgement matches := make(map[order.OrderID]*serverMatches) var errs []string for _, msgMatch := range msgMatches { var oid order.OrderID copy(oid[:], msgMatch.OrderID) - tracker, isCancel := dc.findOrder(oid) + tracker, isCancel := c.findOrder(oid) if tracker == nil { - dc.blindCancelsMtx.Lock() - _, found := dc.blindCancels[oid] - delete(dc.blindCancels, oid) - dc.blindCancelsMtx.Unlock() + c.blindCancelsMtx.Lock() + _, found := c.blindCancels[oid] + delete(c.blindCancels, oid) + c.blindCancelsMtx.Unlock() if found { // We're done. The targeted order isn't tracked, and we don't need to ack. - dc.log.Infof("Blind cancel order %v matched.", oid) + c.log.Infof("Blind cancel order %v matched.", oid) continue } errs = append(errs, "order "+oid.String()+" not found") @@ -896,7 +882,7 @@ type matchStatusConflict struct { // Reported matches with missing trackers are already checked by parseMatches, // but we also must check for incomplete matches that the server is not // reporting. -func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serverMatches) ( +func (c *Core) compareServerMatches(dc *dexConnection, srvMatches map[order.OrderID]*serverMatches) ( exceptions map[order.OrderID]*matchDiscreps, statusConflicts map[order.OrderID]*matchStatusConflict) { exceptions = make(map[order.OrderID]*matchDiscreps) @@ -957,9 +943,9 @@ func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serv } // Identify active matches that are missing from server's response. - dc.tradeMtx.RLock() - defer dc.tradeMtx.RUnlock() - for oid, trade := range dc.trades { + c.tradeMtx.RLock() + defer c.tradeMtx.RUnlock() + for oid, trade := range c.trades { var activeMatches []*matchTracker for _, m := range trade.activeMatches() { // Server is not expected to report matches that have been fully @@ -997,7 +983,7 @@ func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serv // updateOrderStatus updates the order's status, cleaning up any associated // cancel orders, unlocking funding coins and refund/redemption reserves, and // updating the order in the DB. The trackedTrade's mutex must be write locked. -func (dc *dexConnection) updateOrderStatus(trade *trackedTrade, newStatus order.OrderStatus) { +func (c *Core) updateOrderStatus(dc *dexConnection, trade *trackedTrade, newStatus order.OrderStatus) { oid := trade.ID() previousStatus := trade.metaData.Status if previousStatus == newStatus { // may be expected if no srvOrderStatuses provided @@ -1011,7 +997,7 @@ func (dc *dexConnection) updateOrderStatus(trade *trackedTrade, newStatus order. // canceled, that indicates we submitted a cancel order preimage but // missed the match notification, so the cancel order is executed. if newStatus != order.OrderStatusCanceled { - trade.deleteCancelOrder() + c.deleteCancelOrder(trade) } else if trade.cancel != nil { cid := trade.cancel.ID() err := trade.db.UpdateOrderStatus(cid, order.OrderStatusExecuted) @@ -1046,7 +1032,7 @@ func (dc *dexConnection) updateOrderStatus(trade *trackedTrade, newStatus order. } // syncOrderStatuses requests and updates the status for each of the trades. -func (dc *dexConnection) syncOrderStatuses(orders []*trackedTrade) (reconciledOrdersCount int) { +func (c *Core) syncOrderStatuses(dc *dexConnection, orders []*trackedTrade) (reconciledOrdersCount int) { orderStatusRequests := make([]*msgjson.OrderStatusRequest, len(orders)) tradeMap := make(map[order.OrderID]*trackedTrade, len(orders)) for i, trade := range orders { @@ -1086,7 +1072,7 @@ func (dc *dexConnection) syncOrderStatuses(orders []*trackedTrade) (reconciledOr } reconciledOrdersCount++ trade.mtx.Lock() - dc.updateOrderStatus(trade, order.OrderStatus(srvOrderStatus.Status)) + c.updateOrderStatus(dc, trade, order.OrderStatus(srvOrderStatus.Status)) trade.mtx.Unlock() } @@ -1105,7 +1091,7 @@ reqsLoop: trade := tradeMap[oid] reconciledOrdersCount++ trade.mtx.Lock() - dc.updateOrderStatus(trade, order.OrderStatusRevoked) + c.updateOrderStatus(dc, trade, order.OrderStatusRevoked) trade.mtx.Unlock() } @@ -1129,8 +1115,8 @@ reqsLoop: // Also purges "stale" cancel orders if the targeted order is returned in the // server's `connect` response. See *trackedTrade.deleteStaleCancelOrder for // the definition of a stale cancel order. -func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus) (unknownOrders []order.OrderID, reconciledOrdersCount int) { - dc.tradeMtx.RLock() +func (c *Core) reconcileTrades(dc *dexConnection, srvOrderStatuses []*msgjson.OrderStatus) (unknownOrders []order.OrderID, reconciledOrdersCount int) { + c.tradeMtx.RLock() // Check for unknown orders reported as active by the server. If such // exists, could be that they were known to the client but were thought // to be inactive and thus were not loaded from db or were retired. @@ -1138,7 +1124,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus for _, srvOrderStatus := range srvOrderStatuses { var oid order.OrderID copy(oid[:], srvOrderStatus.ID) - if _, tracked := dc.trades[oid]; tracked { + if _, tracked := c.trades[oid]; tracked { srvActiveOrderStatuses[oid] = srvOrderStatus } else { dc.log.Warnf("Unknown order %v reported by DEX %s as active", oid, dc.acct.host) @@ -1146,7 +1132,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus } } knownActiveTrades := make(map[order.OrderID]*trackedTrade) - for oid, trade := range dc.trades { + for oid, trade := range c.trades { status := trade.status() if status == order.OrderStatusEpoch || status == order.OrderStatusBooked { knownActiveTrades[oid] = trade @@ -1156,7 +1142,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus oid, status, dc.acct.host, order.OrderStatus(srvOrderStatus.Status)) } } - dc.tradeMtx.RUnlock() + c.tradeMtx.RUnlock() // Compare the status reported by the server for each known active trade. // Orders for which the server did not return a status are no longer active @@ -1176,7 +1162,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus // Server reports this order as active. Delete any associated cancel // order if the cancel order's epoch has passed. - trade.deleteStaleCancelOrder() // could be too soon, so we'll have to check in tick too + c.deleteStaleCancelOrder(trade) // could be too soon, so we'll have to check in tick too ourStatus := trade.metaData.Status serverStatus := order.OrderStatus(srvOrderStatus.Status) @@ -1188,7 +1174,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus // happened in the client's absence (maybe a missed nomatch message). if lo, ok := trade.Order.(*order.LimitOrder); ok && lo.Force == order.StandingTiF { reconciledOrdersCount++ - dc.updateOrderStatus(trade, serverStatus) + c.updateOrderStatus(dc, trade, serverStatus) } else { dc.log.Warnf("Incorrect status %q reported for non-standing order %v by DEX %s, client status = %q", serverStatus, oid, dc.acct.host, ourStatus) @@ -1202,7 +1188,7 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus } if len(mysteryOrders) > 0 { - reconciledOrdersCount += dc.syncOrderStatuses(mysteryOrders) + reconciledOrdersCount += c.syncOrderStatuses(dc, mysteryOrders) } return @@ -1210,14 +1196,14 @@ func (dc *dexConnection) reconcileTrades(srvOrderStatuses []*msgjson.OrderStatus // tickAsset checks open matches related to a specific asset for needed action. func (c *Core) tickAsset(dc *dexConnection, assetID uint32) assetMap { - dc.tradeMtx.RLock() - assetTrades := make([]*trackedTrade, 0, len(dc.trades)) - for _, trade := range dc.trades { + c.tradeMtx.RLock() + assetTrades := make([]*trackedTrade, 0, len(c.trades)) + for _, trade := range c.trades { if trade.Base() == assetID || trade.Quote() == assetID { assetTrades = append(assetTrades, trade) } } - dc.tradeMtx.RUnlock() + c.tradeMtx.RUnlock() updated := make(assetMap) updateChan := make(chan assetMap) @@ -1512,6 +1498,21 @@ type Core struct { walletMtx sync.RWMutex wallets map[uint32]*xcWallet + // tradeMtx is used to synchronize access to the trades map. + tradeMtx sync.RWMutex + // trades tracks outstanding orders issued by this client. + trades map[order.OrderID]*trackedTrade + // inFlightOrders tracks orders issued by this client that have not been + // processed by a dex server. + inFlightOrders map[uint64]*InFlightOrder + + // A map linking cancel order IDs to trade order IDs. + cancelsMtx sync.RWMutex + cancels map[order.OrderID]order.OrderID + + blindCancelsMtx sync.Mutex + blindCancels map[order.OrderID]order.Preimage + waiterMtx sync.RWMutex blockWaiters map[string]*blockWaiter @@ -1646,20 +1647,24 @@ func New(cfg *Config) (*Core, error) { } c := &Core{ - cfg: cfg, - credentials: creds, - ready: make(chan struct{}), - rotate: make(chan struct{}, 1), - log: cfg.Logger, - db: boltDB, - conns: make(map[string]*dexConnection), - wallets: make(map[uint32]*xcWallet), - net: cfg.Net, - lockTimeTaker: dex.LockTimeTaker(cfg.Net), - lockTimeMaker: dex.LockTimeMaker(cfg.Net), - blockWaiters: make(map[string]*blockWaiter), - sentCommits: make(map[order.Commitment]chan struct{}), - tickSched: make(map[order.OrderID]*time.Timer), + cfg: cfg, + credentials: creds, + ready: make(chan struct{}), + rotate: make(chan struct{}, 1), + log: cfg.Logger, + db: boltDB, + conns: make(map[string]*dexConnection), + wallets: make(map[uint32]*xcWallet), + trades: make(map[order.OrderID]*trackedTrade), + cancels: make(map[order.OrderID]order.OrderID), + inFlightOrders: make(map[uint64]*InFlightOrder), + blindCancels: make(map[order.OrderID]order.Preimage), + net: cfg.Net, + lockTimeTaker: dex.LockTimeTaker(cfg.Net), + lockTimeMaker: dex.LockTimeMaker(cfg.Net), + blockWaiters: make(map[string]*blockWaiter), + sentCommits: make(map[order.Commitment]chan struct{}), + tickSched: make(map[order.OrderID]*time.Timer), // Allowing to change the constructor makes testing a lot easier. wsConstructor: comms.NewWsConn, newCrypter: encrypt.NewCrypter, @@ -1794,29 +1799,29 @@ fetchers: // shutting down on context cancellation (the listen goroutines have already // returned however). Warn about specific active orders, and unlock any // locked coins for inactive orders that are not yet retired. + // Note active orders, and unlock any coins locked by inactive orders. + c.tradeMtx.Lock() + for _, trade := range c.trades { + oid := trade.ID() + if trade.isActive() { + c.log.Warnf("Shutting down with active order %v in status %v.", oid, trade.metaData.Status) + continue + } + c.log.Debugf("Retiring inactive order %v. Unlocking coins = %v", + oid, trade.coinsLocked || trade.changeLocked) + delete(c.trades, oid) // for inspection/debugging + trade.returnCoins() + // Do not bother with OrderNote/SubjectOrderRetired and BalanceNote + // notes since any web/rpc servers should be down by now. Go + // consumers can check orders on restart. + } + c.tradeMtx.Unlock() + for _, dc := range c.dexConnections() { // context is already canceled, allowing just a Wait(), but just in case // use Disconnect otherwise it could hang forever. dc.connMaster.Disconnect() dc.acct.lock() - - // Note active orders, and unlock any coins locked by inactive orders. - dc.tradeMtx.Lock() - for _, trade := range dc.trades { - oid := trade.ID() - if trade.isActive() { - c.log.Warnf("Shutting down with active order %v in status %v.", oid, trade.metaData.Status) - continue - } - c.log.Debugf("Retiring inactive order %v. Unlocking coins = %v", - oid, trade.coinsLocked || trade.changeLocked) - delete(dc.trades, oid) // for inspection/debugging - trade.returnCoins() - // Do not bother with OrderNote/SubjectOrderRetired and BalanceNote - // notes since any web/rpc servers should be down by now. Go - // consumers can check orders on restart. - } - dc.tradeMtx.Unlock() } // Lock and disconnect the wallets. @@ -1984,7 +1989,7 @@ func (c *Core) ExchangeMarket(host string, baseID, quoteID uint32) (*Market, err return nil, err } - mkt := dc.coreMarket(marketName(baseID, quoteID)) + mkt := c.coreMarket(dc, marketName(baseID, quoteID)) if mkt == nil { return nil, fmt.Errorf("no market found for %s-%s at %s", unbip(baseID), unbip(quoteID), host) } @@ -2283,7 +2288,7 @@ func (c *Core) storeAndSendWalletBalance(wallet *xcWallet, walletBal *WalletBala func (c *Core) lockedAmounts(assetID uint32) (contractLocked, orderLocked, bondLocked uint64) { for _, dc := range c.dexConnections() { bondLocked, _ = dc.bondTotal(assetID) - for _, tracker := range dc.trackedTrades() { + for _, tracker := range c.trackedTrades() { if tracker.fromAssetID == assetID { tracker.mtx.RLock() contractLocked += tracker.unspentContractAmounts() @@ -3058,10 +3063,8 @@ func (c *Core) WalletTraits(assetID uint32) (asset.WalletTrait, error) { // assetHasActiveOrders checks whether there are any active orders or // negotiating matches for the specified asset. func (c *Core) assetHasActiveOrders(assetID uint32) bool { - for _, dc := range c.dexConnections() { - if dc.hasActiveAssetOrders(assetID) { - return true - } + if c.hasActiveAssetOrders(assetID) { + return true } return false } @@ -3296,10 +3299,8 @@ func (c *Core) RecoverWallet(assetID uint32, appPW []byte, force bool) error { defer crypter.Close() if !force { - for _, dc := range c.dexConnections() { - if dc.hasActiveAssetOrders(assetID) { - return newError(activeOrdersErr, "active orders for %v", unbip(assetID)) - } + if c.hasActiveAssetOrders(assetID) { + return newError(activeOrdersErr, "active orders for %v", unbip(assetID)) } } @@ -3624,25 +3625,23 @@ func (c *Core) ReconfigureWallet(appPW, newWalletPW []byte, form *WalletForm) er } clearTickGovernors := func() { - for _, dc := range c.dexConnections() { - for _, t := range dc.trackedTrades() { - if t.Base() != assetID && t.Quote() != assetID { - continue - } - isFromAsset := t.wallets.fromWallet.AssetID == assetID - t.mtx.RLock() - for _, m := range t.matches { // maybe range t.activeMatches() - m.exceptionMtx.Lock() - if m.tickGovernor != nil && - ((m.suspectSwap && isFromAsset) || (m.suspectRedeem && !isFromAsset)) { - - m.tickGovernor.Stop() - m.tickGovernor = nil - } - m.exceptionMtx.Unlock() + for _, t := range c.trackedTrades() { + if t.Base() != assetID && t.Quote() != assetID { + continue + } + isFromAsset := t.wallets.fromWallet.AssetID == assetID + t.mtx.RLock() + for _, m := range t.matches { // maybe range t.activeMatches() + m.exceptionMtx.Lock() + if m.tickGovernor != nil && + ((m.suspectSwap && isFromAsset) || (m.suspectRedeem && !isFromAsset)) { + + m.tickGovernor.Stop() + m.tickGovernor = nil } - t.mtx.RUnlock() + m.exceptionMtx.Unlock() } + t.mtx.RUnlock() } } @@ -3882,10 +3881,8 @@ func (c *Core) updateAssetWalletRefs(newWallet *xcWallet) { } } - for _, dc := range c.dexConnections() { - for _, tracker := range dc.trackedTrades() { - updateWalletSet(tracker) - } + for _, tracker := range c.trackedTrades() { + updateWalletSet(tracker) } c.updateWallet(assetID, newWallet) @@ -4860,7 +4857,7 @@ func (c *Core) ActiveOrders() (map[string][]*Order, map[string][]*InFlightOrder, dexActiveOrders := make(map[string][]*Order) for _, dc := range c.dexConnections() { if loggedIn { - orders, inflight := dc.activeOrders() + orders, inflight := c.activeOrdersForDEX(dc.acct.host) dexActiveOrders[dc.acct.host] = append(dexActiveOrders[dc.acct.host], orders...) dexInflightOrders[dc.acct.host] = append(dexInflightOrders[dc.acct.host], inflight...) continue @@ -4884,7 +4881,7 @@ func (c *Core) ActiveOrders() (map[string][]*Order, map[string][]*InFlightOrder, // accounts. This includes booked orders and trades that are settling. func (c *Core) Active() bool { for _, dc := range c.dexConnections() { - if dc.hasActiveOrders() { + if c.hasActiveOrders(dc.acct.host) { return true } } @@ -5011,11 +5008,9 @@ func (c *Core) Order(oidB dex.Bytes) (*Order, error) { return nil, err } // See if it's an active order first. - for _, dc := range c.dexConnections() { - tracker, _ := dc.findOrder(oid) - if tracker != nil { - return tracker.coreOrder(), nil - } + tracker, _ := c.findOrder(oid) + if tracker != nil { + return tracker.coreOrder(), nil } // Must not be an active order. Get it from the database. mOrd, err := c.db.Order(oid) @@ -5986,7 +5981,7 @@ func (c *Core) TradeAsync(pw []byte, form *TradeForm) (*InFlightOrder, error) { // Prepare and store the inflight order. corder := coreOrderFromTrade(req.dbOrder.Order, req.dbOrder.MetaData) corder.ReadyToTick = true - tempID := req.dc.storeInFlightOrder(corder) + tempID := c.storeInFlightOrder(corder) req.tempID = tempID // Send silent note for the async order. This improves the UI/UX, so @@ -5997,7 +5992,7 @@ func (c *Core) TradeAsync(pw []byte, form *TradeForm) (*InFlightOrder, error) { go func() { // so core does not shut down while processing this order. defer func() { // Cleanup when the inflight order has been processed. - req.dc.deleteInFlightOrder(tempID) + c.deleteInFlightOrder(tempID) c.wg.Done() }() @@ -6595,9 +6590,9 @@ func (c *Core) sendTradeRequest(tr *tradeRequest) (*Order, error) { tracker.changeLocked = true } - dc.tradeMtx.Lock() - dc.trades[tracker.ID()] = tracker - dc.tradeMtx.Unlock() + c.tradeMtx.Lock() + c.trades[tracker.ID()] = tracker + c.tradeMtx.Unlock() // Send a low-priority notification. corder := tracker.coreOrder() @@ -7109,12 +7104,12 @@ func (c *Core) authDEX(dc *dexConnection) error { } // Associate the matches with known trades. - matches, _, err := dc.parseMatches(result.ActiveMatches, false) + matches, _, err := c.parseMatches(dc, result.ActiveMatches, false) if err != nil { c.log.Error(err) } - exceptions, matchConflicts := dc.compareServerMatches(matches) + exceptions, matchConflicts := c.compareServerMatches(dc, matches) for oid, matchAnomalies := range exceptions { trade := matchAnomalies.trade missing, extras := matchAnomalies.missing, matchAnomalies.extra @@ -7188,7 +7183,7 @@ func (c *Core) authDEX(dc *dexConnection) error { // the trade statuses where necessary. This is done after processing the // connect resp matches so that where possible, available match data can be // used to properly set order statuses and filled amount. - unknownOrders, reconciledOrdersCount := dc.reconcileTrades(result.ActiveOrderStatuses) + unknownOrders, reconciledOrdersCount := c.reconcileTrades(dc, result.ActiveOrderStatuses) if len(unknownOrders) > 0 { subject, details := c.formatDetails(TopicUnknownOrders, len(unknownOrders), dc.acct.host) c.notify(newDEXAuthNote(TopicUnknownOrders, subject, dc.acct.host, false, details, db.Poke)) @@ -7211,8 +7206,8 @@ func (c *Core) authDEX(dc *dexConnection) error { // but without funding coins for new matches. This should be done after the // order status resolution done above. var brokenTrades []*trackedTrade - dc.tradeMtx.RLock() - for _, trade := range dc.trades { + c.tradeMtx.RLock() + for _, trade := range c.trades { if lo, ok := trade.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF { continue // only standing limit orders need to be canceled } @@ -7224,7 +7219,7 @@ func (c *Core) authDEX(dc *dexConnection) error { } trade.mtx.RUnlock() } - dc.tradeMtx.RUnlock() + c.tradeMtx.RUnlock() for _, trade := range brokenTrades { c.log.Warnf("Canceling unfunded standing limit order %v", trade.ID()) if err = c.tryCancelTrade(dc, trade); err != nil { @@ -7258,9 +7253,9 @@ func (c *Core) authDEX(dc *dexConnection) error { continue } c.log.Warnf("Sent request to cancel unknown order %v, cancel order ID %v", oid, co.ID()) - dc.blindCancelsMtx.Lock() - dc.blindCancels[co.ID()] = preImg - dc.blindCancelsMtx.Unlock() + c.blindCancelsMtx.Lock() + c.blindCancels[co.ID()] = preImg + c.blindCancelsMtx.Unlock() close(commitSig) // ready to handle the preimage request } @@ -7559,7 +7554,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e } var pimg order.Preimage copy(pimg[:], metaCancel.MetaData.Proof.Preimage) - err = tracker.cancelTrade(co, pimg, epochDur) // set tracker.cancel and link + err = c.cancelTrade(tracker, co, pimg, epochDur) // set tracker.cancel and link if err != nil { c.log.Errorf("Error setting cancel order info %s: %v", co.ID(), err) } else { @@ -7621,13 +7616,13 @@ func (c *Core) loadDBTrades(dc *dexConnection) error { // Every trade in the trades map must have wallets set. tracker.wallets = walletSet - dc.tradeMtx.Lock() - if _, found := dc.trades[tracker.ID()]; found { - dc.tradeMtx.Unlock() + c.tradeMtx.Lock() + if _, found := c.trades[tracker.ID()]; found { + c.tradeMtx.Unlock() continue } - dc.trades[tracker.ID()] = tracker - dc.tradeMtx.Unlock() + c.trades[tracker.ID()] = tracker + c.tradeMtx.Unlock() mktConf := dc.marketConfig(tracker.mktID) if tracker.metaData.EpochDur == 0 { // upgraded with live orders... smart :/ @@ -7932,24 +7927,22 @@ func (c *Core) resumeTrades(crypter encrypt.Crypter) { failed := make(map[uint32]bool) relocks := make(assetMap) - for _, dc := range c.dexConnections() { - for _, tracker := range dc.trackedTrades() { - tracker.mtx.RLock() - if tracker.readyToTick { - tracker.mtx.RUnlock() - continue - } + for _, tracker := range c.trackedTrades() { + tracker.mtx.RLock() + if tracker.readyToTick { tracker.mtx.RUnlock() + continue + } + tracker.mtx.RUnlock() - if c.resumeTrade(tracker, crypter, failed, relocks) { - c.notify(newOrderNote(TopicOrderLoaded, "", "", db.Data, tracker.coreOrder())) - } else { - tracker.mtx.RLock() - err := fmt.Errorf("failed to connect and unlock wallets for trade ID %s", tracker.ID()) - tracker.mtx.RUnlock() - subject, details := c.formatDetails(TopicOrderResumeFailure, err) - c.notify(newOrderNote(TopicOrderResumeFailure, subject, details, db.ErrorLevel, nil)) - } + if c.resumeTrade(tracker, crypter, failed, relocks) { + c.notify(newOrderNote(TopicOrderLoaded, "", "", db.Data, tracker.coreOrder())) + } else { + tracker.mtx.RLock() + err := fmt.Errorf("failed to connect and unlock wallets for trade ID %s", tracker.ID()) + tracker.mtx.RUnlock() + subject, details := c.formatDetails(TopicOrderResumeFailure, err) + c.notify(newOrderNote(TopicOrderResumeFailure, subject, details, db.ErrorLevel, nil)) } } @@ -7971,143 +7964,141 @@ func (c *Core) reReserveFunding(w *xcWallet) { c.updateBondReserves(w.AssetID) - for _, dc := range c.dexConnections() { - for _, tracker := range dc.trackedTrades() { - // TODO: Consider tokens - if tracker.Base() != w.AssetID && tracker.Quote() != w.AssetID { - continue - } + for _, tracker := range c.trackedTrades() { + // TODO: Consider tokens + if tracker.Base() != w.AssetID && tracker.Quote() != w.AssetID { + continue + } - notifyErr := func(topic Topic, args ...any) { - subject, detail := c.formatDetails(topic, args...) - c.notify(newOrderNote(topic, subject, detail, db.ErrorLevel, tracker.coreOrderInternal())) - } + notifyErr := func(topic Topic, args ...any) { + subject, detail := c.formatDetails(topic, args...) + c.notify(newOrderNote(topic, subject, detail, db.ErrorLevel, tracker.coreOrderInternal())) + } - trade := tracker.Trade() + trade := tracker.Trade() - fromID := tracker.Quote() - if trade.Sell { - fromID = tracker.Base() - } + fromID := tracker.Quote() + if trade.Sell { + fromID = tracker.Base() + } - denom, marketMult, limitMult := lcm(uint64(len(tracker.matches)), trade.Quantity) - var refundNum, redeemNum uint64 + denom, marketMult, limitMult := lcm(uint64(len(tracker.matches)), trade.Quantity) + var refundNum, redeemNum uint64 - addMatchRedemption := func(match *matchTracker) { - if tracker.isMarketBuy() { - redeemNum += marketMult // * 1 - } else { - redeemNum += match.Quantity * limitMult - } + addMatchRedemption := func(match *matchTracker) { + if tracker.isMarketBuy() { + redeemNum += marketMult // * 1 + } else { + redeemNum += match.Quantity * limitMult } + } - addMatchRefund := func(match *matchTracker) { - if tracker.isMarketBuy() { - refundNum += marketMult // * 1 - } else { - refundNum += match.Quantity * limitMult - } + addMatchRefund := func(match *matchTracker) { + if tracker.isMarketBuy() { + refundNum += marketMult // * 1 + } else { + refundNum += match.Quantity * limitMult } + } - isActive := tracker.metaData.Status == order.OrderStatusBooked || tracker.metaData.Status == order.OrderStatusEpoch - var matchesNeedingCoins []*matchTracker - for _, match := range tracker.matches { - if match.Side == order.Maker { - if match.Status < order.MakerSwapCast { - matchesNeedingCoins = append(matchesNeedingCoins, match) - } - if match.Status < order.MakerRedeemed { - addMatchRedemption(match) - addMatchRefund(match) - } - } else { // Taker - if match.Status < order.TakerSwapCast { - matchesNeedingCoins = append(matchesNeedingCoins, match) - } - if match.Status < order.MakerRedeemed { - addMatchRefund(match) - } - if match.Status < order.MatchComplete { - addMatchRedemption(match) - } + isActive := tracker.metaData.Status == order.OrderStatusBooked || tracker.metaData.Status == order.OrderStatusEpoch + var matchesNeedingCoins []*matchTracker + for _, match := range tracker.matches { + if match.Side == order.Maker { + if match.Status < order.MakerSwapCast { + matchesNeedingCoins = append(matchesNeedingCoins, match) } - } - - if c.ctx.Err() != nil { - return - } - - // Prepare funding coins, but don't update tracker until the mutex - // is locked. - needsCoins := len(matchesNeedingCoins) > 0 - // nil coins = no locking required, empty coins = something went - // wrong, non-empty means locking required. - var coins asset.Coins - if fromID == w.AssetID && (isActive || needsCoins) { - coins = []asset.Coin{} // should already be - coinIDs := trade.Coins - if len(tracker.metaData.ChangeCoin) != 0 { - coinIDs = []order.CoinID{tracker.metaData.ChangeCoin} + if match.Status < order.MakerRedeemed { + addMatchRedemption(match) + addMatchRefund(match) } - if len(coinIDs) == 0 { - notifyErr(TopicOrderCoinError, tracker.token()) - markUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution - } else { - byteIDs := make([]dex.Bytes, 0, len(coinIDs)) - for _, cid := range coinIDs { - byteIDs = append(byteIDs, []byte(cid)) - } - var err error - coins, err = w.FundingCoins(byteIDs) - if err != nil || len(coins) == 0 { - notifyErr(TopicOrderCoinFetchError, tracker.token(), unbip(fromID), err) - c.log.Warnf("(re-reserve) Check the status of your %s wallet and the coins logged above! "+ - "Resolve the wallet issue if possible and restart Bison Wallet.", - strings.ToUpper(unbip(fromID))) - c.log.Warnf("(re-reserve) Unfunded order %v will be revoked if %d active matches don't get funding coins!", - tracker.ID(), len(matchesNeedingCoins)) - } + } else { // Taker + if match.Status < order.TakerSwapCast { + matchesNeedingCoins = append(matchesNeedingCoins, match) + } + if match.Status < order.MakerRedeemed { + addMatchRefund(match) + } + if match.Status < order.MatchComplete { + addMatchRedemption(match) } } + } - tracker.mtx.Lock() + if c.ctx.Err() != nil { + return + } - // Refund and redemption reserves for active matches. Doing this - // under mutex lock, but noting that the underlying calls to - // ReReserveRedemption and ReReserveRefund could potentially involve - // long-running RPC calls. - if fromID == w.AssetID { - tracker.refundLocked = 0 - if refundNum != 0 { - tracker.lockRefundFraction(refundNum, denom) - } + // Prepare funding coins, but don't update tracker until the mutex + // is locked. + needsCoins := len(matchesNeedingCoins) > 0 + // nil coins = no locking required, empty coins = something went + // wrong, non-empty means locking required. + var coins asset.Coins + if fromID == w.AssetID && (isActive || needsCoins) { + coins = []asset.Coin{} // should already be + coinIDs := trade.Coins + if len(tracker.metaData.ChangeCoin) != 0 { + coinIDs = []order.CoinID{tracker.metaData.ChangeCoin} + } + if len(coinIDs) == 0 { + notifyErr(TopicOrderCoinError, tracker.token()) + markUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution } else { - tracker.redemptionLocked = 0 - if redeemNum != 0 { - tracker.lockRedemptionFraction(redeemNum, denom) + byteIDs := make([]dex.Bytes, 0, len(coinIDs)) + for _, cid := range coinIDs { + byteIDs = append(byteIDs, []byte(cid)) + } + var err error + coins, err = w.FundingCoins(byteIDs) + if err != nil || len(coins) == 0 { + notifyErr(TopicOrderCoinFetchError, tracker.token(), unbip(fromID), err) + c.log.Warnf("(re-reserve) Check the status of your %s wallet and the coins logged above! "+ + "Resolve the wallet issue if possible and restart Bison Wallet.", + strings.ToUpper(unbip(fromID))) + c.log.Warnf("(re-reserve) Unfunded order %v will be revoked if %d active matches don't get funding coins!", + tracker.ID(), len(matchesNeedingCoins)) } } + } + + tracker.mtx.Lock() - // Funding coins - if coins != nil { - tracker.coinsLocked = len(coins) > 0 - tracker.coins = mapifyCoins(coins) + // Refund and redemption reserves for active matches. Doing this + // under mutex lock, but noting that the underlying calls to + // ReReserveRedemption and ReReserveRefund could potentially involve + // long-running RPC calls. + if fromID == w.AssetID { + tracker.refundLocked = 0 + if refundNum != 0 { + tracker.lockRefundFraction(refundNum, denom) + } + } else { + tracker.redemptionLocked = 0 + if redeemNum != 0 { + tracker.lockRedemptionFraction(redeemNum, denom) } + } - // Refund and redemption reserves for booked orders. + // Funding coins + if coins != nil { + tracker.coinsLocked = len(coins) > 0 + tracker.coins = mapifyCoins(coins) + } - tracker.recalcFilled() // Make sure Remaining is accurate. + // Refund and redemption reserves for booked orders. - if isActive { - if fromID == w.AssetID { - tracker.lockRefundFraction(trade.Remaining(), trade.Quantity) - } else { - tracker.lockRedemptionFraction(trade.Remaining(), trade.Quantity) - } - } + tracker.recalcFilled() // Make sure Remaining is accurate. - tracker.mtx.Unlock() + if isActive { + if fromID == w.AssetID { + tracker.lockRefundFraction(trade.Remaining(), trade.Quantity) + } else { + tracker.lockRedemptionFraction(trade.Remaining(), trade.Quantity) + } } + + tracker.mtx.Unlock() } } @@ -8292,10 +8283,6 @@ func (c *Core) newDEXConnection(acctInfo *db.AccountInfo, flag connectDEXFlag) ( notify: c.notify, ticker: newDexTicker(defaultTickInterval), // updated when server config obtained books: make(map[string]*bookie), - trades: make(map[order.OrderID]*trackedTrade), - cancels: make(map[order.OrderID]order.OrderID), - inFlightOrders: make(map[uint64]*InFlightOrder), - blindCancels: make(map[order.OrderID]order.Preimage), apiVer: -1, reportingConnects: reporting, spots: make(map[string]*msgjson.Spot), @@ -8483,7 +8470,7 @@ func (c *Core) handleReconnect(host string) { } // Update the orders' selfGoverned flag according to the configured markets. - for _, trade := range dc.trackedTrades() { + for _, trade := range c.trackedTrades() { // If the server's market is gone, we're on our own, otherwise we are // now free to swap for this order. auto := mkts[trade.mktID] == nil @@ -8527,7 +8514,7 @@ func (c *Core) handleReconnect(host string) { // status orders that should be re-checked in the next epoch because we may // have missed the preimage request while disconnected. epochOrders := make(map[string][]*trackedTrade) - for _, trade := range dc.trackedTrades() { + for _, trade := range c.trackedTrades() { if trade.status() == order.OrderStatusEpoch { epochOrders[trade.mktID] = append(epochOrders[trade.mktID], trade) } @@ -8547,7 +8534,7 @@ func (c *Core) handleReconnect(host string) { } } if len(stillEpochOrders) > 0 { - dc.syncOrderStatuses(stillEpochOrders) + c.syncOrderStatuses(dc, stillEpochOrders) } }, ) @@ -8627,7 +8614,7 @@ func (c *Core) handleConnectEvent(dc *dexConnection, status comms.ConnectionStat atomic.StoreUint32(&dc.anomaliesCount, 0) } - for _, tracker := range dc.trackedTrades() { + for _, tracker := range c.trackedTrades() { tracker.setSelfGoverned(true) // reconnect handles unflagging based on fresh market config tracker.mtx.RLock() @@ -8674,7 +8661,7 @@ func handleMatchProofMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error } // Validate match_proof commitment checksum for client orders in this epoch. - for _, trade := range dc.trackedTrades() { + for _, trade := range c.trackedTrades() { if note.MarketID != trade.mktID { continue } @@ -8706,7 +8693,7 @@ func handleRevokeOrderMsg(c *Core, dc *dexConnection, msg *msgjson.Message) erro var oid order.OrderID copy(oid[:], revocation.OrderID) - tracker, isCancel := dc.findOrder(oid) + tracker, isCancel := c.findOrder(oid) if tracker == nil { return fmt.Errorf("no order found with id %s", oid.String()) } @@ -8715,7 +8702,7 @@ func handleRevokeOrderMsg(c *Core, dc *dexConnection, msg *msgjson.Message) erro // Cancel order revoked (e.g. we missed the preimage request). Don't // revoke the targeted order, just unlink the cancel order. c.log.Warnf("Deleting failed cancel order %v that targeted trade order %v", oid, tracker.ID()) - tracker.deleteCancelOrder() + c.deleteCancelOrder(tracker) subject, details := c.formatDetails(TopicFailedCancel, tracker.token()) c.notify(newOrderNote(TopicFailedCancel, subject, details, db.WarningLevel, tracker.coreOrder())) return nil @@ -8748,7 +8735,7 @@ func handleRevokeMatchMsg(c *Core, dc *dexConnection, msg *msgjson.Message) erro var oid order.OrderID copy(oid[:], revocation.OrderID) - tracker, _ := dc.findOrder(oid) + tracker, _ := c.findOrder(oid) if tracker == nil { return fmt.Errorf("no order found with id %s (not an error if you've completed your side of the swap)", oid.String()) } @@ -8969,7 +8956,7 @@ func (c *Core) listen(dc *dexConnection) { // NOTE: Don't lock tradeMtx while also locking a trackedTrade's mtx // since we risk blocking access to the trades map if there is lock // contention for even one trade. - for _, trade := range dc.trackedTrades() { + for _, trade := range c.trackedTrades() { if trade.isActive() { activeTrades = append(activeTrades, trade) continue @@ -8978,7 +8965,7 @@ func (c *Core) listen(dc *dexConnection) { } if len(doneTrades) > 0 { - dc.tradeMtx.Lock() + c.tradeMtx.Lock() for _, trade := range doneTrades { // Log an error if redemption funds are still reserved. @@ -8994,9 +8981,9 @@ func (c *Core) listen(dc *dexConnection) { } c.notify(newOrderNote(TopicOrderRetired, "", "", db.Data, trade.coreOrder())) - delete(dc.trades, trade.ID()) + delete(c.trades, trade.ID()) } - dc.tradeMtx.Unlock() + c.tradeMtx.Unlock() } // Unlock funding coins for retired orders for good measure, in case @@ -9147,13 +9134,13 @@ func handlePreimageRequest(c *Core, dc *dexConnection, msg *msgjson.Message) err } func processPreimageRequest(c *Core, dc *dexConnection, reqID uint64, oid order.OrderID, commitChecksum dex.Bytes) error { - tracker, isCancel := dc.findOrder(oid) + tracker, isCancel := c.findOrder(oid) var preImg order.Preimage if tracker == nil { var found bool - dc.blindCancelsMtx.Lock() - preImg, found = dc.blindCancels[oid] - dc.blindCancelsMtx.Unlock() + c.blindCancelsMtx.Lock() + preImg, found = c.blindCancels[oid] + c.blindCancelsMtx.Unlock() if !found { return fmt.Errorf("no active order found for preimage request for %s", oid) } // delete the entry in match/nomatch @@ -9239,7 +9226,7 @@ func handleMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error { // request handling. // Acknowledgements MUST be in the same orders as the msgjson.Matches. - matches, acks, err := dc.parseMatches(msgMatches, true) + matches, acks, err := c.parseMatches(dc, msgMatches, true) if err != nil { // Even one failed match fails them all since the server requires acks // for them all, and in the same order. TODO: consider lifting this @@ -9301,12 +9288,12 @@ func handleNoMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error var oid order.OrderID copy(oid[:], nomatchMsg.OrderID) - tracker, _ := dc.findOrder(oid) + tracker, _ := c.findOrder(oid) if tracker == nil { - dc.blindCancelsMtx.Lock() - _, found := dc.blindCancels[oid] - delete(dc.blindCancels, oid) - dc.blindCancelsMtx.Unlock() + c.blindCancelsMtx.Lock() + _, found := c.blindCancels[oid] + delete(c.blindCancels, oid) + c.blindCancelsMtx.Unlock() if found { // if it didn't match, the targeted order isn't booked and we're done c.log.Infof("Blind cancel order %v did not match. Its targeted order is assumed to be unbooked.", oid) return nil @@ -9314,7 +9301,7 @@ func handleNoMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error return newError(unknownOrderErr, "nomatch request received for unknown order %v from %s", oid, dc.acct.host) } - updatedAssets, err := tracker.nomatch(oid) + updatedAssets, err := c.nomatch(tracker, oid) if len(updatedAssets) > 0 { c.updateBalances(updatedAssets) } @@ -9373,7 +9360,7 @@ func handleAuditRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error { var oid order.OrderID copy(oid[:], audit.OrderID) - tracker, _ := dc.findOrder(oid) + tracker, _ := c.findOrder(oid) if tracker == nil { return fmt.Errorf("audit request received for unknown order: %s", string(msg.Payload)) } @@ -9398,7 +9385,7 @@ func handleRedemptionRoute(c *Core, dc *dexConnection, msg *msgjson.Message) err var oid order.OrderID copy(oid[:], redemption.OrderID) - tracker, isCancel := dc.findOrder(oid) + tracker, isCancel := c.findOrder(oid) if tracker != nil { if isCancel { return fmt.Errorf("redemption request received for cancel order %v, match %v (you ok server?)", @@ -10480,11 +10467,9 @@ func (c *Core) RemoveWalletPeer(assetID uint32, address string) error { // findActiveOrder will search the dex connections for an active order by order // id. An error is returned if it cannot be found. func (c *Core) findActiveOrder(oid order.OrderID) (*trackedTrade, error) { - for _, dc := range c.dexConnections() { - tracker, _ := dc.findOrder(oid) - if tracker != nil { - return tracker, nil - } + tracker, _ := c.findOrder(oid) + if tracker != nil { + return tracker, nil } return nil, fmt.Errorf("could not find active order with order id: %s", oid) } @@ -10866,13 +10851,7 @@ func (c *Core) handleRetryRedemptionAction(actionB []byte) error { } var oid order.OrderID copy(oid[:], req.OrderID) - var tracker *trackedTrade - for _, dc := range c.dexConnections() { - tracker, _ = dc.findOrder(oid) - if tracker != nil { - break - } - } + tracker, _ := c.findOrder(oid) if tracker == nil { return fmt.Errorf("order %s not known", oid) } @@ -10975,7 +10954,7 @@ func (c *Core) checkEpochResolution(host string, mktID string) { return } - ts, inFlights := dc.marketTrades(mktID) + ts, inFlights := c.marketTrades(dc.acct.host, mktID) for _, ord := range inFlights { if ord.Epoch == lastEpoch { return @@ -11112,7 +11091,7 @@ func (c *Core) TradingLimits(host string) (userParcels, parcelLimit uint32, err mkts[mkt.Name] = mkt } mktTrades := make(map[string][]*trackedTrade) - for _, t := range dc.trackedTrades() { + for _, t := range c.trackedTrades() { mktTrades[t.mktID] = append(mktTrades[t.mktID], t) } diff --git a/client/core/core_test.go b/client/core/core_test.go index 61b92b3a82..07b966101f 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -271,9 +271,6 @@ func testDexConnection(ctx context.Context, crypter *tCrypter) (*dexConnection, BinSizes: []string{"1h", "24h"}, }, notify: func(Notification) {}, - trades: make(map[order.OrderID]*trackedTrade), - cancels: make(map[order.OrderID]order.OrderID), - inFlightOrders: make(map[uint64]*InFlightOrder), epoch: map[string]uint64{tDcrBtcMktName: 0}, resolvedEpoch: map[string]uint64{tDcrBtcMktName: 0}, apiVer: serverdex.PreAPIVersion, @@ -1341,12 +1338,15 @@ func newTestRig() *testRig { conns: map[string]*dexConnection{ tDexHost: dc, }, - lockTimeTaker: dex.LockTimeTaker(dex.Testnet), - lockTimeMaker: dex.LockTimeMaker(dex.Testnet), - wallets: make(map[uint32]*xcWallet), - blockWaiters: make(map[string]*blockWaiter), - sentCommits: make(map[order.Commitment]chan struct{}), - tickSched: make(map[order.OrderID]*time.Timer), + trades: make(map[order.OrderID]*trackedTrade), + cancels: make(map[order.OrderID]order.OrderID), + inFlightOrders: make(map[uint64]*InFlightOrder), + lockTimeTaker: dex.LockTimeTaker(dex.Testnet), + lockTimeMaker: dex.LockTimeMaker(dex.Testnet), + wallets: make(map[uint32]*xcWallet), + blockWaiters: make(map[string]*blockWaiter), + sentCommits: make(map[order.Commitment]chan struct{}), + tickSched: make(map[order.OrderID]*time.Timer), wsConstructor: func(*comms.WsCfg) (comms.WsConn, error) { // This is not very realistic since it doesn't start a fresh // one, and (*Core).connectDEX always gets the same TWebsocket, @@ -2458,11 +2458,11 @@ func TestLogin(t *testing.T) { return nil }) - dc.trades = map[order.OrderID]*trackedTrade{ + tCore = rig.core + tCore.trades = map[order.OrderID]*trackedTrade{ oid: tracker, } - tCore = rig.core rig.queueConnect(nil, []*msgjson.Match{knownMsgMatch /* missing missingMatch! */, extraMsgMatch}, nil) rig.queueCancel(nil) // for the unfunded order that gets canceled in authDEX // Login>authDEX will do 4 match DB updates for these two matches: @@ -3297,7 +3297,7 @@ func TestRefundReserves(t *testing.T) { tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[loid] = tracker + tCore.trades[loid] = tracker preImgC := newPreimage() co := &order.CancelOrder{ P: order.Prefix{ @@ -3432,10 +3432,10 @@ func TestRefundReserves(t *testing.T) { tracker = newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades = map[order.OrderID]*trackedTrade{moid: tracker} + tCore.trades = map[order.OrderID]*trackedTrade{moid: tracker} test("nomatch", reserves, func() { - tracker.nomatch(moid) + tCore.nomatch(tracker, moid) }) test("partial market sell match", reserves/3, func() { @@ -3550,7 +3550,7 @@ func TestRedemptionReserves(t *testing.T) { tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[loid] = tracker + tCore.trades[loid] = tracker preImgC := newPreimage() co := &order.CancelOrder{ P: order.Prefix{ @@ -3639,10 +3639,10 @@ func TestRedemptionReserves(t *testing.T) { tracker = newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades = map[order.OrderID]*trackedTrade{moid: tracker} + tCore.trades = map[order.OrderID]*trackedTrade{moid: tracker} test("nomatch", reserves, func() { - tracker.nomatch(moid) + tCore.nomatch(tracker, moid) }) test("partial market sell match", reserves/3, func() { @@ -3737,7 +3737,7 @@ func TestCancel(t *testing.T) { oid := lo.ID() tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, nil, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[oid] = tracker + rig.core.trades[oid] = tracker rig.queueCancel(nil) err := rig.core.Cancel(oid[:]) @@ -3776,10 +3776,10 @@ func TestCancel(t *testing.T) { oid = ogID // Order not found - delete(dc.trades, oid) + delete(rig.core.trades, oid) ensureErr("no order") ensureNilCancel("no order") - dc.trades[oid] = tracker + rig.core.trades[oid] = tracker // Send error rig.ws.reqErr = tErr @@ -3821,7 +3821,7 @@ func TestHandlePreimageRequest(t *testing.T) { tracker.csumMtx.Unlock() } - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker err := handlePreimageRequest(rig.core, rig.dc, reqNoCommit) if err == nil { t.Fatalf("handlePreimageRequest succeeded with no commitment in the request") @@ -3847,7 +3847,7 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker err = handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -3928,7 +3928,7 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4003,7 +4003,7 @@ func TestHandlePreimageRequest(t *testing.T) { rig.ws.sendMsgErrChan = make(chan *msgjson.Error, 1) defer func() { rig.ws.sendMsgErrChan = nil }() - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4074,7 +4074,7 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4157,8 +4157,8 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[oid] = tracker - rig.dc.registerCancelLink(cid, oid) + rig.core.trades[oid] = tracker + rig.core.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4248,8 +4248,8 @@ func TestHandlePreimageRequest(t *testing.T) { rig.ws.sendMsgErrChan = make(chan *msgjson.Error, 1) defer func() { rig.ws.sendMsgErrChan = nil }() - rig.dc.trades[oid] = tracker - rig.dc.registerCancelLink(cid, oid) + rig.core.trades[oid] = tracker + rig.core.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4333,8 +4333,8 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[oid] = tracker - rig.dc.registerCancelLink(cid, oid) + rig.core.trades[oid] = tracker + rig.core.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4429,8 +4429,8 @@ func TestHandleRevokeOrderMsg(t *testing.T) { } tracker.cancel = &trackedCancel{CancelOrder: *co} coid := co.ID() - rig.dc.trades[oid] = tracker - rig.dc.registerCancelLink(coid, oid) + tCore.trades[oid] = tracker + tCore.registerCancelLink(coid, oid) orderNotes, feedDone := orderNoteFeed(tCore) defer feedDone() @@ -4529,7 +4529,7 @@ func TestHandleRevokeMatchMsg(t *testing.T) { t.Fatal("[handleRevokeMatchMsg] expected a non-existent order") } - rig.dc.trades[oid] = tracker + tCore.trades[oid] = tracker // Success err = handleRevokeMatchMsg(rig.core, rig.dc, req) @@ -4580,7 +4580,7 @@ func TestTradeTracking(t *testing.T) { fundingCoins := asset.Coins{&tCoin{id: fundCoinDcrID}} tracker := newTrackedTrade(dbOrder, preImgL, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, fundingCoins, rig.core.notify, rig.core.formatDetails) - rig.dc.trades[tracker.ID()] = tracker + tCore.trades[tracker.ID()] = tracker var match *matchTracker checkStatus := func(tag string, wantStatus order.MatchStatus) { t.Helper() @@ -5056,7 +5056,7 @@ func TestTradeTracking(t *testing.T) { } tracker.cancel = &trackedCancel{CancelOrder: *co, epochLen: mkt.EpochLen} coid := co.ID() - rig.dc.registerCancelLink(coid, tracker.ID()) + tCore.registerCancelLink(coid, tracker.ID()) m1 := &msgjson.Match{ OrderID: loid[:], MatchID: mid[:], @@ -5274,16 +5274,16 @@ func TestReconcileTrades(t *testing.T) { for _, tt := range tests { // Track client orders in dc.trades. - dc.tradeMtx.Lock() + rig.core.tradeMtx.Lock() var pendingCancel *trackedTrade - dc.trades = make(map[order.OrderID]*trackedTrade) + rig.core.trades = make(map[order.OrderID]*trackedTrade) for _, tracker := range tt.clientOrders { - dc.trades[tracker.ID()] = tracker + rig.core.trades[tracker.ID()] = tracker if tracker.cancel != nil { pendingCancel = tracker } } - dc.tradeMtx.Unlock() + rig.core.tradeMtx.Unlock() // Queue order_status response if required for reconciliation. if len(tt.orderStatusRes) > 0 { @@ -5295,14 +5295,14 @@ func TestReconcileTrades(t *testing.T) { } // Reconcile tracked orders with server orders. - dc.reconcileTrades(tt.serverOrders) + rig.core.reconcileTrades(dc, tt.serverOrders) - dc.tradeMtx.RLock() - if len(dc.trades) != len(tt.expectOrderStatuses) { + rig.core.tradeMtx.RLock() + if len(rig.core.trades) != len(tt.expectOrderStatuses) { t.Fatalf("%s: post-reconcileTrades order count mismatch. expected %d, got %d", - tt.name, len(tt.expectOrderStatuses), len(dc.trades)) + tt.name, len(tt.expectOrderStatuses), len(rig.core.trades)) } - for oid, tracker := range dc.trades { + for oid, tracker := range rig.core.trades { expectedStatus, expected := tt.expectOrderStatuses[oid] if !expected { t.Fatalf("%s: unexpected order %v tracked by client", tt.name, oid) @@ -5314,7 +5314,7 @@ func TestReconcileTrades(t *testing.T) { } tracker.mtx.RUnlock() } - dc.tradeMtx.RUnlock() + rig.core.tradeMtx.RUnlock() // Check if a previously canceled order existed; if the order is still // active (Epoch/Booked status) and the cancel order is deleted, having @@ -5435,7 +5435,7 @@ func TestRefunds(t *testing.T) { tEthWallet.fundRedeemScripts = []dex.Bytes{nil} tracker := newTrackedTrade(dbOrder, preImgL, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, fundCoinsETH, rig.core.notify, rig.core.formatDetails) - rig.dc.trades[tracker.ID()] = tracker + tCore.trades[tracker.ID()] = tracker // MAKER REFUND, INVALID TAKER COUNTERSWAP // @@ -5831,7 +5831,7 @@ func TestResolveActiveTrades(t *testing.T) { tEthWallet.reservedRedemption = 0 tEthWallet.reservedRefund = 0 - rig.dc.trades = make(map[order.OrderID]*trackedTrade) + tCore.trades = make(map[order.OrderID]*trackedTrade) } // Ensure the order is good, and reset the state. @@ -5849,9 +5849,9 @@ func TestResolveActiveTrades(t *testing.T) { t.Fatalf("%s: login error: %v", description, err) } - trade, found := rig.dc.trades[lo.ID()] + trade, found := tCore.trades[lo.ID()] if expAddedToTradesMap != found { - t.Fatalf("%s: expected added to trades map = %v, but got %v. len(trades) = %d", description, expAddedToTradesMap, found, len(rig.dc.trades)) + t.Fatalf("%s: expected added to trades map = %v, but got %v. len(trades) = %d", description, expAddedToTradesMap, found, len(tCore.trades)) } if !expAddedToTradesMap { return @@ -6017,7 +6017,7 @@ func TestReReserveFunding(t *testing.T) { refundReserves: refundReserves, } - rig.dc.trades = map[order.OrderID]*trackedTrade{ + tCore.trades = map[order.OrderID]*trackedTrade{ oid: tracker, } @@ -6186,12 +6186,12 @@ func TestCompareServerMatches(t *testing.T) { // oidMissing not included (missing!) } - dc.trades = map[order.OrderID]*trackedTrade{ + rig.core.trades = map[order.OrderID]*trackedTrade{ oid: tracker, oidMissing: trackerMissing, } - exceptions, _ := dc.compareServerMatches(srvMatches) + exceptions, _ := rig.core.compareServerMatches(dc, srvMatches) if len(exceptions) != 2 { t.Fatalf("exceptions did not include both trades, just %d", len(exceptions)) } @@ -6451,10 +6451,9 @@ func TestHandleMatchProofMsg(t *testing.T) { } func Test_marketTrades(t *testing.T) { + rig := newTestRig() + defer rig.shutdown() mktID := "dcr_btc" - dc := &dexConnection{ - trades: make(map[order.OrderID]*trackedTrade), - } preImg := newPreimage() activeOrd := &order.LimitOrder{P: order.Prefix{ @@ -6465,14 +6464,14 @@ func Test_marketTrades(t *testing.T) { Order: activeOrd, preImg: preImg, mktID: mktID, - dc: dc, + dc: rig.dc, metaData: &db.OrderMetaData{ Status: order.OrderStatusBooked, }, matches: make(map[order.MatchID]*matchTracker), } - dc.trades[activeTracker.ID()] = activeTracker + rig.core.trades[activeTracker.ID()] = activeTracker preImg = newPreimage() // different oid inactiveOrd := &order.LimitOrder{P: order.Prefix{ @@ -6483,16 +6482,16 @@ func Test_marketTrades(t *testing.T) { Order: inactiveOrd, preImg: preImg, mktID: mktID, - dc: dc, + dc: rig.dc, metaData: &db.OrderMetaData{ Status: order.OrderStatusExecuted, }, matches: make(map[order.MatchID]*matchTracker), // no matches } - dc.trades[inactiveTracker.ID()] = inactiveTracker + rig.core.trades[inactiveTracker.ID()] = inactiveTracker - trades, _ := dc.marketTrades(mktID) + trades, _ := rig.core.marketTrades(rig.dc.acct.host, mktID) if len(trades) != 1 { t.Fatalf("Expected only one trade from marketTrades, found %v", len(trades)) } @@ -6522,7 +6521,7 @@ func TestLogout(t *testing.T) { }, matches: make(map[order.MatchID]*matchTracker), } - rig.dc.trades[ord.ID()] = tracker + tCore.trades[ord.ID()] = tracker ensureErr := func(tag string) { t.Helper() @@ -6560,7 +6559,7 @@ func TestLogout(t *testing.T) { } // Active orders with matches error. ensureErr("active orders matches") - rig.dc.trades = nil + tCore.trades = nil } func TestSetEpoch(t *testing.T) { @@ -6782,7 +6781,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { oid := lo.ID() tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, coins, rig.core.notify, rig.core.formatDetails) - dc.trades[oid] = tracker + tCore.trades[oid] = tracker return tracker } @@ -6834,14 +6833,14 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { // Check that the funding coin was returned. Use the tradeMtx for // synchronization. - dc.tradeMtx.Lock() + tCore.tradeMtx.Lock() if len(tDcrWallet.returnedCoins) != 1 || !bytes.Equal(tDcrWallet.returnedCoins[0].ID(), fundCoinDcrID) { t.Fatalf("funding coin not returned") } - dc.tradeMtx.Unlock() + tCore.tradeMtx.Unlock() // Make sure the change coin is returned for a trade with a change coin. - delete(dc.trades, freshTracker.ID()) + delete(tCore.trades, freshTracker.ID()) swappedTracker := addTracker(nil) changeCoinID := encode.RandomBytes(36) swappedTracker.change = &tCoin{id: changeCoinID} @@ -6858,12 +6857,12 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } // Check that the funding coin was returned. - dc.tradeMtx.Lock() + tCore.tradeMtx.Lock() if len(tDcrWallet.returnedCoins) != 1 || !bytes.Equal(tDcrWallet.returnedCoins[0].ID(), changeCoinID) { t.Fatalf("change coin not returned") } tDcrWallet.returnedCoins = nil - dc.tradeMtx.Unlock() + tCore.tradeMtx.Unlock() // Make sure the coin isn't returned if there are unswapped matches. mid := ordertest.RandomMatchID() @@ -6883,11 +6882,11 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { if err != nil { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } - dc.tradeMtx.Lock() + tCore.tradeMtx.Lock() if tDcrWallet.returnedCoins != nil { t.Fatalf("change coin returned with active matches") } - dc.tradeMtx.Unlock() + tCore.tradeMtx.Unlock() // Ensure trades for a suspended market generate an error. form := &TradeForm{ @@ -7078,7 +7077,7 @@ func TestHandleNomatch(t *testing.T) { immediateOID := loImmediate.ID() immediateTracker := newTrackedTrade(dbOrder, preImgL, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, fundingCoins, rig.core.notify, rig.core.formatDetails) - dc.trades[immediateOID] = immediateTracker + tCore.trades[immediateOID] = immediateTracker // 2. Standing limit order loStanding, dbOrder, preImgL, _ := makeLimitOrder(dc, true, dcrBtcLotSize*100, dcrBtcRateStep) @@ -7086,7 +7085,7 @@ func TestHandleNomatch(t *testing.T) { standingOID := loStanding.ID() standingTracker := newTrackedTrade(dbOrder, preImgL, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, fundingCoins, rig.core.notify, rig.core.formatDetails) - dc.trades[standingOID] = standingTracker + tCore.trades[standingOID] = standingTracker // 3. Cancel order. cancelOrder := &order.CancelOrder{ @@ -7098,7 +7097,7 @@ func TestHandleNomatch(t *testing.T) { standingTracker.cancel = &trackedCancel{ CancelOrder: *cancelOrder, } - dc.registerCancelLink(cancelOID, standingOID) + tCore.registerCancelLink(cancelOID, standingOID) // 4. Market order. loWillBeMarket, dbOrder, preImgL, _ := makeLimitOrder(dc, true, dcrBtcLotSize*100, dcrBtcRateStep) @@ -7110,10 +7109,10 @@ func TestHandleNomatch(t *testing.T) { marketOID := mktOrder.ID() marketTracker := newTrackedTrade(dbOrder, preImgL, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, fundingCoins, rig.core.notify, rig.core.formatDetails) - dc.trades[marketOID] = marketTracker + tCore.trades[marketOID] = marketTracker runNomatch := func(tag string, oid order.OrderID) { - tracker, _ := dc.findOrder(oid) + tracker, _ := tCore.findOrder(oid) if tracker == nil { t.Fatalf("%s: order ID not found", tag) } @@ -7126,7 +7125,7 @@ func TestHandleNomatch(t *testing.T) { } checkTradeStatus := func(tag string, oid order.OrderID, expStatus order.OrderStatus) { - tracker, _ := dc.findOrder(oid) + tracker, _ := tCore.findOrder(oid) if tracker.metaData.Status != expStatus { t.Fatalf("%s: wrong status. expected %s, got %s", tag, expStatus, tracker.metaData.Status) } @@ -7404,8 +7403,8 @@ func TestReconfigureWallet(t *testing.T) { }, }, } - tCore.conns[tDexHost].tradeMtx.Lock() - tCore.conns[tDexHost].trades[order.OrderID{}] = &trackedTrade{ + tCore.tradeMtx.Lock() + tCore.trades[order.OrderID{}] = &trackedTrade{ Order: &order.LimitOrder{ P: order.Prefix{ BaseAsset: assetID, @@ -7425,7 +7424,7 @@ func TestReconfigureWallet(t *testing.T) { dc: rig.dc, readyToTick: true, // prevent resume path } - tCore.conns[tDexHost].tradeMtx.Unlock() + tCore.tradeMtx.Unlock() // Error checking if wallet owns address. tXyzWallet.ownsAddressErr = tErr @@ -8035,7 +8034,7 @@ func TestAccelerateOrder(t *testing.T) { } trade := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[trade.ID()] = trade + tCore.trades[trade.ID()] = trade trade.Trade().AddFill(test.orderFilled) trade.metaData.ChangeCoin = encode.RandomBytes(32) @@ -8286,7 +8285,7 @@ func TestMatchStatusResolution(t *testing.T) { trade := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[trade.ID()] = trade + tCore.trades[trade.ID()] = trade matchID := ordertest.RandomMatchID() matchTime := time.Now() match := &matchTracker{ @@ -8773,7 +8772,7 @@ func TestConfirmRedemption(t *testing.T) { oid := lo.ID() tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[oid] = tracker + tCore.trades[oid] = tracker tBytes := encode.RandomBytes(2) tCoinID := encode.RandomBytes(36) @@ -9172,7 +9171,7 @@ func TestMaxSwapsRedeemsInTx(t *testing.T) { oid := lo.ID() tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[oid] = tracker + tCore.trades[oid] = tracker newMatch := func(side order.MatchSide, status order.MatchStatus) *matchTracker { return &matchTracker{ @@ -9298,7 +9297,7 @@ func TestSuspectTrades(t *testing.T) { oid := lo.ID() tracker := newTrackedTrade(dbOrder, preImg, dc, rig.core.lockTimeTaker, rig.core.lockTimeMaker, rig.db, rig.queue, walletSet, nil, rig.core.notify, rig.core.formatDetails) - dc.trades[oid] = tracker + tCore.trades[oid] = tracker newMatch := func(side order.MatchSide, status order.MatchStatus) *matchTracker { return &matchTracker{ @@ -11141,7 +11140,7 @@ func TestTradingLimits(t *testing.T) { Status: order.OrderStatusEpoch, }, } - rig.dc.trades[oids[0]] = tracker + rig.core.trades[oids[0]] = tracker checkTradingLimits(2, 20) // Add another epoch order, 2 lots, likely taker, so 2x @@ -11163,7 +11162,7 @@ func TestTradingLimits(t *testing.T) { Status: order.OrderStatusEpoch, }, } - rig.dc.trades[oids[1]] = tracker + rig.core.trades[oids[1]] = tracker checkTradingLimits(6, 20) // Add partially filled booked order @@ -11185,7 +11184,7 @@ func TestTradingLimits(t *testing.T) { Status: order.OrderStatusBooked, }, } - rig.dc.trades[oids[2]] = tracker + rig.core.trades[oids[2]] = tracker checkTradingLimits(7, 20) // Add settling match to the booked order @@ -11248,7 +11247,7 @@ func TestTakeAction(t *testing.T) { var oid order.OrderID copy(oid[:], encode.RandomBytes(32)) - rig.dc.trades[oid] = tracker + rig.core.trades[oid] = tracker requestData := []byte(fmt.Sprintf(`{"orderID":"abcd","coinID":"%s","retry":true}`, dex.Bytes(coinID))) diff --git a/client/core/trade.go b/client/core/trade.go index 04da11654f..eecdfbe709 100644 --- a/client/core/trade.go +++ b/client/core/trade.go @@ -740,9 +740,9 @@ func (t *trackedTrade) token() string { // clearCancel clears the unmatched cancel and deletes the cancel checksum and // link to the trade in the dexConnection. clearCancel must be called with the // trackedTrade.mtx locked. -func (t *trackedTrade) clearCancel(preImg order.Preimage) { +func (c *Core) clearCancel(t *trackedTrade, preImg order.Preimage) { if t.cancel != nil { - t.dc.deleteCancelLink(t.cancel.ID()) + c.deleteCancelLink(t.cancel.ID()) t.cancel = nil } t.csumMtx.Lock() @@ -753,15 +753,15 @@ func (t *trackedTrade) clearCancel(preImg order.Preimage) { // cancelTrade sets the cancellation data with the order and its preimage. // cancelTrade must be called with the mtx write-locked. -func (t *trackedTrade) cancelTrade(co *order.CancelOrder, preImg order.Preimage, epochLen uint64) error { - t.clearCancel(preImg) +func (c *Core) cancelTrade(t *trackedTrade, co *order.CancelOrder, preImg order.Preimage, epochLen uint64) error { + c.clearCancel(t, preImg) t.cancel = &trackedCancel{ CancelOrder: *co, epochLen: epochLen, } cid := co.ID() oid := t.ID() - t.dc.registerCancelLink(cid, oid) + c.registerCancelLink(cid, oid) err := t.db.LinkOrder(oid, cid) if err != nil { return fmt.Errorf("error linking cancel order %s for trade %s: %w", cid, oid, err) @@ -771,7 +771,7 @@ func (t *trackedTrade) cancelTrade(co *order.CancelOrder, preImg order.Preimage, } // nomatch sets the appropriate order status and returns funding coins. -func (t *trackedTrade) nomatch(oid order.OrderID) (assetMap, error) { +func (c *Core) nomatch(t *trackedTrade, oid order.OrderID) (assetMap, error) { assets := make(assetMap) // Check if this is the cancel order. t.mtx.Lock() @@ -789,7 +789,7 @@ func (t *trackedTrade) nomatch(oid order.OrderID) (assetMap, error) { t.dc.log.Errorf("DB error unlinking cancel order %s for trade %s: %v", oid, t.ID(), err) } // Clearing the trackedCancel allows this order to be canceled again. - t.clearCancel(order.Preimage{}) + c.clearCancel(t, order.Preimage{}) t.metaData.LinkedOrder = order.OrderID{} subject, details := t.formatDetails(TopicMissedCancel, makeOrderToken(t.token())) @@ -1195,7 +1195,7 @@ func (t *trackedTrade) counterPartyConfirms(ctx context.Context, match *matchTra // orders detected as "stale" with the two-epochs-old heuristic use this. // // This method MUST be called with the trackedTrade mutex lock held for writes. -func (t *trackedTrade) deleteCancelOrder() { +func (c *Core) deleteCancelOrder(t *trackedTrade) { if t.cancel == nil { return } @@ -1205,7 +1205,7 @@ func (t *trackedTrade) deleteCancelOrder() { t.dc.log.Errorf("Error updating status in db for cancel order %v to revoked: %v", cid, err) } // Unlink the cancel order from the trade. - t.clearCancel(order.Preimage{}) + c.clearCancel(t, order.Preimage{}) t.metaData.LinkedOrder = order.OrderID{} // NOTE: caller may wish to update the trades's DB entry } @@ -1234,7 +1234,7 @@ func (t *trackedTrade) hasStaleCancelOrder() bool { // check and return status of cancel orders. // // This method MUST be called with the trackedTrade mutex lock held for writes. -func (t *trackedTrade) deleteStaleCancelOrder() { +func (c *Core) deleteStaleCancelOrder(t *trackedTrade) { if !t.hasStaleCancelOrder() { return } @@ -1245,7 +1245,7 @@ func (t *trackedTrade) deleteStaleCancelOrder() { // Clear the trackedCancel, allowing this order to be canceled again, and // set the cancel order's status as revoked. cancelOrd := t.cancel - t.deleteCancelOrder() + c.deleteCancelOrder(t) err := t.db.LinkOrder(t.ID(), order.OrderID{}) if err != nil { t.dc.log.Errorf("DB error unlinking cancel order %s for trade %s: %v", cancelOrd.ID(), t.ID(), err) @@ -2103,7 +2103,7 @@ func (c *Core) tick(t *trackedTrade) (assetMap, error) { defer t.mtx.Unlock() if rmCancel { - t.deleteStaleCancelOrder() + c.deleteStaleCancelOrder(t) } for _, match := range revokes {