diff --git a/src/main/java/org/qortal/api/resource/TransactionsResource.java b/src/main/java/org/qortal/api/resource/TransactionsResource.java index f6f82153c..964d38d6e 100644 --- a/src/main/java/org/qortal/api/resource/TransactionsResource.java +++ b/src/main/java/org/qortal/api/resource/TransactionsResource.java @@ -832,6 +832,11 @@ public String processTransaction(String rawBytes58, @HeaderParam(ApiService.API_ if (transactionData == null) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + // general chat transactions are invalid + if( transactionData.getType() == TransactionType.CHAT && transactionData.getTxGroupId() == 0 && transactionData.getRecipient() == null) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + } + try (final Repository repository = RepositoryManager.getRepository()) { Transaction transaction = Transaction.fromData(repository, transactionData); diff --git a/src/main/java/org/qortal/api/websocket/ChatMessagesWebSocket.java b/src/main/java/org/qortal/api/websocket/ChatMessagesWebSocket.java index 4340ad581..abfecfd32 100644 --- a/src/main/java/org/qortal/api/websocket/ChatMessagesWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/ChatMessagesWebSocket.java @@ -45,6 +45,12 @@ public void onWebSocketConnect(Session session) { if (txGroupIds != null && txGroupIds.size() == 1) { int txGroupId = Integer.parseInt(txGroupIds.get(0)); + // reject general chat + if( txGroupId == 0 ) { + session.close(4001, "invalid criteria"); + return; + } + try (final Repository repository = RepositoryManager.getRepository()) { List chatMessages = repository.getChatRepository().getMessagesMatchingCriteria( null, diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 3f72ed9be..c5af3afbc 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -37,6 +37,7 @@ import org.qortal.network.PeerAddress; import org.qortal.network.message.*; import org.qortal.repository.*; +import org.qortal.repository.hsqldb.HSQLDBCacheUtils; import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory; import org.qortal.settings.Settings; import org.qortal.transaction.Transaction; @@ -423,7 +424,7 @@ public static void main(String[] args) { } if( Settings.getInstance().isDbCacheEnabled() ) { - LOGGER.info("Db Cache Starting ..."); + LOGGER.info("Starting Db Cache..."); HSQLDBDataCacheManager hsqldbDataCacheManager = new HSQLDBDataCacheManager(); hsqldbDataCacheManager.start(); } @@ -431,7 +432,7 @@ public static void main(String[] args) { LOGGER.info("Db Cache Disabled"); } - LOGGER.info("Arbitrary Indexing Starting ..."); + LOGGER.info("Starting Arbitrary Indexing..."); ArbitraryIndexUtils.startCaching( Settings.getInstance().getArbitraryIndexingPriority(), Settings.getInstance().getArbitraryIndexingFrequency() @@ -441,7 +442,7 @@ public static void main(String[] args) { Optional recorder = HSQLDBBalanceRecorder.getInstance(); if( recorder.isPresent() ) { - LOGGER.info("Balance Recorder Starting ..."); + LOGGER.info("Starting Balance Recorder..."); recorder.get().start(); } else { @@ -449,7 +450,7 @@ public static void main(String[] args) { } } else { - LOGGER.info("Balance Recorder Disabled"); + LOGGER.info("Balance Recorder disabled"); } } catch (DataException e) { // If exception has no cause or message then repository is in use by some other process. @@ -469,17 +470,17 @@ public static void main(String[] args) { // Rebuild Names table and check database integrity (if enabled) NamesDatabaseIntegrityCheck namesDatabaseIntegrityCheck = new NamesDatabaseIntegrityCheck(); + LOGGER.info("Rebuilding all names..."); namesDatabaseIntegrityCheck.rebuildAllNames(); if (Settings.getInstance().isNamesIntegrityCheckEnabled()) { + LOGGER.info("Running database integrity check..."); namesDatabaseIntegrityCheck.runIntegrityCheck(); } - - LOGGER.info("Validating blockchain"); + LOGGER.info("Validating blockchain..."); try { BlockChain.validate(); - Controller.getInstance().refillLatestBlocksCache(); - LOGGER.info(String.format("Our chain height at start-up: %d", Controller.getInstance().getChainHeight())); + LOGGER.info("Chain height at start-up: {}", Controller.getInstance().getChainHeight()); } catch (DataException e) { LOGGER.error("Couldn't validate blockchain", e); Gui.getInstance().fatalError("Blockchain validation issue", e); @@ -565,7 +566,6 @@ public void run() { ); } - LOGGER.info("Starting online accounts manager"); OnlineAccountsManager.getInstance().start(); @@ -599,7 +599,7 @@ public void run() { } if (Settings.getInstance().isGatewayEnabled()) { - LOGGER.info(String.format("Starting gateway service on port %d", Settings.getInstance().getGatewayPort())); + LOGGER.info("Starting gateway service on port {}", Settings.getInstance().getGatewayPort()); try { GatewayService gatewayService = GatewayService.getInstance(); gatewayService.start(); @@ -612,7 +612,7 @@ public void run() { } if (Settings.getInstance().isDomainMapEnabled()) { - LOGGER.info(String.format("Starting domain map service on port %d", Settings.getInstance().getDomainMapPort())); + LOGGER.info("Starting domain map service on port {}", Settings.getInstance().getDomainMapPort()); try { DomainMapService domainMapService = DomainMapService.getInstance(); domainMapService.start(); @@ -782,12 +782,12 @@ public void run() { if (ntpTime != null) { if (ntpTime != now) // Only log if non-zero offset - LOGGER.info(String.format("Adjusting system time by NTP offset: %dms", ntpTime - now)); + LOGGER.info("Adjusting system time by NTP offset: {}ms", ntpTime - now); ntpCheckTimestamp = now + NTP_POST_SYNC_CHECK_PERIOD; requestSysTrayUpdate = true; } else { - LOGGER.info(String.format("No NTP offset yet")); + LOGGER.info("No NTP offset yet"); ntpCheckTimestamp = now + NTP_PRE_SYNC_CHECK_PERIOD; // We can't do much without a valid NTP time continue; @@ -1120,6 +1120,11 @@ public void shutdown() { LOGGER.info("Shutting down synchronizer"); Synchronizer.getInstance().shutdown(); + try { + Synchronizer.getInstance().join(); + } catch (InterruptedException e) { + // We were interrupted while waiting for thread to join + } LOGGER.info("Shutting down API"); ApiService.getInstance().stop(); @@ -1177,6 +1182,17 @@ public void shutdown() { // We were interrupted while waiting for thread to join } + LOGGER.info("Shutting down TradeBot"); + TradeBot.getInstance().shutdown(); + + // Shutdown database cache timers before closing repository + LOGGER.info("Shutting down database cache timers"); + HSQLDBCacheUtils.shutdown(); + + // Shutdown arbitrary metadata manager scheduler before closing repository + LOGGER.info("Shutting down arbitrary metadata manager"); + ArbitraryMetadataManager.getInstance().shutdown(); + // Make sure we're the only thread modifying the blockchain when shutting down the repository ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); try { diff --git a/src/main/java/org/qortal/controller/ForeignFeesManager.java b/src/main/java/org/qortal/controller/ForeignFeesManager.java index 3763b7b30..71fb7ca7f 100644 --- a/src/main/java/org/qortal/controller/ForeignFeesManager.java +++ b/src/main/java/org/qortal/controller/ForeignFeesManager.java @@ -415,6 +415,9 @@ private void processForeignFeesImportQueue() { ATData atData = repository.getATRepository().fromATAddress(atAddress); + // if AT data is not available, then continue on to the next AT + if( atData == null ) continue; + LOGGER.debug("verify signer for atAddress = " + atAddress); // determine if the creator authorized the foreign fee diff --git a/src/main/java/org/qortal/controller/TransactionImporter.java b/src/main/java/org/qortal/controller/TransactionImporter.java index 6f9fa1774..80f9402dd 100644 --- a/src/main/java/org/qortal/controller/TransactionImporter.java +++ b/src/main/java/org/qortal/controller/TransactionImporter.java @@ -92,6 +92,27 @@ public void run() { public void shutdown() { isStopping = true; this.interrupt(); + + // Shutdown all schedulers + LOGGER.info("Shutting down TransactionImporter schedulers"); + try { + getTransactionMessageScheduler.shutdownNow(); + getUnconfirmedTransactionsMessageScheduler.shutdownNow(); + signatureMessageScheduler.shutdownNow(); + + if (!getTransactionMessageScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("getTransactionMessageScheduler did not terminate in time"); + } + if (!getUnconfirmedTransactionsMessageScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("getUnconfirmedTransactionsMessageScheduler did not terminate in time"); + } + if (!signatureMessageScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("signatureMessageScheduler did not terminate in time"); + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for TransactionImporter schedulers to terminate", e); + Thread.currentThread().interrupt(); + } } @@ -245,6 +266,13 @@ private void importTransactionsInQueue() { return; } + // discard general chat transactions, chat transactions with no group and no recipient + sigValidTransactions.removeIf( + transactionData -> transactionData.getType() == Transaction.TransactionType.CHAT && + transactionData.getTxGroupId() == 0 && + transactionData.getRecipient() == null + ); + if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { // Prioritize syncing, and don't attempt to lock return; @@ -397,6 +425,9 @@ public void onNetworkGetTransactionMessage(Peer peer, Message message) { } private void processNetworkGetTransactionMessages() { + if (Controller.isStopping()) { + return; + } try { List messagesToProcess; @@ -490,7 +521,6 @@ private static void sendTransactionMessage(String signature58, TransactionData d // Scheduled executor service to process messages every second private final ScheduledExecutorService getUnconfirmedTransactionsMessageScheduler = Executors.newScheduledThreadPool(1); - public void onNetworkGetUnconfirmedTransactionsMessage(Peer peer, Message message) { synchronized (getUnconfirmedTransactionsMessageLock) { getUnconfirmedTransactionsMessageList.add(new PeerMessage(peer, message)); @@ -498,6 +528,9 @@ public void onNetworkGetUnconfirmedTransactionsMessage(Peer peer, Message messag } private void processNetworkGetUnconfirmedTransactionsMessages() { + if (Controller.isStopping()) { + return; + } List messagesToProcess; synchronized (getUnconfirmedTransactionsMessageLock) { @@ -542,6 +575,9 @@ public void onNetworkTransactionSignaturesMessage(Peer peer, Message message) { } public void processNetworkTransactionSignaturesMessage() { + if (Controller.isStopping()) { + return; + } try { List messagesToProcess; diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java index 9accd9c71..b9c78d199 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java @@ -63,6 +63,13 @@ public void run() { // Process queue processResourceQueue(); + } catch (InterruptedException e) { + // Check if we're shutting down + if (Controller.isStopping()) { + LOGGER.info("Arbitrary Data Cache Manager shutting down"); + break; + } + LOGGER.warn("Arbitrary Data Cache Manager interrupted, retrying...", e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); Thread.sleep(600_000L); // wait 10 minutes to continue @@ -71,6 +78,10 @@ public void run() { // Clear queue before terminating thread processResourceQueue(); + } catch (InterruptedException e) { + if (!Controller.isStopping()) { + LOGGER.error("Arbitrary Data Cache Manager interrupted unexpectedly", e); + } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 8f0bf7081..f4b9d752a 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -449,6 +449,10 @@ private void fetchAllMetadata() throws InterruptedException { ); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); + } catch (InterruptedException e) { + // Thread interrupted during shutdown - restore interrupt status and exit + Thread.currentThread().interrupt(); + return; } catch (Exception e) { LOGGER.error(e.getMessage(), e); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java index 76e448955..45a9a4e98 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java @@ -398,6 +398,11 @@ public void onNetworkGetArbitraryMetadataMessage(Peer peer, Message message) { private void processNetworkGetArbitraryMetadataMessage() { + // Exit gracefully if shutting down + if (Controller.isStopping()) { + return; + } + try { List messagesToProcess; synchronized (lock) { @@ -531,4 +536,29 @@ private void processNetworkGetArbitraryMetadataMessage() { LOGGER.error(e.getMessage(), e); } } + + /** + * Shutdown the scheduler + * + * Stops the scheduled executor service to allow clean shutdown + */ + public void shutdown() { + LOGGER.info("Shutting down ArbitraryMetadataManager scheduler..."); + + if (!scheduler.isShutdown()) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + LOGGER.debug("Scheduler forced shutdown"); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + LOGGER.debug("Scheduler shutdown complete"); + } + + LOGGER.info("ArbitraryMetadataManager scheduler shutdown complete"); + } } diff --git a/src/main/java/org/qortal/controller/arbitrary/Follower.java b/src/main/java/org/qortal/controller/arbitrary/Follower.java index 228640f4c..6a06cc5d8 100644 --- a/src/main/java/org/qortal/controller/arbitrary/Follower.java +++ b/src/main/java/org/qortal/controller/arbitrary/Follower.java @@ -68,7 +68,8 @@ private void fetch(OptionalInt limit) { final int blockHeightThreshold = repository.getBlockRepository().getBlockchainHeight() - limit.getAsInt(); transactionsInReverseOrder - = latestArbitraryTransactionsByName.stream().filter(tx -> tx.getBlockHeight() > blockHeightThreshold) + = latestArbitraryTransactionsByName.stream() + .filter(tx -> tx.getBlockHeight() != null && tx.getBlockHeight() > blockHeightThreshold) .collect(Collectors.toList()); } else { transactionsInReverseOrder = latestArbitraryTransactionsByName; diff --git a/src/main/java/org/qortal/controller/hsqldb/HSQLDBBalanceRecorder.java b/src/main/java/org/qortal/controller/hsqldb/HSQLDBBalanceRecorder.java index 43e7c5425..1c7f89ae2 100644 --- a/src/main/java/org/qortal/controller/hsqldb/HSQLDBBalanceRecorder.java +++ b/src/main/java/org/qortal/controller/hsqldb/HSQLDBBalanceRecorder.java @@ -45,18 +45,12 @@ private HSQLDBBalanceRecorder( int priorityRequested, int frequency, int capacit public static Optional getInstance() { if( SINGLETON == null ) { - SINGLETON = new HSQLDBBalanceRecorder( Settings.getInstance().getBalanceRecorderPriority(), Settings.getInstance().getBalanceRecorderFrequency(), Settings.getInstance().getBalanceRecorderCapacity() ); - - } - else if( SINGLETON == null ) { - - return Optional.empty(); } return Optional.of(SINGLETON); @@ -72,13 +66,11 @@ public void run() { public List getLatestDynamics(int limit, long offset) { - List latest = this.balanceDynamics.stream() + return this.balanceDynamics.stream() .sorted(BalanceRecorderUtils.BLOCK_HEIGHT_RANGE_ADDRESS_AMOUNTS_COMPARATOR.reversed()) .skip(offset) .limit(limit) .collect(Collectors.toList()); - - return latest; } public List getRanges(Integer offset, Integer limit, Boolean reverse) { diff --git a/src/main/java/org/qortal/controller/repository/BlockArchiver.java b/src/main/java/org/qortal/controller/repository/BlockArchiver.java index 01cf40edd..60d875378 100644 --- a/src/main/java/org/qortal/controller/repository/BlockArchiver.java +++ b/src/main/java/org/qortal/controller/repository/BlockArchiver.java @@ -45,6 +45,13 @@ public void run() { repository.discardChanges(); return; } + } catch (InterruptedException e) { + if (Controller.isStopping()) { + LOGGER.info("Block Archiving shutting down"); + } else { + LOGGER.error("Block Archiving interrupted during initialization. Restart ASAP. Report this error immediately to the developers.", e); + } + return; } catch (Exception e) { LOGGER.error("Block Archiving is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e); return; diff --git a/src/main/java/org/qortal/controller/repository/NamesDatabaseIntegrityCheck.java b/src/main/java/org/qortal/controller/repository/NamesDatabaseIntegrityCheck.java index 359819764..67b985deb 100644 --- a/src/main/java/org/qortal/controller/repository/NamesDatabaseIntegrityCheck.java +++ b/src/main/java/org/qortal/controller/repository/NamesDatabaseIntegrityCheck.java @@ -30,6 +30,7 @@ public class NamesDatabaseIntegrityCheck { ); private List nameTransactions = new ArrayList<>(); + private Map> transactionsByNameCache = null; public int rebuildName(String name, Repository repository) { @@ -130,18 +131,55 @@ public int rebuildName(String name, Repository repository) { public int rebuildAllNames() { int modificationCount = 0; + long startTime = System.currentTimeMillis(); + try (final Repository repository = RepositoryManager.getRepository()) { - List names = this.fetchAllNames(repository); // TODO: de-duplicate, to speed up this process + // Build cache of all transactions by name to avoid repeated database queries + long cacheStartTime = System.currentTimeMillis(); + this.buildTransactionsByNameCache(repository); + long cacheEndTime = System.currentTimeMillis(); + LOGGER.info("Cache built in {} ms", cacheEndTime - cacheStartTime); + + List names = this.fetchAllNames(repository); + int totalNames = names.size(); + LOGGER.info("Rebuilding {} names...", totalNames); + + int processedCount = 0; + int logInterval = Math.max(1, totalNames / 10); // Log every 10% + for (String name : names) { modificationCount += this.rebuildName(name, repository); + processedCount++; + + // Log progress every logInterval names + if (processedCount % logInterval == 0 || processedCount == totalNames) { + long elapsedTime = System.currentTimeMillis() - startTime; + double percentComplete = (processedCount * 100.0) / totalNames; + long estimatedTotalTime = (long) (elapsedTime / (processedCount / (double) totalNames)); + long estimatedTimeRemaining = estimatedTotalTime - elapsedTime; + + LOGGER.info(String.format("Progress: %d/%d names (%.1f%%) - Elapsed: %d ms - Est. remaining: %d ms", + processedCount, totalNames, percentComplete, + elapsedTime, estimatedTimeRemaining)); + } } + + long saveStartTime = System.currentTimeMillis(); repository.saveChanges(); + long saveEndTime = System.currentTimeMillis(); + LOGGER.info("Changes saved in {} ms", saveEndTime - saveStartTime); + + // Clear cache after use + this.transactionsByNameCache = null; + + long totalTime = System.currentTimeMillis() - startTime; + LOGGER.info("Rebuild completed: {} modifications in {} ms ({} seconds)", + modificationCount, totalTime, totalTime / 1000.0); } catch (DataException e) { LOGGER.info("Error when running integrity check for all names: {}", e.getMessage()); } - //LOGGER.info("modificationCount: {}", modificationCount); return modificationCount; } @@ -307,7 +345,86 @@ private void fetchAllNameTransactions(Repository repository) throws DataExceptio this.nameTransactions = nameTransactions; } + private void buildTransactionsByNameCache(Repository repository) throws DataException { + LOGGER.info("Building transaction cache for all names..."); + this.transactionsByNameCache = new HashMap<>(); + + // Fetch all name transactions if not already fetched + if (this.nameTransactions.isEmpty()) { + this.fetchAllNameTransactions(repository); + } + + // Group all transactions by the names they involve + for (TransactionData transactionData : this.nameTransactions) { + // Filter out unconfirmed transactions + if (transactionData.getBlockHeight() == null || transactionData.getBlockHeight() <= 0) { + continue; + } + + Set involvedNames = new HashSet<>(); + + if (transactionData instanceof RegisterNameTransactionData) { + RegisterNameTransactionData registerNameTransactionData = (RegisterNameTransactionData) transactionData; + involvedNames.add(registerNameTransactionData.getName()); + String reducedName = Unicode.sanitize(registerNameTransactionData.getName()); + if (reducedName != null && !reducedName.equals(registerNameTransactionData.getName())) { + involvedNames.add(reducedName); + } + } + else if (transactionData instanceof UpdateNameTransactionData) { + UpdateNameTransactionData updateNameTransactionData = (UpdateNameTransactionData) transactionData; + involvedNames.add(updateNameTransactionData.getName()); + if (updateNameTransactionData.getNewName() != null) { + involvedNames.add(updateNameTransactionData.getNewName()); + String reducedNewName = Unicode.sanitize(updateNameTransactionData.getNewName()); + if (reducedNewName != null && !reducedNewName.isEmpty()) { + involvedNames.add(reducedNewName); + } + } + } + else if (transactionData instanceof BuyNameTransactionData) { + BuyNameTransactionData buyNameTransactionData = (BuyNameTransactionData) transactionData; + involvedNames.add(buyNameTransactionData.getName()); + } + else if (transactionData instanceof SellNameTransactionData) { + SellNameTransactionData sellNameTransactionData = (SellNameTransactionData) transactionData; + involvedNames.add(sellNameTransactionData.getName()); + } + else if (transactionData instanceof CancelSellNameTransactionData) { + CancelSellNameTransactionData cancelSellNameTransactionData = (CancelSellNameTransactionData) transactionData; + involvedNames.add(cancelSellNameTransactionData.getName()); + } + + // Add this transaction to all involved names + for (String involvedName : involvedNames) { + if (involvedName == null || involvedName.isEmpty()) { + continue; + } + this.transactionsByNameCache.computeIfAbsent(involvedName, k -> new ArrayList<>()).add(transactionData); + } + } + + // Sort all transaction lists by block height and timestamp + for (List transactions : this.transactionsByNameCache.values()) { + sortTransactions(transactions); + } + + LOGGER.info("Transaction cache built for {} unique names", this.transactionsByNameCache.size()); + } + public List fetchAllTransactionsInvolvingName(String name, Repository repository) throws DataException { + // Use cache if available + if (this.transactionsByNameCache != null) { + List cachedTransactions = this.transactionsByNameCache.get(name); + if (cachedTransactions != null) { + // Return a copy to avoid external modifications + return new ArrayList<>(cachedTransactions); + } + // If not in cache, return empty list (all names should be in cache when it's built) + return new ArrayList<>(); + } + + // Fall back to database queries if cache not available List signatures = new ArrayList<>(); String reducedName = Unicode.sanitize(name); @@ -361,7 +478,7 @@ private TransactionData fetchLatestModificationTransactionInvolvingName(String r } private List fetchAllNames(Repository repository) throws DataException { - List names = new ArrayList<>(); + Set namesSet = new HashSet<>(); // Fetch all the confirmed name transactions if (this.nameTransactions.isEmpty()) { @@ -372,46 +489,39 @@ private List fetchAllNames(Repository repository) throws DataException { if ((transactionData instanceof RegisterNameTransactionData)) { RegisterNameTransactionData registerNameTransactionData = (RegisterNameTransactionData) transactionData; - if (!names.contains(registerNameTransactionData.getName())) { - names.add(registerNameTransactionData.getName()); - } + namesSet.add(registerNameTransactionData.getName()); } if ((transactionData instanceof UpdateNameTransactionData)) { UpdateNameTransactionData updateNameTransactionData = (UpdateNameTransactionData) transactionData; - if (!names.contains(updateNameTransactionData.getName())) { - names.add(updateNameTransactionData.getName()); - } - if (!names.contains(updateNameTransactionData.getNewName())) { - names.add(updateNameTransactionData.getNewName()); + namesSet.add(updateNameTransactionData.getName()); + if (updateNameTransactionData.getNewName() != null) { + namesSet.add(updateNameTransactionData.getNewName()); } } if ((transactionData instanceof BuyNameTransactionData)) { BuyNameTransactionData buyNameTransactionData = (BuyNameTransactionData) transactionData; - if (!names.contains(buyNameTransactionData.getName())) { - names.add(buyNameTransactionData.getName()); - } + namesSet.add(buyNameTransactionData.getName()); } if ((transactionData instanceof SellNameTransactionData)) { SellNameTransactionData sellNameTransactionData = (SellNameTransactionData) transactionData; - if (!names.contains(sellNameTransactionData.getName())) { - names.add(sellNameTransactionData.getName()); - } + namesSet.add(sellNameTransactionData.getName()); } if ((transactionData instanceof CancelSellNameTransactionData)) { CancelSellNameTransactionData cancelSellNameTransactionData = (CancelSellNameTransactionData) transactionData; - if (!names.contains(cancelSellNameTransactionData.getName())) { - names.add(cancelSellNameTransactionData.getName()); - } + namesSet.add(cancelSellNameTransactionData.getName()); } } - return names; + return new ArrayList<>(namesSet); } private int addAdditionalTransactionsRelatingToName(List transactions, String name, Repository repository) throws DataException { int added = 0; + // Use a HashSet for O(1) lookups when checking for existing transactions + Set existingTransactions = new HashSet<>(transactions); + // If this name has been updated at any point, we need to add transactions from the other names to the sequence - List otherNames = new ArrayList<>(); + Set otherNames = new HashSet<>(); List updateNameTransactions = transactions.stream().filter(t -> t.getType() == TransactionType.UPDATE_NAME).collect(Collectors.toList()); for (TransactionData transactionData : updateNameTransactions) { UpdateNameTransactionData updateNameTransactionData = (UpdateNameTransactionData) transactionData; @@ -431,9 +541,10 @@ private int addAdditionalTransactionsRelatingToName(List transa for (String otherName : otherNames) { List otherNameTransactions = this.fetchAllTransactionsInvolvingName(otherName, repository); for (TransactionData otherNameTransactionData : otherNameTransactions) { - if (!transactions.contains(otherNameTransactionData)) { + if (!existingTransactions.contains(otherNameTransactionData)) { // Add new transaction relating to other name transactions.add(otherNameTransactionData); + existingTransactions.add(otherNameTransactionData); added++; } } diff --git a/src/main/java/org/qortal/controller/repository/OnlineAccountsSignaturesTrimmer.java b/src/main/java/org/qortal/controller/repository/OnlineAccountsSignaturesTrimmer.java index c2d37e14f..22893088d 100644 --- a/src/main/java/org/qortal/controller/repository/OnlineAccountsSignaturesTrimmer.java +++ b/src/main/java/org/qortal/controller/repository/OnlineAccountsSignaturesTrimmer.java @@ -35,6 +35,13 @@ public void run() { Thread.sleep(INITIAL_SLEEP_PERIOD); trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight(); + } catch (InterruptedException e) { + if (Controller.isStopping()) { + LOGGER.info("Online Accounts Signatures Trimming shutting down"); + } else { + LOGGER.error("Online Accounts Signatures Trimming interrupted during initialization. Restart ASAP. Report this error immediately to the developers.", e); + } + return; } catch (Exception e) { LOGGER.error("Online Accounts Signatures Trimming is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e); return; diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index c17e57589..cecbca3e9 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -137,6 +137,19 @@ public static synchronized TradeBot getInstance() { return instance; } + public void shutdown() { + try { + LOGGER.info("Shutting down TradeBot scheduler"); + tradePresenceMessageScheduler.shutdownNow(); + if (!tradePresenceMessageScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("TradeBot scheduler did not terminate in time"); + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for TradeBot scheduler to terminate", e); + Thread.currentThread().interrupt(); + } + } + public ACCT getAcctUsingAtData(ATData atData) { byte[] codeHash = atData.getCodeHash(); if (codeHash == null) diff --git a/src/main/java/org/qortal/crosschain/ChainableServerConnectionRecorder.java b/src/main/java/org/qortal/crosschain/ChainableServerConnectionRecorder.java index 23697c125..47dc9ff9e 100644 --- a/src/main/java/org/qortal/crosschain/ChainableServerConnectionRecorder.java +++ b/src/main/java/org/qortal/crosschain/ChainableServerConnectionRecorder.java @@ -21,13 +21,18 @@ public ChainableServerConnection recordConnection( ChainableServerConnection connection = new ChainableServerConnection(server, requestedBy, open, success, System.currentTimeMillis(), notes); - connections.add(connection); - - if( connections.size() > limit) { - ChainableServerConnection firstConnection - = connections.stream().sorted(Comparator.comparing(ChainableServerConnection::getCurrentTimeMillis)) - .findFirst().get(); - connections.remove(firstConnection); + synchronized (connections) { + connections.add(connection); + + if (connections.size() > limit) { + ChainableServerConnection firstConnection = connections.get(0); + for (ChainableServerConnection candidate : connections) { + if (candidate.getCurrentTimeMillis() < firstConnection.getCurrentTimeMillis()) { + firstConnection = candidate; + } + } + connections.remove(firstConnection); + } } return connection; } diff --git a/src/main/java/org/qortal/crosschain/Digibyte.java b/src/main/java/org/qortal/crosschain/Digibyte.java index 9ee1f06a7..67ac4f108 100644 --- a/src/main/java/org/qortal/crosschain/Digibyte.java +++ b/src/main/java/org/qortal/crosschain/Digibyte.java @@ -47,6 +47,7 @@ public Collection getServers() { // Servers chosen on NO BASIS WHATSOEVER from various sources! // Status verified at https://1209k.com/bitcoin-eye/ele.php?chain=dgb new Server("electrum.qortal.link", Server.ConnectionType.SSL, 55002), + new Server("electrum.cipig.net", Server.ConnectionType.SSL, 20059), new Server("electrum1.cipig.net", Server.ConnectionType.SSL, 20059), new Server("electrum2.cipig.net", Server.ConnectionType.SSL, 20059), new Server("electrum3.cipig.net", Server.ConnectionType.SSL, 20059) diff --git a/src/main/java/org/qortal/crosschain/ElectrumServer.java b/src/main/java/org/qortal/crosschain/ElectrumServer.java index bcf399983..cbb40e9b9 100644 --- a/src/main/java/org/qortal/crosschain/ElectrumServer.java +++ b/src/main/java/org/qortal/crosschain/ElectrumServer.java @@ -2,6 +2,7 @@ import org.qortal.crypto.TrustlessSSLSocketFactory; +import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.net.Socket; @@ -18,6 +19,7 @@ public class ElectrumServer { private Socket socket; private Scanner scanner; private int nextId = 1; + private String clientName; private ChainableServerConnectionRecorder recorder; @@ -44,6 +46,10 @@ private void init(ChainableServer server, SocketAddress endpoint, int timeout, C if (this.server.getConnectionType() == ElectrumX.Server.ConnectionType.SSL) { SSLSocketFactory factory = TrustlessSSLSocketFactory.getSocketFactory(); this.socket = factory.createSocket(this.socket, server.getHostName(), server.getPort(), true); + this.socket.setSoTimeout(timeout); + this.socket.setTcpNoDelay(true); + this.socket.getOutputStream().flush(); + ((SSLSocket) this.socket).startHandshake(); } this.scanner = new Scanner(this.socket.getInputStream()); @@ -60,6 +66,14 @@ public int incrementNextId() { return nextId++; } + public String getClientName() { + return this.clientName; + } + + public void setClientName(String clientName) { + this.clientName = clientName; + } + public String write(byte[] bytes, String id) throws IOException { synchronized (this.serverLock) { diff --git a/src/main/java/org/qortal/crosschain/ElectrumX.java b/src/main/java/org/qortal/crosschain/ElectrumX.java index aedc93eb8..53523345a 100644 --- a/src/main/java/org/qortal/crosschain/ElectrumX.java +++ b/src/main/java/org/qortal/crosschain/ElectrumX.java @@ -9,6 +9,7 @@ import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.qortal.api.resource.CrossChainUtils; +import org.qortal.controller.Controller; import org.qortal.crypto.Crypto; import org.qortal.utils.BitTwiddling; @@ -19,10 +20,13 @@ import java.text.DecimalFormat; import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -37,7 +41,12 @@ public class ElectrumX extends BitcoinyBlockchainProvider { // See: https://electrumx.readthedocs.io/en/latest/protocol-changes.html private static final double MIN_PROTOCOL_VERSION = 1.2; private static final double MAX_PROTOCOL_VERSION = 2.0; // Higher than current latest, for hopeful future-proofing - private static final String CLIENT_NAME = "Qortal"; + private static final int MIN_TARGET_CONNECTIONS = 2; + private static final int DEFAULT_TARGET_CONNECTIONS = 3; + private static final double TARGET_CONNECTIONS_FRACTION = 0.75d; + private static final int PROBE_TIMEOUT_MS = 2000; + private static final long PROBE_RETRY_MS = 5 * 60 * 1000L; + private static final long FAILURE_PENALTY_MS = 5000L; private static final int BLOCK_HEADER_LENGTH = 80; @@ -49,15 +58,16 @@ public class ElectrumX extends BitcoinyBlockchainProvider { private static final int RESPONSE_TIME_READINGS = 5; private static final long MAX_AVG_RESPONSE_TIME = 2000L; // ms + private static final long UNKNOWN_RESPONSE_PENALTY_MS = MAX_AVG_RESPONSE_TIME * 5; public static final String MISSING_FEATURES_ERROR = "MISSING FEATURES ERROR"; public static final String EXPECTED_GENESIS_ERROR = "EXPECTED GENESIS ERROR"; - public static final int MINIMUM_CONNECTIONS = 30; - public static final int MAXIMUM_CONNECTIONS = 50; + private static final long IDLE_DISCONNECT_MS = 2 * 60 * 1000L; private ChainableServerConnectionRecorder recorder = new ChainableServerConnectionRecorder(100); // the minimum number of connections targeted for this foreign blockchain private int minimumConnections; + private int maximumConnections; public static class Server implements ChainableServer { String hostname; @@ -83,11 +93,15 @@ public void addResponseTime(long responseTime) { @Override public long averageResponseTime() { - if (this.responseTimes.size() < RESPONSE_TIME_READINGS) { + List snapshot; + synchronized (this.responseTimes) { + snapshot = new ArrayList<>(this.responseTimes); + } + if (snapshot.size() < RESPONSE_TIME_READINGS) { // Not enough readings yet return 0L; } - OptionalDouble average = this.responseTimes.stream().mapToDouble(a -> a).average(); + OptionalDouble average = snapshot.stream().filter(Objects::nonNull).mapToDouble(a -> a).average(); if (average.isPresent()) { return Double.valueOf(average.getAsDouble()).longValue(); } @@ -165,6 +179,16 @@ public boolean removeEldestEntry(Map.Entry eldest) // Scheduled executor service to monitor connections private final ScheduledExecutorService scheduleMonitorConnections = Executors.newScheduledThreadPool(1); + private final Object connectionManagementLock = new Object(); + private final Object connectionListLock = new Object(); + private volatile boolean connectionManagementStarted = false; + private volatile long lastRpcTimeMs = 0L; + private final AtomicInteger inFlightRpcCount = new AtomicInteger(0); + private final Map serverFailureCounts = new ConcurrentHashMap<>(); + private final Map serverLastProbeTime = new ConcurrentHashMap<>(); + private volatile boolean initialProbeCompleted = false; + private volatile String lastScoreExtremesDigest = ""; + // Constructors public ElectrumX(String netId, String genesisHash, Collection initialServerList, Map defaultPorts) { @@ -173,12 +197,7 @@ public ElectrumX(String netId, String genesisHash, Collection initialSer this.servers.addAll(initialServerList); this.defaultPorts.putAll(defaultPorts); - // the minimum is set to roughly 10% of the initial count - this.minimumConnections = (initialServerList.size() / 10) + 1; - - scheduleMakeConnections.scheduleWithFixedDelay(this::makeConnections, 1, 3600, TimeUnit.SECONDS); - scheduleRecoverConnections.scheduleWithFixedDelay(this::recoverConnections, 120, 10, TimeUnit.SECONDS); - scheduleMonitorConnections.scheduleWithFixedDelay(this::monitorConnections, 1, 10, TimeUnit.MINUTES); + updateConnectionTargets(initialServerList.size()); } // Methods for use by other classes @@ -740,6 +759,198 @@ private void releaseServer( ElectrumServer server ) { this.availableConnections.add(server); } + private boolean isIdle() { + if (this.inFlightRpcCount.get() > 0) { + return false; + } + if (this.lastRpcTimeMs <= 0L) { + return true; + } + return System.currentTimeMillis() - this.lastRpcTimeMs > IDLE_DISCONNECT_MS; + } + + private void closeAllConnections(String reason) { + synchronized (this.connectionListLock) { + for (ElectrumServer server : new HashSet<>(this.connections)) { + this.connections.remove(server); + server.closeServer(this.getClass().getSimpleName(), reason); + } + this.availableConnections.clear(); + this.remainingServers.clear(); + } + } + + private long averageConnectedResponseTime() { + long total = 0L; + int count = 0; + synchronized (this.connections) { + for (ElectrumServer server : this.connections) { + long responseTime = server.averageResponseTime(); + if (responseTime > 0) { + total += responseTime; + count++; + } + } + } + return count == 0 ? 0L : total / count; + } + + private void updateConnectionTargets(int listSize) { + if (listSize <= 0) { + this.maximumConnections = 0; + this.minimumConnections = 0; + LOGGER.info("{} has no ElectrumX servers configured", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode()); + return; + } + + int targetConnections = (int) Math.ceil(listSize * TARGET_CONNECTIONS_FRACTION); + if (listSize > 30) { + targetConnections = 30; + } + targetConnections = Math.max(targetConnections, DEFAULT_TARGET_CONNECTIONS); + int minTarget = Math.min(MIN_TARGET_CONNECTIONS, listSize); + targetConnections = clamp(targetConnections, minTarget, listSize); + this.maximumConnections = targetConnections; + this.minimumConnections = Math.max(1, Math.min(listSize, Math.max(1, targetConnections / 2))); + + LOGGER.info("{} targets {} connections (min {}), listSize {}, avgResponse {}ms", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), this.maximumConnections, this.minimumConnections, listSize, averageConnectedResponseTime()); + } + + private long scoreServer(ChainableServer server) { + long averageResponse = server.averageResponseTime(); + long latencyScore = averageResponse > 0 ? averageResponse : UNKNOWN_RESPONSE_PENALTY_MS; + int failures = this.serverFailureCounts.getOrDefault(server, 0); + return latencyScore + (failures * FAILURE_PENALTY_MS); + } + + private List selectPreferredServers(int maxServers) { + List snapshot; + synchronized (this.connectionListLock) { + snapshot = new ArrayList<>(this.servers); + } + if (snapshot.isEmpty() || maxServers <= 0) { + return Collections.emptyList(); + } + snapshot.sort(Comparator.comparingLong(this::scoreServer)); + logScoreExtremes(snapshot); + int limit = Math.min(maxServers, snapshot.size()); + return new ArrayList<>(snapshot.subList(0, limit)); + } + + private void logScoreExtremes(List sortedServers) { + int limit = Math.min(3, sortedServers.size()); + if (limit == 0) { + return; + } + + StringBuilder best = new StringBuilder(); + StringBuilder worst = new StringBuilder(); + for (int i = 0; i < limit; i++) { + if (i > 0) { + best.append(", "); + } + ChainableServer server = sortedServers.get(i); + best.append(server).append(":").append(scoreServer(server)).append("ms"); + } + for (int i = sortedServers.size() - limit; i < sortedServers.size(); i++) { + if (i > sortedServers.size() - limit) { + worst.append(", "); + } + ChainableServer server = sortedServers.get(i); + worst.append(server).append(":").append(scoreServer(server)).append("ms"); + } + + String digest = best.toString() + "|" + worst.toString() + "|" + this.connections.size(); + if (digest.equals(this.lastScoreExtremesDigest)) { + return; + } + this.lastScoreExtremesDigest = digest; + + LOGGER.info("{} top {} ElectrumX servers: {}", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), limit, best); + LOGGER.info("{} bottom {} ElectrumX servers: {}", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), limit, worst); + } + + private void recordFailure(ChainableServer server) { + this.serverFailureCounts.merge(server, 1, Integer::sum); + } + + private void recordSuccess(ChainableServer server) { + this.serverFailureCounts.put(server, 0); + } + + private void probeServers(Collection servers) { + long now = System.currentTimeMillis(); + for (ChainableServer server : servers) { + Long lastProbe = this.serverLastProbeTime.get(server); + if (lastProbe != null && now - lastProbe < PROBE_RETRY_MS) { + continue; + } + this.serverLastProbeTime.put(server, now); + probeServer(server); + } + } + + private void probeServer(ChainableServer server) { + ElectrumServer electrumServer = null; + try { + SocketAddress endpoint = new InetSocketAddress(server.getHostName(), server.getPort()); + electrumServer = ElectrumServer.createInstance(server, endpoint, PROBE_TIMEOUT_MS, this.recorder); + electrumServer.setClientName(randomClientName()); + + Object response = connectedRpc(electrumServer, "server.version"); + if (response != null) { + recordSuccess(server); + } else { + recordFailure(server); + } + } catch (IOException | ForeignBlockchainException | ClassCastException | NullPointerException e) { + recordFailure(server); + } finally { + if (electrumServer != null) { + electrumServer.closeServer(this.getClass().getSimpleName(), "probe"); + } + } + } + + /** + * Ensure the connection maintenance threads are running and initial connections exist. + */ + private void ensureConnectionManagementStarted() { + if (this.connectionManagementStarted) { + return; + } + + boolean shouldInit = false; + synchronized (this.connectionManagementLock) { + if (!this.connectionManagementStarted) { + this.connectionManagementStarted = true; + shouldInit = true; + } + } + + if (!shouldInit) { + return; + } + + if (!this.initialProbeCompleted) { + List serversSnapshot; + synchronized (this.connectionListLock) { + serversSnapshot = new ArrayList<>(this.servers); + } + if (!serversSnapshot.isEmpty()) { + LOGGER.info("{} probing {} ElectrumX servers for initial scoring", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), serversSnapshot.size()); + probeServers(serversSnapshot); + } + this.initialProbeCompleted = true; + } + + startMakingConnections(); + + scheduleMakeConnections.scheduleWithFixedDelay(this::makeConnections, 1, 3600, TimeUnit.SECONDS); + scheduleRecoverConnections.scheduleWithFixedDelay(this::recoverConnections, 120, 10, TimeUnit.SECONDS); + scheduleMonitorConnections.scheduleWithFixedDelay(this::monitorConnections, 1, 10, TimeUnit.MINUTES); + } + /** *

Performs RPC call, with automatic reconnection to different server if needed. *

@@ -749,6 +960,14 @@ private void releaseServer( ElectrumServer server ) { * @throws ForeignBlockchainException if server returns error or something goes wrong */ private ElectrumServerResponse rpc(String method, Object...params) throws ForeignBlockchainException { + this.inFlightRpcCount.incrementAndGet(); + this.lastRpcTimeMs = System.currentTimeMillis(); + try { + ensureConnectionManagementStarted(); + if (this.availableConnections.isEmpty()) { + LOGGER.debug("{} no available ElectrumX connections; starting connections on demand", this.blockchain.getCurrencyCode()); + startMakingConnections(); + } ElectrumServer electrumServer = acquireServer(); @@ -771,6 +990,7 @@ private ElectrumServerResponse rpc(String method, Object...params) throws Foreig if (response != null) { releaseServer(electrumServer); + this.lastRpcTimeMs = System.currentTimeMillis(); return new ElectrumServerResponse(electrumServer, response); } @@ -785,6 +1005,9 @@ private ElectrumServerResponse rpc(String method, Object...params) throws Foreig // Failed to perform RPC - maybe lack of servers? LOGGER.info("Error: No connected Electrum servers when trying to make RPC call"); throw new ForeignBlockchainException.NetworkException(String.format("Failed to perform ElectrumX RPC %s", method)); + } finally { + this.inFlightRpcCount.decrementAndGet(); + } } /** @@ -794,12 +1017,18 @@ private ElectrumServerResponse rpc(String method, Object...params) throws Foreig */ private void monitorConnections() { + if (this.isIdle() && !this.connections.isEmpty()) { + LOGGER.info("{} idle; closing {} ElectrumX connections", this.blockchain.getCurrencyCode(), this.connections.size()); + this.closeAllConnections("idle timeout"); + } + LOGGER.info( - "{} {} available connections, {} total servers, {} total connections, {} useless servers", + "{} {} available connections, {} total servers, {} total connections (target {}), {} useless servers", this.blockchain.getCurrencyCode(), this.availableConnections.size(), this.servers.size(), this.connections.size(), + this.maximumConnections, this.uselessServers.size() ); } @@ -812,6 +1041,9 @@ private void monitorConnections() { private void makeConnections() { try { + if (this.isIdle()) { + return; + } if( this.connections.isEmpty() ) { startMakingConnections(); } @@ -830,10 +1062,18 @@ private void makeConnections() { private void recoverConnections() { try { + if (this.isIdle()) { + return; + } if( this.connections.size() < this.minimumConnections ) { - LOGGER.debug("{} recovering connections", this.blockchain.currencyCode); + LOGGER.debug("{} recovering connections", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode()); + List serversSnapshot; + synchronized (this.connectionListLock) { + serversSnapshot = new ArrayList<>(this.servers); + } + probeServers(serversSnapshot); startMakingConnections(); - LOGGER.debug("{} recovered {} connections", this.blockchain.currencyCode, this.connections.size()); + LOGGER.debug("{} recovered {} connections", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), this.connections.size()); } } catch (Exception e) { LOGGER.error(e.getMessage(), e); @@ -846,8 +1086,13 @@ private void recoverConnections() { private void startMakingConnections() { // assume there are no server to get peers from, so we must start from the base list - this.remainingServers.clear(); - this.remainingServers.addAll(this.servers); + synchronized (this.connectionListLock) { + this.remainingServers.clear(); + updateConnectionTargets(this.servers.size()); + List preferredServers = selectPreferredServers(this.maximumConnections); + LOGGER.info("{} selecting {} of {} ElectrumX servers by score", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), preferredServers.size(), this.servers.size()); + this.remainingServers.addAll(preferredServers); + } connectRemainingServers(); } @@ -860,19 +1105,33 @@ private void startMakingConnections() { private void makeMoreConnections() { // if we need more connections - if(this.connections.size() < MINIMUM_CONNECTIONS) { + if(this.connections.size() < this.maximumConnections) { // Ask for more servers Set moreServers = serverPeersSubscribe(); + List newlyAdded = new ArrayList<>(); - // Add all servers to base list - this.servers.addAll(moreServers); + synchronized (this.connectionListLock) { + for (Server server : moreServers) { + if (!this.servers.contains(server)) { + newlyAdded.add(server); + } + } + this.servers.addAll(moreServers); + } - // add base list to remaining list - this.remainingServers.addAll(this.servers); + if (!newlyAdded.isEmpty()) { + LOGGER.info("{} probing {} newly discovered ElectrumX servers", this.blockchain == null ? "ElectrumX" : this.blockchain.getCurrencyCode(), newlyAdded.size()); + probeServers(newlyAdded); + } - // remove servers that this node is already connected to - this.remainingServers.removeAll(this.connections.stream().map(ElectrumServer::getServer).collect(Collectors.toList())); + synchronized (this.connectionListLock) { + updateConnectionTargets(this.servers.size()); + List preferredServers = selectPreferredServers(this.maximumConnections); + this.remainingServers.clear(); + this.remainingServers.addAll(preferredServers); + this.remainingServers.removeAll(this.connections.stream().map(ElectrumServer::getServer).collect(Collectors.toList())); + } // try connecting the remaining servers connectRemainingServers(); @@ -881,13 +1140,39 @@ private void makeMoreConnections() { private void connectRemainingServers() { // while there are remaining servers and less than the maximum connections - while( !this.remainingServers.isEmpty() && this.connections.size() < MAXIMUM_CONNECTIONS ) { - ChainableServer server = this.remainingServers.remove(RANDOM.nextInt(this.remainingServers.size())); + while (true) { + ChainableServer server; + synchronized (this.connectionListLock) { + if (this.remainingServers.isEmpty() || this.connections.size() >= this.maximumConnections) { + return; + } + server = this.remainingServers.remove(RANDOM.nextInt(this.remainingServers.size())); + } makeConnection(server, this.getClass().getSimpleName()); } } + private static int clamp(int value, int min, int max) { + if (value < min) { + return min; + } + if (value > max) { + return max; + } + return value; + } + + private static String randomClientName() { + final String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + StringBuilder name = new StringBuilder(12); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < 12; i++) { + name.append(alphabet.charAt(random.nextInt(alphabet.length()))); + } + return name.toString(); + } + private Optional makeConnection(ChainableServer server, String requestedBy) { LOGGER.debug(() -> String.format("Connecting to %s %s", server, this.blockchain.currencyCode)); @@ -896,6 +1181,7 @@ private Optional makeConnection(ChainableServer serve int timeout = 5000; // ms ElectrumServer electrumServer = ElectrumServer.createInstance(server, endpoint, timeout, this.recorder); + electrumServer.setClientName(randomClientName()); // All connections need to start with a version negotiation this.connectedRpc(electrumServer, "server.version"); @@ -903,29 +1189,39 @@ private Optional makeConnection(ChainableServer serve // Check connection is suitable by asking for server features, including genesis block hash JSONObject featuresJson = (JSONObject) this.connectedRpc(electrumServer, "server.features"); - if (featuresJson == null ) + if (featuresJson == null ) { + recordFailure(server); return Optional.of( recorder.recordConnection(server, requestedBy, true, false, MISSING_FEATURES_ERROR) ); + } try { double protocol_min = CrossChainUtils.getVersionDecimal(featuresJson, "protocol_min"); - if (protocol_min < MIN_PROTOCOL_VERSION) + if (protocol_min < MIN_PROTOCOL_VERSION) { + recordFailure(server); return Optional.of( recorder.recordConnection(server, requestedBy, true, false, "old version: protocol_min = " + protocol_min + " < MIN_PROTOCOL_VERSION = " + MIN_PROTOCOL_VERSION) ); + } } catch (NumberFormatException e) { + recordFailure(server); return Optional.of( recorder.recordConnection(server, requestedBy,true, false,featuresJson.get("protocol_min").toString() + " is not a valid version")); } catch (NullPointerException e) { + recordFailure(server); return Optional.of( recorder.recordConnection(server, requestedBy,true, false,"server version not available: protocol_min")); } - if (this.expectedGenesisHash != null && !((String) featuresJson.get("genesis_hash")).equals(this.expectedGenesisHash)) + if (this.expectedGenesisHash != null && !((String) featuresJson.get("genesis_hash")).equals(this.expectedGenesisHash)) { + recordFailure(server); return Optional.of( recorder.recordConnection(server, requestedBy, true, false, EXPECTED_GENESIS_ERROR) ); + } + recordSuccess(server); LOGGER.debug(() -> String.format("Connected to %s %s", server, this.blockchain.currencyCode)); this.connections.add(electrumServer); this.availableConnections.add(electrumServer); return Optional.of( this.recorder.recordConnection( server, requestedBy, true, true, EMPTY) ); } catch (IOException | ForeignBlockchainException | ClassCastException | NullPointerException e) { // Didn't work, try another server... + recordFailure(server); return Optional.of( this.recorder.recordConnection( server, requestedBy, true, false, CrossChainUtils.getNotes(e))); } catch( Exception e ) { LOGGER.error(e.getMessage(), e); @@ -954,7 +1250,12 @@ private Object connectedRpc(ElectrumServer server, String method, Object...param // server.version needs additional params to negotiate a version if (method.equals("server.version")) { - requestParams.add(CLIENT_NAME); + String clientName = server.getClientName(); + if (clientName == null) { + clientName = randomClientName(); + server.setClientName(clientName); + } + requestParams.add(clientName); List versions = new ArrayList<>(); DecimalFormat df = new DecimalFormat("#.#"); versions.add(df.format(MIN_PROTOCOL_VERSION)); diff --git a/src/main/java/org/qortal/crosschain/Litecoin.java b/src/main/java/org/qortal/crosschain/Litecoin.java index b27e77f18..5a40e4145 100644 --- a/src/main/java/org/qortal/crosschain/Litecoin.java +++ b/src/main/java/org/qortal/crosschain/Litecoin.java @@ -53,13 +53,24 @@ public Collection getServers() { // Servers chosen on NO BASIS WHATSOEVER from various sources! // Status verified at https://1209k.com/bitcoin-eye/ele.php?chain=ltc new Server("backup.electrum-ltc.org", Server.ConnectionType.SSL, 443), + new Server("backup.electrum-ltc.org", Server.ConnectionType.SSL, 50002), new Server("electrum.ltc.xurious.com", Server.ConnectionType.SSL, 50002), - new Server("electrum.qortal.link", Server.ConnectionType.SSL, 50002), + new Server("electrum.jochen-hoenicke.de", Server.ConnectionType.SSL, 50091), new Server("electrum-ltc.petrkr.net", Server.ConnectionType.SSL, 60002), - new Server("electrum1.cipig.net", Server.ConnectionType.SSL, 20063), - new Server("electrum2.cipig.net", Server.ConnectionType.SSL, 20063), - new Server("electrum3.cipig.net", Server.ConnectionType.SSL, 20063), - new Server("ltc.rentonrisk.com", Server.ConnectionType.SSL, 50002) + new Server("electrum.petrkr.net", Server.ConnectionType.SSL, 60002), + new Server("electrum-ltc.bysh.me", Server.ConnectionType.SSL, 50002), + new Server("fury.fiatfaucet.com", Server.ConnectionType.SSL, 50002), + new Server("ltc-electrum.cakewallet.com", Server.ConnectionType.SSL, 50002), + new Server("litecoin.stackwallet.com", Server.ConnectionType.SSL, 20063), + new Server("ltc.aftrek.org", Server.ConnectionType.SSL, 50002), + new Server("137.184.250.112", Server.ConnectionType.SSL, 50002), + new Server("146.190.15.65", Server.ConnectionType.SSL, 50002), + new Server("157.230.64.188", Server.ConnectionType.SSL, 50002), + new Server("209.38.53.75", Server.ConnectionType.SSL, 50002), + new Server("24.199.78.132", Server.ConnectionType.SSL, 50002), + new Server("5.78.97.174", Server.ConnectionType.SSL, 50002), + new Server("188.166.208.106", Server.ConnectionType.SSL, 50002), + new Server("5.161.216.180", Server.ConnectionType.SSL, 50002) ); } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBCacheUtils.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBCacheUtils.java index d0621da09..2819ac6d5 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBCacheUtils.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBCacheUtils.java @@ -62,6 +62,9 @@ public int compare(ArbitraryResourceData data1, ArbitraryResourceData data2) { }; private static final String DEFAULT_IDENTIFIER = "default"; private static final int ZERO = 0; + private static Timer dbCacheTimer; + private static Timer balanceRecorderTimer; + public static final String DB_CACHE_TIMER = "DB Cache Timer"; public static final String DB_CACHE_TIMER_TASK = "DB Cache Timer Task"; public static final String BALANCE_RECORDER_TIMER = "Balance Recorder Timer"; @@ -464,7 +467,7 @@ private static boolean passQuery(Predicate predicate, ArbitraryResourceD */ public static void startCaching(int priorityRequested, int frequency) { - Timer timer = buildTimer(DB_CACHE_TIMER, priorityRequested); + dbCacheTimer = buildTimer(DB_CACHE_TIMER, priorityRequested); TimerTask task = new TimerTask() { @Override @@ -472,6 +475,11 @@ public void run() { Thread.currentThread().setName(DB_CACHE_TIMER_TASK); + // Exit gracefully if shutting down + if (Controller.isStopping()) { + return; + } + try (final Repository respository = RepositoryManager.getRepository()) { fillCache(ArbitraryResourceCache.getInstance(), respository); } @@ -482,7 +490,7 @@ public void run() { }; // delay 1 second - timer.scheduleAtFixedRate(task, 1000, frequency * 1000); + dbCacheTimer.scheduleAtFixedRate(task, 1000, frequency * 1000); } /** @@ -501,7 +509,7 @@ public static void startRecordingBalances( int frequency, int capacity) { - Timer timer = buildTimer(BALANCE_RECORDER_TIMER, priorityRequested); + balanceRecorderTimer = buildTimer(BALANCE_RECORDER_TIMER, priorityRequested); TimerTask task = new TimerTask() { @Override @@ -509,6 +517,11 @@ public void run() { Thread.currentThread().setName(BALANCE_RECORDER_TIMER_TASK); + // Exit gracefully if shutting down + if (Controller.isStopping()) { + return; + } + int currentHeight = recordCurrentBalances(balancesByHeight); LOGGER.debug("recorded balances: height = " + currentHeight); @@ -546,7 +559,7 @@ public void run() { }; // wait 5 minutes - timer.scheduleAtFixedRate(task, 300_000, frequency * 60_000); + balanceRecorderTimer.scheduleAtFixedRate(task, 300_000, frequency * 60_000); } private static void produceBalanceDynamics(int currentHeight, Optional priorHeight, boolean isRewardDistribution, ConcurrentHashMap> balancesByHeight, CopyOnWriteArrayList balanceDynamics, int capacity) { @@ -864,4 +877,29 @@ public static List getAccountBalances(Repository repository) return data; } + + /** + * Shutdown all timers + * + * Cancels all running Timer tasks to allow clean shutdown + */ + public static void shutdown() { + LOGGER.info("Shutting down HSQLDBCacheUtils timers..."); + + if (dbCacheTimer != null) { + dbCacheTimer.cancel(); + dbCacheTimer.purge(); + dbCacheTimer = null; + LOGGER.debug("DB Cache Timer shutdown"); + } + + if (balanceRecorderTimer != null) { + balanceRecorderTimer.cancel(); + balanceRecorderTimer.purge(); + balanceRecorderTimer = null; + LOGGER.debug("Balance Recorder Timer shutdown"); + } + + LOGGER.info("HSQLDBCacheUtils timers shutdown complete"); + } } \ No newline at end of file diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java index 66b2447a2..ad2fc04df 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java @@ -7,6 +7,7 @@ import org.qortal.data.chat.ActiveChats.DirectChat; import org.qortal.data.chat.ActiveChats.GroupChat; import org.qortal.data.chat.ChatMessage; +import org.qortal.data.group.GroupMemberData; import org.qortal.data.transaction.ChatTransactionData; import org.qortal.repository.ChatRepository; import org.qortal.repository.DataException; @@ -16,6 +17,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.qortal.data.chat.ChatMessage.Encoding; @@ -149,6 +151,17 @@ else if (hasChatReference != null && !hasChatReference) { chatMessages.add(chatMessage); } while (resultSet.next()); + // if this is a group chat, then ensure that the sender is in the group + if( txGroupId != null && txGroupId > 0 ) { + List members + = this.repository.getGroupRepository() + .getGroupMembers(txGroupId).stream() + .map(GroupMemberData::getMember) + .collect(Collectors.toList()); + + chatMessages.removeIf( data -> !members.contains(data.getSender()) ); + } + return chatMessages; } catch (SQLException e) { throw new DataException("Unable to fetch matching chat transactions from repository", e); diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java index 2bf88657d..a4972bcb2 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java @@ -574,7 +574,7 @@ private PreparedStatement cachePreparedStatement(String sql) throws SQLException * which we never close, which means HSQLDB also caches a parsed, * prepared statement that can be reused for subsequent * calls to HSQLDB.prepareStatement(sql). - * + * * See org.hsqldb.StatementManager for more details. */ PreparedStatement preparedStatement = this.preparedStatementCache.get(sql); @@ -586,9 +586,19 @@ private PreparedStatement cachePreparedStatement(String sql) throws SQLException preparedStatement = this.connection.prepareStatement(sql); this.preparedStatementCache.put(sql, preparedStatement); } else { - // Clean up ready for reuse - preparedStatement.clearBatch(); - preparedStatement.clearParameters(); + try { + // Clean up ready for reuse + preparedStatement.clearBatch(); + preparedStatement.clearParameters(); + } catch (SQLException e) { + // Connection may have been closed, try to recreate the statement + if (this.connection == null || this.connection.isClosed()) { + throw new SQLException("Connection is closed", e); + } + // Statement is closed but connection is still open, recreate + preparedStatement = this.connection.prepareStatement(sql); + this.preparedStatementCache.put(sql, preparedStatement); + } } return preparedStatement; @@ -964,6 +974,18 @@ public SQLException examineException(SQLException e) { } private void assertEmptyTransaction(String context) throws DataException { + // If connection is already closed, skip this check + try { + if (this.connection == null || this.connection.isClosed()) { + LOGGER.debug(() -> String.format("Skipping transaction check after %s - connection already closed", context)); + return; + } + } catch (SQLException e) { + // If we can't check if connection is closed, assume it is and skip + LOGGER.debug(() -> String.format("Unable to check connection status after %s, skipping transaction check", context)); + return; + } + String sql = "SELECT transaction, transaction_size FROM information_schema.system_sessions WHERE session_id = ?"; try { @@ -993,6 +1015,16 @@ private void assertEmptyTransaction(String context) throws DataException { } } } catch (SQLException e) { + // During shutdown, the connection might be closed by another thread + // Check if this is the case and log appropriately + try { + if (this.connection == null || this.connection.isClosed()) { + LOGGER.debug(() -> String.format("Connection closed while checking repository status after %s", context)); + return; + } + } catch (SQLException ignored) { + // Ignore - we'll throw the original exception below + } throw new DataException("Error checking repository status after " + context, e); } } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepositoryFactory.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepositoryFactory.java index 2ddabf8d1..6c7f9de30 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepositoryFactory.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepositoryFactory.java @@ -43,8 +43,10 @@ public HSQLDBRepositoryFactory(String connectionUrl) throws DataException { this.connectionUrl = connectionUrl; // Check no-one else is accessing database + LOGGER.info("Opening database connection (this may take a while if replaying transaction logs)..."); try (Connection connection = DriverManager.getConnection(this.connectionUrl)) { // We only need to check we can obtain connection. It will be auto-closed. + LOGGER.info("Database connection established"); } catch (SQLException e) { Throwable cause = e.getCause(); if (!(cause instanceof HsqlException)) diff --git a/testnet/settings-test.json b/testnet/settings-test.json index ef00f162a..f454b82f0 100755 --- a/testnet/settings-test.json +++ b/testnet/settings-test.json @@ -32,4 +32,4 @@ "0.0.0.0/0", "::/0" ] -} +} \ No newline at end of file diff --git a/testnet/testchain.json b/testnet/testchain.json index 06f5df122..99c4e2311 100644 --- a/testnet/testchain.json +++ b/testnet/testchain.json @@ -106,7 +106,16 @@ "disableRewardshareHeight": 8450, "enableRewardshareHeight": 11400, "onlyMintWithNameHeight": 8500, - "groupMemberCheckHeight": 11200 + "groupMemberCheckHeight": 11200, + "decreaseOnlineAccountsDifficultyTimestamp": 9999999999999, + "removeOnlyMintWithNameHeight": 9999999999999, + "fixBatchRewardHeight": 9999999999999, + "adminsReplaceFoundersHeight": 0, + "nullGroupMembershipHeight": 0, + "ignoreLevelForRewardShareHeight": 0, + "adminQueryFixHeight": 0, + "multipleNamesPerAccountHeight": 0, + "mintedBlocksAdjustmentRemovalHeight": 0 }, "genesisInfo": { "version": 4, @@ -2678,4 +2687,4 @@ { "type": "GENESIS", "recipient": "QU7EUWDZz7qJVPih3wL9RKTHRfPFy4ASHC", "amount": 10 } ] } -} +} \ No newline at end of file diff --git a/tools/auto-update-scripts/generate-release-notes.sh b/tools/auto-update-scripts/generate-release-notes.sh index c0f44f390..1e4e21c8e 100755 --- a/tools/auto-update-scripts/generate-release-notes.sh +++ b/tools/auto-update-scripts/generate-release-notes.sh @@ -126,7 +126,7 @@ fi # Get changelog between previous and current commit echo "Generating changelog between ${PREV_BUMP_COMMIT} and ${CURRENT_BUMP_COMMIT}..." -CHANGELOG=$(curl -s "https://api.github.com/repos/${REPO}/compare/${PREV_BUMP_COMMIT}...${CURRENT_BUMP_COMMIT}" | jq -r '.commits[] | "- " + .sha[0:7] + " " + .commit.message') +CHANGELOG=$(curl -s "https://api.github.com/repos/${REPO}/compare/${PREV_BUMP_COMMIT}...${CURRENT_BUMP_COMMIT}" | jq -r '.commits[] | .sha[0:7] as $sha | (.commit.message | split("\n")) as $lines | ($lines[0]) as $title | ($lines[1:] | map(select(length > 0))) as $body | "- " + $title + "\n - " + $sha + (if ($body | length) > 0 then "\n " + ($body | join("\n ")) else "" end)') # Fetch latest commit timestamp from GitHub API for final file timestamping COMMIT_API_URL="https://api.github.com/repos/${REPO}/commits?sha=${BRANCH}&per_page=1" @@ -235,4 +235,3 @@ Packed with \`7z a -r -tzip qortal.zip qortal/\` EOF echo "Release notes generated: release-notes.txt" -