Skip to content

Commit

Permalink
Changed refresh_connections to release the connections container lock…
Browse files Browse the repository at this point in the history
… while creating a new connection
  • Loading branch information
barshaul committed Sep 13, 2024
1 parent c54a463 commit 35c4775
Showing 1 changed file with 51 additions and 42 deletions.
93 changes: 51 additions & 42 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,50 +1303,59 @@ where
check_existing_conn: bool,
) {
info!("Started refreshing connections to {:?}", addresses);
let connections_container = inner.conn_lock.read().await;
let cluster_params = &inner.cluster_params;
let subscriptions_by_address = &inner.subscriptions_by_address;
let glide_connection_optons = &inner.glide_connection_options;
let mut tasks = FuturesUnordered::new();
let inner = inner.clone();

stream::iter(addresses.into_iter())
.fold(
&*connections_container,
|connections_container, address| async move {
let node_option = if check_existing_conn {
connections_container.remove_node(&address)
} else {
None
};
for address in addresses.into_iter() {
let inner = inner.clone();

// override subscriptions for this connection
let mut cluster_params = cluster_params.clone();
let subs_guard = subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
drop(subs_guard);
let node = get_or_create_conn(
&address,
node_option,
&cluster_params,
conn_type,
glide_connection_optons.clone(),
)
.await;
match node {
Ok(node) => {
connections_container
.replace_or_add_connection_for_address(address, node);
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address, err
);
}
}
connections_container
},
)
.await;
tasks.push(async move {
let connections_container = inner.conn_lock.read().await;
let cluster_params = &inner.cluster_params;
let subscriptions_by_address = &inner.subscriptions_by_address;
let glide_connection_options = &inner.glide_connection_options;

let node_option = if check_existing_conn {
connections_container.remove_node(&address)
} else {
None
};

// Override subscriptions for this connection
let mut cluster_params = cluster_params.clone();
let subs_guard = subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
drop(subs_guard);
drop(connections_container);

let node = get_or_create_conn(
&address,
node_option,
&cluster_params,
conn_type,
glide_connection_options.clone(),
)
.await;

(address, node)
});
}

// Poll connection tasks as soon as each one finishes
while let Some(result) = tasks.next().await {
match result {
(address, Ok(node)) => {
let connections_container = inner.conn_lock.read().await;
connections_container.replace_or_add_connection_for_address(address, node);
}
(address, Err(err)) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address, err
);
}
}
}
info!("refresh connections completed");
}

Expand Down

0 comments on commit 35c4775

Please sign in to comment.