diff --git a/src/cache/redis_cacher.rs b/src/cache/redis_cacher.rs index cff4606..a334996 100644 --- a/src/cache/redis_cacher.rs +++ b/src/cache/redis_cacher.rs @@ -6,6 +6,9 @@ use error_stack::Report; use futures::stream::FuturesUnordered; use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; +/// A constant holding the redis pipeline size. +const REDIS_PIPELINE_SIZE: usize = 3; + /// A named struct which stores the redis Connection url address to which the client will /// connect to. #[derive(Clone)] @@ -19,6 +22,8 @@ pub struct RedisCache { current_connection: u8, /// It stores the max TTL for keys. cache_ttl: u16, + /// It stores the redis pipeline struct of size 3. + pipeline: redis::Pipeline, } impl RedisCache { @@ -29,6 +34,8 @@ impl RedisCache { /// * `redis_connection_url` - It takes the redis Connection url address. /// * `pool_size` - It takes the size of the connection pool (in other words the number of /// connections that should be stored in the pool). + /// * `cache_ttl` - It takes the the time to live for cached results to live in the redis + /// server. /// /// # Error /// @@ -45,7 +52,7 @@ impl RedisCache { for _ in 0..pool_size { let client_partially_cloned = client.clone(); tasks.push(tokio::spawn(async move { - client_partially_cloned.get_tokio_connection_manager().await + client_partially_cloned.get_connection_manager().await })); } @@ -59,7 +66,9 @@ impl RedisCache { pool_size, current_connection: Default::default(), cache_ttl, + pipeline: redis::Pipeline::with_capacity(REDIS_PIPELINE_SIZE), }; + Ok(redis_cache) } @@ -129,13 +138,14 @@ impl RedisCache { keys: impl Iterator, ) -> Result<(), Report> { self.current_connection = Default::default(); - let mut pipeline = redis::Pipeline::with_capacity(3); for (key, json_result) in keys.zip(json_results) { - pipeline.set_ex(key, json_result, self.cache_ttl.into()); + self.pipeline + .set_ex(key, json_result, self.cache_ttl.into()); } - let mut result: Result<(), RedisError> = pipeline + let mut result: Result<(), RedisError> = self + .pipeline .query_async(&mut self.connection_pool[self.current_connection as usize]) .await; @@ -156,7 +166,8 @@ impl RedisCache { CacheError::PoolExhaustionWithConnectionDropError, )); } - result = pipeline + result = self + .pipeline .query_async( &mut self.connection_pool[self.current_connection as usize], )