From db93c316034d4dab2caab26a4c5a0adbe122a2c0 Mon Sep 17 00:00:00 2001 From: neon_arch Date: Sun, 27 Aug 2023 20:50:42 +0300 Subject: [PATCH] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20=20refactor:=20implement?= =?UTF-8?q?=20async=20pooling=20for=20redis=20connections=20(#180)(#178)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cache/cacher.rs | 126 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 103 insertions(+), 23 deletions(-) diff --git a/src/cache/cacher.rs b/src/cache/cacher.rs index 44d0710..6932dea 100644 --- a/src/cache/cacher.rs +++ b/src/cache/cacher.rs @@ -1,17 +1,26 @@ //! This module provides the functionality to cache the aggregated results fetched and aggregated //! from the upstream search engines in a json format. +use error_stack::Report; +use futures::future::try_join_all; use md5::compute; -use redis::{Client, Commands, Connection}; +use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; + +use super::error::PoolError; /// A named struct which stores the redis Connection url address to which the client will /// connect to. /// /// # Fields /// -/// * `redis_connection_url` - It stores the redis Connection url address. +/// * `connection_pool` - It stores a pool of connections ready to be used. +/// * `pool_size` - It stores the size of the connection pool (in other words the number of +/// connections that should be stored in the pool). +/// * `current_connection` - It stores the index of which connection is being used at the moment. pub struct RedisCache { - connection: Connection, + connection_pool: Vec, + pool_size: u8, + current_connection: u8, } impl RedisCache { @@ -19,11 +28,25 @@ impl RedisCache { /// /// # Arguments /// - /// * `redis_connection_url` - It stores the redis Connection url address. - pub fn new(redis_connection_url: String) -> Result> { + /// * `redis_connection_url` - It takes the redis Connection url address. + /// * `pool_size` - It takes the size of the connection pool (in other words the number of + /// connections that should be stored in the pool). + pub async fn new( + redis_connection_url: &str, + pool_size: u8, + ) -> Result> { let client = Client::open(redis_connection_url)?; - let connection = client.get_connection()?; - let redis_cache = RedisCache { connection }; + let mut tasks: Vec<_> = Vec::new(); + + for _ in 0..pool_size { + tasks.push(client.get_tokio_connection_manager()); + } + + let redis_cache = RedisCache { + connection_pool: try_join_all(tasks).await?, + pool_size, + current_connection: Default::default(), + }; Ok(redis_cache) } @@ -32,7 +55,7 @@ impl RedisCache { /// # Arguments /// /// * `url` - It takes an url as string. - fn hash_url(url: &str) -> String { + fn hash_url(&self, url: &str) -> String { format!("{:?}", compute(url)) } @@ -41,9 +64,42 @@ impl RedisCache { /// # Arguments /// /// * `url` - It takes an url as a string. - pub fn cached_json(&mut self, url: &str) -> Result> { - let hashed_url_string = Self::hash_url(url); - Ok(self.connection.get(hashed_url_string)?) + pub async fn cached_json(&mut self, url: &str) -> Result> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); + + let mut result: Result = self.connection_pool + [self.current_connection as usize] + .get(hashed_url_string) + .await; + + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .get(hashed_url_string) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(res) => return Ok(res), + } + } } /// A function which caches the results by using the hashed `url` as the key and @@ -54,21 +110,45 @@ impl RedisCache { /// /// * `json_results` - It takes the json results string as an argument. /// * `url` - It takes the url as a String. - pub fn cache_results( + pub async fn cache_results( &mut self, - json_results: String, + json_results: &str, url: &str, - ) -> Result<(), Box> { - let hashed_url_string = Self::hash_url(url); + ) -> Result<(), Report> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); - // put results_json into cache - self.connection.set(&hashed_url_string, json_results)?; + let mut result: Result<(), RedisError> = self.connection_pool + [self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; - // Set the TTL for the key to 60 seconds - self.connection - .expire::(hashed_url_string, 60) - .unwrap(); - - Ok(()) + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(_) => return Ok(()), + } + } } }