diff --git a/build.sbt b/build.sbt index 16e90b5636d..17ae1505435 100644 --- a/build.sbt +++ b/build.sbt @@ -17,8 +17,8 @@ lazy val projectSettings = Seq( version := "0.1.0-SNAPSHOT", resolvers ++= Resolver.sonatypeOssRepos("releases") ++ - Resolver.sonatypeOssRepos("snapshots") ++ - Seq("jitpack" at "https://jitpack.io"), + Resolver.sonatypeOssRepos("snapshots") ++ + Seq("jitpack" at "https://jitpack.io"), wartremoverExcluded += sourceManaged.value, Compile / compile / wartremoverErrors ++= Warts.allBut( // those we want @@ -76,7 +76,7 @@ lazy val projectSettings = Seq( Seq( Compile / packageDoc / publishArtifact := false, packageDoc / publishArtifact := false, - Compile / doc / sources := Seq.empty, + Compile / doc / sources := Seq.empty ) } @@ -379,7 +379,7 @@ lazy val node = (project in file("node")) rpmUrl := Some("https://rchain.coop"), rpmLicense := Some("Apache 2.0"), Rpm / packageArchitecture := "noarch", - Rpm / maintainerScripts := maintainerScriptsAppendFromFile((Rpm/maintainerScripts).value)( + Rpm / maintainerScripts := maintainerScriptsAppendFromFile((Rpm / maintainerScripts).value)( RpmConstants.Post -> (sourceDirectory.value / "rpm" / "scriptlets" / "post") ), rpmPrerequisites := Seq( @@ -410,7 +410,7 @@ lazy val rholang = (project in file("rholang")) "-Xfatal-warnings", "-Xlint:_,-missing-interpolator" // disable "possible missing interpolator" warning ), - Compile / packageDoc/ publishArtifact := false, + Compile / packageDoc / publishArtifact := false, packageDoc / publishArtifact := false, Compile / doc / sources := Seq.empty, libraryDependencies ++= commonDependencies ++ Seq( diff --git a/node/src/main/scala/coop/rchain/node/benchmark/ConcurrencyReplayBench.scala b/node/src/main/scala/coop/rchain/node/benchmark/ConcurrencyReplayBench.scala new file mode 100644 index 00000000000..85490ea4bb7 --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/ConcurrencyReplayBench.scala @@ -0,0 +1,192 @@ +package io.rhonix.node.benchmark + +import cats.Parallel +import cats.effect.concurrent.Ref +import cats.effect.{Blocker, Concurrent, ContextShift} +import cats.syntax.all._ +import fs2.Stream +import io.circe.syntax._ +import io.rhonix.casper.genesis.Genesis +import io.rhonix.casper.genesis.contracts.Validator +import io.rhonix.casper.protocol.{CommEvent, ConsumeEvent, ProduceEvent} +import io.rhonix.casper.rholang.{BlockRandomSeed, RuntimeManager} +import io.rhonix.casper.{StatefulExecutionTracker, ValidatorIdentity} +import io.rhonix.crypto.signatures.Secp256k1 +import io.rhonix.metrics.Metrics.Source +import io.rhonix.metrics.{Metrics, Span} +import io.rhonix.node.benchmark.utils.GenesisParams.genesisParameters +import io.rhonix.node.benchmark.utils.LeaderfulSimulation.ValidatorWithPayments +import io.rhonix.node.benchmark.utils.{Payment, StateTransition, User} +import io.rhonix.rspace.syntax._ +import io.rhonix.shared.syntax._ +import io.rhonix.shared.{Log, Stopwatch, Time} +import io.rhonix.store.InMemoryStoreManager +import monix.eval.Task +import monix.execution.Scheduler + +import java.nio.file.Path + +/** Benchmark for concurrent state transitions. This is equivalent to concurrent block validation. */ +object ConcurrentReplayBench { + + def main(args: Array[String]): Unit = { + implicit val time = Time.fromTimer[Task] + implicit val c = Concurrent[Task] + implicit val scheduler: Scheduler = Scheduler.Implicits.global + implicit val log = Log.log + + go[Task](100, Path.of("bench")).runSyncUnsafe() + } + + def go[F[_]: Concurrent: Parallel: Time: Log: ContextShift]( + stateTransitionsMax: Int, + dataDir: Path + )(implicit scheduler: Scheduler): F[Unit] = { + implicit val m = new Metrics.MetricsNOP[F] + + // run everything in memory + val rnodeStoreManager = new InMemoryStoreManager + + for { + rSpaceStores <- rnodeStoreManager.rSpaceStores + // extract all performance data gathered by Span trait usage across codebase + statsRef <- Ref.of[F, Map[String, (Long, Int)]](Map.empty) + profiler = new Span[F] { + override def trace[A](source: Source)(block: F[A]): F[A] = + for { + v <- Stopwatch.durationRaw(block) + (r, time) = v + _ <- statsRef.update { s => + val (currTimeAcc, currCallsAcc) = s.getOrElse(source, (0L, 0)) + val newV = (currTimeAcc + time.toNanos, currCallsAcc + 1) + s.updated(source, newV) + } + } yield r + + // do not need these one + override def mark(name: String): F[Unit] = ().pure[F] + override def withMarks[A](label: String)(block: F[A]): F[A] = block + } + + mergeStore <- RuntimeManager.mergeableStore(rnodeStoreManager) + // Block execution tracker + executionTracker <- StatefulExecutionTracker[F] + runtimeManager <- { + implicit val span = profiler + RuntimeManager( + rSpaceStores, + mergeStore, + BlockRandomSeed.nonNegativeMergeableTagName("dummy"), + executionTracker + ) + } + + _ <- Log[F].info(s"Preparing genesis block...") + users = User.random.take(stateTransitionsMax).toList + validatorsKeys = (1 to stateTransitionsMax) + .map(_ => Secp256k1.newKeyPair) + .map { case (_, pk) => pk } + .toList + genesisVaults = users.map(_.pk) + bondedValidators = validatorsKeys.zipWithIndex.map { + case (v, i) => + Validator(v, 2L * i.toLong + 1L) + } + genesis <- { + implicit val a = runtimeManager + Genesis.createGenesisBlock( + ValidatorIdentity(Secp256k1.newKeyPair._1), + genesisParameters(bondedValidators, genesisVaults) + ) + } + _ <- Log[F].info(s"Genesis done.") + + r <- { + implicit val rm = runtimeManager + implicit val blocker = + Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.global) + + // first is a warm up for JVM + (Stream(1, 1, 2, 3, 4, 5, 7) ++ Stream.range(10, stateTransitionsMax + 5, 5)) + .evalMap { networkSize => + val validators = validatorsKeys.take(networkSize) + val payments = Payment.random(users, 1, 10).take(networkSize).toList + val transitions = validators + .zip(payments) + .map { case (v, p) => ValidatorWithPayments(v, Seq(p)) } + + val test = Stream + .emits(transitions) + .parEvalMapProcBounded { v => + StateTransition.make( + genesis.postStateHash, + v.validator, + 0, + 1, + v.payments.toList + ) + } + .compile + .toList + + for { + _ <- Log[F].info(s"Running ${transitions.size} concurrent STs.") + r <- Stopwatch.durationRaw(test) + (results, time) = r + timeStr = Stopwatch.showTime(time) + avgDeploysPerST = results.flatMap(_.processedDeploys).size.toFloat / results.size + ps = results + .flatMap(_.processedDeploys.flatMap(_.deployLog)) + .collect { case c: ProduceEvent => c } + .size + cs = results + .flatMap(_.processedDeploys.flatMap(_.deployLog)) + .collect { case c: ConsumeEvent => c } + .size + comms = results + .flatMap(_.processedDeploys.flatMap(_.deployLog)) + .collect { case c: CommEvent => c } + .size + psSys = results + .flatMap(_.processedSysDeploys.flatMap(_.eventList)) + .collect { case c: ProduceEvent => c } + .size + csSys = results + .flatMap(_.processedSysDeploys.flatMap(_.eventList)) + .collect { case c: ConsumeEvent => c } + .size + commsSys = results + .flatMap(_.processedSysDeploys.flatMap(_.eventList)) + .collect { case c: CommEvent => c } + .size + cps = comms.toFloat / time.toNanos * 1e9 + stats <- statsRef.get + logMsg = stats.toList + .sortBy { case (_, (v, _)) => v } + .reverse + .foldLeft( + s"\nDONE: ${results.size} State transitions (avg $avgDeploysPerST TX per ST), " + + s"user events: $ps P, $cs C $comms COMM, " + + s"sys events : $psSys P, $csSys C $commsSys COMM, " + + s"time: ${timeStr}. COMMS per sec: ${cps}" + ) { + case (acc, (metric, (totalTime, totalCalls))) => + val timeS = totalTime.toFloat / 1e9 + acc + f"\n$metric%60s: avg ${timeS / totalCalls}%.7f s, total $timeS%.7f s, calls ${totalCalls}, " + } + _ <- Log[F].info(logMsg) + _ <- statsRef.set(Map.empty) // reset stats + r = stats.map { + case (metric, (total, qty)) => + metric -> ("size" -> networkSize, "total" -> total, "calls" -> qty) + }.asJson + } yield r + } + .flatMap(v => Stream.fromIterator(v.show.getBytes.iterator)) + .through(fs2.io.file.writeAll(dataDir.resolve("out.json"), blocker)) + .compile + .lastOrError + } + } yield () + } +} diff --git a/node/src/main/scala/coop/rchain/node/benchmark/utils/GenesisParams.scala b/node/src/main/scala/coop/rchain/node/benchmark/utils/GenesisParams.scala new file mode 100644 index 00000000000..6edb89ed3e1 --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/utils/GenesisParams.scala @@ -0,0 +1,51 @@ +package io.rhonix.node.benchmark.utils + +import cats.syntax.all._ +import io.rhonix.casper.genesis.Genesis +import io.rhonix.casper.genesis.contracts.{ProofOfStake, Registry, Validator, Vault} +import io.rhonix.crypto.PublicKey +import io.rhonix.rholang.interpreter.util.RevAddress + +object GenesisParams { + + val predefinedVaultsAmt = 900000000L + val posVK = + "04eccad1d78ea16046f4787ffba9b36bec5ef151aba14fa46aeca8e14b6e604812d7d1deb50e0931fa0dbac63dbe0f86bf61c3c93a69c17071427e1580260cbb8e" + + def genesisParameters( + bondedValidators: Seq[Validator], + genesisVaults: List[PublicKey], + epochLength: Int = 1000 + ): Genesis = { + def predefinedVault(pub: PublicKey): Vault = + Vault(RevAddress.fromPublicKey(pub).get, predefinedVaultsAmt) + + Genesis( + shardId = "root", + proofOfStake = ProofOfStake( + minimumBond = 0L, + maximumBond = Long.MaxValue, + // Epoch length is set to large number to prevent trigger of epoch change + // in PoS close block method, which causes block merge conflicts + // - epoch change can be set as a parameter in Rholang tests (e.g. PoSSpec) + epochLength = epochLength, + quarantineLength = 50000, + numberOfActiveValidators = 100, + validators = bondedValidators, + posMultiSigPublicKeys = List(posVK), + posMultiSigQuorum = 1, + posVaultPubKey = posVK + ), + vaults = genesisVaults.map(predefinedVault) ++ + bondedValidators.toList.map { + case Validator(pk, _) => + // Initial validator vaults contain 0 Rev + RevAddress.fromPublicKey(pk).map(Vault(_, 0)) + }.flattenOption, + blockNumber = 0, + sender = genesisVaults.head, + registry = Registry(posVK) + ) + } + +} diff --git a/node/src/main/scala/coop/rchain/node/benchmark/utils/LeaderfulSimulation.scala b/node/src/main/scala/coop/rchain/node/benchmark/utils/LeaderfulSimulation.scala new file mode 100644 index 00000000000..c8912691c0c --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/utils/LeaderfulSimulation.scala @@ -0,0 +1,467 @@ +package io.rhonix.node.benchmark.utils + +import io.rhonix.crypto.PublicKey + +//import cats.effect.Concurrent +//import cats.syntax.all._ +//import com.google.protobuf.ByteString +//import io.rhonix.blockstorage.dag.BlockDagStorage +//import io.rhonix.casper.merging.DeployIndex.sysCloseBlockId +//import io.rhonix.casper.merging.{BlockIndex, DagMerger, DeployIndex} +//import io.rhonix.casper.protocol.{ +// BlockMessage, +// Body, +// Bond, +// CommEvent, +// Header, +// ProcessedDeploy, +// ProcessedSystemDeploy, +// RChainState, +// RejectedDeploy +//} +//import io.rhonix.casper.util.rholang.RuntimeManager +//import io.rhonix.crypto.PublicKey +//import io.rhonix.crypto.codec.Base16 +//import io.rhonix.metrics.Metrics +//import io.rhonix.models.block.StateHash.StateHash +//import io.rhonix.node.benchmark.utils.Payment.{ +// conflictsPresent, +// verifyBalances, +// BalanceSheet, +// BlockWithPayments +//} +//import io.rhonix.node.benchmark.utils.StateTransition.StateTransitionResult +//import io.rhonix.rspace.hashing.Blake2b256Hash +//import io.rhonix.shared.syntax._ +//import io.rhonix.shared.{Log, Stopwatch, Time} +//import fs2.Stream +//import io.rhonix.casper.rholang.RuntimeManager +// +//import scala.collection.Seq +//import scala.concurrent.duration.{FiniteDuration, NANOSECONDS} + +object LeaderfulSimulation { +// +// def mkBlocks[F[_]: Concurrent: Time]( +// validatorsWithPayments: List[ValidatorWithPayments], +// preStateHash: StateHash, +// seqNum: Int, +// blockNum: Long, +// deploysToReject: Seq[ByteString] = List.empty +// )( +// implicit runtimeManager: RuntimeManager[F] +// ): List[Stream[F, (BlockMessage, Seq[Charged[PaymentDeploy]])]] = { +// def packBlock( +// sender: PublicKey, +// postStateHash: StateHash, +// processed: Seq[ProcessedDeploy], +// processedSystem: Seq[ProcessedSystemDeploy] +// ): BlockMessage = BlockMessage( +// blockHash = ByteString.copyFrom(Array.fill(32)((scala.util.Random.nextInt(256) - 128).toByte)), +// header = Header( +// parentsHashList = List.empty, +// timestamp = processed.head.deploy.data.timestamp, +// version = 1 +// ), +// body = Body( +// state = RChainState( +// preStateHash = preStateHash, +// postStateHash = postStateHash, +// bonds = validatorsWithPayments +// .map(_.validator) +// .map(pk => Bond(ByteString.copyFrom(pk.bytes), 1)), +// blockNumber = blockNum +// ), +// deploys = processed.toList, +// systemDeploys = processedSystem.toList, +// rejectedDeploys = deploysToReject.map(RejectedDeploy(_)).toList +// ), +// justifications = List.empty, +// sender = ByteString.copyFrom(sender.bytes), +// seqNum = seqNum, +// sig = ByteString.copyFrom(Array.fill(32)((scala.util.Random.nextInt(256) - 128).toByte)), +// sigAlgorithm = "", +// shardId = "shardId" +// ) +// +// validatorsWithPayments.map { +// case ValidatorWithPayments(validatorPk, payments) => +// Stream +// .eval( +// StateTransition +// .make( +// preStateHash, +// validatorPk, +// seqNum, +// blockNum, +// payments.toList +// ) +// ) +// .map { +// case StateTransitionResult( +// postStateHash, +// chargedDeploysWithMeta, +// processedDeploys, +// processedSystem +// ) => +// ( +// packBlock(validatorPk, postStateHash, processedDeploys, processedSystem), +// chargedDeploysWithMeta +// ) +// } +// } +// } +// +// final case class LayerResult( +// mergeBlock: BlockWithPayments, +// mergedBlocks: List[BlockWithPayments], +// rejected: List[RejectedDeploy], +// commsAccepted: Long, +// commsrejected: Long, +// mergeTime: String +// ) +// def mkLayer[F[_]: Concurrent: Time: Log]( +// validatorsWithPayments: List[ValidatorWithPayments], +// baseBlock: BlockMessage, +// dagStore: BlockDagStorage[F] +// )( +// implicit runtimeManager: RuntimeManager[F] +// ): F[LayerResult] = { +// +// val baseState = baseBlock.body.state.postStateHash +// val seqNum = baseBlock.seqNum + 1 +// val mergingBlocksNum = (baseBlock.body.state.blockNumber + 1).toLong +// val mergerBlockNum = (baseBlock.body.state.blockNumber + 2).toLong +// val validatorsNum = validatorsWithPayments.size +// +// val mkBlocksToMerge = +// Log[F].info( +// s"${validatorsNum - 1} validators create blocks concurrently. (x3: play, replay, validation)" +// ) *> +// Stream +// .emits( +// mkBlocks[F](validatorsWithPayments, baseState, seqNum, mergingBlocksNum).dropRight(1) +// ) +// .parJoinProcBounded +// .map { +// case (b, payment) => +// ( +// b.copy(header = b.header.copy(parentsHashList = List(baseBlock.blockHash))), +// payment +// ) +// } +// .evalTap { case (b, _) => dagStore.insert(b, false) } +// .compile +// .toList +// +// for { +// // create children blocks +// v <- Stopwatch.duration(mkBlocksToMerge) // play +// _ <- mkBlocksToMerge // replay +// _ <- mkBlocksToMerge // validation +// (t, tailStateTransitionTime) = v +// +// _ <- Log[F].info(s"Done in ${tailStateTransitionTime}") +// (toMerge, mergedPayments) = t.unzip +// +// _ <- Log[F].info("Indexing blocks...") +// // merge children blocks +// indices <- (baseBlock +: toMerge) +// .traverse( +// b => +// BlockIndex( +// b.blockHash, +// b.body.deploys, +// b.body.systemDeploys, +// Blake2b256Hash.fromByteString(b.body.state.preStateHash), +// Blake2b256Hash.fromByteString(b.body.state.postStateHash), +// runtimeManager.getHistoryRepo +// ).map(b.blockHash -> _) +// ) +// .map(_.toMap) +// dag <- dagStore.getRepresentation +// +// _ <- Log[F].info("Preparing merged state... (x3: play, replay, validation)") +// merge = DagMerger.merge[F]( +// dag, +// baseBlock.blockHash, +// Blake2b256Hash.fromByteString(baseState), +// indices(_).deployChains.pure, +// runtimeManager.getHistoryRepo, +// DagMerger.costOptimalRejectionAlg +// ) +// v <- Stopwatch.duration(merge) //play +// _ <- merge // replay +// _ <- merge // validation +// ((postState, rejectedDeploys), mergeTime) = v +// mergedState = ByteString.copyFrom(postState.bytes.toArray) +// +// // create next base block (merge block) +// _ <- Log[F].info("Creating merge block... (x3: play, replay, validation)") +// mkMergeBlock = mkBlocks[F]( +// validatorsWithPayments, +// mergedState, +// seqNum, +// mergerBlockNum, +// rejectedDeploys +// ).last +// .map { +// case (b, balancesDiff) => +// ( +// b.copy(header = b.header.copy(parentsHashList = toMerge.map(_.blockHash))), +// balancesDiff +// ) +// } +// .compile +// .lastOrError +// r <- mkMergeBlock // play +// _ <- mkMergeBlock // replay +// _ <- mkMergeBlock // validation +// (nextBaseBlock, leaderPayments) = r +// _ <- dagStore.insert(nextBaseBlock, false) +// _ <- dagStore.recordDirectlyFinalized(nextBaseBlock.blockHash, _ => ().pure[F]) +// (rejectedLogs, acceptedLogs) = (nextBaseBlock +: toMerge) +// .flatMap(_.body.deploys) +// .map(d => (d, rejectedDeploys.contains(d.deploy.sig))) +// .partition { case (_, rejected) => rejected } +// commsAccepted = acceptedLogs +// .flatMap(_._1.deployLog) +// .collect { case c: CommEvent => c } +// .size +// .toLong +// commsRejected = rejectedLogs +// .flatMap(_._1.deployLog) +// .collect { case c: CommEvent => c } +// .size +// .toLong +// } yield LayerResult( +// BlockWithPayments(nextBaseBlock, leaderPayments), +// toMerge.zip(mergedPayments).map(BlockWithPayments.tupled), +// rejectedDeploys.map(RejectedDeploy(_)).toList, +// commsAccepted, +// commsRejected, +// mergeTime +// ) +// } + + final case class ValidatorWithPayments(validator: PublicKey, payments: Seq[Payment]) +// def go[F[_]: Concurrent: Time: RuntimeManager: BlockDagStorage: Log: Metrics]( +// genesis: BlockMessage, +// layers: Iterator[Seq[ValidatorWithPayments]], +// initBalances: BalanceSheet, +// epochLength: Int, +// mergesNum: Int = 1 +// ): Stream[F, (List[BlockMessage], BlockMessage)] = { +// val usersToTrack = initBalances +// .filterNot { +// // do not check per validator vaults +// case (User(_, pk, _), _) => +// layers.next().map(_.validator).contains(pk) +// } +// +// def newLayer( +// validatorsWithPayments: Seq[ValidatorWithPayments], +// baseBlock: BlockMessage, +// balanceSheetAcc: BalanceSheet, +// acceptedCommsAcc: Long, +// rejectedCommsAcc: Long, +// durAcc: Long, +// layerNum: Int +// ): F[(BlockMessage, List[BlockMessage], Int, BalanceSheet, Long, Long, Long)] = +// for { +// v <- Stopwatch.durationNano( +// mkLayer(validatorsWithPayments.toList, baseBlock, BlockDagStorage[F]) +// ) +// ( +// LayerResult( +// mergeBlockWithPayments, +// mergedBlocksWithPayments, +// rejectedOnMerge, +// commsAccepted, +// commsRejected, +// mergeTime +// ), +// dur +// ) = v +// +// // 1. initial check +// _ <- new Exception("State is not changed.").raiseError.whenA( +// baseBlock.body.state.postStateHash == mergeBlockWithPayments.b.body.state.postStateHash +// ) +// isEpochMerge = mergedBlocksWithPayments.head.b.body.state.blockNumber % epochLength == 0 +// paymentsOfferedToMerge = mergedBlocksWithPayments.map(_.payments) +// conflictingPaymentsPresent = conflictsPresent( +// paymentsOfferedToMerge.map(_.map(_.v.payment)) +// ) +// closeBlocksOfferedToMerge = mergedBlocksWithPayments +// .map(_.b) +// .map(DeployIndex.sysCloseBlockId) +// rejections = rejectedOnMerge.map(_.sig) +// // a. when merging epoch all but one closeBlocks should be rejected +// badEpochMerge = isEpochMerge && (closeBlocksOfferedToMerge diff rejections).size != 1 +// _ <- new Exception( +// "More then one close block left unrejected when merging blocks with epoch change" +// ).raiseError.whenA(badEpochMerge) +// // b. if conflicting payments present - there have to be rejected deploys +// // this check for epoch merge is problematic because close block on epoch depends on all payments +// // so all deploys are rejected +// wrongPaymentRejection = !isEpochMerge && rejectedOnMerge.nonEmpty != conflictingPaymentsPresent +// _ <- new Exception( +// "Transfers are conflicting but no rejections or vice versa." +// ).raiseError.whenA(wrongPaymentRejection) +// +// // 2. balances check +// paymentsMerged = paymentsOfferedToMerge.flatten.map { dwp => +// if (rejectedOnMerge.map(_.sig).contains(dwp.v.d.sig)) { +// dwp.copy( +// charge = dwp.charge.copy(rejected = true), +// v = dwp.v.copy(payment = dwp.v.payment.copy(rejected = true)) +// ) +// } else dwp +// } +// paymentsInMerge = mergeBlockWithPayments.payments +// newBalanceSheet = (paymentsInMerge ++ paymentsMerged) +// .flatMap { dwp => +// val v = List(dwp.charge, dwp.v.payment) +// assert( +// dwp.charge.rejected == dwp.v.payment.rejected, +// s"charge rejected ${dwp.charge.rejected}, payment rejected: ${dwp.v.payment.rejected}" +// ) +// v +// } +// .foldLeft(balanceSheetAcc) { +// case (acc, p) => +// acc + +// (p.source -> { +// val (currB, currPs) = +// acc.getOrElse(p.source, (0L, List.empty[Payment])) +// ( +// if (p.rejected || p.source == p.dest) currB +// else currB - p.amt, +// p +: currPs +// ) +// }) + +// (p.dest -> { +// val (currB, currPs) = +// acc.getOrElse(p.dest, (0L, List.empty[Payment])) +// ( +// if (p.rejected || p.source == p.dest) currB +// else currB + p.amt, +// p +: currPs +// ) +// }) +// } +// // a. verify new balance sheet - each tx has charge so number of TX should be even +// paymentsRejected = newBalanceSheet.flatMap(_._2._2).toSet.count(_.rejected) +// paymentsIn = newBalanceSheet.flatMap(_._2._2).toSet.count(v => !v.rejected) +// _ <- new Exception( +// s"Rejected payments number is odd ($paymentsRejected) (are you including charge?)" +// ).raiseError.whenA(paymentsRejected % 2 != 0) +// _ <- new Exception( +// s"Accepted payments number is odd ($paymentsIn) (are you including charge?)" +// ).raiseError.whenA(paymentsIn % 2 != 0) +// // b. verify state balances +// toVerify = newBalanceSheet.filterKeys(usersToTrack.contains) +// usersMadeTx = toVerify.filter { case (_, (_, txs)) => txs.nonEmpty } +// _ <- Log[F].info(s"Verifying ${toVerify.size} vaults balances at state ${Base16 +// .encode(mergeBlockWithPayments.b.body.state.postStateHash.toByteArray)}") +// _ <- verifyBalances( +// toVerify.iterator, +// mergeBlockWithPayments.b.body.state.postStateHash +// ) +// +// // Log +// txRejected = paymentsRejected / 2 +// txIn = paymentsIn / 2 +// txTotal = txIn + txRejected +// newAcceptedCommsAcc = acceptedCommsAcc + commsAccepted +// newRejectedCommsAcc = rejectedCommsAcc + commsRejected +// newDurAcc = durAcc + dur +// nextLayerNum = layerNum + 1 +// +// _ <- Log[F].info( +// s""" +// |Layer ${nextLayerNum} accomplished. Layer stats: +// | Time spent: ${Stopwatch.showTime(FiniteDuration(dur, NANOSECONDS))} +// | Time spent on merge: $mergeTime +// | COMM events accepted: ${commsAccepted} +// | COMM events rejected: ${commsRejected} +// |Network stats: +// | Validators num: ${validatorsWithPayments.size} +// | Users total: ${newBalanceSheet.size - validatorsWithPayments.size} +// | Users involved in TX: ${usersMadeTx.size} +// | Payments accepted: ${txIn} (${txIn.toFloat / txTotal * 100} %) +// | Payments rejected: ${txRejected} (${txRejected.toFloat / txTotal * 100} %) +// | Avg payments per block: ${txTotal.toFloat / nextLayerNum / validatorsWithPayments.size} +// | COMMs accepted total: $newAcceptedCommsAcc +// | COMMs rejected total: $newRejectedCommsAcc +// | Time spent: ${Stopwatch +// .showTime(FiniteDuration(newDurAcc, NANOSECONDS))} $newDurAcc +// | COMM EVENTS PER SEC (include rejected): ${(newAcceptedCommsAcc + newRejectedCommsAcc) +// .floatValue() / (newDurAcc.floatValue() / 1e9)} +// | COMM EVENTS PER SEC (real): ${newAcceptedCommsAcc +// .floatValue() / (newDurAcc.floatValue() / 1e9)} +// |""".stripMargin +// ) +// } yield ( +// mergeBlockWithPayments.b, +// mergedBlocksWithPayments.map(_.b), +// nextLayerNum, +// newBalanceSheet, +// newAcceptedCommsAcc, +// newRejectedCommsAcc, +// newDurAcc +// ) +// +// Stream +// .unfoldLoopEval((genesis, 0, initBalances, 0L, 0L, 0L)) { +// case ( +// baseBlock, +// layerNum, +// balanceSheetAcc, +// acceptedCommsAcc, +// rejectedCommsAcc, +// durAcc +// ) => +// val validatorsWithPayments = layers.next() +// +// for { +// _ <- Log[F].info("Verifying vaults balances at genesis...") +// _ <- verifyBalances( +// initBalances.filterKeys(usersToTrack.contains).iterator, +// genesis.body.state.postStateHash +// ).whenA(layerNum == 0) +// _ <- Log[F].info(s"Done. OK for ${usersToTrack.size} vaults at state ${Base16 +// .encode(genesis.body.state.postStateHash.toByteArray)}.") +// r <- newLayer( +// validatorsWithPayments, +// baseBlock, +// balanceSheetAcc, +// acceptedCommsAcc, +// rejectedCommsAcc, +// durAcc, +// layerNum +// ) +// ( +// b, +// merged, +// nextLayerNum, +// newBalanceSheet, +// newAcceptedCommsAcc, +// newRejectedCommsAcc, +// newDurAcc +// ) = r +// +// out = (merged, b) +// next = ( +// b, +// nextLayerNum, +// newBalanceSheet, +// newAcceptedCommsAcc, +// newRejectedCommsAcc, +// newDurAcc +// ) +// } yield (out, (nextLayerNum < mergesNum).guard[Option].as(next)) +// } +// } +} diff --git a/node/src/main/scala/coop/rchain/node/benchmark/utils/Payment.scala b/node/src/main/scala/coop/rchain/node/benchmark/utils/Payment.scala new file mode 100644 index 00000000000..50aa9e6cbf7 --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/utils/Payment.scala @@ -0,0 +1,157 @@ +package io.rhonix.node.benchmark.utils + +import cats.Functor +import cats.effect.Concurrent +import cats.syntax.all._ +import fs2.Stream +import io.rhonix.casper.protocol.{BlockMessage, DeployData} +import io.rhonix.casper.rholang.RuntimeManager +import io.rhonix.casper.util.ConstructDeploy +import io.rhonix.crypto.signatures.Signed +import io.rhonix.models.block.StateHash.StateHash +import io.rhonix.node.benchmark.utils +import io.rhonix.node.revvaultexport.VaultBalanceGetter +import io.rhonix.rspace.hashing.Blake2b256Hash +import io.rhonix.shared.{Base16, Time} +import io.rhonix.shared.syntax._ + +import scala.util.Random + +final case class Payment( + source: User, + dest: User, + amt: Long, + rejected: Boolean = false, + timestamp: Long = Random.nextLong() // this is to make all payments different +) +final case class PaymentDeploy(d: Signed[DeployData], payment: Payment) +final case class Charged[A](v: A, charge: Payment) + +object Payment { + + final case class BlockWithPayments(b: BlockMessage, payments: Seq[Charged[PaymentDeploy]]) + + type BalanceSheet = Map[User, (Long, Seq[Payment])] + + val rnd = new Random(System.currentTimeMillis()) + + def random(users: Seq[User], minTxAmt: Int, maxTxAmt: Int): Iterator[Payment] = + Iterator.continually({ + val s = users(rnd.nextInt(users.length)) + val t = users(rnd.nextInt(users.length)) + val amt = minTxAmt + rnd.nextInt((maxTxAmt - minTxAmt) + 1).toLong + utils.Payment(s, t, amt) + }) + + def randomBatches( + users: Seq[User], + minTxAmt: Int, + maxTxAmt: Int, + maxSize: Int + ): Iterator[Seq[Payment]] = { + require(maxSize > 0, "randomBatches accepts only positive maxSize") + random(users, minTxAmt, maxTxAmt) + .grouped(maxSize) + .map(l => l.take(1 + rnd.nextInt(l.size))) + } + + def conflictsPresent(payments: List[Seq[Payment]]): Boolean = + payments + .combinations(2) + .filter { + case List(l, r) => + // conflict present if the same sources or destinations are used + ((l.map(v => v.dest) ++ l.map(v => v.source)) intersect (r.map(v => v.dest) ++ r.map( + v => v.source + ))).nonEmpty + } + .take(1) + .nonEmpty + + def mkTxDeploy[F[_]: Functor: Time]( + payment: Payment, + printDebug: Boolean = true + ): F[PaymentDeploy] = { + def txRho(payer: String, payee: String, amt: Long) = { + val tx = + if (printDebug) s"""@vault!("transfer", to, amount, *key, *resultCh) | + | for (@r <- resultCh) { + | stdout!(("${payer} -> ${payee}", "${amt}", sender, r)) + | } + |""" + else s"""@vault!("transfer", to, amount, *key, *resultCh)""" + + s""" + |new + | rl(`rho:registry:lookup`), stdout(`rho:io:stdout`), revVaultCh, log, getBlockData(`rho:block:data`), blockDataCh + |in { + | rl!(`rho:rchain:revVault`, *revVaultCh) | + | getBlockData!(*blockDataCh) | + | for (@(_, revVault) <- revVaultCh; _, _, @sender <- blockDataCh) { + | match ("${payer}", "${payee}", ${amt}) { + | (from, to, amount) => { + | new vaultCh, revVaultKeyCh, deployerId(`rho:rchain:deployerId`) in { + | @revVault!("findOrCreate", from, *vaultCh) | + | @revVault!("deployerAuthKey", *deployerId, *revVaultKeyCh) | + | for (@(true, vault) <- vaultCh; key <- revVaultKeyCh) { + | new resultCh, r in { + | ${tx} + | } + | } + | } + | } + | } + | } + |}""".stripMargin + } + + val payerKey = payment.source.sk + val payerAddr = payment.source.addr + val payeeAddr = payment.dest.addr + val amt = payment.amt + ConstructDeploy + .sourceDeployNowF[F](txRho(payerAddr, payeeAddr, amt), sec = payerKey) + .map(PaymentDeploy(_, payment)) + } + + def verifyBalances[F[_]: Concurrent]( + balances: Iterator[(User, (Long, Seq[Payment]))], + state: StateHash + )( + implicit runtimeManager: RuntimeManager[F] + ): F[Unit] = { + + def getVault(addr: String): String = + s"""new return, rl(`rho:registry:lookup`), RevVaultCh, vaultCh, balanceCh in { + | rl!(`rho:rchain:revVault`, *RevVaultCh) | + | for (@(_, RevVault) <- RevVaultCh) { + | @RevVault!("findOrCreate", "${addr}", *vaultCh) | + | for (@(true, vault) <- vaultCh) { + | return!(vault) + | } + | } + |} + |""".stripMargin + + Stream + .fromIterator(balances) + .parEvalMapProcBounded { + case (User(_, _, addr), (paperBalance, txs)) => + for { + vaultPar <- runtimeManager.playExploratoryDeploy(getVault(addr), state) + runtime <- runtimeManager.spawnRuntime + _ <- runtime.reset(Blake2b256Hash.fromByteString(state)) + realBalance <- VaultBalanceGetter.getBalanceFromVaultPar(vaultPar.head, runtime) + errMsg = s""" + |Balance verification for ${addr} failed. + |State balance = ${realBalance.get} (${Base16.encode(state.toByteArray)} ), paper balance = $paperBalance. + |Transfers list: + |${txs.mkString("\n")} + |""".stripMargin + _ <- new Exception(errMsg).raiseError.unlessA(realBalance.contains(paperBalance)) + } yield () + } + .compile + .lastOrError + } +} diff --git a/node/src/main/scala/coop/rchain/node/benchmark/utils/StateTransition.scala b/node/src/main/scala/coop/rchain/node/benchmark/utils/StateTransition.scala new file mode 100644 index 00000000000..74e6cbc2f36 --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/utils/StateTransition.scala @@ -0,0 +1,76 @@ +package io.rhonix.node.benchmark.utils + +import cats.effect.Concurrent +import cats.syntax.all._ +import com.google.protobuf.ByteString +import io.rhonix.casper.protocol.{DeployData, ProcessedDeploy, ProcessedSystemDeploy} +import io.rhonix.casper.rholang.sysdeploys.CloseBlockDeploy +import io.rhonix.casper.rholang.{BlockRandomSeed, RuntimeManager} +import io.rhonix.crypto.signatures.Signed +import io.rhonix.crypto.{PrivateKey, PublicKey} +import io.rhonix.models.block.StateHash.StateHash +import io.rhonix.models.syntax._ +import io.rhonix.node.benchmark.utils +import io.rhonix.rholang.interpreter.SystemProcesses.BlockData +import io.rhonix.rholang.interpreter.util.RevAddress +import io.rhonix.shared.{Base16, Time} + +object StateTransition { + + final case class StateTransitionResult( + finalState: StateHash, + paymentsDone: Seq[Charged[PaymentDeploy]], + processedDeploys: Seq[ProcessedDeploy], + processedSysDeploys: Seq[ProcessedSystemDeploy] + ) + + /** Make state transition */ + def make[F[_]: Concurrent: RuntimeManager: Time]( + baseState: StateHash, + validator: PublicKey, + seqNum: Int, + blockNum: Long, + payments: List[Payment] + ): F[StateTransitionResult] = { + val perValidatorVault = + User(PrivateKey(ByteString.EMPTY), validator, Base16.encode(validator.bytes)) + + def computeState( + userDeploys: Seq[Signed[DeployData]] + ): F[(StateHash, Seq[ProcessedDeploy], Seq[ProcessedSystemDeploy])] = { + val rand = + BlockRandomSeed.randomGenerator("shardId", blockNum, validator, baseState.toBlake2b256Hash) + val cbRandomSeed = rand.splitByte(userDeploys.size.toByte) + assert(userDeploys.nonEmpty, "Attempting to compute state without user deploys.") + RuntimeManager[F] + .computeState(baseState)( + terms = userDeploys.distinct, + systemDeploys = CloseBlockDeploy(cbRandomSeed) :: Nil, + blockData = BlockData(blockNum, validator, seqNum.toLong), + rand = rand + ) + } + for { + paymentDeploys <- payments.traverse(Payment.mkTxDeploy(_, printDebug = false)) + r <- computeState(paymentDeploys.map(_.d)).map { + case (s, processedDeploys, sp) => + assert( + !processedDeploys.exists(_.isFailed), + "Failed deploys found. Check if you users have enough REV to continue payments." + ) + val charged = + processedDeploys.map { d => + val payerAddr = RevAddress.fromPublicKey(d.deploy.pk).get.address.toBase58 + val charge = utils.Payment( + // TODO key not avail here, but not needed actually + User(PrivateKey(ByteString.EMPTY), d.deploy.pk, payerAddr), + perValidatorVault, + d.cost.cost + ) + Charged(paymentDeploys.find(_.d.sig == d.deploy.sig).get, charge) + } + StateTransitionResult(s, charged, processedDeploys, sp) + } + } yield r + } +} diff --git a/node/src/main/scala/coop/rchain/node/benchmark/utils/User.scala b/node/src/main/scala/coop/rchain/node/benchmark/utils/User.scala new file mode 100644 index 00000000000..f9b4861eae4 --- /dev/null +++ b/node/src/main/scala/coop/rchain/node/benchmark/utils/User.scala @@ -0,0 +1,20 @@ +package io.rhonix.node.benchmark.utils + +import io.rhonix.crypto.signatures.Secp256k1 +import io.rhonix.crypto.{PrivateKey, PublicKey} +import io.rhonix.rholang.interpreter.util.RevAddress + +final case class User(sk: PrivateKey, pk: PublicKey, addr: String) { + override def equals(obj: Any): Boolean = obj match { + case User(_, _, a) => a == addr + case _ => false + } + override def hashCode(): Int = addr.hashCode() +} + +object User { + def random: Iterator[User] = + Iterator.continually(Secp256k1.newKeyPair).map { + case (sk, pk) => User(sk, pk, RevAddress.fromPublicKey(pk).get.address.toBase58) + } +}