0
0
mirror of https://github.com/neon-mmd/websurfx.git synced 2024-10-18 06:22:53 -04:00

♻️ Refactor cache system (#399)

* ♻️ Refactor cache system

* 🐛 Fix cache not getting set

This patch also makes it that cookies are eagerly evaluated. This is
done to figure out the safe search level set by the user. The
performance hit wouldn't be much of a deal as the cookie is a small
json string

* 🔖 chore: bump the app version (#399)

* 🔖 chore: bump the app version (#399)

---------

Co-authored-by: alamin655 <mdalamin655@outlook.com>
This commit is contained in:
Ashwin Vinod 2023-11-28 11:47:35 +05:30 committed by GitHub
parent 90f010359d
commit e704c26ed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 252 additions and 240 deletions

2
Cargo.lock generated
View File

@ -4066,7 +4066,7 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
[[package]]
name = "websurfx"
version = "1.2.30"
version = "1.2.34"
dependencies = [
"actix-cors",
"actix-files",

View File

@ -1,6 +1,6 @@
[package]
name = "websurfx"
version = "1.2.30"
version = "1.2.34"
edition = "2021"
description = "An open-source alternative to Searx that provides clean, ad-free, and organic results with incredible speed while keeping privacy and security in mind."
repository = "https://github.com/neon-mmd/websurfx"

View File

@ -5,7 +5,7 @@
use mimalloc::MiMalloc;
use std::net::TcpListener;
use websurfx::{cache::cacher::Cache, config::parser::Config, run};
use websurfx::{cache::cacher::create_cache, config::parser::Config, run};
/// A dhat heap memory profiler
#[cfg(feature = "dhat-heap")]
@ -31,7 +31,7 @@ async fn main() -> std::io::Result<()> {
// Initialize the parsed config file.
let config = Config::parse(false).unwrap();
let cache = Cache::build(&config).await;
let cache = create_cache(&config).await;
log::info!(
"started server on port {} and IP {}",

334
src/cache/cacher.rs vendored
View File

@ -14,24 +14,10 @@ use super::error::CacheError;
#[cfg(feature = "redis-cache")]
use super::redis_cacher::RedisCache;
/// Different implementations for caching, currently it is possible to cache in-memory or in Redis.
#[derive(Clone)]
pub enum Cache {
/// Caching is disabled
Disabled,
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
/// Encapsulates the Redis based cache
Redis(RedisCache),
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
/// Contains the in-memory cache.
InMemory(MokaCache<String, SearchResults>),
#[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
/// Contains both the in-memory cache and Redis based cache
Hybrid(RedisCache, MokaCache<String, SearchResults>),
}
impl Cache {
/// A function that builds the cache from the given configuration.
/// Abstraction trait for common methods provided by a cache backend.
#[async_trait::async_trait]
pub trait Cacher: Send + Sync {
// A function that builds the cache from the given configuration.
///
/// # Arguments
///
@ -39,89 +25,10 @@ impl Cache {
///
/// # Returns
///
/// It returns a newly initialized variant based on the feature enabled by the user.
pub async fn build(_config: &Config) -> Self {
#[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
{
log::info!("Using a hybrid cache");
Cache::new_hybrid(
RedisCache::new(&_config.redis_url, 5)
.await
.expect("Redis cache configured"),
)
}
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
{
log::info!("Listening redis server on {}", &_config.redis_url);
Cache::new(
RedisCache::new(&_config.redis_url, 5)
.await
.expect("Redis cache configured"),
)
}
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
{
log::info!("Using an in-memory cache");
Cache::new_in_memory()
}
#[cfg(not(any(feature = "memory-cache", feature = "redis-cache")))]
{
log::info!("Caching is disabled");
Cache::Disabled
}
}
/// A function that initializes a new connection pool struct.
///
/// # Arguments
///
/// * `redis_cache` - It takes the newly initialized connection pool struct as an argument.
///
/// # Returns
///
/// It returns a `Redis` variant with the newly initialized connection pool struct.
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
pub fn new(redis_cache: RedisCache) -> Self {
Cache::Redis(redis_cache)
}
/// A function that initializes the `in memory` cache which is used to cache the results in
/// memory with the search engine thus improving performance by making retrieval and caching of
/// results faster.
///
/// # Returns
///
/// It returns a `InMemory` variant with the newly initialized in memory cache type.
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
pub fn new_in_memory() -> Self {
let cache = MokaCache::builder()
.max_capacity(1000)
.time_to_live(Duration::from_secs(60))
.build();
Cache::InMemory(cache)
}
/// A function that initializes both in memory cache and redis client connection for being used
/// for managing hybrid cache which increases resiliancy of the search engine by allowing the
/// cache to switch to `in memory` caching if the `redis` cache server is temporarily
/// unavailable.
///
/// # Arguments
///
/// * `redis_cache` - It takes `redis` client connection struct as an argument.
///
/// # Returns
///
/// It returns a tuple variant `Hybrid` storing both the in-memory cache type and the `redis`
/// client connection struct.
#[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
pub fn new_hybrid(redis_cache: RedisCache) -> Self {
let cache = MokaCache::builder()
.max_capacity(1000)
.time_to_live(Duration::from_secs(60))
.build();
Cache::Hybrid(redis_cache, cache)
}
/// It returns a newly initialized backend based on the feature enabled by the user.
async fn build(config: &Config) -> Self
where
Self: Sized;
/// A function which fetches the cached json results as json string.
///
@ -133,31 +40,7 @@ impl Cache {
///
/// Returns the `SearchResults` from the cache if the program executes normally otherwise
/// returns a `CacheError` if the results cannot be retrieved from the cache.
pub async fn cached_json(&mut self, _url: &str) -> Result<SearchResults, Report<CacheError>> {
match self {
Cache::Disabled => Err(Report::new(CacheError::MissingValue)),
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
Cache::Redis(redis_cache) => {
let json = redis_cache.cached_json(_url).await?;
Ok(serde_json::from_str::<SearchResults>(&json)
.map_err(|_| CacheError::SerializationError)?)
}
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
Cache::InMemory(in_memory) => match in_memory.get(&_url.to_string()) {
Some(res) => Ok(res),
None => Err(Report::new(CacheError::MissingValue)),
},
#[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
Cache::Hybrid(redis_cache, in_memory) => match redis_cache.cached_json(_url).await {
Ok(res) => Ok(serde_json::from_str::<SearchResults>(&res)
.map_err(|_| CacheError::SerializationError)?),
Err(_) => match in_memory.get(&_url.to_string()) {
Some(res) => Ok(res),
None => Err(Report::new(CacheError::MissingValue)),
},
},
}
}
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>>;
/// A function which caches the results by using the `url` as the key and
/// `json results` as the value and stores it in the cache
@ -172,44 +55,164 @@ impl Cache {
/// Returns a unit type if the program caches the given search results without a failure
/// otherwise it returns a `CacheError` if the search results cannot be cached due to a
/// failure.
pub async fn cache_results(
async fn cache_results(
&mut self,
search_results: &SearchResults,
url: &str,
) -> Result<(), Report<CacheError>>;
/// A helper function which computes the hash of the url and formats and returns it as string.
///
/// # Arguments
///
/// * `url` - It takes an url as string.
fn hash_url(&self, url: &str) -> String {
blake3::hash(url.as_bytes()).to_string()
}
}
#[cfg(feature = "redis-cache")]
#[async_trait::async_trait]
impl Cacher for RedisCache {
async fn build(config: &Config) -> Self {
log::info!(
"Initialising redis cache. Listening to {}",
&config.redis_url
);
RedisCache::new(&config.redis_url, 5)
.await
.expect("Redis cache configured")
}
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let hashed_url_string: &str = &self.hash_url(url);
let json = self.cached_json(hashed_url_string).await?;
Ok(serde_json::from_str::<SearchResults>(&json)
.map_err(|_| CacheError::SerializationError)?)
}
async fn cache_results(
&mut self,
search_results: &SearchResults,
url: &str,
) -> Result<(), Report<CacheError>> {
let json =
serde_json::to_string(search_results).map_err(|_| CacheError::SerializationError)?;
let hashed_url_string = self.hash_url(url);
self.cache_json(&json, &hashed_url_string).await
}
}
/// Memory based cache backend.
#[cfg(feature = "memory-cache")]
pub struct InMemoryCache {
/// The backend cache which stores data.
cache: MokaCache<String, SearchResults>,
}
#[cfg(feature = "memory-cache")]
#[async_trait::async_trait]
impl Cacher for InMemoryCache {
async fn build(_config: &Config) -> Self {
log::info!("Initialising in-memory cache");
InMemoryCache {
cache: MokaCache::builder()
.max_capacity(1000)
.time_to_live(Duration::from_secs(60))
.build(),
}
}
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
match self.cache.get(&hashed_url_string) {
Some(res) => Ok(res),
None => Err(Report::new(CacheError::MissingValue)),
}
}
async fn cache_results(
&mut self,
search_results: &SearchResults,
url: &str,
) -> Result<(), Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
self.cache.insert(hashed_url_string, search_results.clone());
Ok(())
}
}
/// Cache backend which utilises both memory and redis based caches.
///
/// The hybrid cache system uses both the types of cache to ensure maximum availability.
/// The set method sets the key, value pair in both the caches. Therefore in a case where redis
/// cache becomes unavailable, the backend will retreive the value from in-memory cache.
#[cfg(all(feature = "memory-cache", feature = "redis-cache"))]
pub struct HybridCache {
/// The in-memory backend cache which stores data.
memory_cache: InMemoryCache,
/// The redis backend cache which stores data.
redis_cache: RedisCache,
}
#[cfg(all(feature = "memory-cache", feature = "redis-cache"))]
#[async_trait::async_trait]
impl Cacher for HybridCache {
async fn build(config: &Config) -> Self {
log::info!("Initialising hybrid cache");
HybridCache {
memory_cache: InMemoryCache::build(config).await,
redis_cache: RedisCache::build(config).await,
}
}
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
match self.redis_cache.cached_results(url).await {
Ok(res) => Ok(res),
Err(_) => self.memory_cache.cached_results(url).await,
}
}
async fn cache_results(
&mut self,
search_results: &SearchResults,
url: &str,
) -> Result<(), Report<CacheError>> {
self.redis_cache.cache_results(search_results, url).await?;
self.memory_cache.cache_results(search_results, url).await?;
Ok(())
}
}
/// Dummy cache backend
pub struct DisabledCache;
#[async_trait::async_trait]
impl Cacher for DisabledCache {
async fn build(_config: &Config) -> Self {
log::info!("Caching is disabled");
DisabledCache
}
async fn cached_results(&mut self, _url: &str) -> Result<SearchResults, Report<CacheError>> {
Err(Report::new(CacheError::MissingValue))
}
async fn cache_results(
&mut self,
_search_results: &SearchResults,
_url: &str,
) -> Result<(), Report<CacheError>> {
match self {
Cache::Disabled => Ok(()),
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
Cache::Redis(redis_cache) => {
let json = serde_json::to_string(_search_results)
.map_err(|_| CacheError::SerializationError)?;
redis_cache.cache_results(&json, _url).await
}
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
Cache::InMemory(cache) => {
cache.insert(_url.to_string(), _search_results.clone());
Ok(())
}
#[cfg(all(feature = "memory-cache", feature = "redis-cache"))]
Cache::Hybrid(redis_cache, cache) => {
let json = serde_json::to_string(_search_results)
.map_err(|_| CacheError::SerializationError)?;
match redis_cache.cache_results(&json, _url).await {
Ok(_) => Ok(()),
Err(_) => {
cache.insert(_url.to_string(), _search_results.clone());
Ok(())
}
}
}
}
Ok(())
}
}
/// A structure to efficiently share the cache between threads - as it is protected by a Mutex.
pub struct SharedCache {
/// The internal cache protected from concurrent access by a mutex
cache: Mutex<Cache>,
cache: Mutex<Box<dyn Cacher>>,
}
impl SharedCache {
@ -220,9 +223,9 @@ impl SharedCache {
/// * `cache` - It takes the `Cache` enum variant as an argument with the prefered cache type.
///
/// Returns a newly constructed `SharedCache` struct.
pub fn new(cache: Cache) -> Self {
pub fn new(cache: impl Cacher + 'static) -> Self {
Self {
cache: Mutex::new(cache),
cache: Mutex::new(Box::new(cache)),
}
}
@ -237,9 +240,9 @@ impl SharedCache {
///
/// Returns a `SearchResults` struct containing the search results from the cache if nothing
/// goes wrong otherwise returns a `CacheError`.
pub async fn cached_json(&self, url: &str) -> Result<SearchResults, Report<CacheError>> {
pub async fn cached_results(&self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let mut mut_cache = self.cache.lock().await;
mut_cache.cached_json(url).await
mut_cache.cached_results(url).await
}
/// A setter function which caches the results by using the `url` as the key and
@ -265,3 +268,18 @@ impl SharedCache {
mut_cache.cache_results(search_results, url).await
}
}
/// A function to initialise the cache backend.
pub async fn create_cache(config: &Config) -> impl Cacher {
#[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
return HybridCache::build(config).await;
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
return InMemoryCache::build(config).await;
#[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
return RedisCache::build(config).await;
#[cfg(not(any(feature = "memory-cache", feature = "redis-cache")))]
return DisabledCache::build(config).await;
}

View File

@ -1,7 +1,6 @@
//! This module provides the functionality to cache the aggregated results fetched and aggregated
//! from the upstream search engines in a json format.
use blake3::hash;
use error_stack::Report;
use futures::future::try_join_all;
use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
@ -53,32 +52,22 @@ impl RedisCache {
Ok(redis_cache)
}
/// A helper function which computes the hash of the url and formats and returns it as string.
/// A function which fetches the cached json as json string from the redis server.
///
/// # Arguments
///
/// * `url` - It takes an url as string.
fn hash_url(&self, url: &str) -> String {
format!("{:?}", blake3::hash(url.as_bytes()))
}
/// A function which fetches the cached json results as json string from the redis server.
///
/// # Arguments
///
/// * `url` - It takes an url as a string.
/// * `key` - It takes a string as key.
///
/// # Error
///
/// Returns the results as a String from the cache on success otherwise returns a `CacheError`
/// Returns the json as a String from the cache on success otherwise returns a `CacheError`
/// on a failure.
pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<CacheError>> {
pub async fn cached_json(&mut self, key: &str) -> Result<String, Report<CacheError>> {
self.current_connection = Default::default();
let hashed_url_string: &str = &self.hash_url(url);
let mut result: Result<String, RedisError> = self.connection_pool
[self.current_connection as usize]
.get(hashed_url_string)
.get(key)
.await;
// Code to check whether the current connection being used is dropped with connection error
@ -99,7 +88,7 @@ impl RedisCache {
));
}
result = self.connection_pool[self.current_connection as usize]
.get(hashed_url_string)
.get(key)
.await;
continue;
}
@ -110,30 +99,29 @@ impl RedisCache {
}
}
/// A function which caches the results by using the hashed `url` as the key and
/// A function which caches the json by using the key and
/// `json results` as the value and stores it in redis server with ttl(time to live)
/// set to 60 seconds.
///
/// # Arguments
///
/// * `json_results` - It takes the json results string as an argument.
/// * `url` - It takes the url as a String.
/// * `key` - It takes the key as a String.
///
/// # Error
///
/// Returns an unit type if the results are cached succesfully otherwise returns a `CacheError`
/// on a failure.
pub async fn cache_results(
pub async fn cache_json(
&mut self,
json_results: &str,
url: &str,
key: &str,
) -> Result<(), Report<CacheError>> {
self.current_connection = Default::default();
let hashed_url_string: &str = &self.hash_url(url);
let mut result: Result<(), RedisError> = self.connection_pool
[self.current_connection as usize]
.set_ex(hashed_url_string, json_results, 60)
.set_ex(key, json_results, 600)
.await;
// Code to check whether the current connection being used is dropped with connection error
@ -154,7 +142,7 @@ impl RedisCache {
));
}
result = self.connection_pool[self.current_connection as usize]
.set_ex(hashed_url_string, json_results, 60)
.set_ex(key, json_results, 60)
.await;
continue;
}

View File

@ -22,7 +22,7 @@ use actix_cors::Cors;
use actix_files as fs;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer};
use cache::cacher::{Cache, SharedCache};
use cache::cacher::{Cacher, SharedCache};
use config::parser::Config;
use handler::{file_path, FileType};
@ -40,14 +40,21 @@ use handler::{file_path, FileType};
///
/// ```rust
/// use std::net::TcpListener;
/// use websurfx::{config::parser::Config, run, cache::cacher::Cache};
/// use websurfx::{config::parser::Config, run, cache::cacher::create_cache};
///
/// let config = Config::parse(true).unwrap();
/// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
/// let cache = Cache::new_in_memory();
/// let server = run(listener,config,cache).expect("Failed to start server");
/// #[tokio::main]
/// async fn main(){
/// let config = Config::parse(true).unwrap();
/// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
/// let cache = create_cache(&config).await;
/// let server = run(listener,config,cache).expect("Failed to start server");
/// }
/// ```
pub fn run(listener: TcpListener, config: Config, cache: Cache) -> std::io::Result<Server> {
pub fn run(
listener: TcpListener,
config: Config,
cache: impl Cacher + 'static,
) -> std::io::Result<Server> {
let public_folder_path: &str = file_path(FileType::Theme)?;
let cloned_config_threads_opt: u8 = config.threads;

View File

@ -107,41 +107,40 @@ async fn results(
req: HttpRequest,
safe_search: &Option<u8>,
) -> Result<SearchResults, Box<dyn std::error::Error>> {
let url = format!(
"http://{}:{}/search?q={}&page={}&safesearch=",
config.binding_ip,
config.port,
query,
page - 1,
// eagerly parse cookie value to evaluate safe search level
let cookie_value = req.cookie("appCookie");
let cookie_value: Option<Cookie<'_>> = cookie_value
.as_ref()
.and_then(|cv| serde_json::from_str(cv.name_value().1).ok());
let safe_search_level = get_safesearch_level(
safe_search,
&cookie_value.as_ref().map(|cv| cv.safe_search_level),
config.safe_search,
);
let cache_key = format!(
"http://{}:{}/search?q={}&page={}&safesearch={}",
config.binding_ip, config.port, query, page, safe_search_level
);
// fetch the cached results json.
let cached_results = cache.cached_json(&url).await;
let cached_results = cache.cached_results(&cache_key).await;
// check if fetched cache results was indeed fetched or it was an error and if so
// handle the data accordingly.
match cached_results {
Ok(results) => Ok(results),
Err(_) => {
let mut safe_search_level: u8 = match config.safe_search {
3..=4 => config.safe_search,
_ => match safe_search {
Some(safesearch) => match safesearch {
0..=2 => *safesearch,
_ => config.safe_search,
},
None => config.safe_search,
},
};
if safe_search_level == 4 {
let mut results: SearchResults = SearchResults::default();
let flag: bool =
!is_match_from_filter_list(file_path(FileType::BlockList)?, query)?;
// Return early when query contains disallowed words,
if flag {
results.set_disallowed();
cache.cache_results(&results, &url).await?;
cache.cache_results(&results, &cache_key).await?;
results.set_safe_search_level(safe_search_level);
return Ok(results);
}
@ -151,28 +150,14 @@ async fn results(
// default selected upstream search engines from the config file otherwise
// parse the non-empty cookie and grab the user selected engines from the
// UI and use that.
let mut results: SearchResults = match req.cookie("appCookie") {
let mut results: SearchResults = match cookie_value {
Some(cookie_value) => {
let cookie_value: Cookie<'_> =
serde_json::from_str(cookie_value.name_value().1)?;
let engines: Vec<EngineHandler> = cookie_value
.engines
.iter()
.filter_map(|name| EngineHandler::new(name).ok())
.collect();
safe_search_level = match config.safe_search {
3..=4 => config.safe_search,
_ => match safe_search {
Some(safesearch) => match safesearch {
0..=2 => *safesearch,
_ => config.safe_search,
},
None => cookie_value.safe_search_level,
},
};
match engines.is_empty() {
false => {
aggregate(
@ -217,9 +202,7 @@ async fn results(
{
results.set_filtered();
}
cache
.cache_results(&results, &(format!("{url}{safe_search_level}")))
.await?;
cache.cache_results(&results, &cache_key).await?;
results.set_safe_search_level(safe_search_level);
Ok(results)
}
@ -252,3 +235,24 @@ fn is_match_from_filter_list(
Ok(false)
}
/// A helper function which returns the safe search level based on the url params
/// and cookie value.
///
/// # Argurments
///
/// * `safe_search` - Safe search level from the url.
/// * `cookie` - User's cookie
/// * `default` - Safe search level to fall back to
fn get_safesearch_level(safe_search: &Option<u8>, cookie: &Option<u8>, default: u8) -> u8 {
match safe_search {
Some(ss) => {
if *ss >= 3 {
default
} else {
*ss
}
}
None => cookie.unwrap_or(default),
}
}

View File

@ -3,18 +3,13 @@ use std::net::TcpListener;
use websurfx::{config::parser::Config, run, templates::views};
// Starts a new instance of the HTTP server, bound to a random available port
fn spawn_app() -> String {
async fn spawn_app() -> String {
// Binding to port 0 will trigger the OS to assign a port for us.
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
let port = listener.local_addr().unwrap().port();
let config = Config::parse(false).unwrap();
let server = run(
listener,
config,
#[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
websurfx::cache::cacher::Cache::new_in_memory(),
)
.expect("Failed to bind address");
let cache = websurfx::cache::cacher::create_cache(&config).await;
let server = run(listener, config, cache).expect("Failed to bind address");
tokio::spawn(server);
format!("http://127.0.0.1:{}/", port)
@ -22,7 +17,7 @@ fn spawn_app() -> String {
#[tokio::test]
async fn test_index() {
let address = spawn_app();
let address = spawn_app().await;
let client = reqwest::Client::new();
let res = client.get(address).send().await.unwrap();