From 8b5218adc68f2e4571744e22dad0b22641f721b1 Mon Sep 17 00:00:00 2001 From: gerceboss Date: Thu, 31 Jul 2025 13:07:44 +0530 Subject: [PATCH 1/2] update error handling --- eth/p2p/discoveryv5/protocol.nim | 213 ++++++++++++++++++++----------- tests/p2p/test_discoveryv5.nim | 6 +- 2 files changed, 142 insertions(+), 77 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1f23164d..deb3916b 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -467,83 +467,91 @@ proc banNode*(d: Protocol, n: Node, banPeriod: chronos.Duration) = proc isBanned*(d: Protocol, nodeId: NodeId): bool = d.banNodes and d.routingTable.isBanned(nodeId) -proc receive*(d: Protocol, a: Address, packet: openArray[byte]) = +proc receive*(d: Protocol, a: Address, packet: openArray[byte]) : DiscResult[void] = discv5_network_bytes.inc(packet.len.int64, labelValues = [$Direction.In]) let decoded = d.codec.decodePacket(a, packet) - if decoded.isOk: - let packet = decoded[] - case packet.flag - of OrdinaryMessage: - if d.isBanned(packet.srcId): - trace "Ignoring received OrdinaryMessage from banned node", nodeId = packet.srcId - return - - if packet.messageOpt.isSome(): - let message = packet.messageOpt.get() - trace "Received message packet", srcId = packet.srcId, address = a, - kind = message.kind - d.handleMessage(packet.srcId, a, message) - else: - trace "Not decryptable message packet received", - srcId = packet.srcId, address = a - d.sendWhoareyou(packet.srcId, a, packet.requestNonce, - d.getNode(packet.srcId)) - - of Flag.Whoareyou: - trace "Received whoareyou packet", address = a - var pr: PendingRequest - if d.pendingRequests.take(packet.whoareyou.requestNonce, pr): - let toNode = pr.node - # This is a node we previously contacted and thus must have an address. - doAssert(toNode.address.isSome()) - let address = toNode.address.get() - let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, - address, pr.message, packet.whoareyou, toNode.pubkey) - - # Finished setting up the session on our side, so store the ENR of the - # peer in the session cache. - d.codec.sessions.setEnr(toNode.id, address, toNode.record) - - trace "Send handshake message packet", dstId = toNode.id, address - d.send(toNode, data) - else: - debug "Timed out or unrequested whoareyou packet", address = a - of HandshakeMessage: - if d.isBanned(packet.srcIdHs): - trace "Ignoring received HandshakeMessage from banned node", nodeId = packet.srcIdHs - return - - trace "Received handshake message packet", srcId = packet.srcIdHs, - address = a, kind = packet.message.kind - - # For a handshake message it is possible that we received an newer ENR. - # In that case we can add/update it to the routing table. - if packet.node.isSome(): - let node = packet.node.get() - # Lets not add nodes without correct IP in the ENR to the routing table. - # The ENR could contain bogus IPs and although they would get removed - # on the next revalidation, one could spam these as the handshake - # message occurs on (first) incoming messages. - if node.address.isSome() and a == node.address.get(): - if d.addNode(node): - trace "Added new node to routing table after handshake", node - - # Received an ENR in the handshake, add it to the session that was just - # created in the session cache. - d.codec.sessions.setEnr(packet.srcIdHs, a, node.record) - else: - # Did not receive an ENR in the handshake, this means that the ENR used - # is up to date. Get it from the routing table which should normally - # be there unless the request was started manually (E.g. from a JSON-RPC call). - let node = d.getNode(packet.srcIdHs) - if node.isSome(): - d.codec.sessions.setEnr(packet.srcIdHs, a, node.value().record) - - # The handling of the message needs to be done after adding the ENR. - d.handleMessage(packet.srcIdHs, a, packet.message, packet.node) - else: + if decoded.isErr: trace "Packet decoding error", error = decoded.error, address = a + return err("Failed to decode packet") + + let decodedPacket = decoded[] + case decodedPacket.flag + of OrdinaryMessage: + if d.isBanned(decodedPacket.srcId): + trace "Ignoring received OrdinaryMessage from banned node", nodeId = decodedPacket.srcId + return ok() + + if decodedPacket.messageOpt.isSome(): + let message = decodedPacket.messageOpt.get() + trace "Received message packet", srcId = decodedPacket.srcId, address = a, + kind = message.kind + d.handleMessage(decodedPacket.srcId, a, message) + else: + trace "Not decryptable message packet received", + srcId = decodedPacket.srcId, address = a + d.sendWhoareyou(decodedPacket.srcId, a, decodedPacket.requestNonce, + d.getNode(decodedPacket.srcId)) + + return ok() + + of Flag.Whoareyou: + trace "Received whoareyou packet", address = a + var pr: PendingRequest + if d.pendingRequests.take(decodedPacket.whoareyou.requestNonce, pr): + let toNode = pr.node + # This is a node we previously contacted and thus must have an address. + doAssert(toNode.address.isSome()) + let address = toNode.address.get() + let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, + address, pr.message, decodedPacket.whoareyou, toNode.pubkey) + + # Finished setting up the session on our side, so store the ENR of the + # peer in the session cache. + d.codec.sessions.setEnr(toNode.id, address, toNode.record) + + trace "Send handshake message packet", dstId = toNode.id, address + d.send(toNode, data) + else: + debug "Timed out or unrequested whoareyou packet", address = a + + return ok() + + of HandshakeMessage: + if d.isBanned(decodedPacket.srcIdHs): + trace "Ignoring received HandshakeMessage from banned node", nodeId = decodedPacket.srcIdHs + return ok() + + trace "Received handshake message packet", srcId = decodedPacket.srcIdHs, + address = a, kind = decodedPacket.message.kind + + # For a handshake message it is possible that we received an newer ENR. + # In that case we can add/update it to the routing table. + if decodedPacket.node.isSome(): + let node = decodedPacket.node.get() + # Lets not add nodes without correct IP in the ENR to the routing table. + # The ENR could contain bogus IPs and although they would get removed + # on the next revalidation, one could spam these as the handshake + # message occurs on (first) incoming messages. + if node.address.isSome() and a == node.address.get(): + if d.addNode(node): + trace "Added new node to routing table after handshake", node + + # Received an ENR in the handshake, add it to the session that was just + # created in the session cache. + d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.record) + else: + # Did not receive an ENR in the handshake, this means that the ENR used + # is up to date. Get it from the routing table which should normally + # be there unless the request was started manually (E.g. from a JSON-RPC call). + let node = d.getNode(decodedPacket.srcIdHs) + if node.isSome(): + d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.value().record) + + # The handling of the message needs to be done after adding the ENR. + d.handleMessage(decodedPacket.srcIdHs, a, decodedPacket.message, decodedPacket.node) + + return ok() proc processClient(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async: (raises: []).} = @@ -556,7 +564,9 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress): warn "Transport getMessage", exception = e.name, msg = e.msg return - proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) + let res = proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) + if res.isErr: + debug "Failed to process received packet", error = res.error # TODO: This could be improved to do the clean-up immediately in case a non # whoareyou response does arrive, but we would need to store the AuthTag @@ -1180,6 +1190,61 @@ proc newProtocol*( responseTimeout: config.responseTimeout, rng: rng) +proc newDiscoveryV5*( + privKey: PrivateKey, + enrIp: Opt[IpAddress], + enrTcpPort: Opt[Port], + enrUdpPort: Opt[Port], + bootstrapRecords: openArray[enr.Record] = [], + bindPort: Port, + bindIp = IPv6_any(), + enrAutoUpdate = true, + rng = newRng(), +): Protocol = + ## Create a new Discovery v5 protocol instance + let protocol = newProtocol( + privKey = privKey, + enrIp = enrIp, + enrTcpPort = enrTcpPort, + enrUdpPort = enrUdpPort, + bootstrapRecords = bootstrapRecords, + bindPort = bindPort, + bindIp = bindIp, + enrAutoUpdate = enrAutoUpdate, + rng = rng + ) + protocol.seedTable() + return protocol + +proc newDiscoveryV5WithTransport*( + privKey: PrivateKey, + enrIp: Opt[IpAddress], + enrTcpPort: Opt[Port], + enrUdpPort: Opt[Port], + bootstrapRecords: openArray[enr.Record] = [], + transp: DatagramTransport, + bindAddress: OptAddress, + enrAutoUpdate = true, + rng = newRng(), +): Protocol = + ## Create a new Discovery v5 protocol instance with an existing transport + ## This allows sharing the same UDP transport between multiple protocols + let protocol = newProtocol( + privKey = privKey, + enrIp = enrIp, + enrTcpPort = enrTcpPort, + enrUdpPort = enrUdpPort, + bootstrapRecords = bootstrapRecords, + bindPort = bindAddress.port, + bindIp = bindAddress.ip, + enrAutoUpdate = enrAutoUpdate, + rng = rng + ) + + protocol.transp = transp + protocol.seedTable() + return protocol + proc `$`*(a: OptAddress): string = if a.ip.isNone(): "*:" & $a.port diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 8f484895..ab9911d5 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -701,7 +701,7 @@ suite "Discovery v5.1 Tests": let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() # Checking different nodeIds but same address check receiveNode.codec.handshakes.len == 5 @@ -731,7 +731,7 @@ suite "Discovery v5.1 Tests": let a = localAddress(20303 + i) let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() # Checking different nodeIds but same address check receiveNode.codec.handshakes.len == 5 @@ -763,7 +763,7 @@ suite "Discovery v5.1 Tests": for i in 0 ..< 5: let (packet, requestNonce) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() if i == 0: firstRequestNonce = requestNonce From ed5cde15f3fc73f409cfe4202ed10403a679b7c1 Mon Sep 17 00:00:00 2001 From: gerceboss Date: Wed, 6 Aug 2025 17:44:57 +0530 Subject: [PATCH 2/2] update suggested changes --- doc/discv5.md | 14 ++++++++ eth/p2p/discoveryv5/protocol.nim | 61 +++++++++++++++++++++++--------- 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/doc/discv5.md b/doc/discv5.md index b446111b..c448f4f1 100644 --- a/doc/discv5.md +++ b/doc/discv5.md @@ -13,6 +13,8 @@ and `rlp`. ## How to use +### Standard usage + ```Nim let rng = keys.newRng @@ -36,6 +38,18 @@ d = newProtocol(privKey, ip, tcpPort, udpPort, d.open() # Start listening and add bootstrap nodes to the routing table. ``` +### Discovery v5 with shared transport + +```Nim + d1 = newDiscoveryV5WithTransport(privKey1, enrIp = Opt.some(ip), + enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress) + d2 = newDiscoveryV5WithTransport(privKey2, enrIp = Opt.some(ip), + enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress) + d1.openWithTransport() # Start listening with shared transport + d2.openWithTransport() # Start listening with shared transport +``` +This allows multiple Discovery v5 nodes to share the same UDP socket, useful for running multiple protocols on the same port. + Next there are two ways to run the protocol. One can call `d.start()` and two loops will be started: diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index deb3916b..9e9cd500 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -470,12 +470,10 @@ proc isBanned*(d: Protocol, nodeId: NodeId): bool = proc receive*(d: Protocol, a: Address, packet: openArray[byte]) : DiscResult[void] = discv5_network_bytes.inc(packet.len.int64, labelValues = [$Direction.In]) - let decoded = d.codec.decodePacket(a, packet) - if decoded.isErr: - trace "Packet decoding error", error = decoded.error, address = a + let decodedPacket = d.codec.decodePacket(a, packet).valueOr: + trace "Packet decoding error", error = error, address = a return err("Failed to decode packet") - let decodedPacket = decoded[] case decodedPacket.flag of OrdinaryMessage: if d.isBanned(decodedPacket.srcId): @@ -564,9 +562,7 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress): warn "Transport getMessage", exception = e.name, msg = e.msg return - let res = proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) - if res.isErr: - debug "Failed to process received packet", error = res.error + discard proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) # TODO: This could be improved to do the clean-up immediately in case a non # whoareyou response does arrive, but we would need to store the AuthTag @@ -1190,30 +1186,42 @@ proc newProtocol*( responseTimeout: config.responseTimeout, rng: rng) -proc newDiscoveryV5*( +# Alias for clarity +type + DiscoveryV5Protocol* = Protocol + +proc new*(T: type DiscoveryV5Protocol, privKey: PrivateKey, enrIp: Opt[IpAddress], enrTcpPort: Opt[Port], enrUdpPort: Opt[Port], - bootstrapRecords: openArray[enr.Record] = [], + localEnrFields: openArray[(string, seq[byte])] = [], + bootstrapRecords: openArray[Record] = [], + previousRecord = Opt.none(enr.Record), bindPort: Port, - bindIp = IPv6_any(), - enrAutoUpdate = true, - rng = newRng(), -): Protocol = - ## Create a new Discovery v5 protocol instance + bindIp = IPv4_any(), + enrAutoUpdate = false, + banNodes = false, + config = defaultDiscoveryConfig, + rng = newRng() + ): DiscoveryV5Protocol = + let protocol = newProtocol( privKey = privKey, enrIp = enrIp, enrTcpPort = enrTcpPort, enrUdpPort = enrUdpPort, + localEnrFields = localEnrFields, bootstrapRecords = bootstrapRecords, + previousRecord = previousRecord, bindPort = bindPort, bindIp = bindIp, enrAutoUpdate = enrAutoUpdate, + banNodes = banNodes, + config = config, rng = rng ) - protocol.seedTable() + return protocol proc newDiscoveryV5WithTransport*( @@ -1242,7 +1250,7 @@ proc newDiscoveryV5WithTransport*( ) protocol.transp = transp - protocol.seedTable() + return protocol proc `$`*(a: OptAddress): string = @@ -1293,3 +1301,24 @@ proc closeWait*(d: Protocol) {.async: (raises: []).} = proc close*(d: Protocol) {.deprecated: "Please use closeWait() instead".} = asyncSpawn d.closeWait() + +proc openWithTransport*(d: Protocol) = + ## Start the discovery protocol using an externally managed transport. + ## Does not create or bind a new transport. + info "Starting discovery node (with external transport)", node = d.localNode, + bindAddress = d.bindAddress + d.seedTable() + +proc closeWithTransport*(d: Protocol) {.async: (raises: []).} = + ## Stop the discovery protocol, but do not close the external transport. + doAssert(not d.transp.closed) + debug "Closing discovery node (with external transport)", node = d.localNode + var futures: seq[Future[void]] + if not d.revalidateLoop.isNil: + futures.add(d.revalidateLoop.cancelAndWait()) + if not d.refreshLoop.isNil: + futures.add(d.refreshLoop.cancelAndWait()) + if not d.ipMajorityLoop.isNil: + futures.add(d.ipMajorityLoop.cancelAndWait()) + await noCancel(allFutures(futures)) + # Do not close d.transp here, as it is managed externally