diff --git a/doc/discv5.md b/doc/discv5.md index b446111b..c448f4f1 100644 --- a/doc/discv5.md +++ b/doc/discv5.md @@ -13,6 +13,8 @@ and `rlp`. ## How to use +### Standard usage + ```Nim let rng = keys.newRng @@ -36,6 +38,18 @@ d = newProtocol(privKey, ip, tcpPort, udpPort, d.open() # Start listening and add bootstrap nodes to the routing table. ``` +### Discovery v5 with shared transport + +```Nim + d1 = newDiscoveryV5WithTransport(privKey1, enrIp = Opt.some(ip), + enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress) + d2 = newDiscoveryV5WithTransport(privKey2, enrIp = Opt.some(ip), + enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress) + d1.openWithTransport() # Start listening with shared transport + d2.openWithTransport() # Start listening with shared transport +``` +This allows multiple Discovery v5 nodes to share the same UDP socket, useful for running multiple protocols on the same port. + Next there are two ways to run the protocol. One can call `d.start()` and two loops will be started: diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1f23164d..9e9cd500 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -467,83 +467,89 @@ proc banNode*(d: Protocol, n: Node, banPeriod: chronos.Duration) = proc isBanned*(d: Protocol, nodeId: NodeId): bool = d.banNodes and d.routingTable.isBanned(nodeId) -proc receive*(d: Protocol, a: Address, packet: openArray[byte]) = +proc receive*(d: Protocol, a: Address, packet: openArray[byte]) : DiscResult[void] = discv5_network_bytes.inc(packet.len.int64, labelValues = [$Direction.In]) - let decoded = d.codec.decodePacket(a, packet) - if decoded.isOk: - let packet = decoded[] - case packet.flag - of OrdinaryMessage: - if d.isBanned(packet.srcId): - trace "Ignoring received OrdinaryMessage from banned node", nodeId = packet.srcId - return - - if packet.messageOpt.isSome(): - let message = packet.messageOpt.get() - trace "Received message packet", srcId = packet.srcId, address = a, - kind = message.kind - d.handleMessage(packet.srcId, a, message) - else: - trace "Not decryptable message packet received", - srcId = packet.srcId, address = a - d.sendWhoareyou(packet.srcId, a, packet.requestNonce, - d.getNode(packet.srcId)) - - of Flag.Whoareyou: - trace "Received whoareyou packet", address = a - var pr: PendingRequest - if d.pendingRequests.take(packet.whoareyou.requestNonce, pr): - let toNode = pr.node - # This is a node we previously contacted and thus must have an address. - doAssert(toNode.address.isSome()) - let address = toNode.address.get() - let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, - address, pr.message, packet.whoareyou, toNode.pubkey) - - # Finished setting up the session on our side, so store the ENR of the - # peer in the session cache. - d.codec.sessions.setEnr(toNode.id, address, toNode.record) - - trace "Send handshake message packet", dstId = toNode.id, address - d.send(toNode, data) - else: - debug "Timed out or unrequested whoareyou packet", address = a - of HandshakeMessage: - if d.isBanned(packet.srcIdHs): - trace "Ignoring received HandshakeMessage from banned node", nodeId = packet.srcIdHs - return - - trace "Received handshake message packet", srcId = packet.srcIdHs, - address = a, kind = packet.message.kind - - # For a handshake message it is possible that we received an newer ENR. - # In that case we can add/update it to the routing table. - if packet.node.isSome(): - let node = packet.node.get() - # Lets not add nodes without correct IP in the ENR to the routing table. - # The ENR could contain bogus IPs and although they would get removed - # on the next revalidation, one could spam these as the handshake - # message occurs on (first) incoming messages. - if node.address.isSome() and a == node.address.get(): - if d.addNode(node): - trace "Added new node to routing table after handshake", node - - # Received an ENR in the handshake, add it to the session that was just - # created in the session cache. - d.codec.sessions.setEnr(packet.srcIdHs, a, node.record) - else: - # Did not receive an ENR in the handshake, this means that the ENR used - # is up to date. Get it from the routing table which should normally - # be there unless the request was started manually (E.g. from a JSON-RPC call). - let node = d.getNode(packet.srcIdHs) - if node.isSome(): - d.codec.sessions.setEnr(packet.srcIdHs, a, node.value().record) - - # The handling of the message needs to be done after adding the ENR. - d.handleMessage(packet.srcIdHs, a, packet.message, packet.node) - else: - trace "Packet decoding error", error = decoded.error, address = a + let decodedPacket = d.codec.decodePacket(a, packet).valueOr: + trace "Packet decoding error", error = error, address = a + return err("Failed to decode packet") + + case decodedPacket.flag + of OrdinaryMessage: + if d.isBanned(decodedPacket.srcId): + trace "Ignoring received OrdinaryMessage from banned node", nodeId = decodedPacket.srcId + return ok() + + if decodedPacket.messageOpt.isSome(): + let message = decodedPacket.messageOpt.get() + trace "Received message packet", srcId = decodedPacket.srcId, address = a, + kind = message.kind + d.handleMessage(decodedPacket.srcId, a, message) + else: + trace "Not decryptable message packet received", + srcId = decodedPacket.srcId, address = a + d.sendWhoareyou(decodedPacket.srcId, a, decodedPacket.requestNonce, + d.getNode(decodedPacket.srcId)) + + return ok() + + of Flag.Whoareyou: + trace "Received whoareyou packet", address = a + var pr: PendingRequest + if d.pendingRequests.take(decodedPacket.whoareyou.requestNonce, pr): + let toNode = pr.node + # This is a node we previously contacted and thus must have an address. + doAssert(toNode.address.isSome()) + let address = toNode.address.get() + let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, + address, pr.message, decodedPacket.whoareyou, toNode.pubkey) + + # Finished setting up the session on our side, so store the ENR of the + # peer in the session cache. + d.codec.sessions.setEnr(toNode.id, address, toNode.record) + + trace "Send handshake message packet", dstId = toNode.id, address + d.send(toNode, data) + else: + debug "Timed out or unrequested whoareyou packet", address = a + + return ok() + + of HandshakeMessage: + if d.isBanned(decodedPacket.srcIdHs): + trace "Ignoring received HandshakeMessage from banned node", nodeId = decodedPacket.srcIdHs + return ok() + + trace "Received handshake message packet", srcId = decodedPacket.srcIdHs, + address = a, kind = decodedPacket.message.kind + + # For a handshake message it is possible that we received an newer ENR. + # In that case we can add/update it to the routing table. + if decodedPacket.node.isSome(): + let node = decodedPacket.node.get() + # Lets not add nodes without correct IP in the ENR to the routing table. + # The ENR could contain bogus IPs and although they would get removed + # on the next revalidation, one could spam these as the handshake + # message occurs on (first) incoming messages. + if node.address.isSome() and a == node.address.get(): + if d.addNode(node): + trace "Added new node to routing table after handshake", node + + # Received an ENR in the handshake, add it to the session that was just + # created in the session cache. + d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.record) + else: + # Did not receive an ENR in the handshake, this means that the ENR used + # is up to date. Get it from the routing table which should normally + # be there unless the request was started manually (E.g. from a JSON-RPC call). + let node = d.getNode(decodedPacket.srcIdHs) + if node.isSome(): + d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.value().record) + + # The handling of the message needs to be done after adding the ENR. + d.handleMessage(decodedPacket.srcIdHs, a, decodedPacket.message, decodedPacket.node) + + return ok() proc processClient(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async: (raises: []).} = @@ -556,7 +562,7 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress): warn "Transport getMessage", exception = e.name, msg = e.msg return - proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) + discard proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf) # TODO: This could be improved to do the clean-up immediately in case a non # whoareyou response does arrive, but we would need to store the AuthTag @@ -1180,6 +1186,73 @@ proc newProtocol*( responseTimeout: config.responseTimeout, rng: rng) +# Alias for clarity +type + DiscoveryV5Protocol* = Protocol + +proc new*(T: type DiscoveryV5Protocol, + privKey: PrivateKey, + enrIp: Opt[IpAddress], + enrTcpPort: Opt[Port], + enrUdpPort: Opt[Port], + localEnrFields: openArray[(string, seq[byte])] = [], + bootstrapRecords: openArray[Record] = [], + previousRecord = Opt.none(enr.Record), + bindPort: Port, + bindIp = IPv4_any(), + enrAutoUpdate = false, + banNodes = false, + config = defaultDiscoveryConfig, + rng = newRng() + ): DiscoveryV5Protocol = + + let protocol = newProtocol( + privKey = privKey, + enrIp = enrIp, + enrTcpPort = enrTcpPort, + enrUdpPort = enrUdpPort, + localEnrFields = localEnrFields, + bootstrapRecords = bootstrapRecords, + previousRecord = previousRecord, + bindPort = bindPort, + bindIp = bindIp, + enrAutoUpdate = enrAutoUpdate, + banNodes = banNodes, + config = config, + rng = rng + ) + + return protocol + +proc newDiscoveryV5WithTransport*( + privKey: PrivateKey, + enrIp: Opt[IpAddress], + enrTcpPort: Opt[Port], + enrUdpPort: Opt[Port], + bootstrapRecords: openArray[enr.Record] = [], + transp: DatagramTransport, + bindAddress: OptAddress, + enrAutoUpdate = true, + rng = newRng(), +): Protocol = + ## Create a new Discovery v5 protocol instance with an existing transport + ## This allows sharing the same UDP transport between multiple protocols + let protocol = newProtocol( + privKey = privKey, + enrIp = enrIp, + enrTcpPort = enrTcpPort, + enrUdpPort = enrUdpPort, + bootstrapRecords = bootstrapRecords, + bindPort = bindAddress.port, + bindIp = bindAddress.ip, + enrAutoUpdate = enrAutoUpdate, + rng = rng + ) + + protocol.transp = transp + + return protocol + proc `$`*(a: OptAddress): string = if a.ip.isNone(): "*:" & $a.port @@ -1228,3 +1301,24 @@ proc closeWait*(d: Protocol) {.async: (raises: []).} = proc close*(d: Protocol) {.deprecated: "Please use closeWait() instead".} = asyncSpawn d.closeWait() + +proc openWithTransport*(d: Protocol) = + ## Start the discovery protocol using an externally managed transport. + ## Does not create or bind a new transport. + info "Starting discovery node (with external transport)", node = d.localNode, + bindAddress = d.bindAddress + d.seedTable() + +proc closeWithTransport*(d: Protocol) {.async: (raises: []).} = + ## Stop the discovery protocol, but do not close the external transport. + doAssert(not d.transp.closed) + debug "Closing discovery node (with external transport)", node = d.localNode + var futures: seq[Future[void]] + if not d.revalidateLoop.isNil: + futures.add(d.revalidateLoop.cancelAndWait()) + if not d.refreshLoop.isNil: + futures.add(d.refreshLoop.cancelAndWait()) + if not d.ipMajorityLoop.isNil: + futures.add(d.ipMajorityLoop.cancelAndWait()) + await noCancel(allFutures(futures)) + # Do not close d.transp here, as it is managed externally diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 8f484895..ab9911d5 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -701,7 +701,7 @@ suite "Discovery v5.1 Tests": let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() # Checking different nodeIds but same address check receiveNode.codec.handshakes.len == 5 @@ -731,7 +731,7 @@ suite "Discovery v5.1 Tests": let a = localAddress(20303 + i) let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() # Checking different nodeIds but same address check receiveNode.codec.handshakes.len == 5 @@ -763,7 +763,7 @@ suite "Discovery v5.1 Tests": for i in 0 ..< 5: let (packet, requestNonce) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + check receiveNode.receive(a, packet).isOk() if i == 0: firstRequestNonce = requestNonce