forked from cculianu/Fulcrum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathController.cpp
1477 lines (1355 loc) · 65.9 KB
/
Controller.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// Fulcrum - A fast & nimble SPV Server for Bitcoin Cash
// Copyright (C) 2019-2020 Calin A. Culianu <calin.culianu@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program (see LICENSE.txt). If not, see
// <https://www.gnu.org/licenses/>.
//
#include "BlockProc.h"
#include "BTC.h"
#include "Controller.h"
#include "Mempool.h"
#include "Merkle.h"
#include "SubsMgr.h"
#include "TXO.h"
#include "bitcoin/transaction.h"
#include "robin_hood/robin_hood.h"
#include <algorithm>
#include <cassert>
#include <iterator>
#include <list>
#include <map>
Controller::Controller(const std::shared_ptr<Options> &o)
: Mgr(nullptr), polltimeMS(int(o->pollTimeSecs * 1e3)), options(o)
{
setObjectName("Controller");
_thread.setObjectName(objectName());
}
Controller::~Controller() { Debug() << __func__; cleanup(); }
void Controller::startup()
{
/// Note: we tried doing this using AsyncOnObject rather than a signal/slot connection, but on Linux at least those
/// events arrive AFTER the signal/slot events do. So in order to make sure putBlock arrives BEFORE the
/// DownloadBlocksTask completes, we have to do this. On Windows and MacOS this was not an issue, just on Linux.
conns += connect(this, &Controller::putBlock, this, &Controller::on_putBlock);
stopFlag = false;
storage = std::make_shared<Storage>(options);
storage->startup(); // may throw here
bitcoindmgr = std::make_shared<BitcoinDMgr>(options->bitcoind.first, options->bitcoind.second, options->rpcuser, options->rpcpassword);
{
auto constexpr waitTimer = "wait4bitcoind", callProcessTimer = "callProcess";
int constexpr msgPeriod = 10000, // 10sec
smallDelay = 100;
// some setup code that waits for bitcoind to be ready before kicking off our "process" method
auto waitForBitcoinD = [this] {
lostConn = true;
stopTimer(pollTimerName);
stopTimer(callProcessTimer);
callOnTimerSoon(msgPeriod, waitTimer, []{ Log("Waiting for bitcoind..."); return true; }, false, Qt::TimerType::VeryCoarseTimer);
};
waitForBitcoinD();
conns += connect(bitcoindmgr.get(), &BitcoinDMgr::allConnectionsLost, this, waitForBitcoinD);
conns += connect(bitcoindmgr.get(), &BitcoinDMgr::gotFirstGoodConnection, this, [this](quint64 id) {
// connection to kick off our 'process' method once the first auth is received
if (lostConn) {
lostConn = false;
stopTimer(waitTimer);
Debug() << "Auth recvd from bicoind with id: " << id << ", proceeding with processing ...";
callOnTimerSoonNoRepeat(smallDelay, callProcessTimer, [this]{process();}, true);
}
});
conns += connect(bitcoindmgr.get(), &BitcoinDMgr::inWarmUp, this, [last = -1.0](const QString &msg) mutable {
// just print a message to the log as to why we keep dropping conn. -- if bitcoind is still warming up
auto now = Util::getTimeSecs();
if (now-last >= 1.0) { // throttled to not spam log
last = now;
Log() << "bitcoind is still warming up: " << msg;
}
});
}
bitcoindmgr->startup(); // may throw
// We defer listening for connections until we hit the "upToDate" state at least once, to prevent problems
// for clients.
auto connPtr = std::make_shared<QMetaObject::Connection>();
*connPtr = connect(this, &Controller::upToDate, this, [this, connPtr] {
// the below code runs precisely once after the first upToDate signal
if (connPtr) disconnect(*connPtr);
if (!srvmgr) {
if (!origThread) {
Fatal() << "INTERNAL ERROR: Controller's creation thread is null; cannot start SrvMgr, exiting!";
return;
}
masterNotifySubsFlag = true; // permanently latch this to true. notifications enabled.
srvmgr = std::make_unique<SrvMgr>(options, storage, bitcoindmgr);
// this object will live on our creation thread (normally the main thread)
srvmgr->moveToThread(origThread);
// now, start it up on our creation thread (normally the main thread)
Util::VoidFuncOnObjectNoThrow(srvmgr.get(), [this]{
// creation thread (normally the main thread)
try {
srvmgr->startup(); // may throw Exception, waits for servers to bind
} catch (const Exception & e) {
// exit app on bind/listen failure.
Fatal() << e.what();
}
}); // wait for srvmgr's thread (usually the main thread)
// connect the header subscribe signal
conns += connect(this, &Controller::newHeader, srvmgr.get(), &SrvMgr::newHeader);
}
}, Qt::QueuedConnection);
{
// logging/stats timers stuff
constexpr const char * mempoolLogTimer = "mempoolLogTimer";
constexpr int mempoolLogTimerTimeout = 10000; // 10 secs (the actual printing happens once every 30 seconds if changed)
// set up the mempool status log timer
conns += connect(this, &Controller::upToDate, this, [this]{
callOnTimerSoon(mempoolLogTimerTimeout, mempoolLogTimer, [this]{
printMempoolStatusToLog();
return true;
}, false, Qt::TimerType::VeryCoarseTimer);
});
conns += connect(this, &Controller::synchronizing, this, [this]{ stopTimer(mempoolLogTimer);});
conns += connect(bitcoindmgr.get(), &BitcoinDMgr::allConnectionsLost, this, [this]{ stopTimer(mempoolLogTimer);});
}
{
// set up periodic refresh of mempool fee histogram
constexpr const char *feeHistogramTimer = "feeHistogramTimer";
constexpr int feeHistogramTimerInterval = 30 * 1000; // every 30 seconds
conns += connect(this, &Controller::upToDate, this, [this] {
callOnTimerSoon(feeHistogramTimerInterval, feeHistogramTimer, [this]{
storage->refreshMempoolHistogram();
return true;
}, false, Qt::TimerType::VeryCoarseTimer);
});
// disable the timer if downloading blocks and restart it later when up-to-date
conns += connect(this, &Controller::synchronizing, this, [this]{ stopTimer(feeHistogramTimer); });
}
start(); // start our thread
}
void Controller::cleanup()
{
stopFlag = true;
stop();
tasks.clear(); // deletes all tasks asap
if (srvmgr) { Log("Stopping SrvMgr ... "); srvmgr->cleanup(); srvmgr.reset(); }
if (bitcoindmgr) { Log("Stopping BitcoinDMgr ... "); bitcoindmgr->cleanup(); bitcoindmgr.reset(); }
if (storage) { Log("Closing storage ..."); storage->cleanup(); storage.reset(); }
sm.reset();
}
/// Encapsulates basically the data returned from bitcoind by the getblockchaininfo RPC method.
/// This has been separated out into its own struct for future use to detect blockchain changes.
/// TODO: Refactor this out to storage, etc to detect when blockchain changed.
struct ChainInfo {
QString toString() const;
QString chain = "";
int blocks = 0, headers = -1;
QByteArray bestBlockhash; ///< decoded bytes
double difficulty = 0.0;
int64_t mtp = 0;
double verificationProgress = 0.0;
bool initialBlockDownload = false;
QByteArray chainWork; ///< decoded bytes
size_t sizeOnDisk = 0;
bool pruned = false;
QString warnings;
};
struct GetChainInfoTask : public CtlTask
{
GetChainInfoTask(Controller *ctl_) : CtlTask(ctl_, "Task.GetChainInfo") {}
~GetChainInfoTask() override { stop(); } // paranoia
void process() override;
ChainInfo info;
};
void GetChainInfoTask::process()
{
submitRequest("getblockchaininfo", {}, [this](const RPC::Message & resp){
const auto Err = [this, id=resp.id.toInt()](const QString &thing) {
const auto msg = QString("Failed to parse %1").arg(thing);
errorCode = id;
errorMessage = msg;
throw Exception(msg);
};
try {
bool ok = false;
const auto map = resp.result().toMap();
if (map.isEmpty()) Err("response; expected map");
info.blocks = map.value("blocks").toInt(&ok);
if (!ok || info.blocks < 0) Err("blocks"); // enforce positive blocks number
info.chain = map.value("chain").toString();
if (info.chain.isEmpty()) Err("chain");
info.headers = map.value("headers").toInt(); // error ignored here
info.bestBlockhash = Util::ParseHexFast(map.value("bestblockhash").toByteArray());
if (info.bestBlockhash.size() != HashLen) Err("bestblockhash");
info.difficulty = map.value("difficulty").toDouble(); // error ignored here
info.mtp = map.value("mediantime").toLongLong(); // error ok
info.verificationProgress = map.value("verificationprogress").toDouble(); // error ok
if (auto v = map.value("initialblockdownload"); v.canConvert<bool>())
info.initialBlockDownload = v.toBool();
else
Err("initialblockdownload");
info.chainWork = Util::ParseHexFast(map.value("chainwork").toByteArray()); // error ok
info.sizeOnDisk = map.value("size_on_disk").toULongLong(); // error ok
info.pruned = map.value("pruned").toBool(); // error ok
info.warnings = map.value("warnings").toString(); // error ok
if (Trace::isEnabled()) Trace() << info.toString();
emit success();
} catch (const Exception & e) {
Error() << "INTERNAL ERROR: " << e.what();
emit errored();
}
});
}
QString ChainInfo::toString() const
{
QString ret;
{
QTextStream ts(&ret, QIODevice::WriteOnly|QIODevice::Truncate);
ts << "(ChainInfo"
<< " chain: \"" << chain << "\""
<< " blocks: " << blocks
<< " headers: " << headers
<< " bestBlockHash: " << bestBlockhash.toHex()
<< " difficulty: " << QString::number(difficulty, 'f', 9)
<< " mtp: " << mtp
<< " verificationProgress: " << QString::number(verificationProgress, 'f', 6)
<< " ibd: " << initialBlockDownload
<< " chainWork: " << chainWork.toHex()
<< " sizeOnDisk: " << sizeOnDisk
<< " pruned: " << pruned
<< " warnings: \"" << warnings << "\""
<< ")";
}
return ret;
}
struct DownloadBlocksTask : public CtlTask
{
DownloadBlocksTask(unsigned from, unsigned to, unsigned stride, Controller *ctl);
~DownloadBlocksTask() override { stop(); } // paranoia
void process() override;
const unsigned from = 0, to = 0, stride = 1, expectedCt = 1;
unsigned next = 0;
std::atomic_uint goodCt = 0;
bool maybeDone = false;
const bool TRACE = Trace::isEnabled();
int q_ct = 0;
static constexpr int max_q = /*16;*/BitcoinDMgr::N_CLIENTS+1; // todo: tune this
static const int HEADER_SIZE;
std::atomic<size_t> nTx = 0, nIns = 0, nOuts = 0;
void do_get(unsigned height);
// basically computes expectedCt. Use expectedCt member to get the actual expected ct. this is used only by c'tor as a utility function
static size_t nToDL(unsigned from, unsigned to, unsigned stride) { return size_t( (((to-from)+1) + stride-1) / qMax(stride, 1U) ); }
// thread safe, this is a rough estimate and not 100% accurate
size_t nSoFar(double prog=-1) const { if (prog<0.) prog = lastProgress; return size_t(qRound(expectedCt * prog)); }
// given a position in the headers array, return the height
size_t index2Height(size_t index) { return size_t( from + (index * stride) ); }
// given a block height, return the index into our array
size_t height2Index(size_t h) { return size_t( ((h-from) + stride-1) / stride ); }
};
/*static*/ const int DownloadBlocksTask::HEADER_SIZE = BTC::GetBlockHeaderSize();
DownloadBlocksTask::DownloadBlocksTask(unsigned from, unsigned to, unsigned stride, Controller *ctl_)
: CtlTask(ctl_, QString("Task.DL %1 -> %2").arg(from).arg(to)), from(from), to(to), stride(stride), expectedCt(unsigned(nToDL(from, to, stride)))
{
FatalAssert( (to >= from) && (ctl_) && (stride > 0)) << "Invalid params to DonloadBlocksTask c'tor, FIXME!";
next = from;
}
void DownloadBlocksTask::process()
{
if (next > to) {
if (maybeDone) {
if (goodCt >= expectedCt)
emit success();
else {
errorCode = int(expectedCt - goodCt);
errorMessage = QString("missing %1 blocks").arg(errorCode);
emit errored();
}
}
return;
}
do_get(next);
next += stride;
}
void DownloadBlocksTask::do_get(unsigned int bnum)
{
if (ctl->isStopping()) return; // short-circuit early return if controller is stopping
if (unsigned msec = ctl->downloadTaskRecommendedThrottleTimeMsec(bnum); msec > 0) {
// Controller told us to back off because it is backlogged.
// Schedule ourselves to run again soon and return.
Util::AsyncOnObject(this, [this, bnum]{
do_get(bnum);
}, msec, Qt::TimerType::PreciseTimer);
return;
}
submitRequest("getblockhash", {bnum}, [this, bnum](const RPC::Message & resp){
QVariant var = resp.result();
const auto hash = Util::ParseHexFast(var.toByteArray());
if (hash.length() == HashLen) {
submitRequest("getblock", {var, false}, [this, bnum, hash](const RPC::Message & resp){
QVariant var = resp.result();
const auto rawblock = Util::ParseHexFast(var.toByteArray());
const auto header = rawblock.left(HEADER_SIZE); // we need a deep copy of this anyway so might as well take it now.
QByteArray chkHash;
if (bool sizeOk = header.length() == HEADER_SIZE; sizeOk && (chkHash = BTC::HashRev(header)) == hash) {
auto ppb = PreProcessedBlock::makeShared(bnum, size_t(rawblock.size()), BTC::Deserialize<bitcoin::CBlock>(rawblock)); // this is here to test performance
if (TRACE) Trace() << "block " << bnum << " size: " << rawblock.size() << " nTx: " << ppb->txInfos.size();
// update some stats for /stats endpoint
nTx += ppb->txInfos.size();
nOuts += ppb->outputs.size();
nIns += ppb->inputs.size();
const size_t index = height2Index(bnum);
++goodCt;
q_ct = qMax(q_ct-1, 0);
lastProgress = double(index) / double(expectedCt);
if (!(bnum % 1000) && bnum) {
emit progress(lastProgress);
}
if (TRACE) Trace() << resp.method << ": header for height: " << bnum << " len: " << header.length();
emit ctl->putBlock(this, ppb); // send the block off to the Controller thread for further processing and for save to db
if (goodCt >= expectedCt) {
// flag state to maybeDone to do checks when process() called again
maybeDone = true;
AGAIN();
return;
}
while (goodCt + unsigned(q_ct) < expectedCt && q_ct < max_q) {
// queue multiple at once
AGAIN();
++q_ct;
}
} else if (!sizeOk) {
Warning() << resp.method << ": at height " << bnum << " header not valid (decoded size: " << header.length() << ")";
errorCode = int(bnum);
errorMessage = QString("bad size for height %1").arg(bnum);
emit errored();
} else {
Warning() << resp.method << ": at height " << bnum << " header not valid (expected hash: " << hash.toHex() << ", got hash: " << chkHash.toHex() << ")";
errorCode = int(bnum);
errorMessage = QString("hash mismatch for height %1").arg(bnum);
emit errored();
}
});
} else {
Warning() << resp.method << ": at height " << bnum << " hash not valid (decoded size: " << hash.length() << ")";
errorCode = int(bnum);
errorMessage = QString("invalid hash for height %1").arg(bnum);
emit errored();
}
});
}
/// We use the "getrawmempool true" (verbose) call to get the initial list of mempool tx's. In pathological cases where
/// the mempool is extremely full this 1. wastes lots of CPU cycles parsing all that JSON and 2. may hit the limit on
/// Qt's ability to parse JSON (which is 128MB for the source text). See: https://bugreports.qt.io/browse/QTBUG-47629
/// For now we will continue to use that call because it provides useful info for detecting reorgs (such as height),
/// but we may want to think about not using that call and doing it the hard way from the non-verbose version (which
/// just returns a txhash list).
struct SynchMempoolTask : public CtlTask
{
SynchMempoolTask(Controller *ctl_, std::shared_ptr<Storage> storage)
: CtlTask(ctl_, "SynchMempool"), storage(storage)
{ scriptHashesAffected.reserve(SubsMgr::kRecommendedPendingNotificationsReserveSize); }
~SynchMempoolTask() override;
void process() override;
std::shared_ptr<Storage> storage;
bool isdlingtxs = false;
Mempool::TxMap txsNeedingDownload, txsWaitingForResponse;
using DldTxsMap = robin_hood::unordered_flat_map<TxHash, std::pair<Mempool::TxRef, bitcoin::CTransactionRef>, HashHasher>;
DldTxsMap txsDownloaded;
unsigned expectedNumTxsDownloaded = 0;
const bool TRACE = Trace::isEnabled(); // set this to true to print more debug
/// The scriptHashes that were affected by this refresh/synch cycle. Used for notifications.
std::unordered_set<HashX, HashHasher> scriptHashesAffected;
void clear() {
isdlingtxs = false;
txsNeedingDownload.clear(); txsWaitingForResponse.clear();
txsDownloaded.clear();
expectedNumTxsDownloaded = 0;
}
void doGetRawMempool();
void doDLNextTx();
void processResults();
};
SynchMempoolTask::~SynchMempoolTask() { stop(); } // paranoia
void SynchMempoolTask::process()
{
if (ctl->isStopping())
return; // short-circuit early return if controller is stopping
if (!isdlingtxs)
doGetRawMempool();
else if (!txsNeedingDownload.empty()) {
doDLNextTx();
} else if (txsWaitingForResponse.empty()) {
try {
processResults();
} catch (const std::exception & e) {
Error() << "Caught exception when processing mempool tx's: " << e.what();
emit errored();
return;
}
} else {
Error() << "Unexpected state in " << __PRETTY_FUNCTION__ << ". FIXME!";
emit errored();
return;
}
}
/// takes locks, prints to Log() every 30 seconds if there were changes
void Controller::printMempoolStatusToLog() const
{
if (storage) {
size_t newSize, numAddresses;
{
auto [mempool, lock] = storage->mempool();
newSize = mempool.txs.size();
numAddresses = mempool.hashXTxs.size();
} // release mempool lock
printMempoolStatusToLog(newSize, numAddresses, false);
}
}
// static
void Controller::printMempoolStatusToLog(size_t newSize, size_t numAddresses, bool isDebug, bool force)
{
static std::atomic_size_t oldSize = 0, oldNumAddresses = 0;
static std::atomic<double> lastTS = 0.;
static std::mutex mut;
constexpr double interval = 60.; // print once per minute if changed. (TODO: make this configurable?)
double now = Util::getTimeSecs();
std::lock_guard g(mut);
if (force || (newSize > 0 && (oldSize != newSize || oldNumAddresses != numAddresses) && now - lastTS >= interval)) {
std::unique_ptr<Log> logger(isDebug ? new Debug : new Log);
Log & log(*logger);
log << newSize << Util::Pluralize(" mempool tx", newSize) << " involving " << numAddresses
<< Util::Pluralize(" address", numAddresses);
if (!force) {
oldSize = newSize;
oldNumAddresses = numAddresses;
lastTS = now;
}
}
}
void SynchMempoolTask::processResults()
{
if (txsDownloaded.size() != expectedNumTxsDownloaded) {
Error() << __PRETTY_FUNCTION__ << ": Expected to downlaod " << expectedNumTxsDownloaded << ", instead got " << txsDownloaded.size() << ". FIXME!";
emit errored();
return;
}
size_t oldSize = 0, newSize = 0, oldNumAddresses = 0, newNumAddresses = 0;
{
auto [mempool, lock] = storage->mutableMempool(); // grab mempool struct exclusively
oldSize = mempool.txs.size();
oldNumAddresses = mempool.hashXTxs.size();
// first, do new outputs for all tx's, and put the new tx's in the mempool struct
for (auto & [hash, pair] : txsDownloaded) {
auto & [tx, ctx] = pair;
assert(hash == tx->hash);
mempool.txs[tx->hash] = tx; // save tx right now to map, since we need to find it later for possible spends, etc if subsequent tx's refer to this tx.
IONum n = 0;
const auto numTxo = ctx->vout.size();
if (LIKELY(tx->txos.size() != numTxo)) {
// we do it this way (reserve then resize) to avoid the automatic 2^N prealloc of normal vector .resize()
tx->txos.reserve(numTxo);
tx->txos.resize(numTxo);
}
for (const auto & out : ctx->vout) {
const auto & script = out.scriptPubKey;
if (!BTC::IsOpReturn(script)) {
// UTXO only if it's not OP_RETURN -- can't do 'continue' here as that would throw off the 'n' counter
HashX sh = BTC::HashXFromCScript(out.scriptPubKey);
// the below is a hack to save memory by re-using the same shallow copy of 'sh' each time
auto hxit = mempool.hashXTxs.find(sh);
if (hxit != mempool.hashXTxs.end()) {
// found existing, re-use sh as a shallow copy
sh = hxit->first;
} else {
// new entry, insert, update hxit
auto pair = mempool.hashXTxs.insert({sh, decltype(hxit->second)()});
hxit = pair.first;
}
// end memory saving hack
TXOInfo &txoInfo = tx->txos[n];
txoInfo = TXOInfo{out.nValue, sh, {}, {}};
tx->hashXs[sh].utxo.insert(n);
hxit->second.push_back(tx); // save tx to hashx -> tx vector (amortized constant time insert at end -- we will sort and uniqueify this at end of this function)
scriptHashesAffected.insert(sh);
assert(txoInfo.isValid());
}
tx->fee -= out.nValue; // update fee (fee = ins - outs, so we "add" the outs as a negative)
++n;
}
assert(n == numTxo);
// . <-- at this point the .txos vec is built, with everything isValid() except for the OP_RETURN outs, which are all !isValid()
}
// next, do new inputs for all tx's, debiting/crediting either a mempool tx or querying db for the relevant utxo
for (auto & [hash, pair] : txsDownloaded) {
auto & [tx, ctx] = pair;
assert(hash == tx->hash);
IONum inNum = 0;
for (const auto & in : ctx->vin) {
const IONum prevN = IONum(in.prevout.GetN());
const TxHash prevTxId = BTC::Hash2ByteArrayRev(in.prevout.GetTxId());
const TXO prevTXO{prevTxId, prevN};
TXOInfo prevInfo;
QByteArray sh; // shallow copy of prevInfo.hashX
if (auto it = mempool.txs.find(prevTxId); it != mempool.txs.end()) {
// prev is a mempool tx
auto prevTxRef = it->second;
assert(bool(prevTxRef));
if (prevN >= prevTxRef->txos.size()
|| !(prevInfo = prevTxRef->txos[prevN]).isValid())
// defensive programming paranoia
throw InternalError(QString("FAILED TO FIND A VALID PREVIOUS TXOUTN %1 IN MEMPOOL for TxHash: %2")
.arg(prevN).arg(QString(prevTxId.toHex())));
sh = prevInfo.hashX;
tx->hashXs[sh].unconfirmedSpends[prevTXO] = prevInfo;
prevTxRef->hashXs[sh].utxo.erase(prevN); // remove this spend from utxo set for prevTx in mempool
if (TRACE) Debug() << hash.toHex() << " unconfirmed spend: " << prevTXO.toString() << " " << prevInfo.amount.ToString().c_str();
} else {
// prev is a confirmed tx
const auto optTXOInfo = storage->utxoGetFromDB(prevTXO, false); // this may also throw on low-level db error
if (UNLIKELY(!optTXOInfo.has_value())) {
// Uh oh. If it wasn't in the mempool or in the db.. something is very wrong with our code.
// We will throw if missing, and the synch process aborts and hopefully we recover with a reorg
// or a new block or somesuch.
throw InternalError(QString("FAILED TO FIND PREVIOUS TX %1 IN EITHER MEMPOOL OR DB for TxHash: %2 (input %3)")
.arg(prevTXO.toString()).arg(QString(prevTxId.toHex())).arg(inNum));
}
prevInfo = optTXOInfo.value();
sh = prevInfo.hashX;
// hack to save memory by re-using existing sh QByteArray and/or forcing a shallow-copy
auto hxit = tx->hashXs.find(sh);
if (hxit != tx->hashXs.end()) {
// existing found, re-use same unerlying QByteArray memory for sh
sh = prevInfo.hashX = hxit->first;
} else {
// new entry, insert, update hxit
auto pair = tx->hashXs.insert({sh, decltype(hxit->second)()});
hxit = pair.first;
}
// end memory saving hack
hxit->second.confirmedSpends[prevTXO] = prevInfo;
if (TRACE) Debug() << hash.toHex() << " confirmed spend: " << prevTXO.toString() << " " << prevInfo.amount.ToString().c_str();
}
tx->fee += prevInfo.amount;
assert(sh == prevInfo.hashX);
mempool.hashXTxs[sh].push_back(tx); // mark this hashX as having been "touched" because of this input (note we push dupes here out of order but sort and uniqueify at the end)
scriptHashesAffected.insert(sh);
++inNum;
}
// Now, compactify some data structures to take up less memory by rehashing thier unordered_maps/unordered_sets..
// we do this once for each new tx we see.. and it can end up saving tons of space. Note the below structures
// are either fixed in size or will only ever shrink as the mempool evolves so this is a good time to do this.
tx->hashXs.rehash(tx->hashXs.size());
for (auto & [sh, ioinfo] : tx->hashXs) {
ioinfo.confirmedSpends.rehash(ioinfo.confirmedSpends.size()); // this is fixed once built
ioinfo.unconfirmedSpends.rehash(ioinfo.unconfirmedSpends.size()); // this is fixed once built
ioinfo.utxo.rehash(ioinfo.utxo.size()); // this may shrink but we rehash it once now to the largest size it will ever have
}
}
// now, sort and uniqueify data structures made temporarily inconsistent above (have dupes, are out-of-order)
for (const auto & sh : scriptHashesAffected) {
if (auto it = mempool.hashXTxs.find(sh); LIKELY(it != mempool.hashXTxs.end()))
Util::sortAndUniqueify<Mempool::TxRefOrdering>(it->second);
else
throw InternalError(QString("Unable to find sh %1 in hashXTXs map! FIXME!").arg(QString(sh.toHex())));
}
newSize = mempool.txs.size();
newNumAddresses = mempool.hashXTxs.size();
} // release mempool lock
if (oldSize != newSize && Debug::isEnabled()) {
Controller::printMempoolStatusToLog(newSize, newNumAddresses, true, true);
}
emit success();
}
void SynchMempoolTask::doDLNextTx()
{
Mempool::TxRef tx;
if (auto it = txsNeedingDownload.begin(); it == txsNeedingDownload.end()) {
Error() << "FIXME -- txsNeedingDownload is empty in " << __FUNCTION__;
emit errored();
return;
} else {
tx = it->second;
it = txsNeedingDownload.erase(it); // pop it off the front
}
assert(bool(tx));
const auto hashHex = Util::ToHexFast(tx->hash);
txsWaitingForResponse[tx->hash] = tx;
submitRequest("getrawtransaction", {hashHex, false}, [this, hashHex, tx](const RPC::Message & resp){
QByteArray txdata = resp.result().toString().toUtf8();
const int expectedLen = txdata.length() / 2;
txdata = Util::ParseHexFast(txdata);
if (txdata.length() != expectedLen) {
Error() << "Received tx data is of the wrong length -- bad hex? FIXME";
emit errored();
return;
} else if (BTC::HashRev(txdata) != tx->hash) {
Error() << "Received tx data appears to not match requested tx! FIXME!!";
emit errored();
return;
}
if (TRACE)
Debug() << "got reply for tx: " << hashHex << " " << txdata.length() << " bytes";
{
// tmp mutable object will be moved into CTransactionRef below via a move constructor
bitcoin::CMutableTransaction ctx = BTC::Deserialize<bitcoin::CMutableTransaction>(txdata);
txsDownloaded[tx->hash] = {tx, bitcoin::MakeTransactionRef(std::move(ctx)) };
}
txsWaitingForResponse.erase(tx->hash);
AGAIN();
});
}
void SynchMempoolTask::doGetRawMempool()
{
submitRequest("getrawmempool", {true}, [this](const RPC::Message & resp){
const int tipHeight = storage->latestTip().first;
int newCt = 0;
const QVariantMap vm = resp.result().toMap();
auto [mempool, lock] = storage->mutableMempool(); // grab the mempool data struct and lock it exclusively
const auto oldCt = mempool.txs.size();
auto droppedTxs = Util::keySet<std::unordered_set<TxHash, HashHasher>>(mempool.txs);
for (auto it = vm.begin(); it != vm.end(); ++it) {
const TxHash hash = Util::ParseHexFast(it.key().toUtf8());
if (hash.length() != HashLen) {
Error() << resp.method << ": got an empty tx hash";
emit errored();
return;
}
droppedTxs.erase(hash); // mark this tx as "not dropped"
const QVariantMap m = it.value().toMap();
if (m.isEmpty()) {
Error() << resp.method << ": got an empty dict for tx hash " << hash.toHex();
emit errored();
return;
}
Mempool::TxRef tx;
static const QVariantList EmptyList; // avoid constructng this for each iteration
if (auto it = mempool.txs.find(hash); it != mempool.txs.end()) {
tx = it->second;
if (TRACE) Debug() << "Existing mempool tx: " << hash.toHex();
} else {
if (TRACE) Debug() << "New mempool tx: " << hash.toHex();
++newCt;
tx = std::make_shared<Mempool::Tx>();
tx->hashXs.max_load_factor(1.0); // hopefully this will save some memory by expicitly setting it to 1.0
tx->hash = hash;
tx->ordinal = mempool.nextOrdinal++;
tx->sizeBytes = m.value("size", 0).toUInt();
// Note: we end up calculating the fee ourselves since I don't trust doubles here. I wish bitcoind would have returned sats.. :(
//tx->fee = int64_t(m.value("fee", 0.0).toDouble() * (bitcoin::COIN / bitcoin::Amount::satoshi())) * bitcoin::Amount::satoshi();
tx->time = int64_t(m.value("time", 0).toULongLong());
tx->height = m.value("height", 0).toUInt();
// Note mempool tx's may have any height in the past because they may not confirm when new blocks arrive...
if (tx->height > unsigned(tipHeight)) {
Debug() << resp.method << ": tx height " << tx->height << " > current height " << tipHeight << ", assuming a new block has arrived, aborting mempool synch ...";
mempool.clear();
emit retryRecommended(); // this is an exit point for this task
return;
}
// save ancestor count exactly once. this should never change unless there is a reorg, at which point
// our in-mempory mempool is wiped anyway.
tx->ancestorCount = m.value("ancestorcount", 0).toUInt();
if (!tx->ancestorCount) {
Error() << resp.method << ": failed to parse ancestor count for tx " << hash.toHex();
emit errored();
return;
}
txsNeedingDownload[hash] = tx;
}
// at this point we have a valid tx ptr
// update descendantCount since it may change as new tx's appear in mempool
tx->descendantCount = m.value("descendantcount", 0).toUInt();
if (!tx->descendantCount) {
Error() << resp.method << ": failed to parse descendant count for tx " << hash.toHex();
emit errored();
return;
}
}
if (UNLIKELY(!droppedTxs.empty())) {
const bool recommendFullRetry = oldCt >= 2 && droppedTxs.size() >= oldCt/2; // more than 50% of the mempool tx's dropped out. something is funny. likely a new block arrived.
// TODO here: keep track of notifications?
Debug() << droppedTxs.size() << " txs dropped from mempool, resetting mempool and trying again ...";
mempool.clear();
if (recommendFullRetry) {
emit retryRecommended(); // this is an exit point for this task
return;
}
clear();
AGAIN();
return;
}
if (newCt)
Debug() << resp.method << ": got reply with " << vm.size() << " items, " << newCt << " new";
isdlingtxs = true;
expectedNumTxsDownloaded = unsigned(newCt);
// TX data will be downloaded now, if needed
AGAIN();
});
}
struct Controller::StateMachine
{
enum State {
Begin=0, WaitingForChainInfo, GetBlocks, DownloadingBlocks, FinishedDL, End, Failure, IBD, Retry,
SynchMempool, SynchingMempool, SynchMempoolFinished
};
State state = Begin;
int ht = -1; ///< the latest height bitcoind told us this run
bool isMainNet = false;
robin_hood::unordered_flat_map<unsigned, PreProcessedBlockPtr> ppBlocks; // mapping of height -> PreProcessedBlock (we use an unordered_flat_map because it's faster for frequent updates)
unsigned startheight = 0, ///< the height we started at
endHeight = 0; ///< the final (inclusive) block height we expect to receive to pronounce the synch done
std::atomic<unsigned> ppBlkHtNext = 0; ///< the next unprocessed block height we need to process in series
// todo: tune this
const size_t DL_CONCURRENCY = qMax(Util::getNPhysicalProcessors()-1, 1U);//size_t(qMin(qMax(int(Util::getNPhysicalProcessors())-BitcoinDMgr::N_CLIENTS, BitcoinDMgr::N_CLIENTS), 32));
size_t nTx = 0, nIns = 0, nOuts = 0;
const char * stateStr() const {
static constexpr const char *stateStrings[] = { "Begin", "WaitingForChainInfo", "GetBlocks", "DownloadingBlocks",
"FinishedDL", "End",
"Failure", "IBD", "Retry",
"SynchMempool", "SynchingMempool", "SynchMempoolFinished",
"Unknown" /* this should always be last */ };
auto idx = qMin(size_t(state), std::size(stateStrings)-1);
return stateStrings[idx];
}
static constexpr unsigned progressIntervalBlocks = 1000;
size_t nProgBlocks = 0, nProgIOs = 0, nProgTx = 0;
double lastProgTs = 0., waitingTs = 0.;
static constexpr double simpleTaskTookTooLongSecs = 30.;
/// this pointer should *not* be dereferenced (which is why it's void *), but rather is just used to filter out
/// old/stale GetChainInfoTask responses in Controller::process()
void * mostRecentGetChainInfoTask = nullptr;
};
unsigned Controller::downloadTaskRecommendedThrottleTimeMsec(unsigned bnum) const
{
std::shared_lock g(smLock); // this lock guarantees that 'sm' won't be deleted from underneath us
if (sm) {
int maxBackLog = 1000; // <--- TODO: have this be a more dynamic value based on current average blocksize.
if (sm->isMainNet) {
// mainnet
if (bnum > 150000) // beyond this height the blocks are starting to be big enough that we want to not eat memory.
maxBackLog = 250;
else if (bnum > 550000) // beyond this height we may start to see 32MB blocks in the future
maxBackLog = 100;
} else {
// testnet
if (bnum > 1300000) // beyond this height 32MB blocks may be common, esp. in the future
maxBackLog = 100;
}
const int diff = int(bnum) - int(sm->ppBlkHtNext.load()); // note: ppBlkHtNext is not guarded by the lock but it is an atomic value, so that's fine.
if ( diff > maxBackLog ) {
// Make the backoff time be from 10ms to 50ms, depending on how far in the future this block height is from
// what we are processing. The hope is that this enforces some order on future block arrivals and also
// prevents excessive polling for blocks that are too far ahead of us.
return std::min(10u + 5*unsigned(diff - maxBackLog - 1), 50u); // TODO: also have this be tuneable.
}
}
return 0u;
}
void Controller::rmTask(CtlTask *t)
{
if (auto it = tasks.find(t); it != tasks.end()) {
tasks.erase(it); // will delete object immediately
return;
}
Error() << __FUNCTION__ << ": Task '" << t->objectName() << "' not found! FIXME!";
}
bool Controller::isTaskDeleted(CtlTask *t) const { return tasks.count(t) == 0; }
void Controller::add_DLHeaderTask(unsigned int from, unsigned int to, size_t nTasks)
{
DownloadBlocksTask *t = newTask<DownloadBlocksTask>(false, unsigned(from), unsigned(to), unsigned(nTasks), this);
connect(t, &CtlTask::success, this, [t, this]{
if (UNLIKELY(!sm || isTaskDeleted(t))) return; // task was stopped from underneath us, this is stale.. abort.
sm->nTx += t->nTx;
sm->nIns += t->nIns;
sm->nOuts += t->nOuts;
Debug() << "Got all blocks from: " << t->objectName() << " blockCt: " << t->goodCt
<< " nTx,nInp,nOutp: " << t->nTx << "," << t->nIns << "," << t->nOuts << " totals: "
<< sm->nTx << "," << sm->nIns << "," << sm->nOuts;
});
connect(t, &CtlTask::errored, this, [t, this]{
if (UNLIKELY(!sm || isTaskDeleted(t))) return; // task was stopped from underneath us, this is stale.. abort.
if (sm->state == StateMachine::State::Failure) return; // silently ignore if we are already in failure
Error() << "Task errored: " << t->objectName() << ", error: " << t->errorMessage;
genericTaskErrored();
});
}
void Controller::genericTaskErrored()
{
if (sm && sm->state != StateMachine::State::Failure) {
if (LIKELY(sm))
sm->state = StateMachine::State::Failure;
AGAIN();
}
}
template <typename CtlTaskT, typename ...Args, typename /* enable_if... */>
CtlTaskT *Controller::newTask(bool connectErroredSignal, Args && ...args)
{
CtlTaskT *task = new CtlTaskT(std::forward<Args>(args)...);
tasks.emplace(task, task);
if (connectErroredSignal)
connect(task, &CtlTask::errored, this, &Controller::genericTaskErrored);
connect(task, &CtlTask::retryRecommended, this, [this]{ // only the SynchMempoolTask ever emits this
if (LIKELY(sm))
sm->state = StateMachine::State::Retry;
AGAIN();
});
Util::AsyncOnObject(this, [task, this] { // schedule start when we return to our event loop
if (!isTaskDeleted(task))
task->start();
});
return task;
}
void Controller::process(bool beSilentIfUpToDate)
{
if (stopFlag) return;
bool enablePollTimer = false;
auto polltimeout = polltimeMS;
stopTimer(pollTimerName);
//Debug() << "Process called...";
if (!sm) {
std::lock_guard g(smLock);
sm = std::make_unique<StateMachine>();
}
using State = StateMachine::State;
if (sm->state == State::Begin) {
auto task = newTask<GetChainInfoTask>(true, this);
task->threadObjectDebugLifecycle = Trace::isEnabled(); // suppress debug prints here unless we are in trace mode
sm->mostRecentGetChainInfoTask = task; // reentrancy defense mechanism for ignoring all but the most recent getchaininfo reply from bitcoind
sm->waitingTs = Util::getTimeSecs();
sm->state = State::WaitingForChainInfo; // more reentrancy prevention paranoia -- in case we get a spurious call to process() in the future
connect(task, &CtlTask::success, this, [this, task, beSilentIfUpToDate]{
if (UNLIKELY(!sm || task != sm->mostRecentGetChainInfoTask || isTaskDeleted(task) || sm->state != State::WaitingForChainInfo))
// task was stopped from underneath us and/or this response is stale.. so return and ignore
return;
sm->mostRecentGetChainInfoTask = nullptr;
if (task->info.initialBlockDownload) {
sm->state = State::IBD;
AGAIN();
return;
}
if (const auto dbchain = storage->getChain(); dbchain.isEmpty() && !task->info.chain.isEmpty()) {
storage->setChain(task->info.chain);
} else if (dbchain != task->info.chain) {
Fatal() << "Bitcoind reports chain: \"" << task->info.chain << "\", which differs from our database: \""
<< dbchain << "\". You may have connected to the wrong bitcoind. To fix this issue either "
<< "connect to a different bitcoind or delete this program's datadir to resynch.";
return;
}
sm->isMainNet = task->info.chain == "main";
QByteArray tipHeader;
// TODO: detect reorgs here -- to be implemented later after we figure out data model more, etc.
const auto [tip, tipHash] = storage->latestTip(&tipHeader);
sm->ht = task->info.blocks;
if (tip == sm->ht) {
if (task->info.bestBlockhash == tipHash) { // no reorg
if (!beSilentIfUpToDate) {
storage->updateMerkleCache(unsigned(tip));
Log() << "Block height " << tip << ", up-to-date";
emit upToDate();
emit newHeader(unsigned(tip), tipHeader);
}
sm->state = State::SynchMempool; // now, move on to synch mempool
} else {
// height ok, but best block hash mismatch.. reorg
Warning() << "We have bestBlock " << tipHash.toHex() << ", but bitcoind reports bestBlock " << task->info.bestBlockhash.toHex() << "."
<< " Possible reorg, will rewind back 1 block and try again ...";
process_DoUndoAndRetry(); // attempt to undo 1 block and try again.
return;
}
} else if (tip > sm->ht) {
Warning() << "We have height " << tip << ", but bitcoind reports height " << sm->ht << "."
<< " Possible reorg, will rewind back 1 block and try again ...";
process_DoUndoAndRetry(); // attempt to undo 1 block and try again.
return;
} else {
Log() << "Block height " << sm->ht << ", downloading new blocks ...";
emit synchronizing();
sm->state = State::GetBlocks;
}
AGAIN();
});
} else if (sm->state == State::WaitingForChainInfo) {
// This branch very unlikely -- I couldn't get it to happen in normal testing, but is here in case there are
// suprious calls to process(), or in case bitcoind goes out to lunch and our process() timer fires while it
// does so.
if (Util::getTimeSecs() - sm->waitingTs > sm->simpleTaskTookTooLongSecs) {
// this is very unlikely but is here in case bitcoind goes out to lunch so we can reset things and try again.
Warning() << "GetChainInfo task took longer than " << sm->simpleTaskTookTooLongSecs << " seconds to return a response. Trying again ...";
genericTaskErrored();
} else { Debug() << "Spurious Controller::process() call while waiting for the chain info task to complete, ignoring"; }
} else if (sm->state == State::GetBlocks) {
FatalAssert(sm->ht >= 0) << "Inconsistent state -- sm->ht cannot be negative in State::GetBlocks! FIXME!"; // paranoia
const size_t base = size_t(storage->latestTip().first+1);
const size_t num = size_t(sm->ht+1) - base;
FatalAssert(num > 0) << "Cannot download 0 blocks! FIXME!"; // more paranoia
const size_t nTasks = qMin(num, sm->DL_CONCURRENCY);
sm->lastProgTs = Util::getTimeSecs();
sm->ppBlkHtNext = sm->startheight = unsigned(base);
sm->endHeight = unsigned(sm->ht);
for (size_t i = 0; i < nTasks; ++i) {
add_DLHeaderTask(unsigned(base + i), unsigned(sm->ht), nTasks);
}
sm->state = State::DownloadingBlocks; // advance state now. we will be called back by download task in on_putBlock()
} else if (sm->state == State::DownloadingBlocks) {
process_DownloadingBlocks();
} else if (sm->state == State::FinishedDL) {
size_t N = sm->endHeight - sm->startheight + 1;
Log() << "Processed " << N << " new " << Util::Pluralize("block", N) << " with " << sm->nTx << " " << Util::Pluralize("tx", sm->nTx)
<< " (" << sm->nIns << " " << Util::Pluralize("input", sm->nIns) << " & " << sm->nOuts << " " << Util::Pluralize("output", sm->nOuts) << ")"
<< ", verified ok.";
{
std::lock_guard g(smLock);
sm.reset(); // go back to "Begin" state to check if any new headers arrived in the meantime
}