@@ -11,7 +11,7 @@ use std::str::FromStr;
11
11
use std:: sync:: Arc ;
12
12
use std:: time:: Duration ;
13
13
use tokio:: sync:: broadcast;
14
- use tokio:: time:: sleep;
14
+ use tokio:: time:: { sleep, timeout } ;
15
15
16
16
use crate :: config:: env:: ev;
17
17
use crate :: error:: Error ;
@@ -31,7 +31,7 @@ pub async fn initialize_pangea_indexer(
31
31
let store = match trading_engine. get_store ( & config. symbol ) {
32
32
Some ( s) => s,
33
33
None => {
34
- log :: error!( "No CandleStore found for symbol {}" , config. symbol) ;
34
+ error ! ( "No CandleStore found for symbol {}" , config. symbol) ;
35
35
continue ;
36
36
}
37
37
} ;
@@ -41,10 +41,10 @@ pub async fn initialize_pangea_indexer(
41
41
42
42
tokio:: select! {
43
43
_ = shutdown. recv( ) => {
44
- log :: info!( "Shutdown signal received in indexer." ) ;
44
+ info!( "Shutdown signal received in indexer." ) ;
45
45
}
46
46
_ = futures:: future:: join_all( tasks) => {
47
- log :: info!( "All indexer tasks completed." ) ;
47
+ info!( "All indexer tasks completed." ) ;
48
48
}
49
49
}
50
50
@@ -56,7 +56,6 @@ async fn process_events_for_pair(
56
56
store : Arc < CandleStore > ,
57
57
) -> Result < ( ) , Error > {
58
58
let client = create_pangea_client ( ) . await ?;
59
-
60
59
let contract_h256 = H256 :: from_str ( & config. contract_id ) ?;
61
60
62
61
let last_processed_block = fetch_historical_data (
@@ -68,22 +67,12 @@ async fn process_events_for_pair(
68
67
)
69
68
. await ?;
70
69
71
- log :: info!(
70
+ info ! (
72
71
"Completed historical data fetch for {}. Last processed block: {}" ,
73
- config. symbol,
74
- last_processed_block
72
+ config. symbol, last_processed_block
75
73
) ;
76
74
77
- listen_for_new_deltas (
78
- & client,
79
- & store,
80
- last_processed_block,
81
- contract_h256,
82
- config. symbol ,
83
- )
84
- . await ?;
85
-
86
- Ok ( ( ) )
75
+ listen_for_new_deltas ( & store, last_processed_block, contract_h256, config. symbol ) . await
87
76
}
88
77
89
78
async fn create_pangea_client ( ) -> Result < Client < WsProvider > , Error > {
@@ -127,38 +116,44 @@ async fn fetch_historical_data(
127
116
..Default :: default ( )
128
117
} ;
129
118
130
- let stream = client
131
- . get_fuel_spark_orders_by_format ( request, Format :: JsonStream , false )
132
- . await ?;
133
-
119
+ let stream = client. get_fuel_spark_orders_by_format ( request, Format :: JsonStream , false ) . await ?;
134
120
pangea_client:: futures:: pin_mut!( stream) ;
135
121
136
122
while let Some ( data) = stream. next ( ) . await {
137
- match data {
138
- Ok ( data) => {
139
- let order: PangeaOrderEvent = serde_json:: from_slice ( & data) ?;
123
+ if let Ok ( data) = data {
124
+ if let Ok ( order) = serde_json:: from_slice :: < PangeaOrderEvent > ( & data) {
140
125
handle_order_event ( candle_store. clone ( ) , order, symbol. clone ( ) ) . await ;
126
+ } else {
127
+ error ! ( "Failed to deserialize order event" ) ;
141
128
}
142
- Err ( e) => {
143
- error ! ( "Error processing historical data stream: {}" , e) ;
144
- break ;
145
- }
129
+ } else {
130
+ error ! ( "Stream error while processing historical data" ) ;
146
131
}
147
132
}
148
133
149
134
Ok ( target_latest_block)
150
135
}
151
136
152
137
async fn listen_for_new_deltas (
153
- client : & Client < WsProvider > ,
154
138
candle_store : & Arc < CandleStore > ,
155
139
mut last_processed_block : i64 ,
156
140
contract_h256 : H256 ,
157
141
symbol : String ,
158
142
) -> Result < ( ) , Error > {
159
143
let mut retry_delay = Duration :: from_secs ( 1 ) ;
144
+ let max_backoff = Duration :: from_secs ( 60 ) ;
160
145
161
146
loop {
147
+ let client = match create_pangea_client ( ) . await {
148
+ Ok ( c) => c,
149
+ Err ( e) => {
150
+ error ! ( "Failed to create Pangea client: {}" , e) ;
151
+ sleep ( retry_delay) . await ;
152
+ retry_delay = ( retry_delay * 2 ) . min ( max_backoff) ;
153
+ continue ;
154
+ }
155
+ } ;
156
+
162
157
let fuel_chain = match ev ( "CHAIN" ) ?. as_str ( ) {
163
158
"FUEL" => ChainId :: FUEL ,
164
159
_ => ChainId :: FUELTESTNET ,
@@ -172,38 +167,29 @@ async fn listen_for_new_deltas(
172
167
..Default :: default ( )
173
168
} ;
174
169
175
- match client
176
- . get_fuel_spark_orders_by_format ( request, Format :: JsonStream , true )
177
- . await
178
- {
179
- Ok ( stream) => {
170
+ match timeout ( Duration :: from_secs ( 10 ) , client. get_fuel_spark_orders_by_format ( request, Format :: JsonStream , true ) ) . await {
171
+ Ok ( Ok ( stream) ) => {
180
172
pangea_client:: futures:: pin_mut!( stream) ;
181
173
retry_delay = Duration :: from_secs ( 1 ) ;
182
-
183
174
while let Some ( data) = stream. next ( ) . await {
184
- match data {
185
- Ok ( data) => {
186
- let order_event: PangeaOrderEvent = serde_json:: from_slice ( & data) ?;
175
+ if let Ok ( data) = data {
176
+ if let Ok ( order_event) = serde_json:: from_slice :: < PangeaOrderEvent > ( & data) {
187
177
last_processed_block = order_event. block_number ;
188
- handle_order_event ( candle_store. clone ( ) , order_event, symbol. clone ( ) )
189
- . await ;
190
- }
191
- Err ( e) => {
192
- error ! ( "Stream error: {}" , e) ;
193
- break ;
178
+ handle_order_event ( candle_store. clone ( ) , order_event, symbol. clone ( ) ) . await ;
179
+ } else {
180
+ error ! ( "Failed to deserialize order event" ) ;
194
181
}
195
182
}
196
183
}
197
184
}
198
- Err ( e) => {
199
- error ! ( "Failed to subscribe to new deltas: {}" , e) ;
200
- sleep ( retry_delay) . await ;
201
- retry_delay = ( retry_delay * 2 ) . min ( Duration :: from_secs ( 60 ) ) ;
202
- }
185
+ _ => error ! ( "Failed to subscribe to new deltas, retrying..." ) ,
203
186
}
187
+ sleep ( retry_delay) . await ;
188
+ retry_delay = ( retry_delay * 2 ) . min ( max_backoff) ;
204
189
}
205
190
}
206
191
192
+
207
193
async fn get_latest_block ( chain_id : ChainId ) -> Result < i64 , Error > {
208
194
let provider_url = match chain_id {
209
195
ChainId :: FUEL => "mainnet.fuel.network" ,
0 commit comments