diff --git a/Cargo.lock b/Cargo.lock index 7a41139..72af913 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4146,7 +4146,7 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "websurfx" -version = "1.9.3" +version = "1.9.4" dependencies = [ "actix-cors", "actix-files", diff --git a/Cargo.toml b/Cargo.toml index aff946b..fdfbe32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "websurfx" -version = "1.9.3" +version = "1.9.4" edition = "2021" description = "An open-source alternative to Searx that provides clean, ad-free, and organic results with incredible speed while keeping privacy and security in mind." repository = "https://github.com/neon-mmd/websurfx" diff --git a/src/cache/cacher.rs b/src/cache/cacher.rs index d1144e3..f323395 100644 --- a/src/cache/cacher.rs +++ b/src/cache/cacher.rs @@ -4,6 +4,7 @@ use error_stack::Report; #[cfg(feature = "memory-cache")] use mini_moka::sync::Cache as MokaCache; +use mini_moka::sync::ConcurrentCacheExt; #[cfg(feature = "memory-cache")] use std::time::Duration; @@ -61,8 +62,8 @@ pub trait Cacher: Send + Sync { /// failure. async fn cache_results( &mut self, - search_results: &SearchResults, - url: &str, + search_results: &[SearchResults], + urls: &[String], ) -> Result<(), Report>; /// A helper function which computes the hash of the url and formats and returns it as string. @@ -332,14 +333,33 @@ impl Cacher for RedisCache { async fn cache_results( &mut self, - search_results: &SearchResults, - url: &str, + search_results: &[SearchResults], + urls: &[String], ) -> Result<(), Report> { use base64::Engine; - let bytes = self.pre_process_search_results(search_results)?; - let base64_string = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); - let hashed_url_string = self.hash_url(url); - self.cache_json(&base64_string, &hashed_url_string).await + + // size of search_results is expected to be equal to size of urls -> key/value pairs for cache; + let search_results_len = search_results.len(); + + let mut bytes = Vec::with_capacity(search_results_len); + + for result in search_results { + let processed = self.pre_process_search_results(result)?; + bytes.push(processed); + } + + let base64_strings = bytes + .iter() + .map(|bytes_vec| base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes_vec)); + + let mut hashed_url_strings = Vec::with_capacity(search_results_len); + + for url in urls { + let hash = self.hash_url(url); + hashed_url_strings.push(hash); + } + self.cache_json(base64_strings, hashed_url_strings.into_iter()) + .await } } /// TryInto implementation for SearchResults from Vec @@ -391,12 +411,16 @@ impl Cacher for InMemoryCache { async fn cache_results( &mut self, - search_results: &SearchResults, - url: &str, + search_results: &[SearchResults], + urls: &[String], ) -> Result<(), Report> { - let hashed_url_string = self.hash_url(url); - let bytes = self.pre_process_search_results(search_results)?; - self.cache.insert(hashed_url_string, bytes); + for (url, search_result) in urls.iter().zip(search_results.iter()) { + let hashed_url_string = self.hash_url(url); + let bytes = self.pre_process_search_results(search_result)?; + self.cache.insert(hashed_url_string, bytes); + } + + self.cache.sync(); Ok(()) } } @@ -434,11 +458,13 @@ impl Cacher for HybridCache { async fn cache_results( &mut self, - search_results: &SearchResults, - url: &str, + search_results: &[SearchResults], + urls: &[String], ) -> Result<(), Report> { - self.redis_cache.cache_results(search_results, url).await?; - self.memory_cache.cache_results(search_results, url).await?; + self.redis_cache.cache_results(search_results, urls).await?; + self.memory_cache + .cache_results(search_results, urls) + .await?; Ok(()) } @@ -460,8 +486,8 @@ impl Cacher for DisabledCache { async fn cache_results( &mut self, - _search_results: &SearchResults, - _url: &str, + _search_results: &[SearchResults], + _urls: &[String], ) -> Result<(), Report> { Ok(()) } @@ -519,11 +545,11 @@ impl SharedCache { /// on a failure. pub async fn cache_results( &self, - search_results: &SearchResults, - url: &str, + search_results: &[SearchResults], + urls: &[String], ) -> Result<(), Report> { let mut mut_cache = self.cache.lock().await; - mut_cache.cache_results(search_results, url).await + mut_cache.cache_results(search_results, urls).await } } diff --git a/src/cache/redis_cacher.rs b/src/cache/redis_cacher.rs index cfb2a2e..c775963 100644 --- a/src/cache/redis_cacher.rs +++ b/src/cache/redis_cacher.rs @@ -118,14 +118,18 @@ impl RedisCache { /// on a failure. pub async fn cache_json( &mut self, - json_results: &str, - key: &str, + json_results: impl Iterator, + keys: impl Iterator, ) -> Result<(), Report> { self.current_connection = Default::default(); + let mut pipeline = redis::Pipeline::with_capacity(3); - let mut result: Result<(), RedisError> = self.connection_pool - [self.current_connection as usize] - .set_ex(key, json_results, self.cache_ttl.into()) + for (key, json_result) in keys.zip(json_results) { + pipeline.set_ex(key, json_result, self.cache_ttl.into()); + } + + let mut result: Result<(), RedisError> = pipeline + .query_async(&mut self.connection_pool[self.current_connection as usize]) .await; // Code to check whether the current connection being used is dropped with connection error @@ -145,8 +149,10 @@ impl RedisCache { CacheError::PoolExhaustionWithConnectionDropError, )); } - result = self.connection_pool[self.current_connection as usize] - .set_ex(key, json_results, 60) + result = pipeline + .query_async( + &mut self.connection_pool[self.current_connection as usize], + ) .await; continue; } diff --git a/src/server/routes/search.rs b/src/server/routes/search.rs index a1c2cf3..16cfa28 100644 --- a/src/server/routes/search.rs +++ b/src/server/routes/search.rs @@ -40,6 +40,7 @@ pub async fn search( config: web::Data, cache: web::Data, ) -> Result> { + use std::sync::Arc; let params = web::Query::::from_query(req.query_string())?; match ¶ms.q { Some(query) => { @@ -79,12 +80,50 @@ pub async fn search( // .max(1) makes sure that the page >= 0. let page = params.page.unwrap_or(1).max(1) - 1; + let previous_page = page.saturating_sub(1); + let next_page = page + 1; - let (_, results, _) = join!( - get_results(page.saturating_sub(1)), - get_results(page), - get_results(page + 1) - ); + let mut results = Arc::new((SearchResults::default(), String::default())); + if page != previous_page { + let (previous_results, current_results, next_results) = join!( + get_results(previous_page), + get_results(page), + get_results(next_page) + ); + let (parsed_previous_results, parsed_next_results) = + (previous_results?, next_results?); + + let (cache_keys, results_list) = ( + [ + parsed_previous_results.1, + results.1.clone(), + parsed_next_results.1, + ], + [ + parsed_previous_results.0, + results.0.clone(), + parsed_next_results.0, + ], + ); + + results = Arc::new(current_results?); + + tokio::spawn(async move { cache.cache_results(&results_list, &cache_keys).await }); + } else { + let (current_results, next_results) = + join!(get_results(page), get_results(page + 1)); + + let parsed_next_results = next_results?; + + results = Arc::new(current_results?); + + let (cache_keys, results_list) = ( + [results.1.clone(), parsed_next_results.1.clone()], + [results.0.clone(), parsed_next_results.0], + ); + + tokio::spawn(async move { cache.cache_results(&results_list, &cache_keys).await }); + } Ok(HttpResponse::Ok().content_type(ContentType::html()).body( crate::templates::views::search::search( @@ -92,7 +131,7 @@ pub async fn search( &config.style.theme, &config.style.animation, query, - &results?, + &results.0, ) .0, )) @@ -124,7 +163,7 @@ async fn results( query: &str, page: u32, search_settings: &server_models::Cookie<'_>, -) -> Result> { +) -> Result<(SearchResults, String), Box> { // eagerly parse cookie value to evaluate safe search level let safe_search_level = search_settings.safe_search_level; @@ -143,7 +182,7 @@ async fn results( // check if fetched cache results was indeed fetched or it was an error and if so // handle the data accordingly. match cached_results { - Ok(results) => Ok(results), + Ok(results) => Ok((results, cache_key)), Err(_) => { if safe_search_level == 4 { let mut results: SearchResults = SearchResults::default(); @@ -153,9 +192,11 @@ async fn results( // Return early when query contains disallowed words, if flag { results.set_disallowed(); - cache.cache_results(&results, &cache_key).await?; + cache + .cache_results(&[results.clone()], &[cache_key.clone()]) + .await?; results.set_safe_search_level(safe_search_level); - return Ok(results); + return Ok((results, cache_key)); } } @@ -173,7 +214,7 @@ async fn results( &search_settings .engines .iter() - .filter_map(|engine| EngineHandler::new(&engine).ok()) + .filter_map(|engine| EngineHandler::new(engine).ok()) .collect::>(), config.request_timeout, safe_search_level, @@ -192,9 +233,11 @@ async fn results( { results.set_filtered(); } - cache.cache_results(&results, &cache_key).await?; + cache + .cache_results(&[results.clone()], &[cache_key.clone()]) + .await?; results.set_safe_search_level(safe_search_level); - Ok(results) + Ok((results, cache_key)) } } }