diff --git a/Cargo.lock b/Cargo.lock index 01178b8..fdaa249 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -478,6 +484,37 @@ dependencies = [ "bytes 1.5.0", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver 1.0.18", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -973,6 +1010,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "error-stack" version = "0.4.1" @@ -1256,6 +1302,12 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "governor" version = "0.5.1" @@ -1834,6 +1886,21 @@ dependencies = [ "unicase", ] +[[package]] +name = "mini-moka" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23e0b72e7c9042467008b10279fc732326bd605459ae03bda88825909dd19b56" +dependencies = [ + "crossbeam-channel", + "crossbeam-utils 0.8.16", + "dashmap", + "skeptic", + "smallvec 1.11.0", + "tagptr", + "triomphe", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -2383,6 +2450,17 @@ dependencies = [ "url 2.4.1", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + [[package]] name = "quanta" version = "0.9.3" @@ -2900,6 +2978,9 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +dependencies = [ + "serde", +] [[package]] name = "semver-parser" @@ -3020,6 +3101,21 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.9" @@ -3197,6 +3293,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.8.0" @@ -3553,6 +3655,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -3815,6 +3923,7 @@ dependencies = [ "log", "md5", "mimalloc", + "mini-moka", "mlua", "once_cell", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index d30b325..73eda46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ fake-useragent = {version="0.1.3"} env_logger = {version="0.10.0"} log = {version="0.4.20"} mlua = {version="0.8.10", features=["luajit"]} -redis = {version="0.23.3", features=["tokio-comp","connection-manager"]} +redis = {version="0.23.3", features=["tokio-comp","connection-manager"], optional = true} md5 = {version="0.7.0"} rand={version="0.8.5"} once_cell = {version="1.18.0"} @@ -33,6 +33,7 @@ dhat = {version="0.3.2", optional = true} mimalloc = { version = "0.1.38", default-features = false } async-once-cell = {version="0.5.3"} actix-governor = {version="0.4.1"} +mini-moka = { version="0.10", optional = true} [dev-dependencies] rusty-hook = "^0.11.2" @@ -66,4 +67,8 @@ rpath = false strip = "debuginfo" [features] +default = ["memory-cache"] dhat-heap = ["dep:dhat"] +memory-cache = ["dep:mini-moka"] +redis-cache = ["dep:redis"] +hybrid-cache = ["memory-cache", "redis-cache"] diff --git a/src/bin/websurfx.rs b/src/bin/websurfx.rs index bc8e7ce..d80c8e0 100644 --- a/src/bin/websurfx.rs +++ b/src/bin/websurfx.rs @@ -5,7 +5,7 @@ use mimalloc::MiMalloc; use std::net::TcpListener; -use websurfx::{config::parser::Config, run}; +use websurfx::{cache::cacher::Cache, config::parser::Config, run}; /// A dhat heap memory profiler #[cfg(feature = "dhat-heap")] @@ -31,6 +31,8 @@ async fn main() -> std::io::Result<()> { // Initialize the parsed config file. let config = Config::parse(false).unwrap(); + let cache = Cache::build(&config).await; + log::info!( "started server on port {} and IP {}", config.port, @@ -44,5 +46,5 @@ async fn main() -> std::io::Result<()> { let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?; - run(listener, config)?.await + run(listener, config, cache)?.await } diff --git a/src/cache/cacher.rs b/src/cache/cacher.rs index 57351cd..c1d9096 100644 --- a/src/cache/cacher.rs +++ b/src/cache/cacher.rs @@ -2,107 +2,95 @@ //! from the upstream search engines in a json format. use error_stack::Report; -use futures::future::try_join_all; -use md5::compute; -use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; +#[cfg(feature = "memory-cache")] +use mini_moka::sync::Cache as MokaCache; +#[cfg(feature = "memory-cache")] +use std::time::Duration; +use tokio::sync::Mutex; + +use crate::{config::parser::Config, models::aggregation_models::SearchResults}; use super::error::PoolError; +#[cfg(feature = "redis-cache")] +use super::redis_cacher::RedisCache; -/// A named struct which stores the redis Connection url address to which the client will -/// connect to. +/// Different implementations for caching, currently it is possible to cache in-memory or in Redis. #[derive(Clone)] -pub struct RedisCache { - /// It stores a pool of connections ready to be used. - connection_pool: Vec, - /// It stores the size of the connection pool (in other words the number of - /// connections that should be stored in the pool). - pool_size: u8, - /// It stores the index of which connection is being used at the moment. - current_connection: u8, +pub enum Cache { + /// Caching is disabled + Disabled, + #[cfg(feature = "redis-cache")] + /// Encapsulates the Redis based cache + Redis(RedisCache), + #[cfg(feature = "memory-cache")] + /// Contains the in-memory cache. + InMemory(MokaCache), } -impl RedisCache { - /// Constructs a new `SearchResult` with the given arguments needed for the struct. - /// - /// # Arguments - /// - /// * `redis_connection_url` - It takes the redis Connection url address. - /// * `pool_size` - It takes the size of the connection pool (in other words the number of - /// connections that should be stored in the pool). - pub async fn new( - redis_connection_url: &str, - pool_size: u8, - ) -> Result> { - let client = Client::open(redis_connection_url)?; - let mut tasks: Vec<_> = Vec::new(); - - for _ in 0..pool_size { - tasks.push(client.get_tokio_connection_manager()); +impl Cache { + /// Builds the cache from the given configuration. + pub async fn build(_config: &Config) -> Self { + #[cfg(feature = "redis-cache")] + if let Some(url) = &_config.redis_url { + log::info!("Using Redis running at {} for caching", &url); + return Cache::new( + RedisCache::new(url, 5) + .await + .expect("Redis cache configured"), + ); + } + #[cfg(feature = "memory-cache")] + { + log::info!("Using an in-memory cache"); + return Cache::new_in_memory(); + } + #[cfg(not(feature = "memory-cache"))] + { + log::info!("Caching is disabled"); + Cache::Disabled } - - let redis_cache = RedisCache { - connection_pool: try_join_all(tasks).await?, - pool_size, - current_connection: Default::default(), - }; - Ok(redis_cache) } - /// A helper function which computes the hash of the url and formats and returns it as string. - /// - /// # Arguments - /// - /// * `url` - It takes an url as string. - fn hash_url(&self, url: &str) -> String { - format!("{:?}", compute(url)) + /// Creates a new cache, which wraps the given RedisCache. + #[cfg(feature = "redis-cache")] + pub fn new(redis_cache: RedisCache) -> Self { + Cache::Redis(redis_cache) } - /// A function which fetches the cached json results as json string from the redis server. + /// Creates an in-memory cache + #[cfg(feature = "memory-cache")] + pub fn new_in_memory() -> Self { + let cache = MokaCache::builder() + .max_capacity(1000) + .time_to_live(Duration::from_secs(60)) + .build(); + Cache::InMemory(cache) + } + + /// A function which fetches the cached json results as json string. /// /// # Arguments /// /// * `url` - It takes an url as a string. - pub async fn cached_json(&mut self, url: &str) -> Result> { - self.current_connection = Default::default(); - let hashed_url_string: &str = &self.hash_url(url); - - let mut result: Result = self.connection_pool - [self.current_connection as usize] - .get(hashed_url_string) - .await; - - // Code to check whether the current connection being used is dropped with connection error - // or not. if it drops with the connection error then the current connection is replaced - // with a new connection from the pool which is then used to run the redis command then - // that connection is also checked whether it is dropped or not if it is not then the - // result is passed as a `Result` or else the same process repeats again and if all of the - // connections in the pool result in connection drop error then a custom pool error is - // returned. - loop { - match result { - Err(error) => match error.is_connection_dropped() { - true => { - self.current_connection += 1; - if self.current_connection == self.pool_size { - return Err(Report::new( - PoolError::PoolExhaustionWithConnectionDropError, - )); - } - result = self.connection_pool[self.current_connection as usize] - .get(hashed_url_string) - .await; - continue; - } - false => return Err(Report::new(PoolError::RedisError(error))), - }, - Ok(res) => return Ok(res), + pub async fn cached_json(&mut self, url: &str) -> Result> { + match self { + Cache::Disabled => Err(Report::new(PoolError::MissingValue)), + #[cfg(feature = "redis-cache")] + Cache::Redis(redis_cache) => { + let json = redis_cache.cached_json(url).await?; + Ok(serde_json::from_str::(&json) + .map_err(|_| PoolError::SerializationError)?) } + #[cfg(feature = "memory-cache")] + Cache::InMemory(in_memory) => match in_memory.get(&url.to_string()) { + Some(res) => Ok(res), + None => Err(Report::new(PoolError::MissingValue)), + }, } } - /// A function which caches the results by using the hashed `url` as the key and - /// `json results` as the value and stores it in redis server with ttl(time to live) - /// set to 60 seconds. + /// A function which caches the results by using the `url` as the key and + /// `json results` as the value and stores it in the cache /// /// # Arguments /// @@ -110,43 +98,54 @@ impl RedisCache { /// * `url` - It takes the url as a String. pub async fn cache_results( &mut self, - json_results: &str, + search_results: &SearchResults, url: &str, ) -> Result<(), Report> { - self.current_connection = Default::default(); - let hashed_url_string: &str = &self.hash_url(url); - - let mut result: Result<(), RedisError> = self.connection_pool - [self.current_connection as usize] - .set_ex(hashed_url_string, json_results, 60) - .await; - - // Code to check whether the current connection being used is dropped with connection error - // or not. if it drops with the connection error then the current connection is replaced - // with a new connection from the pool which is then used to run the redis command then - // that connection is also checked whether it is dropped or not if it is not then the - // result is passed as a `Result` or else the same process repeats again and if all of the - // connections in the pool result in connection drop error then a custom pool error is - // returned. - loop { - match result { - Err(error) => match error.is_connection_dropped() { - true => { - self.current_connection += 1; - if self.current_connection == self.pool_size { - return Err(Report::new( - PoolError::PoolExhaustionWithConnectionDropError, - )); - } - result = self.connection_pool[self.current_connection as usize] - .set_ex(hashed_url_string, json_results, 60) - .await; - continue; - } - false => return Err(Report::new(PoolError::RedisError(error))), - }, - Ok(_) => return Ok(()), + match self { + Cache::Disabled => Ok(()), + #[cfg(feature = "redis-cache")] + Cache::Redis(redis_cache) => { + let json = serde_json::to_string(search_results) + .map_err(|_| PoolError::SerializationError)?; + redis_cache.cache_results(&json, url).await + } + #[cfg(feature = "memory-cache")] + Cache::InMemory(cache) => { + cache.insert(url.to_string(), search_results.clone()); + Ok(()) } } } } + +/// A structure to efficiently share the cache between threads - as it is protected by a Mutex. +pub struct SharedCache { + /// The internal cache protected from concurrent access by a mutex + cache: Mutex, +} + +impl SharedCache { + /// Creates a new SharedCache from a Cache implementation + pub fn new(cache: Cache) -> Self { + Self { + cache: Mutex::new(cache), + } + } + + /// A function which retrieves the cached SearchResulsts from the internal cache. + pub async fn cached_json(&self, url: &str) -> Result> { + let mut mut_cache = self.cache.lock().await; + mut_cache.cached_json(url).await + } + + /// A function which caches the results by using the `url` as the key and + /// `SearchResults` as the value. + pub async fn cache_results( + &self, + search_results: &SearchResults, + url: &str, + ) -> Result<(), Report> { + let mut mut_cache = self.cache.lock().await; + mut_cache.cache_results(search_results, url).await + } +} diff --git a/src/cache/error.rs b/src/cache/error.rs index 8bdb977..9efda32 100644 --- a/src/cache/error.rs +++ b/src/cache/error.rs @@ -2,21 +2,28 @@ //! the redis server using an async connection pool. use std::fmt; +#[cfg(feature = "redis-cache")] use redis::RedisError; /// A custom error type used for handling redis async pool associated errors. #[derive(Debug)] pub enum PoolError { /// This variant handles all errors related to `RedisError`, + #[cfg(feature = "redis-cache")] RedisError(RedisError), /// This variant handles the errors which occurs when all the connections /// in the connection pool return a connection dropped redis error. PoolExhaustionWithConnectionDropError, + /// Whenever serialization or deserialization fails during communication with the cache. + SerializationError, + /// Returned when the value is missing. + MissingValue, } impl fmt::Display for PoolError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + #[cfg(feature = "redis-cache")] PoolError::RedisError(redis_error) => { if let Some(detail) = redis_error.detail() { write!(f, "{}", detail) @@ -30,6 +37,12 @@ impl fmt::Display for PoolError { "Error all connections from the pool dropped with connection error" ) } + PoolError::MissingValue => { + write!(f, "The value is missing from the cache") + } + PoolError::SerializationError => { + write!(f, "Unable to serialize, deserialize from the cache") + } } } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index f40369f..887f119 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -3,3 +3,5 @@ pub mod cacher; pub mod error; +#[cfg(feature = "redis-cache")] +pub mod redis_cacher; diff --git a/src/cache/redis_cacher.rs b/src/cache/redis_cacher.rs new file mode 100644 index 0000000..9e4cbab --- /dev/null +++ b/src/cache/redis_cacher.rs @@ -0,0 +1,152 @@ +//! This module provides the functionality to cache the aggregated results fetched and aggregated +//! from the upstream search engines in a json format. + +use error_stack::Report; +use futures::future::try_join_all; +use md5::compute; +use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError}; + +use super::error::PoolError; + +/// A named struct which stores the redis Connection url address to which the client will +/// connect to. +#[derive(Clone)] +pub struct RedisCache { + /// It stores a pool of connections ready to be used. + connection_pool: Vec, + /// It stores the size of the connection pool (in other words the number of + /// connections that should be stored in the pool). + pool_size: u8, + /// It stores the index of which connection is being used at the moment. + current_connection: u8, +} + +impl RedisCache { + /// A function which fetches the cached json results as json string. + /// + /// # Arguments + /// + /// * `redis_connection_url` - It takes the redis Connection url address. + /// * `pool_size` - It takes the size of the connection pool (in other words the number of + /// connections that should be stored in the pool). + pub async fn new( + redis_connection_url: &str, + pool_size: u8, + ) -> Result> { + let client = Client::open(redis_connection_url)?; + let mut tasks: Vec<_> = Vec::new(); + + for _ in 0..pool_size { + tasks.push(client.get_tokio_connection_manager()); + } + + let redis_cache = RedisCache { + connection_pool: try_join_all(tasks).await?, + pool_size, + current_connection: Default::default(), + }; + Ok(redis_cache) + } + + /// A helper function which computes the hash of the url and formats and returns it as string. + /// + /// # Arguments + /// + /// * `url` - It takes an url as string. + fn hash_url(&self, url: &str) -> String { + format!("{:?}", compute(url)) + } + + /// A function which fetches the cached json results as json string from the redis server. + /// + /// # Arguments + /// + /// * `url` - It takes an url as a string. + pub async fn cached_json(&mut self, url: &str) -> Result> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); + + let mut result: Result = self.connection_pool + [self.current_connection as usize] + .get(hashed_url_string) + .await; + + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .get(hashed_url_string) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(res) => return Ok(res), + } + } + } + + /// A function which caches the results by using the hashed `url` as the key and + /// `json results` as the value and stores it in redis server with ttl(time to live) + /// set to 60 seconds. + /// + /// # Arguments + /// + /// * `json_results` - It takes the json results string as an argument. + /// * `url` - It takes the url as a String. + pub async fn cache_results( + &mut self, + json_results: &str, + url: &str, + ) -> Result<(), Report> { + self.current_connection = Default::default(); + let hashed_url_string: &str = &self.hash_url(url); + + let mut result: Result<(), RedisError> = self.connection_pool + [self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; + + // Code to check whether the current connection being used is dropped with connection error + // or not. if it drops with the connection error then the current connection is replaced + // with a new connection from the pool which is then used to run the redis command then + // that connection is also checked whether it is dropped or not if it is not then the + // result is passed as a `Result` or else the same process repeats again and if all of the + // connections in the pool result in connection drop error then a custom pool error is + // returned. + loop { + match result { + Err(error) => match error.is_connection_dropped() { + true => { + self.current_connection += 1; + if self.current_connection == self.pool_size { + return Err(Report::new( + PoolError::PoolExhaustionWithConnectionDropError, + )); + } + result = self.connection_pool[self.current_connection as usize] + .set_ex(hashed_url_string, json_results, 60) + .await; + continue; + } + false => return Err(Report::new(PoolError::RedisError(error))), + }, + Ok(_) => return Ok(()), + } + } + } +} diff --git a/src/config/parser.rs b/src/config/parser.rs index 782b026..6d84374 100644 --- a/src/config/parser.rs +++ b/src/config/parser.rs @@ -19,7 +19,7 @@ pub struct Config { pub style: Style, /// It stores the redis connection url address on which the redis /// client should connect. - pub redis_url: String, + pub redis_url: Option, /// It stores the option to whether enable or disable production use. pub aggregator: AggregatorConfig, /// It stores the option to whether enable or disable logs. @@ -99,7 +99,7 @@ impl Config { globals.get::<_, String>("theme")?, globals.get::<_, String>("colorscheme")?, ), - redis_url: globals.get::<_, String>("redis_url")?, + redis_url: globals.get::<_, String>("redis_url").ok(), aggregator: AggregatorConfig { random_delay: globals.get::<_, bool>("production_use")?, }, diff --git a/src/lib.rs b/src/lib.rs index 8c74e6a..73e9364 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ use actix_cors::Cors; use actix_files as fs; use actix_governor::{Governor, GovernorConfigBuilder}; use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer}; +use cache::cacher::{Cache, SharedCache}; use config::parser::Config; use handlebars::Handlebars; use handler::paths::{file_path, FileType}; @@ -39,13 +40,14 @@ use handler::paths::{file_path, FileType}; /// /// ```rust /// use std::net::TcpListener; -/// use websurfx::{config::parser::Config, run}; +/// use websurfx::{config::parser::Config, run, cache::cacher::Cache}; /// /// let config = Config::parse(true).unwrap(); /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address"); -/// let server = run(listener,config).expect("Failed to start server"); +/// let cache = Cache::new_in_memory(); +/// let server = run(listener,config,cache).expect("Failed to start server"); /// ``` -pub fn run(listener: TcpListener, config: Config) -> std::io::Result { +pub fn run(listener: TcpListener, config: Config, cache: Cache) -> std::io::Result { let mut handlebars: Handlebars<'_> = Handlebars::new(); let public_folder_path: &str = file_path(FileType::Theme)?; @@ -58,6 +60,8 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result { let cloned_config_threads_opt: u8 = config.threads; + let cache = web::Data::new(SharedCache::new(cache)); + let server = HttpServer::new(move || { let cors: Cors = Cors::default() .allow_any_origin() @@ -73,6 +77,7 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result { .wrap(Logger::default()) // added logging middleware for logging. .app_data(handlebars_ref.clone()) .app_data(web::Data::new(config.clone())) + .app_data(cache.clone()) .wrap(cors) .wrap(Governor::new( &GovernorConfigBuilder::default() diff --git a/src/models/aggregation_models.rs b/src/models/aggregation_models.rs index ea4a914..656297f 100644 --- a/src/models/aggregation_models.rs +++ b/src/models/aggregation_models.rs @@ -102,7 +102,7 @@ impl EngineErrorInfo { /// A named struct to store, serialize, deserialize the all the search results scraped and /// aggregated from the upstream search engines. /// `SearchResult` structs. -#[derive(Serialize, Deserialize, Default)] +#[derive(Serialize, Deserialize, Default, Clone)] #[serde(rename_all = "camelCase")] pub struct SearchResults { /// Stores the individual serializable `SearchResult` struct into a vector of diff --git a/src/server/routes/search.rs b/src/server/routes/search.rs index 254c038..1c1cff1 100644 --- a/src/server/routes/search.rs +++ b/src/server/routes/search.rs @@ -1,7 +1,7 @@ //! This module handles the search route of the search engine website. use crate::{ - cache::cacher::RedisCache, + cache::cacher::SharedCache, config::parser::Config, handler::paths::{file_path, FileType}, models::{aggregation_models::SearchResults, engine_models::EngineHandler}, @@ -17,10 +17,6 @@ use std::{ }; use tokio::join; -// ---- Constants ---- -/// Initialize redis cache connection once and store it on the heap. -static REDIS_CACHE: async_once_cell::OnceCell = async_once_cell::OnceCell::new(); - /// A named struct which deserializes all the user provided search parameters and stores them. #[derive(Deserialize)] pub struct SearchParams { @@ -89,6 +85,7 @@ pub async fn search( hbs: web::Data>, req: HttpRequest, config: web::Data, + cache: web::Data, ) -> Result> { let params = web::Query::::from_query(req.query_string())?; match ¶ms.q { @@ -125,6 +122,7 @@ pub async fn search( safe_search ), &config, + &cache, query, page - 1, req.clone(), @@ -136,6 +134,7 @@ pub async fn search( config.binding_ip, config.port, query, page, safe_search ), &config, + &cache, query, page, req.clone(), @@ -151,6 +150,7 @@ pub async fn search( safe_search ), &config, + &cache, query, page + 1, req.clone(), @@ -185,26 +185,18 @@ pub async fn search( async fn results( url: String, config: &Config, + cache: &web::Data, query: &str, page: u32, req: HttpRequest, safe_search: u8, ) -> Result> { - // Initialize redis cache connection struct - let mut redis_cache: RedisCache = REDIS_CACHE - .get_or_init(async { - // Initialize redis cache connection pool only one and store it in the heap. - RedisCache::new(&config.redis_url, 5).await.unwrap() - }) - .await - .clone(); // fetch the cached results json. - let cached_results_json: Result> = - redis_cache.clone().cached_json(&url).await; + let cached_results = cache.cached_json(&url).await; // check if fetched cache results was indeed fetched or it was an error and if so // handle the data accordingly. - match cached_results_json { - Ok(results) => Ok(serde_json::from_str::(&results)?), + match cached_results { + Ok(results) => Ok(results), Err(_) => { if safe_search == 4 { let mut results: SearchResults = SearchResults::default(); @@ -216,9 +208,7 @@ async fn results( results.set_disallowed(); results.add_style(&config.style); results.set_page_query(query); - redis_cache - .cache_results(&serde_json::to_string(&results)?, &url) - .await?; + cache.cache_results(&results, &url).await?; return Ok(results); } } @@ -266,9 +256,7 @@ async fn results( results.set_filtered(); } results.add_style(&config.style); - redis_cache - .cache_results(&serde_json::to_string(&results)?, &url) - .await?; + cache.cache_results(&results, &url).await?; Ok(results) } } diff --git a/tests/index.rs b/tests/index.rs index 080ad27..ab56e1d 100644 --- a/tests/index.rs +++ b/tests/index.rs @@ -9,7 +9,12 @@ fn spawn_app() -> String { let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port"); let port = listener.local_addr().unwrap().port(); let config = Config::parse(false).unwrap(); - let server = run(listener, config).expect("Failed to bind address"); + let server = run( + listener, + config, + websurfx::cache::cacher::Cache::new_in_memory(), + ) + .expect("Failed to bind address"); tokio::spawn(server); format!("http://127.0.0.1:{}/", port)