From 5dd5f37d6c343584f4cd651ae1d906272ffda1f2 Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Tue, 22 Aug 2017 12:21:42 -0300 Subject: [PATCH 01/10] Fix encoding of ECDSA signatures and public keys We were using the Bitcoin format when encoding signatures, with the recovery id (v) at the first byte. Also change decode_public_key() to accept only 64-bytes long public keys and encode_raw_public_key() to generate the 64-bytes version as well (without the \x04 prefix) as that's what we must use when generating addresses. --- evm/ecc/backends/coincurve.py | 55 +++--------- evm/utils/address.py | 6 +- evm/utils/ecdsa.py | 14 ++-- evm/utils/secp256k1.py | 9 +- tests/__init__.py | 0 .../test_coincurve_ecc_backend.py | 72 ---------------- tests/core/ecdsa-utils/test_ecdsa_utils.py | 84 ++++--------------- .../test_public_key_conversion.py | 18 ++-- tests/ecc/test_backends.py | 62 ++++++++++++++ tests/ecdsa_fixtures.py | 51 +++++++++++ 10 files changed, 164 insertions(+), 207 deletions(-) create mode 100644 tests/__init__.py delete mode 100644 tests/coincurve-ecc/test_coincurve_ecc_backend.py create mode 100644 tests/ecc/test_backends.py create mode 100644 tests/ecdsa_fixtures.py diff --git a/evm/ecc/backends/coincurve.py b/evm/ecc/backends/coincurve.py index a4c3ccf300..333341fc2a 100644 --- a/evm/ecc/backends/coincurve.py +++ b/evm/ecc/backends/coincurve.py @@ -1,26 +1,15 @@ from .base import BaseECCBackend from evm.utils.ecdsa import ( + decode_signature, encode_signature, ) -from evm.utils.numeric import ( - big_endian_to_int, - safe_ord -) - -from evm.constants import ( - NULL_BYTE, -) - from evm.utils.keccak import ( keccak, ) -from evm.utils.secp256k1 import ( - decode_public_key, - encode_raw_public_key, -) +from evm.utils.secp256k1 import decode_public_key class CoinCurveECCBackend(BaseECCBackend): @@ -40,42 +29,20 @@ def ecdsa_sign(self, msg, private_key): def ecdsa_raw_sign(self, msg_hash, private_key): signature = self.keys.PrivateKey(private_key).sign_recoverable(msg_hash, hasher=None) - v = safe_ord(signature[64]) + 27 - r = big_endian_to_int(signature[0:32]) - s = big_endian_to_int(signature[32:64]) - return v, r, s + return decode_signature(signature) def ecdsa_verify(self, msg, signature, public_key): - signature = signature[1:] + NULL_BYTE - signature = self.__recoverable_to_normal(signature) - return self.keys.PublicKey(public_key).verify(signature, msg, hasher=keccak) + return self.ecdsa_recover(msg, signature) == public_key def ecdsa_raw_verify(self, msg_hash, vrs, raw_public_key): - v, r, s = vrs - signature = encode_signature(v, r, s)[1:] + NULL_BYTE - signature = self.__recoverable_to_normal(signature) - public_key = encode_raw_public_key(raw_public_key) - return self.keys.PublicKey(public_key).verify(signature, msg_hash, hasher=None) + return self.ecdsa_raw_recover(msg_hash, vrs) == raw_public_key def ecdsa_recover(self, msg, signature): - signature = signature[1:] + NULL_BYTE - return self.keys.PublicKey.from_signature_and_message(signature, - msg, - hasher=keccak - ).format(compressed=False) + return self.keys.PublicKey.from_signature_and_message( + signature, msg, hasher=keccak).format(compressed=False)[1:] def ecdsa_raw_recover(self, msg_hash, vrs): - v, r, s = vrs - signature = encode_signature(v, r, s)[1:] + NULL_BYTE - raw_public_key = self.keys.PublicKey.from_signature_and_message(signature, - msg_hash, - hasher=None - ).format(compressed=False) - return decode_public_key(raw_public_key) - - def __recoverable_to_normal(self, signature): - return self.ecdsa.cdata_to_der( - self.ecdsa.recoverable_convert( - self.ecdsa.deserialize_recoverable(signature) - ) - ) + signature = encode_signature(*vrs) + public_key = self.keys.PublicKey.from_signature_and_message( + signature, msg_hash, hasher=None).format(compressed=False)[1:] + return decode_public_key(public_key) diff --git a/evm/utils/address.py b/evm/utils/address.py index 5b5cb9c401..2cf9d71d09 100644 --- a/evm/utils/address.py +++ b/evm/utils/address.py @@ -27,4 +27,8 @@ def private_key_to_address(private_key): def public_key_to_address(public_key): - return keccak(public_key[1:])[-20:] + if len(public_key) != 64: + raise ValueError( + "Unexpected public key format: {}. Public keys must be 64 bytes long and must not " + "include the fixed \x04 prefix".format(public_key)) + return keccak(public_key)[-20:] diff --git a/evm/utils/ecdsa.py b/evm/utils/ecdsa.py index 591fb3da91..63d31f2a27 100644 --- a/evm/utils/ecdsa.py +++ b/evm/utils/ecdsa.py @@ -49,11 +49,11 @@ class BadSignature(ValueError): def encode_signature(v, r, s): - vb = int_to_byte(v) + vb = int_to_byte(v - 27) rb = pad32(int_to_big_endian(r)) sb = pad32(int_to_big_endian(s)) - return b''.join((vb, rb, sb)) + return b''.join((rb, sb, vb)) def decode_signature(signature): @@ -64,13 +64,9 @@ def decode_signature(signature): "signature must be exactly 65 bytes in length: got {0}".format(len(signature)) ) - rb = signature[1:33] - sb = signature[33:65] - - v = signature[0] - r = big_endian_to_int(rb) - s = big_endian_to_int(sb) - + r = big_endian_to_int(signature[0:32]) + s = big_endian_to_int(signature[32:64]) + v = signature[64] + 27 return v, r, s diff --git a/evm/utils/secp256k1.py b/evm/utils/secp256k1.py index 25b54983c5..af8b5f911a 100644 --- a/evm/utils/secp256k1.py +++ b/evm/utils/secp256k1.py @@ -19,15 +19,18 @@ def decode_public_key(public_key): - left = big_endian_to_int(public_key[1:33]) - right = big_endian_to_int(public_key[33:65]) + if len(public_key) != 64: + raise ValueError( + "Unexpected public key format: {}. Public keys must be 64 bytes long and must not " + "include the fixed \x04 prefix".format(public_key)) + left = big_endian_to_int(public_key[0:32]) + right = big_endian_to_int(public_key[32:64]) return left, right def encode_raw_public_key(raw_public_key): left, right = raw_public_key return b''.join(( - b'\x04', pad32(int_to_big_endian(left)), pad32(int_to_big_endian(right)), )) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/coincurve-ecc/test_coincurve_ecc_backend.py b/tests/coincurve-ecc/test_coincurve_ecc_backend.py deleted file mode 100644 index c3306695dd..0000000000 --- a/tests/coincurve-ecc/test_coincurve_ecc_backend.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import absolute_import - -import pytest - -from evm.ecc.backends.pure_python import PurePythonECCBackend -from evm.ecc import ( - get_ecc_backend, -) -from evm.utils.secp256k1 import ( - decode_public_key, -) - - -PRIVATE_KEY = ( - b'E\xa9\x15\xe4\xd0`\x14\x9e\xb46Y`\xe6\xa7\xa4_3C\x93\t0a\x11k\x19~2@\x06_\xf2\xd8' -) -PUBLIC_KEY = ( - b'\x04:QAvFo\xa8\x15\xedH\x1f\xfa\xd0\x91\x10\xa2\xd3D\xf6\xc9\xb7\x8c\x1d\x14\xaf\xc3Q\xc3\xa5\x1b\xe3=\x80r\xe7y9\xdc\x03\xbaDy\x07y\xb7\xa1\x02\x5b\xaf0\x03\xf6s$0\xe2\x0c\xd9\xb7m\x953\x91\xb3' # noqa: E501 -) -RAW_PUBLIC_KEY = decode_public_key(PUBLIC_KEY) - -MSG = b'my message' -MSG_HASH = b'#tpO\xbbmDaqK\xcb\xab\xebj\x16\x0c"E\x9ex\x1b\x08\\\x83lI\x08JG\x0e\xd6\xa4' - -SIGNATURE = ( - b'\x1bw\x84\xe4V\x19\x85\xaf\xeaj\xa9q\x9b\xcf\xc2\xbf\x17\x0c\x8c\xd7\xc3\xd5\xc0\x11\xbd\x80A\xc8\xdbJ\x97\x83\x07\x5b\xb1\xdaD\x00\xe1\xd9#W\x83\rF\xa5gx\xc9"\xdem\xbfu\xa2\x19\xfe\xc6\x83IM\xc7o\xfd\x7f' # noqa: E501 -) - -V = 27 -R = 54060028713369731575288880898058519584012347418583874062392262086259746767623 -S = 41474707565615897636207177895621376369577110960831782659442889110043833138559 - -pytest.importorskip('coincurve') -purePython = PurePythonECCBackend() - - -@pytest.fixture(autouse=True) -def with_coincurve_ecc_backend(monkeypatch): - monkeypatch.setenv( - 'CHAIN_ECC_BACKEND_CLASS', - 'evm.ecc.backends.coincurve.CoinCurveECCBackend', - ) - - -def test_ecdsa_sign(): - signature = get_ecc_backend().ecdsa_sign(MSG, PRIVATE_KEY) - assert signature == purePython.ecdsa_sign(MSG, PRIVATE_KEY) - - -def test_ecdsa_raw_sign(): - raw_signature = get_ecc_backend().ecdsa_raw_sign(MSG_HASH, PRIVATE_KEY) - assert raw_signature == purePython.ecdsa_raw_sign(MSG_HASH, PRIVATE_KEY) - - -def test_ecdsa_verify(): - is_valid = get_ecc_backend().ecdsa_verify(MSG, SIGNATURE, PUBLIC_KEY) - assert is_valid is purePython.ecdsa_verify(MSG, SIGNATURE, PUBLIC_KEY) - - -def test_ecdsa_raw_verify(): - is_valid = get_ecc_backend().ecdsa_raw_verify(MSG_HASH, (V, R, S), RAW_PUBLIC_KEY) - assert is_valid is purePython.ecdsa_raw_verify(MSG_HASH, (V, R, S), RAW_PUBLIC_KEY) - - -def test_ecdsa_recover(): - public_key = get_ecc_backend().ecdsa_recover(MSG, SIGNATURE) - assert public_key == purePython.ecdsa_recover(MSG, SIGNATURE) - - -def test_ecdsa_raw_recover(): - raw_public_key = get_ecc_backend().ecdsa_raw_recover(MSG_HASH, (V, R, S)) - assert raw_public_key == purePython.ecdsa_raw_recover(MSG_HASH, (V, R, S)) diff --git a/tests/core/ecdsa-utils/test_ecdsa_utils.py b/tests/core/ecdsa-utils/test_ecdsa_utils.py index 8a4610da77..dd6994abc3 100644 --- a/tests/core/ecdsa-utils/test_ecdsa_utils.py +++ b/tests/core/ecdsa-utils/test_ecdsa_utils.py @@ -1,6 +1,6 @@ -from evm.utils.keccak import ( - keccak, -) +import pytest + +from evm.utils.keccak import keccak from evm.utils.secp256k1 import ( decode_public_key, encode_raw_public_key, @@ -8,75 +8,27 @@ from evm.utils.ecdsa import ( encode_signature, decode_signature, - ecdsa_sign, - ecdsa_raw_sign, - ecdsa_verify, - ecdsa_raw_verify, ecdsa_verify_address, - ecdsa_recover, - ecdsa_raw_recover, -) - - -PRIVATE_KEY = ( - b'E\xa9\x15\xe4\xd0`\x14\x9e\xb46Y`\xe6\xa7\xa4_3C\x93\t0a\x11k\x19~2@\x06_\xf2\xd8' ) -PUBLIC_KEY = ( - b'\x04:QAvFo\xa8\x15\xedH\x1f\xfa\xd0\x91\x10\xa2\xd3D\xf6\xc9\xb7\x8c\x1d\x14\xaf\xc3Q\xc3\xa5\x1b\xe3=\x80r\xe7y9\xdc\x03\xbaDy\x07y\xb7\xa1\x02\x5b\xaf0\x03\xf6s$0\xe2\x0c\xd9\xb7m\x953\x91\xb3' # noqa: E501 +from tests.ecdsa_fixtures import ( + MSG, + SECRETS, ) -RAW_PUBLIC_KEY = decode_public_key(PUBLIC_KEY) -ADDRESS = ( - b'\xa9OSt\xfc\xe5\xed\xbc\x8e*\x86\x97\xc1S1g~n\xbf\x0b' -) - -MSG = b'my message' -MSG_HASH = b'#tpO\xbbmDaqK\xcb\xab\xebj\x16\x0c"E\x9ex\x1b\x08\\\x83lI\x08JG\x0e\xd6\xa4' - -V = 27 -R = 54060028713369731575288880898058519584012347418583874062392262086259746767623 -S = 41474707565615897636207177895621376369577110960831782659442889110043833138559 - - -assert keccak(MSG) == MSG_HASH - - -assert encode_raw_public_key(decode_public_key(PUBLIC_KEY)) == PUBLIC_KEY - - -def test_raw_signing(): - v, r, s = ecdsa_raw_sign(MSG_HASH, PRIVATE_KEY) - assert ecdsa_raw_verify(MSG_HASH, (v, r, s), RAW_PUBLIC_KEY) - - -def test_raw_recover(): - raw_public_key = ecdsa_raw_recover(MSG_HASH, (V, R, S)) - recovered_public_key = encode_raw_public_key(raw_public_key) - assert recovered_public_key == PUBLIC_KEY - - -def test_raw_verify(): - assert ecdsa_raw_verify(MSG_HASH, (V, R, S), RAW_PUBLIC_KEY) - - -def test_signature_encoding_and_decoding(): - signature = encode_signature(V, R, S) - v, r, s, = decode_signature(signature) - assert v == V - assert r == R - assert s == S -def test_signing_and_verifying_with_public_key(): - signature = ecdsa_sign(MSG, PRIVATE_KEY) - assert ecdsa_verify(MSG, signature, PUBLIC_KEY) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_encode_decode_raw_public_key(label, d): + assert encode_raw_public_key(decode_public_key(d['pubkey'])) == d['pubkey'] -def test_signing_and_verifying_with_address(): - signature = ecdsa_sign(MSG, PRIVATE_KEY) - assert ecdsa_verify_address(MSG, signature, ADDRESS) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_signature_encoding_and_decoding(label, d): + v, r, s, = decode_signature(d['sig']) + assert (v, r, s) == d['raw_sig'] + assert encode_signature(v, r, s) == d['sig'] -def test_recovering_public_key(): - signature = ecdsa_sign(MSG, PRIVATE_KEY) - recovered_public_key = ecdsa_recover(MSG, signature) - assert recovered_public_key == PUBLIC_KEY +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_verify_address(label, d): + addr = keccak(d['pubkey'])[-20:] + assert ecdsa_verify_address(MSG, d['sig'], addr) diff --git a/tests/core/secp256k1-utils/test_public_key_conversion.py b/tests/core/secp256k1-utils/test_public_key_conversion.py index a5fadb5ebf..d61af5a16e 100644 --- a/tests/core/secp256k1-utils/test_public_key_conversion.py +++ b/tests/core/secp256k1-utils/test_public_key_conversion.py @@ -4,16 +4,10 @@ private_key_to_public_key, ) +from tests.ecdsa_fixtures import SECRETS -@pytest.mark.parametrize( - 'private_key,expected', - ( - ( - b"\xe9\x87=y\xc6\xd8}\xc0\xfbjWxc3\x89\xf4E2\x130=\xa6\x1f \xbdg\xfc#:\xa32b", - b"\x04X\x8d *\xfc\xc1\xeeJ\xb5%LxG\xec%\xb9\xa15\xbb\xda\x0f+\xc6\x9e\xe1\xa7\x14t\x9f\xd7}\xc9\xf8\x8f\xf2\xa0\r~u-D\xcb\xe1n\x1e\xbc\xf0\x89\x0bv\xec|x\x88a\t\xde\xe7l\xcf\xc8DT$", # noqa: E501 - ), - ), -) -def test_private_key_to_public_key(private_key, expected): - actual = private_key_to_public_key(private_key) - assert actual == expected + +@pytest.mark.parametrize('label, d', sorted(SECRETS.items())) +def test_private_key_to_public_key(label, d): + actual = private_key_to_public_key(d['privkey']) + assert actual == d['pubkey'] diff --git a/tests/ecc/test_backends.py b/tests/ecc/test_backends.py new file mode 100644 index 0000000000..424249dd70 --- /dev/null +++ b/tests/ecc/test_backends.py @@ -0,0 +1,62 @@ +import pytest + +from rlp.utils import decode_hex + +from evm.ecc.backends.pure_python import PurePythonECCBackend +from evm.ecc.backends.coincurve import CoinCurveECCBackend +from evm.utils.ecdsa import decode_signature +from evm.utils.secp256k1 import ( + decode_public_key, + encode_raw_public_key, +) + +from tests.ecdsa_fixtures import ( + MSG, + MSGHASH, + SECRETS, +) + + +backends = [PurePythonECCBackend()] +try: + backends.append(CoinCurveECCBackend()) +except ImportError: + pass + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_sign(backend, label, d): + assert backend.ecdsa_sign(MSG, d['privkey']) == d['sig'] + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_raw_sign(backend, label, d): + assert backend.ecdsa_raw_sign(MSGHASH, d['privkey']) == d['raw_sig'] + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_verify(backend, label, d): + assert backend.ecdsa_verify(MSG, d['sig'], d['pubkey']) + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_raw_verify(backend, label, d): + assert backend.ecdsa_raw_verify(MSGHASH, decode_signature(d['sig']), decode_public_key(d['pubkey'])) + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_recover(backend, label, d): + pubkey = backend.ecdsa_recover(MSG, d['sig']) + assert pubkey == d['pubkey'] + + +@pytest.mark.parametrize("backend", backends) +@pytest.mark.parametrize("label, d", sorted(SECRETS.items())) +def test_ecdsa_raw_recover(backend, label, d): + raw_public_key = backend.ecdsa_raw_recover(MSGHASH, d['raw_sig']) + assert encode_raw_public_key(raw_public_key) == d['pubkey'] diff --git a/tests/ecdsa_fixtures.py b/tests/ecdsa_fixtures.py new file mode 100644 index 0000000000..7c00b6ded5 --- /dev/null +++ b/tests/ecdsa_fixtures.py @@ -0,0 +1,51 @@ +from rlp.utils import decode_hex + +from evm.utils.keccak import keccak + + +MSG = b'message' +MSGHASH = keccak(MSG) + +# This is a sample of signatures generated with a known-good implementation of the ECDSA +# algorithm, which we use to test our ECC backends. If necessary, it can be generated from scratch +# with the following code: +""" +from devp2p import crypto +from rlp.utils import encode_hex +msg = b'message' +msghash = crypto.sha3(b'message') +for secret in ['alice', 'bob', 'eve']: + print("'{}': dict(".format(secret)) + privkey = crypto.mk_privkey(secret) + pubkey = crypto.privtopub(privkey) + print(" privkey='{}',".format(encode_hex(privkey))) + print(" pubkey='{}',".format(encode_hex(crypto.privtopub(privkey)))) + ecc = crypto.ECCx(raw_privkey=privkey) + sig = ecc.sign(msghash) + print(" sig='{}',".format(encode_hex(sig))) + print(" raw_sig='{}')".format(crypto._decode_sig(sig))) + assert crypto.ecdsa_recover(msghash, sig) == pubkey +""" +SECRETS = { + "alice": dict( + privkey=decode_hex(b'9c0257114eb9399a2985f8e75dad7600c5d89fe3824ffa99ec1c3eb8bf3b0501'), + pubkey=decode_hex(b'5eed5fa3a67696c334762bb4823e585e2ee579aba3558d9955296d6c04541b426078dbd48d74af1fd0c72aa1a05147cf17be6b60bdbed6ba19b08ec28445b0ca'), + sig=decode_hex(b'b20e2ea5d3cbaa83c1e0372f110cf12535648613b479b64c1a8c1a20c5021f380434d07ec5795e3f789794351658e80b7faf47a46328f41e019d7b853745cdfd01'), + raw_sig=(28, + 80536744857756143861726945576089915884233437828013729338039544043241440681784, + 1902566422691403459035240420865094128779958320521066670269403689808757640701)), + "bob": dict( + privkey=decode_hex(b'38e47a7b719dce63662aeaf43440326f551b8a7ee198cee35cb5d517f2d296a2'), + pubkey=decode_hex(b'347746ccb908e583927285fa4bd202f08e2f82f09c920233d89c47c79e48f937d049130e3d1c14cf7b21afefc057f71da73dec8e8ff74ff47dc6a574ccd5d570'), + sig=decode_hex(b'5c48ea4f0f2257fa23bd25e6fcb0b75bbe2ff9bbda0167118dab2bb6e31ba76e691dbdaf2a231fc9958cd8edd99507121f8184042e075cf10f98ba88abff1f3601'), + raw_sig=(28, + 41741612198399299636429810387160790514780876799439767175315078161978521003886, + 47545396818609319588074484786899049290652725314938191835667190243225814114102)), + "eve": dict( + privkey=decode_hex(b'876be0999ed9b7fc26f1b270903ef7b0c35291f89407903270fea611c85f515c'), + pubkey=decode_hex(b'c06641f0d04f64dba13eac9e52999f2d10a1ff0ca68975716b6583dee0318d91e7c2aed363ed22edeba2215b03f6237184833fd7d4ad65f75c2c1d5ea0abecc0'), + sig=decode_hex(b'babeefc5082d3ca2e0bc80532ab38f9cfb196fb9977401b2f6a98061f15ed603603d0af084bf906b2cdf6cdde8b2e1c3e51a41af5e9adec7f3643b3f1aa2aadf00'), + raw_sig=(27, + 84467545608142925331782333363288012579669270632210954476013542647119929595395, + 43529886636775750164425297556346136250671451061152161143648812009114516499167)) + } From da167396c894c4b8c9e1ba0ddb3420850c393ccd Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Fri, 18 Aug 2017 12:10:40 -0300 Subject: [PATCH 02/10] WIP: P2P discovery implementation --- evm/p2p/discovery.py | 583 +++++++++++++++++++++++++++++++++++++++++++ evm/p2p/kademlia.py | 508 +++++++++++++++++++++++++++++++++++++ evm/p2p/p2p.py | 148 +++++++++++ evm/p2p/protocol.py | 132 ++++++++++ evm/p2p/upnp.py | 40 +++ 5 files changed, 1411 insertions(+) create mode 100644 evm/p2p/discovery.py create mode 100644 evm/p2p/kademlia.py create mode 100644 evm/p2p/p2p.py create mode 100644 evm/p2p/protocol.py create mode 100644 evm/p2p/upnp.py diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py new file mode 100644 index 0000000000..311d86f053 --- /dev/null +++ b/evm/p2p/discovery.py @@ -0,0 +1,583 @@ +import asyncio +import ipaddress +import struct +import time + +import rlp +from rlp.utils import decode_hex, encode_hex, is_integer, str_to_bytes, safe_ord + +from repoze.lru import LRUCache + +from evm.p2p import kademlia +from evm.p2p.p2p import host_port_pubkey_to_uri, host_port_pubkey_from_uri +from evm.p2p.upnp import add_portmap, remove_portmap +from evm.utils.secp256k1 import private_key_to_public_key +from evm.utils.keccak import keccak +from evm.utils.numeric import ( + big_endian_to_int, + int_to_big_endian, +) + +# XXX: Most of this code was lifted from pydevp2p + + +class DefectiveMessage(Exception): + pass + + +class WrongMAC(DefectiveMessage): + pass + + +class PacketExpired(DefectiveMessage): + pass + + +def int_to_big_endian4(integer): + ''' 4 bytes big endian integer''' + return struct.pack('>I', integer) + + +def enc_port(p): + return int_to_big_endian4(p)[-2:] + + +class Address(object): + + def __init__(self, ip, udp_port, tcp_port=0, from_binary=False): + tcp_port = tcp_port or udp_port + if from_binary: + self.udp_port = big_endian_to_int(udp_port) + self.tcp_port = big_endian_to_int(tcp_port) + else: + assert is_integer(udp_port) + assert is_integer(tcp_port) + self.udp_port = udp_port + self.tcp_port = tcp_port + try: + self._ip = ipaddress.ip_address(ip) + except ipaddress.AddressValueError as e: + log.debug("failed to parse ip", error=e, ip=ip) + raise e + + @property + def ip(self): + return str(self._ip) + + def update(self, addr): + if not self.tcp_port: + self.tcp_port = addr.tcp_port + + def __eq__(self, other): + # addresses equal if they share ip and udp_port + return (self.ip, self.udp_port) == (other.ip, other.udp_port) + + def __repr__(self): + return 'Address(%s:%s)' % (self.ip, self.udp_port) + + def to_dict(self): + return dict(ip=self.ip, udp_port=self.udp_port, tcp_port=self.tcp_port) + + def to_binary(self): + """ + struct Endpoint + unsigned address; // BE encoded 32-bit or 128-bit unsigned + (layer3 address; size determins ipv4 vs ipv6) + unsigned udpPort; // BE encoded 16-bit unsigned + unsigned tcpPort; // BE encoded 16-bit unsigned } + """ + return list((self._ip.packed, enc_port(self.udp_port), enc_port(self.tcp_port))) + to_endpoint = to_binary + + @classmethod + def from_binary(cls, ip, udp_port, tcp_port='\x00\x00'): + return cls(ip, udp_port, tcp_port, from_binary=True) + from_endpoint = from_binary + + +class Node(kademlia.Node): + + def __init__(self, pubkey, address=None): + super(Node, self).__init__(pubkey) + assert address is None or isinstance(address, Address) + self.address = address + self.reputation = 0 + self.rlpx_version = 0 + + @classmethod + def from_uri(cls, uri): + ip, port, pubkey = host_port_pubkey_from_uri(uri) + return cls(pubkey, Address(ip.decode(), int(port))) + + def to_uri(self): + return host_port_pubkey_to_uri( + str_to_bytes(self.address.ip), self.address.udp_port, self.pubkey) + + def __repr__(self): + return '' % (encode_hex(self.pubkey[:4]), self.address.ip) + + +""" +# Node Discovery Protocol + +**Node**: an entity on the network +**Node ID**: 512 bit public key of node + +The Node Discovery protocol provides a way to find RLPx nodes +that can be connected to. It uses a Kademlia-like protocol to maintain a +distributed database of the IDs and endpoints of all listening nodes. + +Each node keeps a node table as described in the Kademlia paper +[[Maymounkov, Mazières 2002][kad-paper]]. The node table is configured +with a bucket size of 16 (denoted `k` in Kademlia), concurrency of 3 +(denoted `α` in Kademlia), and 8 bits per hop (denoted `b` in +Kademlia) for routing. The eviction check interval is 75 milliseconds, +and the idle bucket-refresh interval is +3600 seconds. + +In order to maintain a well-formed network, RLPx nodes should try to connect +to an unspecified number of close nodes. To increase resilience against Sybil attacks, +nodes should also connect to randomly chosen, non-close nodes. + +Each node runs the UDP-based RPC protocol defined below. The +`FIND_DATA` and `STORE` requests from the Kademlia paper are not part +of the protocol since the Node Discovery Protocol does not provide DHT +functionality. + +[kad-paper]: http://www.cs.rice.edu/Conferences/IPTPS02/109.pdf + +## Joining the network + +When joining the network, fills its node table by perfoming a +recursive Find Node operation with its own ID as the `Target`. The +initial Find Node request is sent to one or more bootstrap nodes. + +## RPC Protocol + +RLPx nodes that want to accept incoming connections should listen on +the same port number for UDP packets (Node Discovery Protocol) and +TCP connections (RLPx protocol). + +All requests time out after are 300ms. Requests are not re-sent. + +""" + + +def devp2p_ecdsa_sign(msghash, privkey): + from secp256k1 import PrivateKey + from rlp.utils import ascii_chr + assert len(msghash) == 32 + pk = PrivateKey(privkey, raw=True) + signature = pk.ecdsa_recoverable_serialize( + pk.ecdsa_sign_recoverable( + msghash, raw=True)) + new = signature[0] + ascii_chr(signature[1]) + return new + + +def devp2p_ecdsa_recover(message, signature): + assert len(signature) == 65 + from secp256k1 import PublicKey, ALL_FLAGS + pk = PublicKey(flags=ALL_FLAGS) + pk.public_key = pk.ecdsa_recover( + message, + pk.ecdsa_recoverable_deserialize( + signature[:64], + safe_ord(signature[64])), + raw=True + ) + return pk.serialize(compressed=False)[1:] + + +class DiscoveryProtocol(asyncio.DatagramProtocol): + + """ + ## Packet Data + All packets contain an `Expiration` date to guard against replay attacks. + The date should be interpreted as a UNIX timestamp. + The receiver should discard any packet whose `Expiration` value is in the past. + """ + version = 4 + expiration = 60 # let messages expire after N secondes + cmd_id_map = dict(ping=1, pong=2, find_node=3, neighbours=4) + rev_cmd_id_map = dict((v, k) for k, v in cmd_id_map.items()) + + # number of required top-level list elements for each cmd_id. + # elements beyond this length are trimmed. + cmd_elem_count_map = dict(ping=4, pong=3, find_node=2, neighbours=2) + + encoders = dict(cmd_id=chr, + expiration=rlp.sedes.big_endian_int.serialize) + + decoders = dict(cmd_id=safe_ord, + expiration=rlp.sedes.big_endian_int.deserialize) + + def __init__(self, transport): + self.transport = transport + self.privkey = decode_hex(config['privkey_hex']) + self.pubkey = private_key_to_public_key(self.privkey) + self.nodes = LRUCache(2048) # nodeid->Node, fixme should be loaded + self.this_node = Node(self.pubkey, self.transport.address) + self.kademlia = kademlia.KademliaProtocol(self.this_node, wire=self) + this_enode = host_port_pubkey_to_uri( + config['listen_host'], config['listen_port'], self.pubkey) + log.info('starting discovery proto', this_enode=this_enode) + + def get_node(self, nodeid, address=None): + "return node or create new, update address if supplied" + assert isinstance(nodeid, bytes) + assert len(nodeid) == 512 // 8 + assert address or self.nodes.get(nodeid) + if not self.nodes.get(nodeid): + self.nodes.put(nodeid, Node(nodeid, address)) + node = self.nodes.get(nodeid) + if address: + assert isinstance(address, Address) + node.address = address + assert node.address + return node + + def pack(self, cmd_id, payload): + """ + UDP packets are structured as follows: + + hash || signature || packet-type || packet-data + packet-type: single byte < 2**7 // valid values are [1,4] + packet-data: RLP encoded list. Packet properties are serialized in the order in + which they're defined. See packet-data below. + + Offset | + 0 | MDC | Ensures integrity of packet, + 65 | signature | Ensures authenticity of sender, `SIGN(sender-privkey, MDC)` + 97 | type | Single byte in range [1, 4] that determines the structure of Data + 98 | data | RLP encoded, see section Packet Data + + The packets are signed and authenticated. The sender's Node ID is determined by + recovering the public key from the signature. + + sender-pubkey = ECRECOVER(Signature) + + The integrity of the packet can then be verified by computing the + expected MDC of the packet as: + + MDC = SHA3(sender-pubkey || type || data) + + As an optimization, implementations may look up the public key by + the UDP sending address and compute MDC before recovering the sender ID. + If the MDC values do not match, the packet can be dropped. + """ + assert cmd_id in self.cmd_id_map.values() + assert isinstance(payload, list) + + cmd_id = str_to_bytes(self.encoders['cmd_id'](cmd_id)) + expiration = self.encoders['expiration'](int(time.time() + self.expiration)) + encoded_data = cmd_id + rlp.encode(payload + [expiration]) + # XXX (gsalgado): Need to figure out why we need to use devp2p_ecdsa_sign + signature = devp2p_ecdsa_sign(keccak(encoded_data), self.privkey) + # signature = ecdsa_sign(encoded_data, self.privkey) + assert len(signature) == 65 + mdc = keccak(signature + encoded_data) + assert len(mdc) == 32 + return mdc + signature + encoded_data + + def unpack(self, message): + """ + macSize = 256 / 8 = 32 + sigSize = 520 / 8 = 65 + headSize = macSize + sigSize = 97 + hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:] + shouldhash := keccak(buf[macSize:]) + """ + mdc = message[:32] + if mdc != keccak(message[32:]): + log.debug('packet with wrong mcd') + raise WrongMAC() + signature = message[32:97] + assert len(signature) == 65 + signed_data = keccak(message[97:]) + remote_pubkey = devp2p_ecdsa_recover(signed_data, signature) + assert len(remote_pubkey) == 512 / 8 + cmd_id = self.decoders['cmd_id'](message[97]) + cmd = self.rev_cmd_id_map[cmd_id] + payload = rlp.decode(message[98:], strict=False) + assert isinstance(payload, list) + # ignore excessive list elements as required by EIP-8. + payload = payload[:self.cmd_elem_count_map.get(cmd, len(payload))] + return remote_pubkey, cmd_id, payload, mdc + + @asyncio.coroutine + def receive(self, address, message): + assert isinstance(address, Address) + try: + remote_pubkey, cmd_id, payload, mdc = self.unpack(message) + # Note: as of discovery version 4, expiration is the last element for all + # packets. This might not be the case for a later version, but just popping + # the last element is good enough for now. + expiration = self.decoders['expiration'](payload.pop()) + if time.time() > expiration: + raise PacketExpired() + except DefectiveMessage: + return + cmd = getattr(self, 'recv_' + self.rev_cmd_id_map[cmd_id]) + nodeid = remote_pubkey + remote = self.get_node(nodeid, address) + log.debug("Dispatching received message", local=self.this_node, remoteid=remote, + cmd=self.rev_cmd_id_map[cmd_id]) + yield from cmd(nodeid, payload, mdc) + + @asyncio.coroutine + def send(self, node, message): + assert node.address + yield self.transport.send(node.address, message) + + @asyncio.coroutine + def send_ping(self, node): + """ + ### Ping (type 0x01) + + Ping packets can be sent and received at any time. The receiver should + reply with a Pong packet and update the IP/Port of the sender in its + node table. + + PingNode packet-type: 0x01 + + PingNode packet-type: 0x01 + struct PingNode <= 59 bytes + { + h256 version = 0x3; <= 1 + Endpoint from; <= 23 + Endpoint to; <= 23 + unsigned expiration; <= 9 + }; + + struct Endpoint <= 24 == [17,3,3] + { + unsigned address; // BE encoded 32-bit or 128-bit unsigned + (layer3 address; size determins ipv4 vs ipv6) + unsigned udpPort; // BE encoded 16-bit unsigned + unsigned tcpPort; // BE encoded 16-bit unsigned + } + """ + assert isinstance(node, type(self.this_node)) and node != self.this_node + log.debug('>>> ping', remoteid=node) + version = rlp.sedes.big_endian_int.serialize(self.version) + ip = config['listen_host'] + udp_port = config['listen_port'] + tcp_port = config['p2p_listen_port'] + payload = [version, + Address(ip, udp_port, tcp_port).to_endpoint(), + node.address.to_endpoint()] + assert len(payload) == 3 + message = self.pack(self.cmd_id_map['ping'], payload) + yield from self.send(node, message) + return message[:32] # return the MDC to identify pongs + + @asyncio.coroutine + def recv_pong(self, nodeid, payload, mdc): + if not len(payload) == 2: + log.error('invalid pong payload', payload=payload) + return + assert len(payload[0]) == 3, payload + + # Verify address is valid + Address.from_endpoint(*payload[0]) + echoed = payload[1] + if self.nodes.get(nodeid): + node = self.get_node(nodeid) + yield from self.kademlia.recv_pong(node, echoed) + else: + log.debug('<<< unexpected pong from unkown node') + + @asyncio.coroutine + def send_find_node(self, node, target_node_id): + """ + ### Find Node (type 0x03) + + Find Node packets are sent to locate nodes close to a given target ID. + The receiver should reply with a Neighbors packet containing the `k` + nodes closest to target that it knows about. + + FindNode packet-type: 0x03 + struct FindNode <= 76 bytes + { + NodeId target; // Id of a node. The responding node will send back nodes closest + to the target. + unsigned expiration; + }; + """ + assert is_integer(target_node_id) + target_node_id = int_to_big_endian( + target_node_id).rjust(kademlia.k_pubkey_size // 8, b'\0') + assert len(target_node_id) == kademlia.k_pubkey_size // 8 + log.debug('>>> find_node', remoteid=node) + message = self.pack(self.cmd_id_map['find_node'], [target_node_id]) + yield from self.send(node, message) + + @asyncio.coroutine + def recv_neighbours(self, nodeid, payload, mdc): + remote = self.get_node(nodeid) + assert len(payload) == 1 + neighbours_lst = payload[0] + assert isinstance(neighbours_lst, list) + + neighbours = [] + for n in neighbours_lst: + nodeid = n.pop() + address = Address.from_endpoint(*n) + node = self.get_node(nodeid, address) + assert node.address + neighbours.append(node) + + yield from self.kademlia.recv_neighbours(remote, neighbours) + + # NOTE(gsalgado): Does a light client need to listen/reply to those messages? Need to find out + @asyncio.coroutine + def recv_ping(self, nodeid, payload, mdc): + """ + update ip, port in node table + Addresses can only be learned by ping messages + """ + if not len(payload) == 3: + log.error('invalid ping payload', payload=payload) + return + node = self.get_node(nodeid) + remote_address = Address.from_endpoint(*payload[1]) # from address + # my_address = Address.from_endpoint(*payload[2]) # my address + self.get_node(nodeid).address.update(remote_address) + yield from self.kademlia.recv_ping(node, echo=mdc) + + @asyncio.coroutine + def send_pong(self, node, token): + """ + ### Pong (type 0x02) + + Pong is the reply to a Ping packet. + + Pong packet-type: 0x02 + struct Pong <= 66 bytes + { + Endpoint to; + h256 echo; + unsigned expiration; + }; + """ + log.debug('>>> pong', remoteid=node) + payload = [node.address.to_endpoint(), token] + assert len(payload[0][0]) in (4, 16), payload + message = self.pack(self.cmd_id_map['pong'], payload) + yield from self.send(node, message) + + @asyncio.coroutine + def recv_find_node(self, nodeid, payload, mdc): + node = self.get_node(nodeid) + log.debug('<<< find_node', remoteid=node) + assert len(payload[0]) == kademlia.k_pubkey_size / 8 + target = big_endian_to_int(payload[0]) + yield from self.kademlia.recv_find_node(node, target) + + @asyncio.coroutine + def send_neighbours(self, node, neighbours): + """ + ### Neighbors (type 0x04) + + Neighbors is the reply to Find Node. It contains up to `k` nodes that + the sender knows which are closest to the requested `Target`. + + Neighbors packet-type: 0x04 + struct Neighbours <= 1423 + { + list nodes: struct Neighbour <= 88: 1411; 76: 1219 + { + inline Endpoint endpoint; + NodeId node; + }; + + unsigned expiration; + }; + """ + assert isinstance(neighbours, list) + assert not neighbours or isinstance(neighbours[0], Node) + nodes = [] + neighbours = sorted(neighbours) + for n in neighbours: + l = n.address.to_endpoint() + [n.pubkey] + nodes.append(l) + log.debug('>>> neighbours', remoteid=node, count=len(nodes), local=self.this_node, + neighbours=neighbours) + # FIXME: don't brake udp packet size / chunk message / also when receiving + message = self.pack(self.cmd_id_map['neighbours'], [nodes[:12]]) # FIXME + yield from self.send(node, message) + + +class NodeDiscovery(): + bootstrapped = False + + def __init__(self): + self.address = Address(config['listen_host'], config['listen_port']) + self.protocol = DiscoveryProtocol(transport=self) + self.nat_upnp = add_portmap(config['listen_port'], 'UDP', 'Ethereum DEVP2P Discovery') + + def connection_made(self, transport): + log.info('connection_made()') + self.transport = transport + if not self.bootstrapped: + # XXX: Is this the right place to run the bootstrap? + self.bootstrapped = True + nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] + if nodes: + asyncio.ensure_future(self.protocol.kademlia.bootstrap(nodes)) + + def send(self, address, message): + self.transport.sendto(message, (address.ip, address.udp_port)) + + def datagram_received(self, data, addr): + asyncio.ensure_future(self.protocol.receive( + Address(ip=addr[0], udp_port=addr[1]), data)) + + def stop(self): + log.info('stopping discovery') + self.transport.close() + remove_portmap(self.nat_upnp, config['listen_port'], 'UDP') + + +config = { + 'privkey_hex': '65462b0520ef7d3df61b9992ed3bea0c56ead753be7c8b3614e0ce01e4cac41b', + # 'privkey_hex': '45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8', + 'listen_host': '0.0.0.0', + 'listen_port': 30303, + 'p2p_listen_port': 30303, + 'bootstrap_nodes': [ + b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 + # b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501 + # b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 + # b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501 + # b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501 + # b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501 + # b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501 + ], +} + + +if __name__ == "__main__": + import logging + logging.getLogger('asyncio').setLevel(logging.DEBUG) + + loop = asyncio.get_event_loop() + loop.set_debug(True) + + from structlog import get_logger + log = get_logger() + + discovery = NodeDiscovery() + ip = config['listen_host'] + port = config['listen_port'] + listen = loop.create_datagram_endpoint(lambda: discovery, local_addr=(ip, port)) + _, _ = loop.run_until_complete(listen) + + try: + loop.run_forever() + except KeyboardInterrupt: + pass + log.info("Pending tasks at exit: {}".format(asyncio.Task.all_tasks(loop))) + discovery.stop() + loop.close() diff --git a/evm/p2p/kademlia.py b/evm/p2p/kademlia.py new file mode 100644 index 0000000000..b5b16fb61d --- /dev/null +++ b/evm/p2p/kademlia.py @@ -0,0 +1,508 @@ +""" +Node discovery and network formation are implemented via a kademlia-like protocol. +The major differences are that packets are signed, node ids are the public keys, and +DHT-related features are excluded. The FIND_VALUE and STORE packets are not implemented. +The parameters necessary to implement the protocol are a +bucket size of 16 (denoted k in Kademlia), +concurrency of 3 (denoted alpha in Kademlia), +and 8 bits per hop (denoted b in Kademlia) for routing. +The eviction check interval is 75 milliseconds, +request timeouts are 300ms, and +the idle bucket-refresh interval is 3600 seconds. + +Aside from the previously described exclusions, node discovery closely follows system +and protocol described by Maymounkov and Mazieres. +""" +import asyncio +import operator +import random +import time +from functools import total_ordering + +from rlp.utils import is_integer, str_to_bytes + +from evm.utils.keccak import keccak +from evm.utils.numeric import big_endian_to_int + +# XXX: Most of this code was lifted from pydevp2p +from structlog import get_logger +log = get_logger() + +k_b = 8 # 8 bits per hop + +k_bucket_size = 16 +k_request_timeout = 3 * 300 / 1000. # timeout of message round trips +k_idle_bucket_refresh_interval = 3600 # ping all nodes in bucket if bucket was idle +k_find_concurrency = 3 # parallel find node lookups +k_pubkey_size = 512 +k_id_size = 256 +k_max_node_id = 2 ** k_id_size - 1 + + +def random_nodeid(): + return random.randint(0, k_max_node_id) + + +@total_ordering +class Node(object): + + def __init__(self, pubkey): + self.pubkey = pubkey + if k_id_size == 512: + self.id = big_endian_to_int(pubkey) + else: + assert k_id_size == 256 + self.id = big_endian_to_int(keccak(pubkey)) + + def distance(self, other): + return self.id ^ other.id + + def id_distance(self, id): + return self.id ^ id + + def __lt__(self, other): + if not isinstance(other, self.__class__): + return super(Node, self).__lt__(other) + return self.id < other.id + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return super(Node, self).__eq__(other) + return self.pubkey == other.pubkey + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(self.pubkey) + + +class KBucket(object): + + """ + Each k-bucket is kept sorted by time last seen—least-recently seen node at the head, + most-recently seen at the tail. For small values of i, the k-buckets will generally + be empty (as no appro- priate nodes will exist). For large values of i, the lists can + grow up to size k, where k is a system-wide replication parameter. + k is chosen such that any given k nodes are very unlikely to fail within an hour of + each other (for example k = 20). + """ + k = k_bucket_size + + def __init__(self, start, end): + self.start = start + self.end = end + self.nodes = [] + self.replacement_cache = [] + self.last_updated = time.time() + + @property + def range(self): + return self.start, self.end + + @property + def midpoint(self): + return self.start + (self.end - self.start) // 2 + + def distance(self, node): + return self.midpoint ^ node.id + + def id_distance(self, id): + return self.midpoint ^ id + + def nodes_by_id_distance(self, id): + assert is_integer(id) + return sorted(self.nodes, key=operator.methodcaller('id_distance', id)) + + @property + def should_split(self): + depth = self.depth + return self.is_full and (depth % k_b != 0 and depth != k_id_size) + + def split(self): + "split at the median id" + + splitid = self.midpoint + lower = KBucket(self.start, splitid) + upper = KBucket(splitid + 1, self.end) + # distribute nodes + for node in self.nodes: + bucket = lower if node.id <= splitid else upper + bucket.add_node(node) + # distribute replacement nodes + for node in self.replacement_cache: + bucket = lower if node.id <= splitid else upper + bucket.replacement_cache.append(node) + + return lower, upper + + def remove_node(self, node): + if node not in self.nodes: + return + self.nodes.remove(node) + + def in_range(self, node): + return self.start <= node.id <= self.end + + @property + def is_full(self): + return len(self) == k_bucket_size + + def add_node(self, node): + """ + If the sending node already exists in the recipient’s k- bucket, + the recipient moves it to the tail of the list. + + If the node is not already in the appropriate k-bucket + and the bucket has fewer than k entries, + then the recipient just inserts the new sender at the tail of the list. + + If the appropriate k-bucket is full, however, + then the recipient pings the k-bucket’s least-recently seen node to decide what to do. + + on success: return None + on bucket full: return least recently seen Node for eviction check + + """ + self.last_updated = time.time() + if node in self.nodes: # already exists + self.nodes.remove(node) + self.nodes.append(node) + elif len(self) < self.k: # add if fewer than k entries + self.nodes.append(node) + else: # bucket is full + return self.head + + @property + def head(self): + "least recently seen" + return self.nodes[0] + + @property + def tail(self): + "last recently seen" + return self.nodes[-1] + + @property + def depth(self): + """ + depth is the prefix shared by all nodes in bucket + i.e. the number of shared leading bits + """ + def to_binary(x): # left padded bit representation + b = bin(x)[2:] + return '0' * (k_id_size - len(b)) + b + + if len(self.nodes) < 2: + return k_id_size + + bits = [to_binary(n.id) for n in self.nodes] + for i in range(k_id_size): + if len(set(b[:i] for b in bits)) != 1: + return i - 1 + raise Exception + + def __contains__(self, node): + return node in self.nodes + + def __len__(self): + return len(self.nodes) + + +class RoutingTable(object): + + def __init__(self, node): + self.this_node = node + self.buckets = [KBucket(0, k_max_node_id)] + + def split_bucket(self, bucket): + a, b = bucket.split() + index = self.buckets.index(bucket) + self.buckets[index] = a + self.buckets.insert(index + 1, b) + + @property + def idle_buckets(self): + one_hour_ago = time.time() - k_idle_bucket_refresh_interval + return [b for b in self.buckets if b.last_updated < one_hour_ago] + + @property + def not_full_buckets(self): + return [b for b in self.buckets if len(b) < k_bucket_size] + + def remove_node(self, node): + self.bucket_by_node(node).remove_node(node) + + def add_node(self, node): + assert node != self.this_node + bucket = self.bucket_by_node(node) + eviction_candidate = bucket.add_node(node) + if eviction_candidate: # bucket is full + # split if the bucket has the local node in its range + # or if the depth is not congruent to 0 mod k_b + depth = bucket.depth + if bucket.in_range(self.this_node) or (depth % k_b != 0 and depth != k_id_size): + self.split_bucket(bucket) + return self.add_node(node) # retry + # nothing added, ping eviction_candidate + return eviction_candidate + return None # successfully added to not full bucket + + def bucket_by_node(self, node): + for bucket in self.buckets: + if node.id < bucket.end: + assert node.id >= bucket.start + return bucket + raise Exception + + def buckets_by_id_distance(self, id): + assert is_integer(id) + return sorted(self.buckets, key=operator.methodcaller('id_distance', id)) + + def buckets_by_distance(self, node): + assert isinstance(node, Node) + return self.buckets_by_id_distance(node.id) + + def __contains__(self, node): + return node in self.bucket_by_node(node) + + def __len__(self): + return sum(len(b) for b in self.buckets) + + def __iter__(self): + for b in self.buckets: + for n in b.nodes: + yield n + + def neighbours(self, node, k=k_bucket_size): + """ + sorting by bucket.midpoint does not work in edge cases + build a short list of k * 2 nodes and sort and shorten it + """ + assert isinstance(node, Node) or is_integer(node) + if isinstance(node, Node): + node = node.id + nodes = [] + for bucket in self.buckets_by_id_distance(node): + for n in bucket.nodes_by_id_distance(node): + if n is not node: + nodes.append(n) + if len(nodes) == k * 2: + break + return sorted(nodes, key=operator.methodcaller('id_distance', node))[:k] + + def neighbours_within_distance(self, id, distance): + """ + naive correct version simply compares all nodes + """ + assert is_integer(id) + nodes = list(n for n in self if n.id_distance(id) <= distance) + return sorted(nodes, key=operator.methodcaller('id_distance', id)) + + +# XXX: Should this be a asyncio.DatagramProtocol? +class KademliaProtocol(object): + + def __init__(self, node, wire): + assert isinstance(node, Node) # the local node + self.this_node = node + self.wire = wire + self.routing = RoutingTable(node) + self._expected_pongs = dict() # pingid -> (timeout, node, replacement_node) + self._find_requests = dict() # nodeid -> timeout + self._deleted_pingids = set() + + @asyncio.coroutine + def bootstrap(self, nodes): + assert isinstance(nodes, list) + for node in nodes: + if node == self.this_node: + continue + self.routing.add_node(node) + yield from self.find_node(self.this_node.id, via_node=node) + + @asyncio.coroutine + def update(self, node, pingid=None): + """ + When a Kademlia node receives any message (request or reply) from another node, + it updates the appropriate k-bucket for the sender’s node ID. + + If the sending node already exists in the recipient’s k- bucket, + the recipient moves it to the tail of the list. + + If the node is not already in the appropriate k-bucket + and the bucket has fewer than k entries, + then the recipient just inserts the new sender at the tail of the list. + + If the appropriate k-bucket is full, however, + then the recipient pings the k-bucket’s least-recently seen node to decide what to do. + + If the least-recently seen node fails to respond, + it is evicted from the k-bucket and the new sender inserted at the tail. + + Otherwise, if the least-recently seen node responds, + it is moved to the tail of the list, and the new sender’s contact is discarded. + + k-buckets effectively implement a least-recently seen eviction policy, + except that live nodes are never removed from the list. + """ + assert isinstance(node, Node) + + if node == self.this_node: + return + + def _expected_pongs(): + return set(v[1] for v in self._expected_pongs.values()) + + if pingid and (pingid not in self._expected_pongs): + assert pingid not in self._expected_pongs + return + + # check for timed out pings and eventually evict them + for _pingid, (timeout, _node, replacement) in list(self._expected_pongs.items()): + if time.time() > timeout: + self._deleted_pingids.add(_pingid) # FIXME this is for testing + del self._expected_pongs[_pingid] + self.routing.remove_node(_node) + if replacement: + self.update(replacement) + return + if _node == node: # prevent node from being added later + return + + # if we had registered this node for eviction test + if pingid in self._expected_pongs: + timeout, _node, replacement = self._expected_pongs[pingid] + if replacement: + self.routing.bucket_by_node(replacement).replacement_cache.append(replacement) + del self._expected_pongs[pingid] + + # add node + eviction_candidate = self.routing.add_node(node) + if eviction_candidate: + # protocol should ping bucket head and evict if there is no response + yield from self.ping(eviction_candidate, replacement=node) + + # check for not full buckets and ping replacements + for bucket in self.routing.not_full_buckets: + for node in bucket.replacement_cache: + yield from self.ping(node) + + # check idle buckets + """ + idle bucket refresh: + for each bucket which hasn't been touched in 3600 seconds + pick a random value in the range of the bucket and perform discovery for that value + """ + for bucket in self.routing.idle_buckets: + rid = random.randint(bucket.start, bucket.end) + yield from self.find_node(rid) + + # check and removed timed out find requests + self._find_requests = { + nodeid: timeout + for nodeid, timeout in self._find_requests.items() + if time.time() <= timeout + } + + def _mkpingid(self, echoed, node): + pid = str_to_bytes(echoed) + node.pubkey + return pid + + @asyncio.coroutine + def ping(self, node, replacement=None): + """ + successful pings should lead to an update + if bucket is not full + elif least recently seen, does not respond in time + """ + assert isinstance(node, Node) + assert node != self.this_node + echoed = yield from self.wire.send_ping(node) + pingid = self._mkpingid(echoed, node) + assert pingid + timeout = time.time() + k_request_timeout + self._expected_pongs[pingid] = (timeout, node, replacement) + + @asyncio.coroutine + def recv_ping(self, remote, echo): + "udp addresses determined by socket address of revd Ping packets" # ok + "tcp addresses determined by contents of Ping packet" # not yet + log.debug('<<< ping', node=remote) + assert isinstance(remote, Node) + if remote == self.this_node: + return + yield from self.update(remote) + yield from self.wire.send_pong(remote, echo) + + @asyncio.coroutine + def recv_pong(self, remote, echoed): + "tcp addresses are only updated upon receipt of Pong packet" + log.debug('<<< pong', remoteid=remote) + assert remote != self.this_node + pingid = self._mkpingid(echoed, remote) + # update address (clumsy fixme) + if hasattr(remote, 'address'): # not available in tests + nnodes = self.routing.neighbours(remote) + if nnodes and nnodes[0] == remote: + nnodes[0].address = remote.address # updated tcp address + # update rest + yield from self.update(remote, pingid) + + @asyncio.coroutine + def _query_neighbours(self, targetid): + for n in self.routing.neighbours(targetid)[:k_find_concurrency]: + yield from self.wire.send_find_node(n, targetid) + + @asyncio.coroutine + def find_node(self, targetid, via_node=None): + # FIXME, amplification attack (need to ping pong ping pong first) + assert is_integer(targetid) + assert not via_node or isinstance(via_node, Node) + self._find_requests[targetid] = time.time() + k_request_timeout + if via_node: + yield from self.wire.send_find_node(via_node, targetid) + else: + yield from self._query_neighbours(targetid) + # FIXME, should we return the closest node (allow callbacks on find_request) + + @asyncio.coroutine + def recv_neighbours(self, remote, neighbours): + """ + if one of the neighbours is closer than the closest known neighbour + if not timed out + query closest node for neighbours + add all nodes to the list + """ + log.debug('<<< neighbours', remoteid=remote, neighbours=neighbours) + assert isinstance(neighbours, list) + neighbours = [n for n in neighbours if n != self.this_node] + neighbours = [n for n in neighbours if n not in self.routing] + + # we don't map requests to responses, thus forwarding to all FIXME + for nodeid, timeout in self._find_requests.items(): + assert is_integer(nodeid) + closest = sorted(neighbours, key=operator.methodcaller('id_distance', nodeid)) + if time.time() < timeout: + closest_known = self.routing.neighbours(nodeid) + closest_known = closest_known[0] if closest_known else None + assert closest_known != self.this_node + # send find_node requests to k_find_concurrency closests + for close_node in closest[:k_find_concurrency]: + if not closest_known or \ + close_node.id_distance(nodeid) < closest_known.id_distance(nodeid): + yield from self.wire.send_find_node(close_node, nodeid) + + # add all nodes to the list + for node in neighbours: + if node != self.this_node: + yield from self.ping(node) + + @asyncio.coroutine + def recv_find_node(self, remote, targetid): + # FIXME, amplification attack (need to ping pong ping pong first) + assert isinstance(remote, Node) + assert is_integer(targetid) + yield from self.update(remote) + found = self.routing.neighbours(targetid) + yield from self.wire.send_neighbours(remote, found) diff --git a/evm/p2p/p2p.py b/evm/p2p/p2p.py new file mode 100644 index 0000000000..2212fe0c21 --- /dev/null +++ b/evm/p2p/p2p.py @@ -0,0 +1,148 @@ +import rlp + +from rlp.utils import ( + bytes_to_str, + decode_hex, + encode_hex, + str_to_bytes, +) + +from evm.p2p.protocol import ( + BaseProtocol, + Command, + Packet, +) + + +class P2PProtocol(BaseProtocol): + + """ + DEV P2P Wire Protocol + https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol + """ + protocol_id = 0 + name = b'p2p' + version = 4 + max_cmd_id = 15 + + def __init__(self, peer, service): + # required by P2PProtocol + self.config = peer.config + assert hasattr(peer, 'capabilities') + assert callable(peer.stop) + assert callable(peer.receive_hello) + super(P2PProtocol, self).__init__(peer, service) + + @classmethod + def get_hello_packet(cls, peer): + "special: we need this packet before the protocol can be initalized" + res = dict(version=cls.version, + client_version_string=peer.config['client_version_string'], + capabilities=peer.capabilities, + listen_port=peer.config['p2p']['listen_port'], + remote_pubkey=peer.config['node']['id']) + payload = cls.hello.encode_payload(res) + return Packet(cls.protocol_id, cls.hello.cmd_id, payload=payload) + + +class ping(Command): + cmd_id = 2 + + def receive(self, proto, data): + proto.send_pong() + + +class pong(Command): + cmd_id = 3 + + +class hello(Command): + cmd_id = 0 + structure = [ + ('version', rlp.sedes.big_endian_int), + ('client_version_string', rlp.sedes.binary), + ('capabilities', rlp.sedes.CountableList( + rlp.sedes.List([rlp.sedes.binary, rlp.sedes.big_endian_int]))), + ('listen_port', rlp.sedes.big_endian_int), + ('remote_pubkey', rlp.sedes.binary) + ] + # don't throw for additional list elements as + # mandated by EIP-8. + decode_strict = False + + def create(self, proto): + return dict(version=proto.version, + client_version_string=proto.config['client_version_string'], + capabilities=proto.peer.capabilities, + listen_port=proto.config['p2p']['listen_port'], + remote_pubkey=proto.config['node']['id'], + ) + + def receive(self, proto, data): + reasons = proto.disconnect.reason + if data['remote_pubkey'] == proto.config['node']['id']: + return proto.send_disconnect(reason=reasons.connected_to_self) + + proto.peer.receive_hello(proto, **data) + # super(hello, self).receive(proto, data) + Command.receive(self, proto, data) + + +class disconnect(Command): + cmd_id = 1 + structure = [('reason', rlp.sedes.big_endian_int)] + + class reason(object): + disconnect_requested = 0 + tcp_sub_system_error = 1 + bad_protocol = 2 # e.g. a malformed message, bad RLP, incorrect magic number + useless_peer = 3 + too_many_peers = 4 + already_connected = 5 + incompatibel_p2p_version = 6 + null_node_identity_received = 7 + client_quitting = 8 + unexpected_identity = 9 # i.e. a different identity to a previous connection or + # what a trusted peer told us + connected_to_self = 10 + timeout = 11 # i.e. nothing received since sending last ping + subprotocol_error = 12 + other = 16 # Some other reason specific to a subprotocol + + def reason_name(self, _id): + d = dict((_id, name) for name, _id in self.reason.__dict__.items()) + return d.get(_id, 'unknown (id:{})'.format(_id)) + + def create(self, proto, reason=reason.client_quitting): + assert self.reason_name(reason) + proto.peer.report_error('sending disconnect %s' % self.reason_name(reason)) + # Defer disconnect until message is sent out. + # gevent.spawn_later(0.5, proto.peer.stop) + return dict(reason=reason) + + def receive(self, proto, data): + proto.peer.report_error('disconnected %s' % self.reason_name(data['reason'])) + proto.peer.stop() + + +node_uri_scheme = 'enode://' + + +def host_port_pubkey_from_uri(uri): + b_node_uri_scheme = str_to_bytes(node_uri_scheme) + assert uri.startswith(b_node_uri_scheme) and \ + b'@' in uri and b':' in uri, uri + pubkey_hex, ip_port = uri[len(b_node_uri_scheme):].split(b'@') + # XXX(gsalgado): commented out because our pubkeys have len == 65 + # assert len(pubkey_hex) == 2 * 512 // 8 + ip, port = ip_port.split(b':') + return ip, port, decode_hex(pubkey_hex) + + +def host_port_pubkey_to_uri(host, port, pubkey): + # XXX(gsalgado): commented out because our pubkeys have len == 65 + # assert len(pubkey) == 512 // 8 + uri = '{}{}@{}:{}'.format(node_uri_scheme, + bytes_to_str(encode_hex(pubkey)), + str(host), port) + return str_to_bytes(uri) diff --git a/evm/p2p/protocol.py b/evm/p2p/protocol.py new file mode 100644 index 0000000000..dfda85c19c --- /dev/null +++ b/evm/p2p/protocol.py @@ -0,0 +1,132 @@ +import asyncio + +import rlp + + +class BaseProtocol(asyncio.Protocol): + protocol_id = 0 + name = '' + version = 0 + max_cmd_id = 0 # reserved cmd space + + def __init__(self): + self._setup() + + def _setup(self): + + def create_methods(klass): + instance = klass() + + def receive(packet): + "decode rlp, create dict, call receive" + assert isinstance(packet, Packet) + instance.receive(proto=self, data=klass.decode_payload(packet.payload)) + + def create(*args, **kargs): + "get data, rlp encode, return packet" + res = instance.create(self, *args, **kargs) + payload = klass.encode_payload(res) + return Packet(self.protocol_id, klass.cmd_id, payload=payload) + + def send(*args, **kargs): + "create and send packet" + packet = create(*args, **kargs) + self.send_packet(packet) + + return receive, create, send, instance.receive_callbacks + + for klass in self._commands: + receive, create, send, receive_callbacks = create_methods(klass) + setattr(self, '_receive_' + klass.__name__, receive) + setattr(self, 'receive_' + klass.__name__ + '_callbacks', receive_callbacks) + setattr(self, 'create_' + klass.__name__, create) + setattr(self, 'send_' + klass.__name__, send) + + self.cmd_by_id = dict((klass.cmd_id, klass.__name__) for klass in self._commands) + + def receive_packet(self, packet): + cmd_name = self.cmd_by_id[packet.cmd_id] + handler = getattr(self, '_receive_' + cmd_name) + handler(packet) + + def send_packet(self, packet): + self.peer.send_packet(packet) + + +class Command(): + cmd_id = 0 + structure = [] + + def create(self, proto, *args, **kargs): + "optionally implement create" + assert isinstance(proto, BaseProtocol) + assert not (kargs and isinstance(self.structure, rlp.sedes.CountableList)) + return kargs or args + + def receive(self, proto, data): + "optionally implement receive" + for cb in self.receive_callbacks: + if isinstance(self.structure, rlp.sedes.CountableList): + cb(proto, data) + else: + cb(proto, **data) + + def __init__(self): + assert isinstance(self.structure, (list, rlp.sedes.CountableList)) + self.receive_callbacks = [] + + @classmethod + def encode_payload(cls, data): + if isinstance(data, dict): # convert dict to ordered list + assert isinstance(cls.structure, list) + data = [data[x[0]] for x in cls.structure] + if isinstance(cls.structure, rlp.sedes.CountableList): + return rlp.encode(data, cls.structure) + else: + assert len(data) == len(cls.structure) + return rlp.encode(data, sedes=rlp.sedes.List([x[1] for x in cls.structure])) + + @classmethod + def decode_payload(cls, rlp_data): + if isinstance(cls.structure, rlp.sedes.CountableList): + decoder = cls.structure + else: + decoder = rlp.sedes.List([x[1] for x in cls.structure]) + try: + data = rlp.decode(rlp_data, sedes=decoder) + except (AssertionError, rlp.RLPException, TypeError) as e: + print(repr(rlp.decode(rlp_data))) + raise e + if isinstance(cls.structure, rlp.sedes.CountableList): + return data + else: # convert to dict + return dict((cls.structure[i][0], v) for i, v in enumerate(data)) + + +class Packet(object): + + """ + Packets are emitted and received by subprotocols + """ + + def __init__(self, protocol_id=0, cmd_id=0, payload=b'', prioritize=False): + self.protocol_id = protocol_id + self.cmd_id = cmd_id + self.payload = payload + self.prioritize = prioritize + + def __repr__(self): + return 'Packet(%r)' % dict(protocol_id=self.protocol_id, + cmd_id=self.cmd_id, + payload_len=len(self.payload), + prioritize=self.prioritize) + + def __eq__(self, other): + s = dict(self.__dict__) + s.pop('prioritize') + o = dict(other.__dict__) + o.pop('prioritize') + return s == o + + def __len__(self): + return len(self.payload) diff --git a/evm/p2p/upnp.py b/evm/p2p/upnp.py new file mode 100644 index 0000000000..f128bab223 --- /dev/null +++ b/evm/p2p/upnp.py @@ -0,0 +1,40 @@ +import miniupnpc + + +def add_portmap(port, proto, label=''): + u = miniupnpc.UPnP() + u.discoverdelay = 200 + try: + # select an igd + u.selectigd() + # find a free port for the redirection + eport = port + r = u.getspecificportmapping(eport, proto) + while r is not None and eport < 65536: + eport = eport + 1 + r = u.getspecificportmapping(eport, proto) + b = u.addportmapping(eport, proto, u.lanaddr, port, label, '') + if b: + return u + else: + # TODO: log + pass + except Exception as e: + # TODO: log + pass + + +def remove_portmap(u, port, proto): + if not u: + return + try: + b = u.deleteportmapping(port, proto) + if b: + # TODO: log + pass + else: + # TODO: log + pass + except Exception as e: + # TODO: log + pass From 93aab6522b306cfd2ffadbc94822b027ade5b49e Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Mon, 21 Aug 2017 10:13:34 -0300 Subject: [PATCH 03/10] Use coincurve for the ecdsa_* methods in p2p/discovery.py --- evm/p2p/discovery.py | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index 311d86f053..0d9804ba93 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -164,29 +164,16 @@ def __repr__(self): def devp2p_ecdsa_sign(msghash, privkey): - from secp256k1 import PrivateKey - from rlp.utils import ascii_chr - assert len(msghash) == 32 - pk = PrivateKey(privkey, raw=True) - signature = pk.ecdsa_recoverable_serialize( - pk.ecdsa_sign_recoverable( - msghash, raw=True)) - new = signature[0] + ascii_chr(signature[1]) - return new + from coincurve import PrivateKey + pk = PrivateKey(privkey) + return pk.sign_recoverable(msghash, hasher=None) def devp2p_ecdsa_recover(message, signature): + from coincurve import PublicKey assert len(signature) == 65 - from secp256k1 import PublicKey, ALL_FLAGS - pk = PublicKey(flags=ALL_FLAGS) - pk.public_key = pk.ecdsa_recover( - message, - pk.ecdsa_recoverable_deserialize( - signature[:64], - safe_ord(signature[64])), - raw=True - ) - return pk.serialize(compressed=False)[1:] + pk = PublicKey.from_signature_and_message(signature, message, hasher=None) + return pk.format(compressed=False)[1:] class DiscoveryProtocol(asyncio.DatagramProtocol): From 8907a964afdbf1c31a62c00dfc43798c57cf4afa Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Mon, 21 Aug 2017 10:15:00 -0300 Subject: [PATCH 04/10] Remove some unused code from p2p/kademlia.py --- evm/p2p/kademlia.py | 35 +---------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/evm/p2p/kademlia.py b/evm/p2p/kademlia.py index b5b16fb61d..ce29ea0024 100644 --- a/evm/p2p/kademlia.py +++ b/evm/p2p/kademlia.py @@ -54,9 +54,6 @@ def __init__(self, pubkey): assert k_id_size == 256 self.id = big_endian_to_int(keccak(pubkey)) - def distance(self, other): - return self.id ^ other.id - def id_distance(self, id): return self.id ^ id @@ -96,17 +93,10 @@ def __init__(self, start, end): self.replacement_cache = [] self.last_updated = time.time() - @property - def range(self): - return self.start, self.end - @property def midpoint(self): return self.start + (self.end - self.start) // 2 - def distance(self, node): - return self.midpoint ^ node.id - def id_distance(self, id): return self.midpoint ^ id @@ -114,11 +104,6 @@ def nodes_by_id_distance(self, id): assert is_integer(id) return sorted(self.nodes, key=operator.methodcaller('id_distance', id)) - @property - def should_split(self): - depth = self.depth - return self.is_full and (depth % k_b != 0 and depth != k_id_size) - def split(self): "split at the median id" @@ -178,11 +163,6 @@ def head(self): "least recently seen" return self.nodes[0] - @property - def tail(self): - "last recently seen" - return self.nodes[-1] - @property def depth(self): """ @@ -259,10 +239,6 @@ def buckets_by_id_distance(self, id): assert is_integer(id) return sorted(self.buckets, key=operator.methodcaller('id_distance', id)) - def buckets_by_distance(self, node): - assert isinstance(node, Node) - return self.buckets_by_id_distance(node.id) - def __contains__(self, node): return node in self.bucket_by_node(node) @@ -291,17 +267,8 @@ def neighbours(self, node, k=k_bucket_size): break return sorted(nodes, key=operator.methodcaller('id_distance', node))[:k] - def neighbours_within_distance(self, id, distance): - """ - naive correct version simply compares all nodes - """ - assert is_integer(id) - nodes = list(n for n in self if n.id_distance(id) <= distance) - return sorted(nodes, key=operator.methodcaller('id_distance', id)) - -# XXX: Should this be a asyncio.DatagramProtocol? -class KademliaProtocol(object): +class KademliaProtocol(): def __init__(self, node, wire): assert isinstance(node, Node) # the local node From 9ff7d9f2ff4949440ced087246afa4036eb598a1 Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Mon, 21 Aug 2017 10:16:33 -0300 Subject: [PATCH 05/10] Merge NodeDiscovery and DiscoveryProtocol into a single class --- evm/p2p/discovery.py | 101 ++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index 0d9804ba93..b2a5dcf0e8 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -101,8 +101,6 @@ def __init__(self, pubkey, address=None): super(Node, self).__init__(pubkey) assert address is None or isinstance(address, Address) self.address = address - self.reputation = 0 - self.rlpx_version = 0 @classmethod def from_uri(cls, uri): @@ -176,6 +174,14 @@ def devp2p_ecdsa_recover(message, signature): return pk.format(compressed=False)[1:] +# TODO(gsalgado): draw a dependency diagram of those things and see how it can be improved; +# currently: +# DiscoveryProtocol contains a KademliaProtocol (.kademlia) +# -> also has a reference to NodeDiscovery (.transport) +# -> all recv_* methods here delegate to KademliaProtocol +# KademliaProtocol contains a DiscoveryProtocol (.wire) +# -> .ping(), .recv_ping(), find_node(), recv_neighbours(), recv_find_node() +# and _query_neighbours() delegate to DiscoveryProtocol class DiscoveryProtocol(asyncio.DatagramProtocol): """ @@ -184,6 +190,7 @@ class DiscoveryProtocol(asyncio.DatagramProtocol): The date should be interpreted as a UNIX timestamp. The receiver should discard any packet whose `Expiration` value is in the past. """ + bootstrapped = False version = 4 expiration = 60 # let messages expire after N secondes cmd_id_map = dict(ping=1, pong=2, find_node=3, neighbours=4) @@ -199,17 +206,44 @@ class DiscoveryProtocol(asyncio.DatagramProtocol): decoders = dict(cmd_id=safe_ord, expiration=rlp.sedes.big_endian_int.deserialize) - def __init__(self, transport): - self.transport = transport + def __init__(self): self.privkey = decode_hex(config['privkey_hex']) self.pubkey = private_key_to_public_key(self.privkey) self.nodes = LRUCache(2048) # nodeid->Node, fixme should be loaded - self.this_node = Node(self.pubkey, self.transport.address) + self.this_node = Node(self.pubkey, Address(config['listen_host'], config['listen_port'])) self.kademlia = kademlia.KademliaProtocol(self.this_node, wire=self) this_enode = host_port_pubkey_to_uri( config['listen_host'], config['listen_port'], self.pubkey) + self.nat_upnp = add_portmap(config['listen_port'], 'UDP', 'Ethereum DEVP2P Discovery') log.info('starting discovery proto', this_enode=this_enode) + def connection_made(self, transport): + log.info('connection_made()') + self.transport = transport + if not self.bootstrapped: + # XXX: Is this the right place to run the bootstrap? + self.bootstrapped = True + nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] + if nodes: + asyncio.ensure_future(self.kademlia.bootstrap(nodes)) + + def datagram_received(self, data, addr): + asyncio.ensure_future(self.receive( + Address(ip=addr[0], udp_port=addr[1]), data)) + + def error_received(self, exc): + log.warn('error received', err=exc) + + @asyncio.coroutine + def send(self, node, message): + assert node.address + self.transport.sendto(message, (node.address.ip, node.address.udp_port)) + + def stop(self): + log.info('stopping discovery') + self.transport.close() + remove_portmap(self.nat_upnp, config['listen_port'], 'UDP') + def get_node(self, nodeid, address=None): "return node or create new, update address if supplied" assert isinstance(nodeid, bytes) @@ -312,11 +346,6 @@ def receive(self, address, message): cmd=self.rev_cmd_id_map[cmd_id]) yield from cmd(nodeid, payload, mdc) - @asyncio.coroutine - def send(self, node, message): - assert node.address - yield self.transport.send(node.address, message) - @asyncio.coroutine def send_ping(self, node): """ @@ -496,37 +525,6 @@ def send_neighbours(self, node, neighbours): yield from self.send(node, message) -class NodeDiscovery(): - bootstrapped = False - - def __init__(self): - self.address = Address(config['listen_host'], config['listen_port']) - self.protocol = DiscoveryProtocol(transport=self) - self.nat_upnp = add_portmap(config['listen_port'], 'UDP', 'Ethereum DEVP2P Discovery') - - def connection_made(self, transport): - log.info('connection_made()') - self.transport = transport - if not self.bootstrapped: - # XXX: Is this the right place to run the bootstrap? - self.bootstrapped = True - nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] - if nodes: - asyncio.ensure_future(self.protocol.kademlia.bootstrap(nodes)) - - def send(self, address, message): - self.transport.sendto(message, (address.ip, address.udp_port)) - - def datagram_received(self, data, addr): - asyncio.ensure_future(self.protocol.receive( - Address(ip=addr[0], udp_port=addr[1]), data)) - - def stop(self): - log.info('stopping discovery') - self.transport.close() - remove_portmap(self.nat_upnp, config['listen_port'], 'UDP') - - config = { 'privkey_hex': '65462b0520ef7d3df61b9992ed3bea0c56ead753be7c8b3614e0ce01e4cac41b', # 'privkey_hex': '45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8', @@ -545,17 +543,31 @@ def stop(self): } +@asyncio.coroutine +def show_tasks(): + while True: + tasks = [] + for task in asyncio.Task.all_tasks(): + if task._coro.__name__ != "show_tasks": + tasks.append(task._coro.__name__) + if tasks: + log.debug("Active tasks: {}".format(tasks)) + yield from asyncio.sleep(1) + + if __name__ == "__main__": import logging - logging.getLogger('asyncio').setLevel(logging.DEBUG) + logging.basicConfig(level=logging.DEBUG) + # logging.getLogger('asyncio').setLevel(logging.DEBUG) loop = asyncio.get_event_loop() loop.set_debug(True) + task_monitor = asyncio.ensure_future(show_tasks()) from structlog import get_logger log = get_logger() - discovery = NodeDiscovery() + discovery = DiscoveryProtocol() ip = config['listen_host'] port = config['listen_port'] listen = loop.create_datagram_endpoint(lambda: discovery, local_addr=(ip, port)) @@ -565,6 +577,7 @@ def stop(self): loop.run_forever() except KeyboardInterrupt: pass - log.info("Pending tasks at exit: {}".format(asyncio.Task.all_tasks(loop))) + task_monitor.set_result(None) discovery.stop() + # log.info("Pending tasks at exit: {}".format(asyncio.Task.all_tasks(loop))) loop.close() From a11f0519364211ded50cf446b9e160536fcbf9cc Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Mon, 21 Aug 2017 13:10:18 -0300 Subject: [PATCH 06/10] Drop unnecessary @coroutine and 'yield from' decorators --- evm/p2p/discovery.py | 43 +++++++++++++++++-------------------------- evm/p2p/kademlia.py | 40 +++++++++++++++------------------------- 2 files changed, 32 insertions(+), 51 deletions(-) diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index b2a5dcf0e8..a01d028e73 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -225,16 +225,14 @@ def connection_made(self, transport): self.bootstrapped = True nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] if nodes: - asyncio.ensure_future(self.kademlia.bootstrap(nodes)) + self.kademlia.bootstrap(nodes) def datagram_received(self, data, addr): - asyncio.ensure_future(self.receive( - Address(ip=addr[0], udp_port=addr[1]), data)) + self.receive(Address(ip=addr[0], udp_port=addr[1]), data) def error_received(self, exc): log.warn('error received', err=exc) - @asyncio.coroutine def send(self, node, message): assert node.address self.transport.sendto(message, (node.address.ip, node.address.udp_port)) @@ -326,7 +324,6 @@ def unpack(self, message): payload = payload[:self.cmd_elem_count_map.get(cmd, len(payload))] return remote_pubkey, cmd_id, payload, mdc - @asyncio.coroutine def receive(self, address, message): assert isinstance(address, Address) try: @@ -344,9 +341,8 @@ def receive(self, address, message): remote = self.get_node(nodeid, address) log.debug("Dispatching received message", local=self.this_node, remoteid=remote, cmd=self.rev_cmd_id_map[cmd_id]) - yield from cmd(nodeid, payload, mdc) + cmd(nodeid, payload, mdc) - @asyncio.coroutine def send_ping(self, node): """ ### Ping (type 0x01) @@ -385,10 +381,9 @@ def send_ping(self, node): node.address.to_endpoint()] assert len(payload) == 3 message = self.pack(self.cmd_id_map['ping'], payload) - yield from self.send(node, message) + self.send(node, message) return message[:32] # return the MDC to identify pongs - @asyncio.coroutine def recv_pong(self, nodeid, payload, mdc): if not len(payload) == 2: log.error('invalid pong payload', payload=payload) @@ -400,11 +395,10 @@ def recv_pong(self, nodeid, payload, mdc): echoed = payload[1] if self.nodes.get(nodeid): node = self.get_node(nodeid) - yield from self.kademlia.recv_pong(node, echoed) + self.kademlia.recv_pong(node, echoed) else: log.debug('<<< unexpected pong from unkown node') - @asyncio.coroutine def send_find_node(self, node, target_node_id): """ ### Find Node (type 0x03) @@ -427,9 +421,8 @@ def send_find_node(self, node, target_node_id): assert len(target_node_id) == kademlia.k_pubkey_size // 8 log.debug('>>> find_node', remoteid=node) message = self.pack(self.cmd_id_map['find_node'], [target_node_id]) - yield from self.send(node, message) + self.send(node, message) - @asyncio.coroutine def recv_neighbours(self, nodeid, payload, mdc): remote = self.get_node(nodeid) assert len(payload) == 1 @@ -444,10 +437,9 @@ def recv_neighbours(self, nodeid, payload, mdc): assert node.address neighbours.append(node) - yield from self.kademlia.recv_neighbours(remote, neighbours) + self.kademlia.recv_neighbours(remote, neighbours) # NOTE(gsalgado): Does a light client need to listen/reply to those messages? Need to find out - @asyncio.coroutine def recv_ping(self, nodeid, payload, mdc): """ update ip, port in node table @@ -460,9 +452,8 @@ def recv_ping(self, nodeid, payload, mdc): remote_address = Address.from_endpoint(*payload[1]) # from address # my_address = Address.from_endpoint(*payload[2]) # my address self.get_node(nodeid).address.update(remote_address) - yield from self.kademlia.recv_ping(node, echo=mdc) + self.kademlia.recv_ping(node, echo=mdc) - @asyncio.coroutine def send_pong(self, node, token): """ ### Pong (type 0x02) @@ -481,17 +472,15 @@ def send_pong(self, node, token): payload = [node.address.to_endpoint(), token] assert len(payload[0][0]) in (4, 16), payload message = self.pack(self.cmd_id_map['pong'], payload) - yield from self.send(node, message) + self.send(node, message) - @asyncio.coroutine def recv_find_node(self, nodeid, payload, mdc): node = self.get_node(nodeid) log.debug('<<< find_node', remoteid=node) assert len(payload[0]) == kademlia.k_pubkey_size / 8 target = big_endian_to_int(payload[0]) - yield from self.kademlia.recv_find_node(node, target) + self.kademlia.recv_find_node(node, target) - @asyncio.coroutine def send_neighbours(self, node, neighbours): """ ### Neighbors (type 0x04) @@ -522,7 +511,7 @@ def send_neighbours(self, node, neighbours): neighbours=neighbours) # FIXME: don't brake udp packet size / chunk message / also when receiving message = self.pack(self.cmd_id_map['neighbours'], [nodes[:12]]) # FIXME - yield from self.send(node, message) + self.send(node, message) config = { @@ -552,17 +541,15 @@ def show_tasks(): tasks.append(task._coro.__name__) if tasks: log.debug("Active tasks: {}".format(tasks)) - yield from asyncio.sleep(1) + yield from asyncio.sleep(3) if __name__ == "__main__": import logging logging.basicConfig(level=logging.DEBUG) - # logging.getLogger('asyncio').setLevel(logging.DEBUG) loop = asyncio.get_event_loop() loop.set_debug(True) - task_monitor = asyncio.ensure_future(show_tasks()) from structlog import get_logger log = get_logger() @@ -570,9 +557,13 @@ def show_tasks(): discovery = DiscoveryProtocol() ip = config['listen_host'] port = config['listen_port'] + + # This will cause DiscoveryProtocol to start listening locally *and* also initiate the + # discovery bootstrap process (via the connection_made() method). listen = loop.create_datagram_endpoint(lambda: discovery, local_addr=(ip, port)) - _, _ = loop.run_until_complete(listen) + loop.run_until_complete(listen) + task_monitor = asyncio.ensure_future(show_tasks()) try: loop.run_forever() except KeyboardInterrupt: diff --git a/evm/p2p/kademlia.py b/evm/p2p/kademlia.py index ce29ea0024..e145a6270e 100644 --- a/evm/p2p/kademlia.py +++ b/evm/p2p/kademlia.py @@ -13,7 +13,6 @@ Aside from the previously described exclusions, node discovery closely follows system and protocol described by Maymounkov and Mazieres. """ -import asyncio import operator import random import time @@ -279,16 +278,14 @@ def __init__(self, node, wire): self._find_requests = dict() # nodeid -> timeout self._deleted_pingids = set() - @asyncio.coroutine def bootstrap(self, nodes): assert isinstance(nodes, list) for node in nodes: if node == self.this_node: continue self.routing.add_node(node) - yield from self.find_node(self.this_node.id, via_node=node) + self.find_node(self.this_node.id, via_node=node) - @asyncio.coroutine def update(self, node, pingid=None): """ When a Kademlia node receives any message (request or reply) from another node, @@ -348,12 +345,12 @@ def _expected_pongs(): eviction_candidate = self.routing.add_node(node) if eviction_candidate: # protocol should ping bucket head and evict if there is no response - yield from self.ping(eviction_candidate, replacement=node) + self.ping(eviction_candidate, replacement=node) # check for not full buckets and ping replacements for bucket in self.routing.not_full_buckets: for node in bucket.replacement_cache: - yield from self.ping(node) + self.ping(node) # check idle buckets """ @@ -363,7 +360,7 @@ def _expected_pongs(): """ for bucket in self.routing.idle_buckets: rid = random.randint(bucket.start, bucket.end) - yield from self.find_node(rid) + self.find_node(rid) # check and removed timed out find requests self._find_requests = { @@ -376,7 +373,6 @@ def _mkpingid(self, echoed, node): pid = str_to_bytes(echoed) + node.pubkey return pid - @asyncio.coroutine def ping(self, node, replacement=None): """ successful pings should lead to an update @@ -385,13 +381,12 @@ def ping(self, node, replacement=None): """ assert isinstance(node, Node) assert node != self.this_node - echoed = yield from self.wire.send_ping(node) + echoed = self.wire.send_ping(node) pingid = self._mkpingid(echoed, node) assert pingid timeout = time.time() + k_request_timeout self._expected_pongs[pingid] = (timeout, node, replacement) - @asyncio.coroutine def recv_ping(self, remote, echo): "udp addresses determined by socket address of revd Ping packets" # ok "tcp addresses determined by contents of Ping packet" # not yet @@ -399,10 +394,9 @@ def recv_ping(self, remote, echo): assert isinstance(remote, Node) if remote == self.this_node: return - yield from self.update(remote) - yield from self.wire.send_pong(remote, echo) + self.update(remote) + self.wire.send_pong(remote, echo) - @asyncio.coroutine def recv_pong(self, remote, echoed): "tcp addresses are only updated upon receipt of Pong packet" log.debug('<<< pong', remoteid=remote) @@ -414,26 +408,23 @@ def recv_pong(self, remote, echoed): if nnodes and nnodes[0] == remote: nnodes[0].address = remote.address # updated tcp address # update rest - yield from self.update(remote, pingid) + self.update(remote, pingid) - @asyncio.coroutine def _query_neighbours(self, targetid): for n in self.routing.neighbours(targetid)[:k_find_concurrency]: - yield from self.wire.send_find_node(n, targetid) + self.wire.send_find_node(n, targetid) - @asyncio.coroutine def find_node(self, targetid, via_node=None): # FIXME, amplification attack (need to ping pong ping pong first) assert is_integer(targetid) assert not via_node or isinstance(via_node, Node) self._find_requests[targetid] = time.time() + k_request_timeout if via_node: - yield from self.wire.send_find_node(via_node, targetid) + self.wire.send_find_node(via_node, targetid) else: - yield from self._query_neighbours(targetid) + self._query_neighbours(targetid) # FIXME, should we return the closest node (allow callbacks on find_request) - @asyncio.coroutine def recv_neighbours(self, remote, neighbours): """ if one of the neighbours is closer than the closest known neighbour @@ -458,18 +449,17 @@ def recv_neighbours(self, remote, neighbours): for close_node in closest[:k_find_concurrency]: if not closest_known or \ close_node.id_distance(nodeid) < closest_known.id_distance(nodeid): - yield from self.wire.send_find_node(close_node, nodeid) + self.wire.send_find_node(close_node, nodeid) # add all nodes to the list for node in neighbours: if node != self.this_node: - yield from self.ping(node) + self.ping(node) - @asyncio.coroutine def recv_find_node(self, remote, targetid): # FIXME, amplification attack (need to ping pong ping pong first) assert isinstance(remote, Node) assert is_integer(targetid) - yield from self.update(remote) + self.update(remote) found = self.routing.neighbours(targetid) - yield from self.wire.send_neighbours(remote, found) + self.wire.send_neighbours(remote, found) From 47ee713ea2e696bc0af175de25bf2a2d028f82be Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Tue, 22 Aug 2017 12:53:59 -0300 Subject: [PATCH 07/10] switch to py-evm's ecdsa functions --- evm/p2p/discovery.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index a01d028e73..2228a61721 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -11,6 +11,10 @@ from evm.p2p import kademlia from evm.p2p.p2p import host_port_pubkey_to_uri, host_port_pubkey_from_uri from evm.p2p.upnp import add_portmap, remove_portmap +from evm.utils.ecdsa import ( + ecdsa_recover, + ecdsa_sign, +) from evm.utils.secp256k1 import private_key_to_public_key from evm.utils.keccak import keccak from evm.utils.numeric import ( @@ -161,19 +165,6 @@ def __repr__(self): """ -def devp2p_ecdsa_sign(msghash, privkey): - from coincurve import PrivateKey - pk = PrivateKey(privkey) - return pk.sign_recoverable(msghash, hasher=None) - - -def devp2p_ecdsa_recover(message, signature): - from coincurve import PublicKey - assert len(signature) == 65 - pk = PublicKey.from_signature_and_message(signature, message, hasher=None) - return pk.format(compressed=False)[1:] - - # TODO(gsalgado): draw a dependency diagram of those things and see how it can be improved; # currently: # DiscoveryProtocol contains a KademliaProtocol (.kademlia) @@ -225,6 +216,11 @@ def connection_made(self, transport): self.bootstrapped = True nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] if nodes: + # XXX: geth will not process a find_node packet unless a bond exists between the + # nodes (introduced in de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a), and to create a + # node a node must ping the other, so as a quick hack I ping all nodes before + # starting the bootstrap. + # list(map(self.send_ping, nodes)) self.kademlia.bootstrap(nodes) def datagram_received(self, data, addr): @@ -291,9 +287,7 @@ def pack(self, cmd_id, payload): cmd_id = str_to_bytes(self.encoders['cmd_id'](cmd_id)) expiration = self.encoders['expiration'](int(time.time() + self.expiration)) encoded_data = cmd_id + rlp.encode(payload + [expiration]) - # XXX (gsalgado): Need to figure out why we need to use devp2p_ecdsa_sign - signature = devp2p_ecdsa_sign(keccak(encoded_data), self.privkey) - # signature = ecdsa_sign(encoded_data, self.privkey) + signature = ecdsa_sign(encoded_data, self.privkey) assert len(signature) == 65 mdc = keccak(signature + encoded_data) assert len(mdc) == 32 @@ -314,7 +308,7 @@ def unpack(self, message): signature = message[32:97] assert len(signature) == 65 signed_data = keccak(message[97:]) - remote_pubkey = devp2p_ecdsa_recover(signed_data, signature) + remote_pubkey = ecdsa_recover(signed_data, signature) assert len(remote_pubkey) == 512 / 8 cmd_id = self.decoders['cmd_id'](message[97]) cmd = self.rev_cmd_id_map[cmd_id] @@ -521,8 +515,12 @@ def send_neighbours(self, node, neighbours): 'listen_port': 30303, 'p2p_listen_port': 30303, 'bootstrap_nodes': [ - b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 + # Local geth bootnodes + b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501 + # Testnet bootnodes + # b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 # b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501 + # Mainnet bootnodes # b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 # b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501 # b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501 From 972ac0f9bda7ef994c6b78d458849bad8b64570c Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Wed, 23 Aug 2017 13:57:46 -0300 Subject: [PATCH 08/10] Some small refactorings/improvements in DiscoveryProtocol --- evm/p2p/discovery.py | 74 ++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index 2228a61721..b981d754ef 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -22,8 +22,6 @@ int_to_big_endian, ) -# XXX: Most of this code was lifted from pydevp2p - class DefectiveMessage(Exception): pass @@ -182,6 +180,7 @@ class DiscoveryProtocol(asyncio.DatagramProtocol): The receiver should discard any packet whose `Expiration` value is in the past. """ bootstrapped = False + transport = None version = 4 expiration = 60 # let messages expire after N secondes cmd_id_map = dict(ping=1, pong=2, find_node=3, neighbours=4) @@ -197,31 +196,37 @@ class DiscoveryProtocol(asyncio.DatagramProtocol): decoders = dict(cmd_id=safe_ord, expiration=rlp.sedes.big_endian_int.deserialize) - def __init__(self): - self.privkey = decode_hex(config['privkey_hex']) + def __init__(self, privkey, address, bootstrap_nodes): + self.privkey = privkey self.pubkey = private_key_to_public_key(self.privkey) + self.address = address + self.bootstrap_nodes = bootstrap_nodes + self.this_node = Node(self.pubkey, address) self.nodes = LRUCache(2048) # nodeid->Node, fixme should be loaded - self.this_node = Node(self.pubkey, Address(config['listen_host'], config['listen_port'])) self.kademlia = kademlia.KademliaProtocol(self.this_node, wire=self) - this_enode = host_port_pubkey_to_uri( - config['listen_host'], config['listen_port'], self.pubkey) - self.nat_upnp = add_portmap(config['listen_port'], 'UDP', 'Ethereum DEVP2P Discovery') - log.info('starting discovery proto', this_enode=this_enode) + self.nat_upnp = add_portmap(address.udp_port, 'UDP', 'Ethereum DEVP2P Discovery') + log.info('starting discovery proto on', this=self.this_node) + + def listen(self, loop): + return loop.create_datagram_endpoint( + lambda: self, local_addr=(self.address.ip, self.address.udp_port)) def connection_made(self, transport): - log.info('connection_made()') self.transport = transport - if not self.bootstrapped: - # XXX: Is this the right place to run the bootstrap? - self.bootstrapped = True - nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] - if nodes: - # XXX: geth will not process a find_node packet unless a bond exists between the - # nodes (introduced in de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a), and to create a - # node a node must ping the other, so as a quick hack I ping all nodes before - # starting the bootstrap. - # list(map(self.send_ping, nodes)) - self.kademlia.bootstrap(nodes) + + @asyncio.coroutine + def bootstrap(self): + if self.bootstrapped or len(self.bootstrap_nodes) == 0: + return + while self.transport is None: + yield from asyncio.sleep(1) + self.bootstrapped = True + # XXX: geth will not process a find_node packet unless a bond exists between the + # nodes (introduced in de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a), and to create a + # node a node must ping the other, so as a quick hack I ping all nodes before + # starting the bootstrap. + list(map(self.send_ping, self.bootstrap_nodes)) + self.kademlia.bootstrap(self.bootstrap_nodes) def datagram_received(self, data, addr): self.receive(Address(ip=addr[0], udp_port=addr[1]), data) @@ -236,7 +241,7 @@ def send(self, node, message): def stop(self): log.info('stopping discovery') self.transport.close() - remove_portmap(self.nat_upnp, config['listen_port'], 'UDP') + remove_portmap(self.nat_upnp, self.address.udp_port, 'UDP') def get_node(self, nodeid, address=None): "return node or create new, update address if supplied" @@ -367,13 +372,7 @@ def send_ping(self, node): assert isinstance(node, type(self.this_node)) and node != self.this_node log.debug('>>> ping', remoteid=node) version = rlp.sedes.big_endian_int.serialize(self.version) - ip = config['listen_host'] - udp_port = config['listen_port'] - tcp_port = config['p2p_listen_port'] - payload = [version, - Address(ip, udp_port, tcp_port).to_endpoint(), - node.address.to_endpoint()] - assert len(payload) == 3 + payload = [version, self.address.to_endpoint(), node.address.to_endpoint()] message = self.pack(self.cmd_id_map['ping'], payload) self.send(node, message) return message[:32] # return the MDC to identify pongs @@ -516,12 +515,12 @@ def send_neighbours(self, node, neighbours): 'p2p_listen_port': 30303, 'bootstrap_nodes': [ # Local geth bootnodes - b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501 + # b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501 # Testnet bootnodes # b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 # b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501 # Mainnet bootnodes - # b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 + b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 # b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501 # b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501 # b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501 @@ -552,14 +551,15 @@ def show_tasks(): from structlog import get_logger log = get_logger() - discovery = DiscoveryProtocol() - ip = config['listen_host'] - port = config['listen_port'] - + privkey = decode_hex(config['privkey_hex']) + addr = Address(config['listen_host'], config['listen_port'], config['p2p_listen_port']) + bootstrap_nodes = [Node.from_uri(x) for x in config['bootstrap_nodes']] + discovery = DiscoveryProtocol(privkey, addr, bootstrap_nodes) # This will cause DiscoveryProtocol to start listening locally *and* also initiate the # discovery bootstrap process (via the connection_made() method). - listen = loop.create_datagram_endpoint(lambda: discovery, local_addr=(ip, port)) - loop.run_until_complete(listen) + loop.run_until_complete(discovery.listen(loop)) + + asyncio.ensure_future(discovery.bootstrap()) task_monitor = asyncio.ensure_future(show_tasks()) try: From 40391d460fb084b783e791113490f94164a073f7 Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Thu, 24 Aug 2017 17:41:06 -0300 Subject: [PATCH 09/10] Some cleanups on kademlia.py and start writing tests for it --- evm/p2p/discovery.py | 100 +++++++++++++++++++++++-------------- evm/p2p/kademlia.py | 104 +++++++++++++++++---------------------- evm/p2p/p2p.py | 30 ----------- evm/p2p/test_kademlia.py | 76 ++++++++++++++++++++++++++++ 4 files changed, 184 insertions(+), 126 deletions(-) create mode 100644 evm/p2p/test_kademlia.py diff --git a/evm/p2p/discovery.py b/evm/p2p/discovery.py index b981d754ef..305b251240 100644 --- a/evm/p2p/discovery.py +++ b/evm/p2p/discovery.py @@ -4,12 +4,18 @@ import time import rlp -from rlp.utils import decode_hex, encode_hex, is_integer, str_to_bytes, safe_ord +from rlp.utils import ( + bytes_to_str, + decode_hex, + encode_hex, + is_integer, + str_to_bytes, + safe_ord, +) from repoze.lru import LRUCache from evm.p2p import kademlia -from evm.p2p.p2p import host_port_pubkey_to_uri, host_port_pubkey_from_uri from evm.p2p.upnp import add_portmap, remove_portmap from evm.utils.ecdsa import ( ecdsa_recover, @@ -507,41 +513,60 @@ def send_neighbours(self, node, neighbours): self.send(node, message) -config = { - 'privkey_hex': '65462b0520ef7d3df61b9992ed3bea0c56ead753be7c8b3614e0ce01e4cac41b', - # 'privkey_hex': '45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8', - 'listen_host': '0.0.0.0', - 'listen_port': 30303, - 'p2p_listen_port': 30303, - 'bootstrap_nodes': [ - # Local geth bootnodes - # b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501 - # Testnet bootnodes - # b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 - # b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501 - # Mainnet bootnodes - b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 - # b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501 - # b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501 - # b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501 - # b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501 - ], -} - - -@asyncio.coroutine -def show_tasks(): - while True: - tasks = [] - for task in asyncio.Task.all_tasks(): - if task._coro.__name__ != "show_tasks": - tasks.append(task._coro.__name__) - if tasks: - log.debug("Active tasks: {}".format(tasks)) - yield from asyncio.sleep(3) +node_uri_scheme = 'enode://' + + +def host_port_pubkey_from_uri(uri): + b_node_uri_scheme = str_to_bytes(node_uri_scheme) + assert uri.startswith(b_node_uri_scheme) and \ + b'@' in uri and b':' in uri, uri + pubkey_hex, ip_port = uri[len(b_node_uri_scheme):].split(b'@') + assert len(pubkey_hex) == 2 * 512 // 8 + ip, port = ip_port.split(b':') + return ip, port, decode_hex(pubkey_hex) + + +def host_port_pubkey_to_uri(host, port, pubkey): + assert len(pubkey) == 512 // 8 + uri = '{}{}@{}:{}'.format(node_uri_scheme, + bytes_to_str(encode_hex(pubkey)), + str(host), port) + return str_to_bytes(uri) if __name__ == "__main__": + @asyncio.coroutine + def show_tasks(): + while True: + tasks = [] + for task in asyncio.Task.all_tasks(): + if task._coro.__name__ != "show_tasks": + tasks.append(task._coro.__name__) + if tasks: + log.debug("Active tasks: {}".format(tasks)) + yield from asyncio.sleep(3) + + config = { + 'privkey_hex': '65462b0520ef7d3df61b9992ed3bea0c56ead753be7c8b3614e0ce01e4cac41b', + # 'privkey_hex': '45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8', + 'listen_host': '0.0.0.0', + 'listen_port': 30303, + 'p2p_listen_port': 30303, + 'bootstrap_nodes': [ + # Local geth bootnodes + # b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501 + # Testnet bootnodes + # b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501 + # b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501 + # Mainnet bootnodes + b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501 + # b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501 + # b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501 + # b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501 + # b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501 + ], + } + import logging logging.basicConfig(level=logging.DEBUG) @@ -561,12 +586,15 @@ def show_tasks(): asyncio.ensure_future(discovery.bootstrap()) - task_monitor = asyncio.ensure_future(show_tasks()) + # This helps when debugging asyncio issues. + # task_monitor = asyncio.ensure_future(show_tasks()) + try: loop.run_forever() except KeyboardInterrupt: pass - task_monitor.set_result(None) + + # task_monitor.set_result(None) discovery.stop() # log.info("Pending tasks at exit: {}".format(asyncio.Task.all_tasks(loop))) loop.close() diff --git a/evm/p2p/kademlia.py b/evm/p2p/kademlia.py index e145a6270e..339b7ba8ec 100644 --- a/evm/p2p/kademlia.py +++ b/evm/p2p/kademlia.py @@ -74,11 +74,10 @@ def __hash__(self): class KBucket(object): - """ Each k-bucket is kept sorted by time last seen—least-recently seen node at the head, most-recently seen at the tail. For small values of i, the k-buckets will generally - be empty (as no appro- priate nodes will exist). For large values of i, the lists can + be empty (as no appropriate nodes will exist). For large values of i, the lists can grow up to size k, where k is a system-wide replication parameter. k is chosen such that any given k nodes are very unlikely to fail within an hour of each other (for example k = 20). @@ -100,24 +99,19 @@ def id_distance(self, id): return self.midpoint ^ id def nodes_by_id_distance(self, id): - assert is_integer(id) return sorted(self.nodes, key=operator.methodcaller('id_distance', id)) def split(self): - "split at the median id" - + """Split at the median id""" splitid = self.midpoint lower = KBucket(self.start, splitid) upper = KBucket(splitid + 1, self.end) - # distribute nodes for node in self.nodes: bucket = lower if node.id <= splitid else upper bucket.add_node(node) - # distribute replacement nodes for node in self.replacement_cache: bucket = lower if node.id <= splitid else upper bucket.replacement_cache.append(node) - return lower, upper def remove_node(self, node): @@ -130,23 +124,21 @@ def in_range(self, node): @property def is_full(self): - return len(self) == k_bucket_size + return len(self) == self.k def add_node(self, node): """ - If the sending node already exists in the recipient’s k- bucket, + If the sending node already exists in the recipient’s k-bucket, the recipient moves it to the tail of the list. - If the node is not already in the appropriate k-bucket - and the bucket has fewer than k entries, - then the recipient just inserts the new sender at the tail of the list. + If the node is not already in the appropriate k-bucket and the bucket has fewer than k + entries, then the recipient just inserts the new sender at the tail of the list. - If the appropriate k-bucket is full, however, - then the recipient pings the k-bucket’s least-recently seen node to decide what to do. + If the appropriate k-bucket is full, however, then the recipient pings the k-bucket’s + least-recently seen node to decide what to do. on success: return None on bucket full: return least recently seen Node for eviction check - """ self.last_updated = time.time() if node in self.nodes: # already exists @@ -159,14 +151,14 @@ def add_node(self, node): @property def head(self): - "least recently seen" + """Least recently seen""" return self.nodes[0] @property def depth(self): - """ - depth is the prefix shared by all nodes in bucket - i.e. the number of shared leading bits + """Depth is the prefix shared by all nodes in the bucket. + + i.e. The number of shared leading bits. """ def to_binary(x): # left padded bit representation b = bin(x)[2:] @@ -176,10 +168,12 @@ def to_binary(x): # left padded bit representation return k_id_size bits = [to_binary(n.id) for n in self.nodes] - for i in range(k_id_size): + for i in range(1, k_id_size + 1): if len(set(b[:i] for b in bits)) != 1: return i - 1 - raise Exception + # This means we have at least two nodes with the same ID, so raise an AssertionError + # because we don't want it to be caught accidentally. + raise AssertionError("Unable to calculate depth") def __contains__(self, node): return node in self.nodes @@ -188,7 +182,7 @@ def __len__(self): return len(self.nodes) -class RoutingTable(object): +class RoutingTable(): def __init__(self, node): self.this_node = node @@ -223,7 +217,7 @@ def add_node(self, node): if bucket.in_range(self.this_node) or (depth % k_b != 0 and depth != k_id_size): self.split_bucket(bucket) return self.add_node(node) # retry - # nothing added, ping eviction_candidate + # Nothing added, ping eviction_candidate return eviction_candidate return None # successfully added to not full bucket @@ -232,6 +226,7 @@ def bucket_by_node(self, node): if node.id < bucket.end: assert node.id >= bucket.start return bucket + # TODO (gsalgado): Use a meaningful exception. raise Exception def buckets_by_id_distance(self, id): @@ -270,16 +265,13 @@ def neighbours(self, node, k=k_bucket_size): class KademliaProtocol(): def __init__(self, node, wire): - assert isinstance(node, Node) # the local node self.this_node = node self.wire = wire self.routing = RoutingTable(node) self._expected_pongs = dict() # pingid -> (timeout, node, replacement_node) self._find_requests = dict() # nodeid -> timeout - self._deleted_pingids = set() def bootstrap(self, nodes): - assert isinstance(nodes, list) for node in nodes: if node == self.this_node: continue @@ -291,78 +283,70 @@ def update(self, node, pingid=None): When a Kademlia node receives any message (request or reply) from another node, it updates the appropriate k-bucket for the sender’s node ID. - If the sending node already exists in the recipient’s k- bucket, - the recipient moves it to the tail of the list. + If the sending node already exists in the recipient’s k-bucket, the recipient moves it to + the tail of the list. - If the node is not already in the appropriate k-bucket - and the bucket has fewer than k entries, - then the recipient just inserts the new sender at the tail of the list. + If the node is not already in the appropriate k-bucket and the bucket has fewer than k + entries, then the recipient just inserts the new sender at the tail of the list. - If the appropriate k-bucket is full, however, - then the recipient pings the k-bucket’s least-recently seen node to decide what to do. + If the appropriate k-bucket is full, however, then the recipient pings the k-bucket’s + least-recently seen node to decide what to do. - If the least-recently seen node fails to respond, - it is evicted from the k-bucket and the new sender inserted at the tail. + If the least-recently seen node fails to respond, it is evicted from the k-bucket and the + new sender inserted at the tail. - Otherwise, if the least-recently seen node responds, - it is moved to the tail of the list, and the new sender’s contact is discarded. + Otherwise, if the least-recently seen node responds, it is moved to the tail of the list, + and the new sender’s contact is discarded. - k-buckets effectively implement a least-recently seen eviction policy, - except that live nodes are never removed from the list. + k-buckets effectively implement a least-recently seen eviction policy, except that live + nodes are never removed from the list. """ - assert isinstance(node, Node) - if node == self.this_node: return - def _expected_pongs(): - return set(v[1] for v in self._expected_pongs.values()) - - if pingid and (pingid not in self._expected_pongs): - assert pingid not in self._expected_pongs + if pingid is not None and (pingid not in self._expected_pongs): return - # check for timed out pings and eventually evict them + # Check for timed out pings and eventually evict them for _pingid, (timeout, _node, replacement) in list(self._expected_pongs.items()): if time.time() > timeout: - self._deleted_pingids.add(_pingid) # FIXME this is for testing + log.debug('evicting expected pong', remote=_node) del self._expected_pongs[_pingid] self.routing.remove_node(_node) if replacement: self.update(replacement) + # XXX (gsalgado): Not sure it's correct to return here? return - if _node == node: # prevent node from being added later + if _node == node: + # Prevent node from being added later. + # XXX (gsalgado): Not sure it's correct to return here? return # if we had registered this node for eviction test if pingid in self._expected_pongs: timeout, _node, replacement = self._expected_pongs[pingid] if replacement: + # FIXME (gsalgado): Instead of directly accessing the bucket's replacement cache + # we should have an API for that. self.routing.bucket_by_node(replacement).replacement_cache.append(replacement) del self._expected_pongs[pingid] - # add node eviction_candidate = self.routing.add_node(node) if eviction_candidate: - # protocol should ping bucket head and evict if there is no response self.ping(eviction_candidate, replacement=node) - # check for not full buckets and ping replacements + # Check for not full buckets and ping replacements for bucket in self.routing.not_full_buckets: for node in bucket.replacement_cache: self.ping(node) - # check idle buckets - """ - idle bucket refresh: - for each bucket which hasn't been touched in 3600 seconds - pick a random value in the range of the bucket and perform discovery for that value - """ + # For buckets that haven't been touched in 3600 seconds, pick a random value in the bucket's + # range and perform discovery for that value. for bucket in self.routing.idle_buckets: rid = random.randint(bucket.start, bucket.end) self.find_node(rid) - # check and removed timed out find requests + # Check and removed timed out find requests self._find_requests = { nodeid: timeout for nodeid, timeout in self._find_requests.items() diff --git a/evm/p2p/p2p.py b/evm/p2p/p2p.py index 2212fe0c21..6fed787f3d 100644 --- a/evm/p2p/p2p.py +++ b/evm/p2p/p2p.py @@ -1,12 +1,5 @@ import rlp -from rlp.utils import ( - bytes_to_str, - decode_hex, - encode_hex, - str_to_bytes, -) - from evm.p2p.protocol import ( BaseProtocol, Command, @@ -123,26 +116,3 @@ def create(self, proto, reason=reason.client_quitting): def receive(self, proto, data): proto.peer.report_error('disconnected %s' % self.reason_name(data['reason'])) proto.peer.stop() - - -node_uri_scheme = 'enode://' - - -def host_port_pubkey_from_uri(uri): - b_node_uri_scheme = str_to_bytes(node_uri_scheme) - assert uri.startswith(b_node_uri_scheme) and \ - b'@' in uri and b':' in uri, uri - pubkey_hex, ip_port = uri[len(b_node_uri_scheme):].split(b'@') - # XXX(gsalgado): commented out because our pubkeys have len == 65 - # assert len(pubkey_hex) == 2 * 512 // 8 - ip, port = ip_port.split(b':') - return ip, port, decode_hex(pubkey_hex) - - -def host_port_pubkey_to_uri(host, port, pubkey): - # XXX(gsalgado): commented out because our pubkeys have len == 65 - # assert len(pubkey) == 512 // 8 - uri = '{}{}@{}:{}'.format(node_uri_scheme, - bytes_to_str(encode_hex(pubkey)), - str(host), port) - return str_to_bytes(uri) diff --git a/evm/p2p/test_kademlia.py b/evm/p2p/test_kademlia.py new file mode 100644 index 0000000000..4010087006 --- /dev/null +++ b/evm/p2p/test_kademlia.py @@ -0,0 +1,76 @@ +import random + +from evm.p2p import kademlia +from evm.utils.numeric import int_to_big_endian + + +def test_kbucket_add_node(): + bucket = kademlia.KBucket(0, 100) + node = random_node() + assert bucket.add_node(node) is None + assert bucket.nodes == [node] + + node2 = random_node() + assert bucket.add_node(node2) is None + assert bucket.nodes == [node, node2] + assert bucket.head == node + + assert bucket.add_node(node) is None + assert bucket.nodes == [node2, node] + assert bucket.head == node2 + + bucket.k = 2 + node3 = random_node() + assert bucket.add_node(node3) == node2 + assert bucket.nodes == [node2, node] + assert bucket.head == node2 + + +def test_kbucket_split(): + bucket = kademlia.KBucket(0, 100) + for i in range(1, bucket.k + 1): + node = random_node() + # Set the IDs of half the nodes below the midpoint, so when we split we should end up with + # two buckets containing k/2 nodes. + if i % 2 == 0: + node.id = bucket.midpoint + i + else: + node.id = bucket.midpoint - i + bucket.add_node(node) + assert bucket.is_full + bucket1, bucket2 = bucket.split() + assert bucket1.start == 0 + assert bucket1.end == 50 + assert bucket2.start == 51 + assert bucket2.end == 100 + assert len(bucket1) == bucket.k / 2 + assert len(bucket2) == bucket.k / 2 + + +def test_kbucket_depth(): + bucket = kademlia.KBucket(0, 100) + + # For buckets with less than 2 nodes, the depth is k_id_size. + assert bucket.depth == kademlia.k_id_size + assert bucket.add_node(random_node()) is None + assert bucket.depth == kademlia.k_id_size + + # Otherwise the depth is the number of leading bits (in the left-padded binary representation) + # shared by all node IDs. + assert bucket.add_node(random_node()) is None + bucket.nodes[0].id = int('0b1', 2) + bucket.nodes[1].id = int('0b0', 2) + assert bucket.depth == kademlia.k_id_size - 1 + + bucket.nodes[0].id = int('0b010', 2) + bucket.nodes[1].id = int('0b110', 2) + assert bucket.depth == kademlia.k_id_size - 3 + + +def random_pubkey(): + pk = int_to_big_endian(random.getrandbits(kademlia.k_pubkey_size)) + return b'\x00' * (kademlia.k_pubkey_size // 8 - len(pk)) + pk + + +def random_node(): + return kademlia.Node(random_pubkey()) From d58db6410de18c2f78b2dbfc66a3398bc8d57e8e Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Fri, 25 Aug 2017 08:57:47 -0300 Subject: [PATCH 10/10] Some cleanups and tests for RoutingTable --- evm/p2p/kademlia.py | 41 ++++++++++++++---------------- evm/p2p/test_kademlia.py | 54 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/evm/p2p/kademlia.py b/evm/p2p/kademlia.py index 339b7ba8ec..d1e657550d 100644 --- a/evm/p2p/kademlia.py +++ b/evm/p2p/kademlia.py @@ -23,7 +23,7 @@ from evm.utils.keccak import keccak from evm.utils.numeric import big_endian_to_int -# XXX: Most of this code was lifted from pydevp2p +# TODO: Setup a logger using the standard logging module. from structlog import get_logger log = get_logger() @@ -207,12 +207,13 @@ def remove_node(self, node): self.bucket_by_node(node).remove_node(node) def add_node(self, node): - assert node != self.this_node + if node == self.this_node: + raise ValueError("Cannot add this_node to routing table") bucket = self.bucket_by_node(node) eviction_candidate = bucket.add_node(node) - if eviction_candidate: # bucket is full - # split if the bucket has the local node in its range - # or if the depth is not congruent to 0 mod k_b + if eviction_candidate is not None: # bucket is full + # Split if the bucket has the local node in its range or if the depth is not congruent + # to 0 mod k_b depth = bucket.depth if bucket.in_range(self.this_node) or (depth % k_b != 0 and depth != k_id_size): self.split_bucket(bucket) @@ -226,11 +227,9 @@ def bucket_by_node(self, node): if node.id < bucket.end: assert node.id >= bucket.start return bucket - # TODO (gsalgado): Use a meaningful exception. - raise Exception + raise ValueError("No bucket found for node with id {}".format(node.id)) def buckets_by_id_distance(self, id): - assert is_integer(id) return sorted(self.buckets, key=operator.methodcaller('id_distance', id)) def __contains__(self, node): @@ -244,22 +243,18 @@ def __iter__(self): for n in b.nodes: yield n - def neighbours(self, node, k=k_bucket_size): - """ - sorting by bucket.midpoint does not work in edge cases - build a short list of k * 2 nodes and sort and shorten it - """ - assert isinstance(node, Node) or is_integer(node) - if isinstance(node, Node): - node = node.id + def neighbours(self, nodeid, k=k_bucket_size): + """Return up to k neighbours of the given node.""" nodes = [] - for bucket in self.buckets_by_id_distance(node): - for n in bucket.nodes_by_id_distance(node): - if n is not node: + # Sorting by bucket.midpoint does not work in edge cases, so build a short list of k * 2 + # nodes and sort it by id_distance. + for bucket in self.buckets_by_id_distance(nodeid): + for n in bucket.nodes_by_id_distance(nodeid): + if n is not nodeid: nodes.append(n) if len(nodes) == k * 2: break - return sorted(nodes, key=operator.methodcaller('id_distance', node))[:k] + return sorted(nodes, key=operator.methodcaller('id_distance', nodeid))[:k] class KademliaProtocol(): @@ -386,8 +381,10 @@ def recv_pong(self, remote, echoed): log.debug('<<< pong', remoteid=remote) assert remote != self.this_node pingid = self._mkpingid(echoed, remote) - # update address (clumsy fixme) - if hasattr(remote, 'address'): # not available in tests + # FIXME: This method is called from DiscoveryProtocol, so it is usually passed a + # discovery.Node instance, and this is ensuring that is really the case before attempting + # to update the node's address. Maybe we should add the .address field to kademlia.Node + if hasattr(remote, 'address'): nnodes = self.routing.neighbours(remote) if nnodes and nnodes[0] == remote: nnodes[0].address = remote.address # updated tcp address diff --git a/evm/p2p/test_kademlia.py b/evm/p2p/test_kademlia.py index 4010087006..4fe6612d84 100644 --- a/evm/p2p/test_kademlia.py +++ b/evm/p2p/test_kademlia.py @@ -1,9 +1,56 @@ import random +import pytest + from evm.p2p import kademlia from evm.utils.numeric import int_to_big_endian +def test_routingtable_split_bucket(): + table = kademlia.RoutingTable(random_node()) + assert len(table.buckets) == 1 + bucket = table.buckets[0] + table.split_bucket(bucket) + assert len(table.buckets) == 2 + assert bucket not in table.buckets + + +def test_routingtable_add_node(): + table = kademlia.RoutingTable(random_node()) + for i in range(table.buckets[0].k): + # As long as the bucket is not full, the new node is added to the bucket and None is + # returned. + assert table.add_node(random_node()) is None + assert len(table.buckets) == 1 + assert len(table) == i + 1 + assert table.buckets[0].is_full + # Now that the bucket is full, an add_node() should cause it to be split. + assert table.add_node(random_node()) is None + + +def test_routingtable_add_node_error(): + table = kademlia.RoutingTable(random_node()) + with pytest.raises(ValueError): + table.add_node(random_node(kademlia.k_max_node_id + 1)) + + +def test_routingtable_neighbours(): + table = kademlia.RoutingTable(random_node()) + for i in range(1000): + assert table.add_node(random_node()) is None + assert i == len(table) - 1 + + for i in range(100): + node = random_node() + nearest_bucket = table.buckets_by_id_distance(node.id)[0] + if not nearest_bucket.nodes: + continue + # Change nodeid to something in this bucket. + node_a = nearest_bucket.nodes[0] + node_b = random_node(node_a.id + 1) + assert node_a == table.neighbours(node_b.id)[0] + + def test_kbucket_add_node(): bucket = kademlia.KBucket(0, 100) node = random_node() @@ -72,5 +119,8 @@ def random_pubkey(): return b'\x00' * (kademlia.k_pubkey_size // 8 - len(pk)) + pk -def random_node(): - return kademlia.Node(random_pubkey()) +def random_node(nodeid=None): + node = kademlia.Node(random_pubkey()) + if nodeid is not None: + node.id = nodeid + return node