From d78fd759791fc47846ffd09421a24f9051477bc3 Mon Sep 17 00:00:00 2001 From: neon_arch Date: Sat, 10 Feb 2024 21:38:40 +0300 Subject: [PATCH] :zap: perf: initialize new async connections parallely using tokio spawn tasks (#486) --- src/cache/redis_cacher.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/cache/redis_cacher.rs b/src/cache/redis_cacher.rs index c775963..cff4606 100644 --- a/src/cache/redis_cacher.rs +++ b/src/cache/redis_cacher.rs @@ -1,11 +1,10 @@ //! This module provides the functionality to cache the aggregated results fetched and aggregated //! from the upstream search engines in a json format. -use error_stack::Report; -use futures::future::try_join_all; -use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; - use super::error::CacheError; +use error_stack::Report; +use futures::stream::FuturesUnordered; +use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; /// A named struct which stores the redis Connection url address to which the client will /// connect to. @@ -41,14 +40,22 @@ impl RedisCache { cache_ttl: u16, ) -> Result> { let client = Client::open(redis_connection_url)?; - let mut tasks: Vec<_> = Vec::new(); + let tasks: FuturesUnordered<_> = FuturesUnordered::new(); for _ in 0..pool_size { - tasks.push(client.get_connection_manager()); + let client_partially_cloned = client.clone(); + tasks.push(tokio::spawn(async move { + client_partially_cloned.get_tokio_connection_manager().await + })); + } + + let mut outputs = Vec::new(); + for task in tasks { + outputs.push(task.await??); } let redis_cache = RedisCache { - connection_pool: try_join_all(tasks).await?, + connection_pool: outputs, pool_size, current_connection: Default::default(), cache_ttl,