From 996ff84c5be6d0dc5f28d9ecd81b9d5f44ad2395 Mon Sep 17 00:00:00 2001 From: Zsombor Gegesy Date: Sat, 9 Sep 2023 18:17:29 +0200 Subject: [PATCH] Cache refactor - add an in-memory cache, so redis is not needed --- Cargo.toml | 1 + src/bin/websurfx.rs | 14 ++- src/cache/cacher.rs | 185 +++++++++++++----------------------- src/cache/error.rs | 4 + src/cache/mod.rs | 1 + src/cache/redis_cacher.rs | 152 +++++++++++++++++++++++++++++ src/config/parser.rs | 4 +- src/lib.rs | 6 +- src/server/routes/search.rs | 35 +++---- tests/index.rs | 7 +- 10 files changed, 263 insertions(+), 146 deletions(-) create mode 100644 src/cache/redis_cacher.rs diff --git a/Cargo.toml b/Cargo.toml index d30b325..6a6d07f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ dhat = {version="0.3.2", optional = true} mimalloc = { version = "0.1.38", default-features = false } async-once-cell = {version="0.5.3"} actix-governor = {version="0.4.1"} +mini-moka = "0.10" [dev-dependencies] rusty-hook = "^0.11.2" diff --git a/src/bin/websurfx.rs b/src/bin/websurfx.rs index bc8e7ce..4a1dc49 100644 --- a/src/bin/websurfx.rs +++ b/src/bin/websurfx.rs @@ -5,7 +5,9 @@ use mimalloc::MiMalloc; use std::net::TcpListener; -use websurfx::{config::parser::Config, run}; +use websurfx::{ + cache::cacher::Cache, cache::redis_cacher::RedisCache, config::parser::Config, run, +}; /// A dhat heap memory profiler #[cfg(feature = "dhat-heap")] @@ -30,6 +32,14 @@ async fn main() -> std::io::Result<()> { // Initialize the parsed config file. let config = Config::parse(false).unwrap(); + let cache = match &config.redis_url { + Some(url) => Cache::new( + RedisCache::new(url, 5) + .await + .expect("Redis cache configured"), + ), + None => Cache::new_in_memory(), + }; log::info!( "started server on port {} and IP {}", @@ -44,5 +54,5 @@ async fn main() -> std::io::Result<()> { let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?; - run(listener, config)?.await + run(listener, config, cache)?.await } diff --git a/src/cache/cacher.rs b/src/cache/cacher.rs index 57351cd..810ced4 100644 --- a/src/cache/cacher.rs +++ b/src/cache/cacher.rs @@ -2,107 +2,53 @@ //! from the upstream search engines in a json format. use error_stack::Report; -use futures::future::try_join_all; -use md5::compute; -use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; +use mini_moka::sync::Cache as MokaCache; +use std::time::Duration; +use tokio::sync::Mutex; -use super::error::PoolError; +use super::{error::PoolError, redis_cacher::RedisCache}; -/// A named struct which stores the redis Connection url address to which the client will -/// connect to. +/// Different implementations for caching, currently it is possible to cache in-memory or in Redis. #[derive(Clone)] -pub struct RedisCache { - /// It stores a pool of connections ready to be used. - connection_pool: Vec, - /// It stores the size of the connection pool (in other words the number of - /// connections that should be stored in the pool). - pool_size: u8, - /// It stores the index of which connection is being used at the moment. - current_connection: u8, +pub enum Cache { + /// Encapsulates the Redis based cache + Redis(RedisCache), + /// Contains the in-memory cache. + InMemory(MokaCache), } -impl RedisCache { - /// Constructs a new `SearchResult` with the given arguments needed for the struct. - /// - /// # Arguments - /// - /// * `redis_connection_url` - It takes the redis Connection url address. - /// * `pool_size` - It takes the size of the connection pool (in other words the number of - /// connections that should be stored in the pool). - pub async fn new( - redis_connection_url: &str, - pool_size: u8, - ) -> Result> { - let client = Client::open(redis_connection_url)?; - let mut tasks: Vec<_> = Vec::new(); - - for _ in 0..pool_size { - tasks.push(client.get_tokio_connection_manager()); - } - - let redis_cache = RedisCache { - connection_pool: try_join_all(tasks).await?, - pool_size, - current_connection: Default::default(), - }; - Ok(redis_cache) +impl Cache { + /// Creates a new cache, which wraps the given RedisCache. + pub fn new(redis_cache: RedisCache) -> Self { + Cache::Redis(redis_cache) } - /// A helper function which computes the hash of the url and formats and returns it as string. - /// - /// # Arguments - /// - /// * `url` - It takes an url as string. - fn hash_url(&self, url: &str) -> String { - format!("{:?}", compute(url)) + /// Creates an in-memory cache + pub fn new_in_memory() -> Self { + let cache = MokaCache::builder() + .max_capacity(1000) + .time_to_live(Duration::from_secs(60)) + .build(); + Cache::InMemory(cache) } - /// A function which fetches the cached json results as json string from the redis server. + /// A function which fetches the cached json results as json string. /// /// # Arguments /// /// * `url` - It takes an url as a string. pub async fn cached_json(&mut self, url: &str) -> Result> { - self.current_connection = Default::default(); - let hashed_url_string: &str = &self.hash_url(url); - - let mut result: Result = self.connection_pool - [self.current_connection as usize] - .get(hashed_url_string) - .await; - - // Code to check whether the current connection being used is dropped with connection error - // or not. if it drops with the connection error then the current connection is replaced - // with a new connection from the pool which is then used to run the redis command then - // that connection is also checked whether it is dropped or not if it is not then the - // result is passed as a `Result` or else the same process repeats again and if all of the - // connections in the pool result in connection drop error then a custom pool error is - // returned. - loop { - match result { - Err(error) => match error.is_connection_dropped() { - true => { - self.current_connection += 1; - if self.current_connection == self.pool_size { - return Err(Report::new( - PoolError::PoolExhaustionWithConnectionDropError, - )); - } - result = self.connection_pool[self.current_connection as usize] - .get(hashed_url_string) - .await; - continue; - } - false => return Err(Report::new(PoolError::RedisError(error))), - }, - Ok(res) => return Ok(res), - } + match self { + Cache::Redis(redis_cache) => redis_cache.cached_json(url).await, + Cache::InMemory(in_memory) => match in_memory.get(&url.to_string()) { + Some(res) => Ok(res), + None => Err(Report::new(PoolError::MissingValue)), + }, } } - /// A function which caches the results by using the hashed `url` as the key and - /// `json results` as the value and stores it in redis server with ttl(time to live) - /// set to 60 seconds. + /// A function which caches the results by using the `url` as the key and + /// `json results` as the value and stores it in the cache /// /// # Arguments /// @@ -110,43 +56,46 @@ impl RedisCache { /// * `url` - It takes the url as a String. pub async fn cache_results( &mut self, - json_results: &str, + json_results: String, url: &str, ) -> Result<(), Report> { - self.current_connection = Default::default(); - let hashed_url_string: &str = &self.hash_url(url); - - let mut result: Result<(), RedisError> = self.connection_pool - [self.current_connection as usize] - .set_ex(hashed_url_string, json_results, 60) - .await; - - // Code to check whether the current connection being used is dropped with connection error - // or not. if it drops with the connection error then the current connection is replaced - // with a new connection from the pool which is then used to run the redis command then - // that connection is also checked whether it is dropped or not if it is not then the - // result is passed as a `Result` or else the same process repeats again and if all of the - // connections in the pool result in connection drop error then a custom pool error is - // returned. - loop { - match result { - Err(error) => match error.is_connection_dropped() { - true => { - self.current_connection += 1; - if self.current_connection == self.pool_size { - return Err(Report::new( - PoolError::PoolExhaustionWithConnectionDropError, - )); - } - result = self.connection_pool[self.current_connection as usize] - .set_ex(hashed_url_string, json_results, 60) - .await; - continue; - } - false => return Err(Report::new(PoolError::RedisError(error))), - }, - Ok(_) => return Ok(()), + match self { + Cache::Redis(redis_cache) => redis_cache.cache_results(&json_results, url).await, + Cache::InMemory(cache) => { + cache.insert(url.to_string(), json_results); + Ok(()) } } } } + +/// A structure to efficiently share the cache between threads - as it is protected by a Mutex. +pub struct SharedCache { + cache: Mutex, +} + +impl SharedCache { + /// Creates a new SharedCache from a Cache implementation + pub fn new(cache: Cache) -> Self { + Self { + cache: Mutex::new(cache), + } + } + + /// A function which fetches the cached json results as json string. + pub async fn cached_json(&self, url: &str) -> Result> { + let mut mut_cache = self.cache.lock().await; + mut_cache.cached_json(url).await + } + + /// A function which caches the results by using the `url` as the key and + /// `json results` as the value and stores it in the cache + pub async fn cache_results( + &self, + json_results: String, + url: &str, + ) -> Result<(), Report> { + let mut mut_cache = self.cache.lock().await; + mut_cache.cache_results(json_results, url).await + } +} diff --git a/src/cache/error.rs b/src/cache/error.rs index 8bdb977..5f5d0e0 100644 --- a/src/cache/error.rs +++ b/src/cache/error.rs @@ -12,6 +12,7 @@ pub enum PoolError { /// This variant handles the errors which occurs when all the connections /// in the connection pool return a connection dropped redis error. PoolExhaustionWithConnectionDropError, + MissingValue, } impl fmt::Display for PoolError { @@ -30,6 +31,9 @@ impl fmt::Display for PoolError { "Error all connections from the pool dropped with connection error" ) } + PoolError::MissingValue => { + write!(f, "The value is missing from the cache") + } } } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index f40369f..80d1a99 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -3,3 +3,4 @@ pub mod cacher; pub mod error; +pub mod redis_cacher; diff --git a/src/cache/redis_cacher.rs b/src/cache/redis_cacher.rs new file mode 100644 index 0000000..9e4cbab --- /dev/null +++ b/src/cache/redis_cacher.rs @@ -0,0 +1,152 @@ +//! This module provides the functionality to cache the aggregated results fetched and aggregated +//! from the upstream search engines in a json format. + +use error_stack::Report; +use futures::future::try_join_all; +use md5::compute; +use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; + +use super::error::PoolError; + +/// A named struct which stores the redis Connection url address to which the client will +/// connect to. +#[derive(Clone)] +pub struct RedisCache { + /// It stores a pool of connections ready to be used. + connection_pool: Vec, + /// It stores the size of the connection pool (in other words the number of + /// connections that should be stored in the pool). + pool_size: u8, + /// It stores the index of which connection is being used at the moment. + current_connection: u8, +} + +impl RedisCache { + /// A function which fetches the cached json results as json string. + /// + /// # Arguments + /// + /// * `redis_connection_url` - It takes the redis Connection url address. + /// * `pool_size` - It takes the size of the connection pool (in other words the number of + /// connections that should be stored in the pool). + pub async fn new( + redis_connection_url: &str, + pool_size: u8, + ) -> Result> { + let client = Client::open(redis_connection_url)?; + let mut tasks: Vec<_> = Vec::new(); + + for _ in 0..pool_size { + tasks.push(client.get_tokio_connection_manager()); + } + + let redis_cache = RedisCache { + connection_pool: try_join_all(tasks).await?, + pool_size, + current_connection: Default::default(), + }; + Ok(redis_cache) + } + + /// A helper function which computes the hash of the url and formats and returns it as string. + /// + /// # Arguments + /// + /// * `url` - It takes an url as string. + fn hash_url(&self, url: &str) -> String { + format!("{:?}", compute(url)) + } + + /// A function which fetches the cached json results as json string from the redis server. + /// + /// # Arguments + /// + /// * `url` - It takes an url as a string. + pub async fn cached_json(&mut self, url: &str) -> Result> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); + + let mut result: Result = self.connection_pool + [self.current_connection as usize] + .get(hashed_url_string) + .await; + + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .get(hashed_url_string) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(res) => return Ok(res), + } + } + } + + /// A function which caches the results by using the hashed `url` as the key and + /// `json results` as the value and stores it in redis server with ttl(time to live) + /// set to 60 seconds. + /// + /// # Arguments + /// + /// * `json_results` - It takes the json results string as an argument. + /// * `url` - It takes the url as a String. + pub async fn cache_results( + &mut self, + json_results: &str, + url: &str, + ) -> Result<(), Report> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); + + let mut result: Result<(), RedisError> = self.connection_pool + [self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; + + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(_) => return Ok(()), + } + } + } +} diff --git a/src/config/parser.rs b/src/config/parser.rs index 782b026..6d84374 100644 --- a/src/config/parser.rs +++ b/src/config/parser.rs @@ -19,7 +19,7 @@ pub struct Config { pub style: Style, /// It stores the redis connection url address on which the redis /// client should connect. - pub redis_url: String, + pub redis_url: Option, /// It stores the option to whether enable or disable production use. pub aggregator: AggregatorConfig, /// It stores the option to whether enable or disable logs. @@ -99,7 +99,7 @@ impl Config { globals.get::<_, String>("theme")?, globals.get::<_, String>("colorscheme")?, ), - redis_url: globals.get::<_, String>("redis_url")?, + redis_url: globals.get::<_, String>("redis_url").ok(), aggregator: AggregatorConfig { random_delay: globals.get::<_, bool>("production_use")?, }, diff --git a/src/lib.rs b/src/lib.rs index 8c74e6a..68f6e01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ use actix_cors::Cors; use actix_files as fs; use actix_governor::{Governor, GovernorConfigBuilder}; use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer}; +use cache::cacher::{Cache, SharedCache}; use config::parser::Config; use handlebars::Handlebars; use handler::paths::{file_path, FileType}; @@ -45,7 +46,7 @@ use handler::paths::{file_path, FileType}; /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address"); /// let server = run(listener,config).expect("Failed to start server"); /// ``` -pub fn run(listener: TcpListener, config: Config) -> std::io::Result { +pub fn run(listener: TcpListener, config: Config, cache: Cache) -> std::io::Result { let mut handlebars: Handlebars<'_> = Handlebars::new(); let public_folder_path: &str = file_path(FileType::Theme)?; @@ -58,6 +59,8 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result { let cloned_config_threads_opt: u8 = config.threads; + let cache = web::Data::new(SharedCache::new(cache)); + let server = HttpServer::new(move || { let cors: Cors = Cors::default() .allow_any_origin() @@ -73,6 +76,7 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result { .wrap(Logger::default()) // added logging middleware for logging. .app_data(handlebars_ref.clone()) .app_data(web::Data::new(config.clone())) + .app_data(cache.clone()) .wrap(cors) .wrap(Governor::new( &GovernorConfigBuilder::default() diff --git a/src/server/routes/search.rs b/src/server/routes/search.rs index 254c038..ea78034 100644 --- a/src/server/routes/search.rs +++ b/src/server/routes/search.rs @@ -1,7 +1,7 @@ //! This module handles the search route of the search engine website. use crate::{ - cache::cacher::RedisCache, + cache::cacher::SharedCache, config::parser::Config, handler::paths::{file_path, FileType}, models::{aggregation_models::SearchResults, engine_models::EngineHandler}, @@ -17,10 +17,6 @@ use std::{ }; use tokio::join; -// ---- Constants ---- -/// Initialize redis cache connection once and store it on the heap. -static REDIS_CACHE: async_once_cell::OnceCell = async_once_cell::OnceCell::new(); - /// A named struct which deserializes all the user provided search parameters and stores them. #[derive(Deserialize)] pub struct SearchParams { @@ -89,6 +85,7 @@ pub async fn search( hbs: web::Data>, req: HttpRequest, config: web::Data, + cache: web::Data, ) -> Result> { let params = web::Query::::from_query(req.query_string())?; match ¶ms.q { @@ -125,6 +122,7 @@ pub async fn search( safe_search ), &config, + &cache, query, page - 1, req.clone(), @@ -136,6 +134,7 @@ pub async fn search( config.binding_ip, config.port, query, page, safe_search ), &config, + &cache, query, page, req.clone(), @@ -151,6 +150,7 @@ pub async fn search( safe_search ), &config, + &cache, query, page + 1, req.clone(), @@ -185,27 +185,19 @@ pub async fn search( async fn results( url: String, config: &Config, + cache: &web::Data, query: &str, page: u32, req: HttpRequest, safe_search: u8, ) -> Result> { - // Initialize redis cache connection struct - let mut redis_cache: RedisCache = REDIS_CACHE - .get_or_init(async { - // Initialize redis cache connection pool only one and store it in the heap. - RedisCache::new(&config.redis_url, 5).await.unwrap() - }) - .await - .clone(); // fetch the cached results json. - let cached_results_json: Result> = - redis_cache.clone().cached_json(&url).await; + let cached_results_json = cache.cached_json(&url).await.ok(); // check if fetched cache results was indeed fetched or it was an error and if so // handle the data accordingly. match cached_results_json { - Ok(results) => Ok(serde_json::from_str::(&results)?), - Err(_) => { + Some(results) => Ok(serde_json::from_str::(&results)?), + None => { if safe_search == 4 { let mut results: SearchResults = SearchResults::default(); let mut _flag: bool = @@ -216,8 +208,8 @@ async fn results( results.set_disallowed(); results.add_style(&config.style); results.set_page_query(query); - redis_cache - .cache_results(&serde_json::to_string(&results)?, &url) + cache + .cache_results(serde_json::to_string(&results)?, &url) .await?; return Ok(results); } @@ -266,9 +258,8 @@ async fn results( results.set_filtered(); } results.add_style(&config.style); - redis_cache - .cache_results(&serde_json::to_string(&results)?, &url) - .await?; + let json_results = serde_json::to_string(&results)?; + cache.cache_results(json_results, &url).await?; Ok(results) } } diff --git a/tests/index.rs b/tests/index.rs index 080ad27..ab56e1d 100644 --- a/tests/index.rs +++ b/tests/index.rs @@ -9,7 +9,12 @@ fn spawn_app() -> String { let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port"); let port = listener.local_addr().unwrap().port(); let config = Config::parse(false).unwrap(); - let server = run(listener, config).expect("Failed to bind address"); + let server = run( + listener, + config, + websurfx::cache::cacher::Cache::new_in_memory(), + ) + .expect("Failed to bind address"); tokio::spawn(server); format!("http://127.0.0.1:{}/", port)