0
0
mirror of https://github.com/neon-mmd/websurfx.git synced 2024-11-23 06:28:23 -05:00

perf: initialize redis pipeline struct once with the default size of 3 (#486)

This commit is contained in:
neon_arch 2024-02-10 21:57:24 +03:00
parent d78fd75979
commit 5f0edde549

View File

@ -6,6 +6,9 @@ use error_stack::Report;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
/// A constant holding the redis pipeline size.
const REDIS_PIPELINE_SIZE: usize = 3;
/// A named struct which stores the redis Connection url address to which the client will /// A named struct which stores the redis Connection url address to which the client will
/// connect to. /// connect to.
#[derive(Clone)] #[derive(Clone)]
@ -19,6 +22,8 @@ pub struct RedisCache {
current_connection: u8, current_connection: u8,
/// It stores the max TTL for keys. /// It stores the max TTL for keys.
cache_ttl: u16, cache_ttl: u16,
/// It stores the redis pipeline struct of size 3.
pipeline: redis::Pipeline,
} }
impl RedisCache { impl RedisCache {
@ -29,6 +34,8 @@ impl RedisCache {
/// * `redis_connection_url` - It takes the redis Connection url address. /// * `redis_connection_url` - It takes the redis Connection url address.
/// * `pool_size` - It takes the size of the connection pool (in other words the number of /// * `pool_size` - It takes the size of the connection pool (in other words the number of
/// connections that should be stored in the pool). /// connections that should be stored in the pool).
/// * `cache_ttl` - It takes the the time to live for cached results to live in the redis
/// server.
/// ///
/// # Error /// # Error
/// ///
@ -45,7 +52,7 @@ impl RedisCache {
for _ in 0..pool_size { for _ in 0..pool_size {
let client_partially_cloned = client.clone(); let client_partially_cloned = client.clone();
tasks.push(tokio::spawn(async move { tasks.push(tokio::spawn(async move {
client_partially_cloned.get_tokio_connection_manager().await client_partially_cloned.get_connection_manager().await
})); }));
} }
@ -59,7 +66,9 @@ impl RedisCache {
pool_size, pool_size,
current_connection: Default::default(), current_connection: Default::default(),
cache_ttl, cache_ttl,
pipeline: redis::Pipeline::with_capacity(REDIS_PIPELINE_SIZE),
}; };
Ok(redis_cache) Ok(redis_cache)
} }
@ -129,13 +138,14 @@ impl RedisCache {
keys: impl Iterator<Item = String>, keys: impl Iterator<Item = String>,
) -> Result<(), Report<CacheError>> { ) -> Result<(), Report<CacheError>> {
self.current_connection = Default::default(); self.current_connection = Default::default();
let mut pipeline = redis::Pipeline::with_capacity(3);
for (key, json_result) in keys.zip(json_results) { for (key, json_result) in keys.zip(json_results) {
pipeline.set_ex(key, json_result, self.cache_ttl.into()); self.pipeline
.set_ex(key, json_result, self.cache_ttl.into());
} }
let mut result: Result<(), RedisError> = pipeline let mut result: Result<(), RedisError> = self
.pipeline
.query_async(&mut self.connection_pool[self.current_connection as usize]) .query_async(&mut self.connection_pool[self.current_connection as usize])
.await; .await;
@ -156,7 +166,8 @@ impl RedisCache {
CacheError::PoolExhaustionWithConnectionDropError, CacheError::PoolExhaustionWithConnectionDropError,
)); ));
} }
result = pipeline result = self
.pipeline
.query_async( .query_async(
&mut self.connection_pool[self.current_connection as usize], &mut self.connection_pool[self.current_connection as usize],
) )