diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 46aa3a845..eb69df328 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -57,11 +57,11 @@ test:other-integration: - pip install tox-travis - tox -e other -test:json-api: - stage: test - script: - - make install tools - - HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py +#test:json-api: +# stage: test +# script: +# - make install tools +# - HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py diff --git a/lbry/wallet/checkpoints.py b/lbry/wallet/checkpoints.py index 3b33bd5fb..623369317 100644 --- a/lbry/wallet/checkpoints.py +++ b/lbry/wallet/checkpoints.py @@ -740,4 +740,145 @@ HASHES = { 738000: 'aebdf15b23eb7a37600f67d45bf6586b1d5bff3d5f3459adc2f6211ab3dd0bcb', 739000: '3f5a894ac42f95f7d54ce25c42ea0baf1a05b2da0e9406978de0dc53484d8b04', 740000: '55debc22f995d844eafa0a90296c9f4f433e2b7f38456fff45dd3c66cef04e37', + 741000: '927b47fc909b4b55c067bbd75d8638af1400fac076cb642e9500a747d849e458', + 742000: '97fa3d83eb94114496e418c118f549ebfb8f6d123d0b40a12ecb093239557646', + 743000: '482b66d8d5084703079c28e3ae69e5dee735f762d6fcf9743e75f04e139fd181', + 744000: 'f406890d5c70808a58fb14429bad812a3185bdb9dace1aa57de76663f92b5013', + 745000: '2bd0802cbb8aa4441a159104d39515a4ff6fc8dfe616bc83e88197847c78bcff', + 746000: '24d090a7b6359db3d5d714a69ddc9a6f2e8ff8f044b723220a8ba32df785fd54', + 747000: '07c4ce9ce5310ee472cf753ddb03c39c5fee6c910d491daffd38615205411633', + 748000: 'ea913798c0f09d0a27eae7c852954c2c88b8c3b7f23f8fba26b68a3952d0ffde', + 749000: '23f256adebfe35d49ba84ad49f3f71fc67f7745091c91f22e65f1cc2e23b8f2c', + 750000: '96db12ee3a295f3d5c56d244e6e7493f58c08d3427e379940e5d4f891a41ec26', + 751000: 'cedaf12415dac1314942e58ced80830b92fbfabc41f42a0b0f054f0672ef9822', + 752000: '293606bcd9fbbee5584724301b2cf86bb69204820023e1fb46c238ddfbc660ab', + 753000: 'f4d43cbb38b7d97919dedc0f5a6dc8007896c4f443b76f3e5693e25bc46760cf', + 754000: 'fcaad22fd815311280fe451086516375d1d9d92b2990c7c351407df5aa19011e', + 755000: 'b9276f10d1844cb5b0308766c8db960490ac34a73c4653d0a91202789a6ccb9b', + 756000: '2fe5581f1110c1c8dcea46cad647551bd6bd640cb37738d863e189bd8f368347', + 757000: 'b9d915f366f0b010429a52245b0fb02774157eb9fd8f66bce32dcd3acc71c2a1', + 758000: '62d1854fc15db56b5d0e05ceeb54c1297966bf9dc7f7a0a14b42c059fc485d1b', + 759000: 'f4ca9f69d16d092f4a0ea5102e6343b21204c4ea9cd9b22cddd77dbb5d68ade3', + 760000: 'df3bb86641330d8cc7f55a2fd0da28251219e95babe960a308b18e08a7d88fc8', + 761000: 'a93029475de4bc7569b6ae802d658cd91c84cc253772712a279f140a6c3b91b1', + 762000: '307e289dc6ec8bcd62ca8831e4159d5edd780f2fae55ba55dd446225450f46f8', + 763000: '293f73514abca24f374473bd0394179812952a04ea13dc60ef5ada5331fa274f', + 764000: 'dd8b082db9281e3d9bacf15d6b352fda186d2d2923c7731844d0d4764dd71db8', + 765000: '201239e562d2571bf47347b3522fff89632aecea3b2d8cef05151f88b2b0bcdb', + 766000: '4a55a538b51b5650979e64521998cd5c5ad055ba9f3ac0e3e2a28febc6cc2798', + 767000: '3916666f2adbb05ea98ec1961f9546b9afa0f6910ec95e42ce37267f2ae4f79c', + 768000: 'dc0ad881eedcb5fd4954238f462080d6e7636b058d481698ed1c077e0ce2207e', + 769000: 'eaf10a1e1ec6e129289b8479a05df03e0808f1f0946f1995de6524e9ebe7a461', + 770000: '7200c64f22e32de7f999583361c933680fc9a2ffcb9a5ab73d3076fd49ec7537', + 771000: 'd883111a2eeacff80ce31df35ab6c943805b9e48877b413fccf371e5dbfa7fb2', + 772000: '3977d3c60edb9c80c97bb2b759b1659cbb650ad2d3a6f61d2caec83f1b2ae84c', + 773000: '9c7175fb8646a1a82383b4c534fd01bcf92d65c43d87ae854d51a784b04dc77e', + 774000: 'e0e92485f86e5fffa87b3497424e43b02a37710517d9d3f272392e8cdc56e5e9', + 775000: '6395229113d3aa2105afbaeb8b59621a536fc61fe272314b2fc3bdda98dd66cc', + 776000: 'b4b00207328b5f032bd4f0b634f91323ff520ada8c8bfec241b23c8e4bfd5a4e', + 777000: '14cdc6f5f7b4bd5bad745dfe6fcd114e9194026412a2e1b3f345be2eef433d16', + 778000: 'd3cd7b68be504c32117b670d38d59d44b02dcf3d65811efc2ca5531d902623cc', + 779000: 'afcd220e4040cb5f92d4b38fc204e59822df2218f767f2c4b33597b238a35f77', + 780000: '78252a9cfc289a70192ed8dd3dddeb1b9a4f9b8eff9a5d0ac259b3254472cf68', + 781000: '02ebc3f17d947481a311b4771c254f1e002b6a9198d4a5258ce6c13165aadddc', + 782000: '8dd9f1f372ee6d688a0bcdc3b342c77804ba5a646a218be4bc2aa02d846206c0', + 783000: 'e46b0d02ec2ef488fae455665e107520e1bd2b4f35ca52af7ad8addd2f72fa73', + 784000: '9ee8a8de94231e3ae3a610b82fdbca48dc14d9b80791d20af6c365a31822df6f', + 785000: '21e1cc12def8173a50158b2833bd91a62140c61646f5e08aecaee3e6da20735e', + 786000: 'b3e659f84d73de42888cc0f2b69bae71dd5fa6756a437a4b21958b182faa316e', + 787000: 'a9be7ba00ea6a9ea6bd03d8412ec014ca7e8cda6bdc33382f165e702811b8836', + 788000: 'a4c14729f8a68c03f5a0ccd890ac6a92b39c143f1f752fe81ad051eb52d8dce0', + 789000: '5cf66d224e5645097efc9c3c0392b51c8ca8ea1295151921a7912a2f04ee1274', + 790000: '676769ade71c33bc102bce416e66eb2c6794b03d7b8f5a590c87c380da463775', + 791000: '0228e074451797bf6bfbc941bcafcbadc972d32e4e1e0c5da015513f65714217', + 792000: '0fa3d00a1f19c5ac060e10a410cf7cea18eac5f89018d79ce51ac3fc66bbb365', + 793000: '5f68d0868b424e32f5ce3d8e7d9f18979da7b831b8ef4e3974d62fb20ff53a97', + 794000: '34508c56423739c00a837801b654b07decb274d02b383eff396d23c4d64bc0e9', + 795000: '7f70910c855d1fd88cd7f9be8a3b94314ee408a31a2da6301404bf8deb07c12c', + 796000: 'b74ab8813b1d2a0967fea0e66597572e5f0b5a285e21f5150fcc9d5f757de130', + 797000: 'bba27b1491d907ab1baa456cb651dc5b071231b1b6ad27b62d351ca12c25dbfd', + 798000: 'e75dcb15b2fc91f02e75e600dde9f6f46c09672533bc82a5d6916c4a2cd8613a', + 799000: 'adf62c826a3e0b33af439a7881918ae4ce19c5fb2ca37d21243415f7d716aa65', + 800000: 'd8f0ca13a8c8a19c254a3a6ba15150a34711dca96f2d877162cc44aa2acfb268', + 801000: '2a8c7104c4040a2bc31913ae25e9361df5bac9477368c708f86c1ca640480887', + 802000: '1f3b09d3561c4a8a056b263289bd492dc6c0d604c3fa195935e735d1c0ddc40e', + 803000: '037769628c40a701fdb4b16d79084b8fbb319fde79770a7ac842f3cdc813099e', + 804000: 'a0c6a089e5fa1e3589ca282085fe7201a5705776d81b257ffd252b2947fa6428', + 805000: 'b2ac99bfc4a488e7b7624b31ee061991a6dd0881bb005cd13f3dd2e66a08fe19', + 806000: 'ffe63cb999a278280b80a667d2dcb60c40e43a53f733914d8bec808b694ebf83', + 807000: 'eddb09fc6c4869a59b520d0befb1fb6ac952333f3cc5de086539c85ea8558778', + 808000: '0f4fb3f9172e52897ea992d9f3a2024126c4d2e63e9888739f11fb1f5e4c1f46', + 809000: '9641dd720d23ced2f1cb6e5cf46ac4e547afb9f56263c4cf58e3b19d407cf401', + 810000: 'de6dc953acd7e5ef213b3aaf1c4a9ee1d5b756bfce5525ee105214647e243a85', + 811000: 'c52c83712ca12b24b2db1b4a575e7f352b1d560cbf702e121a03bdca9e8be23d', + 812000: '83143734bb965318a53a38a7e403dcdb3e3fadedb01ab12c370417fc2a0655c0', + 813000: 'e480deff10c5a84fc957e3aed936690e24b74dd08fa8858a8a953c2f7383b914', + 814000: '810d33afcee07b9abe16c6cdc3a041038daa131c476b0daf48a080007f08b490', + 815000: 'b4aeb9e16fddd27844b2d56bc2b221134039bb5642c9e9ba88372afbdeac3972', + 816000: '86e73b67aae3d248011b8f66ed414cb8a9ba4b2a3cf7e32773cfbff055d719b7', + 817000: '3ebb8b83752b48242016cb682f0f6bd14e15371bf1163a5933193eaa0edeb351', + 818000: '4d925e17f642f220bbf317d3d5355d2f41fbce325f190f8c3b32dc0b337d24d6', + 819000: 'b9cc126d620f6b99d90a00d35957b0e428aaaa7c986bc9e816a60e4334572961', + 820000: '9c2f8c142bed1f94dca29276f7c83958be8cfe11773bb9b56c808fbcf7d3b1f8', + 821000: 'e5509eb98895cfa12a8da5d54c1df3f52472ffcbdf707adbf84a4a9c5d356203', + 822000: '764aada4802ebfe4ef935ab50af06a4f83aa556c49fdde3d9e12e1abd230c16b', + 823000: '1dbd745c2e96a365d865f990d109137d32d42977f503af55d8c00b109d31d3c3', + 824000: '954304a0b0c8f549c3bffd5ff46b5b8f05b0f0fde2a36f24fd5af9d774fb3079', + 825000: '17808b14f2056c1a5d46cb7617e9de9be6a1a6084edbc1bdb778586467a72297', + 826000: '3ca1167d4cac8b187829b23001b438617c43704b42462c4eb001b0d434cb9651', + 827000: '246d1607245e4a202f420393ac2e30e9cbf5eb5570dc997073b897f6d8643023', + 828000: '1764730a8dc3e89d02d168ff6bb54e8c903820b74711af6ff27bd0c8545577e7', + 829000: 'd9f3ab0cd823c6305bd8b95a96188bb4f2ca90b4d66c5d12293e8b6192bac0f2', + 830000: 'd4ff51f0092b04aedf8d39937680d8e8309b1be21d36e7833ed36f8e30aad6ea', + 831000: '3e92e76721b962396dce52993fa7606552f0907b38f7b2bd7b21ada98c145f47', + 832000: 'df12fcdb4cbe53ba627ace6de898298de175f8671d3d90170732d110fcdc34b8', + 833000: '25167ff38ae4a5964b618cabe0a12d4de62ac7a4c47448cdb4499e09e108d5b9', + 834000: 'd31f5309ea179a1e386e835fc372e47dcda6871a3a239abfba50c4f368994f13', + 835000: 'aff7e8dd3e55ea807fcbe284014075f420b3a23f1b0eb47bacdc1c91d2899813', + 836000: '3b5ac6d64c470739bb17d1544a285affb40f2d33e92687e5ba7c5ac602e0d72a', + 837000: 'd5619cbfe4f27c55f2bf9351b4891636cf64fef88212a5eeeae7bd3de47fe0bd', + 838000: '1f9102a49c6ac470cb5d0050e5300b1443840d6d65719b835e3bea484aafb2ec', + 839000: '3f63e391f0fbc5787fbe4ace3bada3816261294ea1c6ee435001801023682f90', + 840000: '777894fd12bd0d6dee7bcde2995c68e55e7094e3122da38571e4b6c4304b75e0', + 841000: 'ceb0c598c788e25e43e25aa4beff5c7377035824844cf1675eaea537074df028', + 842000: '8661cf2065dc713d2ba043f0b81f0effcc940eeb3e91906a21ff22c210561dcd', + 843000: '0dc2766f90415009d0c86bedffee6ebcf58042eb08262c0c67c4e9ed86b2aec8', + 844000: '26d072da864cab268a12794977b04ec44fb69ef3978e2342e82225974dac54dd', + 845000: '95e93bb60be8d5f07a1f4d26290c914957a82fc9d26ae8a3f20082eda27406ff', + 846000: 'f1bdc39af7705e58ab8b6c31dc70dce1e115db1cfd8cc9b037949dfbec82a59a', + 847000: 'f5f10f06396ecf2765d8a081141d489737c1d8d57c281f28f57c4cb2f90db883', + 848000: '331b8ef08605bae8d749893af9ed54f0df4f07a5a002108a2a0aea82d0360979', + 849000: '75b5f6233ab9a1bbc3c8b2893e5b22a0aa98e7ea635261255dc3c281f67d2260', + 850000: '5d7e6fe83e0ea1910a54a00090704737671d6f44df4228e21440ad1fc15e595f', + 851000: '7822db25d3ff0f6695ee38bad91edf317b5c6611673d28f1d22053110bb558be', + 852000: '2f0effad83a3561fc1a2806a562786a641d9ddb18d16bb9308006e7d324a21e9', + 853000: 'f603b2eaff11d5296377d990651317d40a1b2599ad2c5250eab131090f4b9458', + 854000: '34d59b26a50f18a9f250736d0f2e69d28b7e196fbef9b8a26c6b0b75c16aa194', + 855000: '76dd1ffff3946c0878969886fcf177ce5ab5560df19ddf006f9bcb02ae3e4e4f', + 856000: '74ff0b6f64e9dd5802fec2aac1d3ae194d28b9264114adaf0a882b46c8c918fe', + 857000: '7b5badfa2e4f40aa597a504d7ebe83c3705a2c6169a8c168ce293db223bc2d32', + 858000: '2bb0767a0f72b20d45ecfc3e34517dbda16d85758e040cf0e147f4cbd0cc57ac', + 859000: '3d741b9c365a91ed76f85824b94d19ec19b608d232660840ba59c7aa4b2cb67f', + 860000: 'd481a5a117878c0e3acd1f5844e150fb30e617577947d9846b1d214d703b71b0', + 861000: '54033424e488a3f1ad6946d4a6d9acb48465d6b1dbe8e1c2504a54cc84d7cad4', + 862000: '464bc3820a8cc8844dc9e26c388009e9982c656d46ef4b4fd0a2cb0e4eea0aaa', + 863000: 'd1aa94be2174f66780c4f226b9da3f6712b0f37af8dec33360bea83ca261b342', + 864000: '8c16008f11de5bc395d88cd802514ff647450f1bc136724b9aaf2ccce10a494f', + 865000: '3dae86012e97a201e2e1a47c899001ac00f78dc108026ed7c4194858c6c6dd5a', + 866000: 'afe5b0ccab995e1a1fa25fbc24c1d4b1a92c43042d03395f8743dcd806e72fd8', + 867000: 'c83716ac171aa9ab0d414833db340fa30e82bfda6cc616d3038529caab9b5600', + 868000: '8c409fe03cd35ef2d8e366818788b40eaeb4c8f6ae91450d75f4a66ca5f69cad', + 869000: '1d47909ceba790b8e1ce2e9902ee2775ea99e58efdb95668f9803a8ccf95f286', + 870000: '9adf5da1476388f053aa42de636da169d1cf1c9652cdf7cd9ad4fb18a0eb3388', + 871000: '8ad57fb1e74bcba0b5614fbac003be2bb32275dd85b38f2d28a0585005a99cfc', + 872000: '84a32e92012a356106e9657da8dab1a5491ea588fc29d411c69b20680c666420', + 873000: 'adf5921bbbfaa43929f67e6a070975313b77b456e262c700a27be611fceb17ae', + 874000: '09eaa7c4b18c79a46a2895190333f72336826223d5c986849a06f5153f49f2a5', + 875000: '235d7e4f31966507312149ea4c5e294aa84c695cf840117f0ef5963be7a0bda1', + 876000: '9aa9cb806ccbec0475ac330b496c5b2edeba38ba3f1e13ddd54a01457634a288', + 877000: 'c1e7f9b2b20bb1c4c0deadbc786d31fdf36f262325342aa23d1a66e2846b22bc', + 878000: 'ee0d2b20ac28ce23ab38698a57c6beff14f12b7af9d027c05cc92f652695f46b', + 879000: '0eb0810f4b81d1845b0a88f05449408df2e45715c9210a656f45278c5fdf7956', + 880000: 'e7d613027e3b4ca38d09bbef07998b57db237c6d67f1e8ea50024d2e0d9a1a72', + 881000: '21af4d355d8756b8bf0369b2d79b5c824148ae069026ba5c14f9dd6b7555e1db', } diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index aef2c6811..fb6c33648 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -430,7 +430,7 @@ class SQLiteMixin: return await self.db.executescript('\n'.join( f"DROP TABLE {table};" for table in tables - )) + ) + '\n' + 'PRAGMA WAL_CHECKPOINT(FULL);' + '\n' + 'VACUUM;') await self.db.execute(self.CREATE_VERSION_TABLE) await self.db.execute("INSERT INTO version VALUES (?)", (self.SCHEMA_VERSION,)) await self.db.executescript(self.CREATE_TABLES_QUERY) @@ -574,7 +574,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L class Database(SQLiteMixin): - SCHEMA_VERSION = "1.4" + SCHEMA_VERSION = "1.5" PRAGMAS = """ pragma journal_mode=WAL; diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index d30d018e7..5f646d826 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -123,7 +123,6 @@ class Ledger(metaclass=LedgerRegistry): self.network: Network = self.config.get('network') or Network(self) self.network.on_header.listen(self.receive_header) self.network.on_status.listen(self.process_status_update) - self.network.on_connected.listen(self.join_network) self.accounts = [] self.fee_per_byte: int = self.config.get('fee_per_byte', self.default_fee_per_byte) @@ -156,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 10_000)) + self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024)) self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() @@ -329,6 +328,8 @@ class Ledger(metaclass=LedgerRegistry): await self.network.on_connected.first async with self._header_processing_lock: await self._update_tasks.add(self.initial_headers_sync()) + self.network.on_connected.listen(self.join_network) + asyncio.ensure_future(self.join_network()) await fully_synced await self.db.release_all_outputs() await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts)) @@ -414,6 +415,7 @@ class Ledger(metaclass=LedgerRegistry): "Blockchain Reorganization: attempting rewind to height %s from starting height %s", height, height+rewound ) + self._tx_cache.clear() else: raise IndexError(f"headers.connect() returned negative number ({added})") @@ -512,10 +514,9 @@ class Ledger(metaclass=LedgerRegistry): ) return True - acquire_lock_tasks = [] + synced_txs = [] to_request = {} pending_synced_history = {} - updated_cached_items = {} already_synced = set() already_synced_offset = 0 @@ -525,24 +526,6 @@ class Ledger(metaclass=LedgerRegistry): already_synced.add((txid, remote_height)) already_synced_offset += 1 continue - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - - unsynced_offset = already_synced_offset - for txid, remote_height in remote_history[already_synced_offset:]: - cache_item = self._tx_cache[txid] - if cache_item.tx is not None and cache_item.tx.height >= remote_height \ - and (not cache_item.tx.is_verified or remote_height < 1): - pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:' - already_synced.add((txid, cache_item.tx.height)) - else: - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - unsynced_offset += 1 - - if acquire_lock_tasks: - await asyncio.wait(acquire_lock_tasks) tx_indexes = {} @@ -550,22 +533,25 @@ class Ledger(metaclass=LedgerRegistry): tx_indexes[txid] = i if (txid, remote_height) in already_synced: continue - cache_item = self._tx_cache.get(txid) - cache_item.pending_verifications += 1 - updated_cached_items[txid] = cache_item - - assert cache_item is not None, 'cache item is none' - assert cache_item.lock.locked(), 'cache lock is not held?' - to_request[i] = (txid, remote_height) log.debug( - "request %i transactions, %i/%i for %s are already synced", len(to_request), - len(pending_synced_history), len(remote_history), address + "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), + len(remote_history), address ) - requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) - for tx in requested_txes: + remote_history_txids = set(txid for txid, _ in remote_history) + async for tx in self.request_synced_transactions(to_request, remote_history_txids, address): pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" + synced_txs.append(tx) + if len(synced_txs) >= 100: + log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request)) + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), "" + ) + while synced_txs: + tx = synced_txs.pop() + self._on_transaction_controller.add(TransactionEvent(address, tx)) + log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request)) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" @@ -576,21 +562,11 @@ class Ledger(metaclass=LedgerRegistry): if f"{txid}:{height}:" != pending_synced_history[i]: log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i]) synced_history += pending_synced_history[i] - - cache_size = self.config.get("tx_cache_size", 10_000) - for txid, cache_item in updated_cached_items.items(): - cache_item.pending_verifications -= 1 - if cache_item.pending_verifications < 0: - log.warning("config value tx cache size %i needs to be increased", cache_size) - cache_item.pending_verifications = 0 - try: - cache_item.lock.release() - except RuntimeError: - log.warning("lock was already released?") - await self.db.save_transaction_io_batch( - [], address, self.address_to_hash160(address), synced_history + synced_txs, address, self.address_to_hash160(address), synced_history ) + while synced_txs: + self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -598,13 +574,6 @@ class Ledger(metaclass=LedgerRegistry): if address_manager is not None: await address_manager.ensure_address_gap() - for txid, cache_item in updated_cached_items.items(): - if self._tx_cache.get(txid) is not cache_item: - log.warning("tx cache corrupted while syncing %s, reattempt sync=%s", address, reattempt_update) - if reattempt_update: - return await self.update_history(address, remote_status, address_manager, False) - return False - local_status, local_history = \ await self.get_local_status_and_history(address, synced_history) @@ -638,151 +607,88 @@ class Ledger(metaclass=LedgerRegistry): async def maybe_verify_transaction(self, tx, remote_height, merkle=None): tx.height = remote_height - cached = self._tx_cache.get(tx.id) - if not cached: - # cache txs looked up by transaction_show too - cached = TransactionCacheItem() - self._tx_cache[tx.id] = cached - cached.tx = tx - if 0 < remote_height < len(self.headers) and cached.pending_verifications <= 1: + if 0 < remote_height < len(self.headers): # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case if not merkle: merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height) + if 'merkle' not in merkle: + return merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) header = await self.headers.get(remote_height) tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def _single_batch(self, batch, remote_heights, header_cache, transactions): - batch_result = await self.network.retriable_call( - self.network.get_transaction_batch, batch - ) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode() if isinstance(raw, bytes) else raw), - height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - transactions.append(tx) - return transactions - - async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): - header_cache = {} + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False): batches = [[]] remote_heights = {} - transactions = [] - heights_in_batch = 0 - last_height = 0 for txid, height in sorted(to_request, key=lambda x: x[1]): + if cached: + if txid in self._tx_cache: + if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + yield self._tx_cache[txid].tx + continue + else: + self._tx_cache[txid] = TransactionCacheItem() remote_heights[txid] = height - if height != last_height: - heights_in_batch += 1 - last_height = height - if len(batches[-1]) == 100 or heights_in_batch == 20: + if len(batches[-1]) == 100: batches.append([]) - heights_in_batch = 1 batches[-1].append(txid) if not batches[-1]: batches.pop() for batch in batches: - await self._single_batch(batch, remote_heights, header_cache, transactions) - return transactions + async for tx in self._single_batch(batch, remote_heights): + if cached: + self._tx_cache[tx.id].tx = tx + yield tx - async def _request_transaction_batch(self, to_request, remote_history_size, address): - header_cache = {} - batches = [[]] - remote_heights = {} - synced_txs = [] - heights_in_batch = 0 - last_height = 0 - for idx in sorted(to_request): - txid = to_request[idx][0] - height = to_request[idx][1] - remote_heights[txid] = height - if height != last_height: - heights_in_batch += 1 - last_height = height - if len(batches[-1]) == 100 or heights_in_batch == 20: - batches.append([]) - heights_in_batch = 1 - batches[-1].append(txid) - if not batches[-1]: - batches.pop() + async def request_synced_transactions(self, to_request, remote_history, address): + pending_sync = {} + async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): + pending_sync[tx.id] = tx + for f in asyncio.as_completed([self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values()]): + yield await f - last_showed_synced_count = 0 + async def _single_batch(self, batch, remote_heights): + heights = {remote_heights[txid] for txid in batch} + unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0]) + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + tx = Transaction(unhexlify(raw), height=remote_height) + await self.maybe_verify_transaction(tx, remote_height, merkle) + yield tx - async def _single_batch(batch): - transactions = await self._single_batch(batch, remote_heights, header_cache, []) - this_batch_synced = [] + async def _sync(self, tx, remote_history, pending_txs): + check_db_for_txos = {} + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid not in remote_history: + continue + if wanted_txid in pending_txs: + txi.txo_ref = pending_txs[wanted_txid].outputs[txi.txo_ref.position].ref + else: + check_db_for_txos[txi] = txi.txo_ref.id - for tx in transactions: - check_db_for_txos = [] - - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) - if cache_item is not None: - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref - else: - check_db_for_txos.append(txi.txo_ref.id) - - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } - - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - cache_item = self._tx_cache.get(txi.txo_ref.id) - if cache_item is None: - cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.ref - - synced_txs.append(tx) - this_batch_synced.append(tx) - - await self.db.save_transaction_io_batch( - this_batch_synced, address, self.address_to_hash160(address), "" + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=list(check_db_for_txos.values()), order_by='txo.txoid', no_tx=True ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in this_batch_synced - ]) - nonlocal last_showed_synced_count - if last_showed_synced_count + 100 < len(synced_txs): - log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) - last_showed_synced_count = len(synced_txs) + } - for batch in batches: - await _single_batch(batch) - - return synced_txs + for txi in check_db_for_txos: + if txi.txo_ref.id in referenced_txos: + txi.txo_ref = referenced_txos[txi.txo_ref.id].ref + else: + tx_from_db = await self.db.get_transaction(txid=txi.txo_ref.tx_ref.id) + if tx_from_db is None: + log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) + else: + txi.txo_ref = tx_from_db.outputs[txi.txo_ref.position].ref + return tx async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) @@ -850,10 +756,10 @@ class Ledger(metaclass=LedgerRegistry): include_received_tips=False) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? - txs = [] + txs: List[Transaction] = [] if len(outputs.txs) > 0: - txs: List[Transaction] = [] - txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs)))) + async for tx in self.request_transactions(tuple(outputs.txs), cached=True): + txs.append(tx) _txos, blocked = outputs.inflate(txs) @@ -1071,7 +977,7 @@ class Ledger(metaclass=LedgerRegistry): return self.db.get_channel_count(**constraints) async def resolve_collection(self, collection, offset=0, page_size=1): - claim_ids = collection.claim.collection.claims.ids[offset:page_size+offset] + claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset] try: resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids) except Exception as err: @@ -1120,7 +1026,7 @@ class Ledger(metaclass=LedgerRegistry): 'txid': tx.id, 'timestamp': ts, 'date': datetime.fromtimestamp(ts).isoformat(' ')[:-3] if tx.height > 0 else None, - 'confirmations': (headers.height+1) - tx.height if tx.height > 0 else 0, + 'confirmations': (headers.height + 1) - tx.height if tx.height > 0 else 0, 'claim_info': [], 'update_info': [], 'support_info': [], @@ -1130,7 +1036,7 @@ class Ledger(metaclass=LedgerRegistry): is_my_inputs = all([txi.is_my_input for txi in tx.inputs]) if is_my_inputs: # fees only matter if we are the ones paying them - item['value'] = dewies_to_lbc(tx.net_account_balance+tx.fee) + item['value'] = dewies_to_lbc(tx.net_account_balance + tx.fee) item['fee'] = dewies_to_lbc(-tx.fee) else: # someone else paid the fees @@ -1153,13 +1059,13 @@ class Ledger(metaclass=LedgerRegistry): if txi.txo_ref.txo is not None: other_txo = txi.txo_ref.txo if (other_txo.is_claim or other_txo.script.is_support_claim) \ - and other_txo.claim_id == txo.claim_id: + and other_txo.claim_id == txo.claim_id: previous = other_txo break if previous is not None: item['update_info'].append({ 'address': txo.get_address(self), - 'balance_delta': dewies_to_lbc(previous.amount-txo.amount), + 'balance_delta': dewies_to_lbc(previous.amount - txo.amount), 'amount': dewies_to_lbc(txo.amount), 'claim_id': txo.claim_id, 'claim_name': txo.claim_name, @@ -1237,7 +1143,7 @@ class Ledger(metaclass=LedgerRegistry): for account in accounts: balance = self._balance_cache.get(account.id) if not balance: - balance = self._balance_cache[account.id] =\ + balance = self._balance_cache[account.id] = \ await account.get_detailed_balance(confirmations, reserved_subtotals=True) for key, value in balance.items(): if key == 'reserved_subtotals': @@ -1247,7 +1153,6 @@ class Ledger(metaclass=LedgerRegistry): result[key] += value return result - class TestNetLedger(Ledger): network_name = 'testnet' pubkey_address_prefix = bytes((111,)) @@ -1256,7 +1161,6 @@ class TestNetLedger(Ledger): extended_private_key_prefix = unhexlify('04358394') checkpoints = {} - class RegTestLedger(Ledger): network_name = 'regtest' headers_class = UnvalidatedHeaders diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 939a7c880..19e7166c1 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -260,9 +260,9 @@ class Network: restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) - def get_transaction_batch(self, txids, restricted=True, session=None): + def get_transaction_batch(self, txids, restricted=True): # use any server if its old, otherwise restrict to who gave us the history - return self.rpc('blockchain.transaction.get_batch', txids, restricted, session) + return self.rpc('blockchain.transaction.get_batch', txids, restricted) def get_transaction_and_merkle(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index e1822e2b6..276c5fa15 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -40,7 +40,7 @@ class MockNetwork: merkle = await self.get_merkle(tx_hash, known_height) return tx, merkle - async def get_transaction_batch(self, txids): + async def get_transaction_batch(self, txids, restricted): return { txid: await self.get_transaction_and_merkle(txid) for txid in txids @@ -126,9 +126,7 @@ class TestSynchronization(LedgerTestCase): self.ledger.network.get_history_called = [] self.ledger.network.get_transaction_called = [] - self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified) - self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified) - self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified) + self.assertEqual(0, len(self.ledger._tx_cache)) await self.ledger.update_history(address, '') self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_transaction_called, [])