0
0
mirror of https://github.com/neon-mmd/websurfx.git synced 2024-11-26 07:58:20 -05:00

perf: use async crates & methods & make functions async (#486)

This commit is contained in:
neon_arch 2024-03-10 13:29:13 +03:00
parent 6aa99922a6
commit 55dbd06af7
6 changed files with 85 additions and 67 deletions

6
Cargo.lock generated
View File

@ -348,9 +348,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "async-compression"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
checksum = "a116f46a969224200a0a97f29cfd4c50e7534e4b4826bd23ea2c3c533039c82c"
dependencies = [
"brotli",
"flate2",
@ -4168,11 +4168,11 @@ dependencies = [
"actix-files",
"actix-governor",
"actix-web",
"async-compression",
"async-once-cell",
"async-trait",
"base64 0.21.5",
"blake3",
"brotli",
"cfg-if 1.0.0",
"chacha20",
"chacha20poly1305",

View File

@ -14,7 +14,7 @@ path = "src/bin/websurfx.rs"
[dependencies]
reqwest = {version="0.11.24", default-features=false, features=["rustls-tls","brotli", "gzip"]}
tokio = {version="1.32.0",features=["rt-multi-thread","macros"], default-features = false}
tokio = {version="1.32.0",features=["rt-multi-thread","macros", "fs", "io-util"], default-features = false}
serde = {version="1.0.196", default-features=false, features=["derive"]}
serde_json = {version="1.0.109", default-features=false}
maud = {version="0.25.0", default-features=false, features=["actix-web"]}
@ -38,7 +38,7 @@ mimalloc = { version = "0.1.38", default-features = false }
async-once-cell = {version="0.5.3", default-features=false}
actix-governor = {version="0.5.0", default-features=false}
mini-moka = { version="0.10", optional = true, default-features=false, features=["sync"]}
brotli = { version = "3.4.0", default-features = false, features=["std"], optional=true}
async-compression = { version = "0.4.6", default-features = false, features=["brotli","tokio"], optional=true}
chacha20poly1305={version="0.10.1", default-features=false, features=["alloc","getrandom"], optional=true}
chacha20 = {version="0.9.1", default-features=false, optional=true}
base64 = {version="0.21.5", default-features=false, features=["std"], optional=true}
@ -84,7 +84,7 @@ default = ["memory-cache"]
dhat-heap = ["dep:dhat"]
memory-cache = ["dep:mini-moka"]
redis-cache = ["dep:redis","dep:base64"]
compress-cache-results = ["dep:brotli","dep:cfg-if"]
compress-cache-results = ["dep:async-compression","dep:cfg-if"]
encrypt-cache-results = ["dep:chacha20poly1305","dep:chacha20"]
cec-cache-results = ["compress-cache-results","encrypt-cache-results"]

68
src/cache/cacher.rs vendored
View File

@ -93,7 +93,7 @@ pub trait Cacher: Send + Sync {
feature = "encrypt-cache-results",
feature = "cec-cache-results"
))]
fn encrypt_or_decrypt_results(
async fn encrypt_or_decrypt_results(
&mut self,
mut bytes: Vec<u8>,
encrypt: bool,
@ -137,11 +137,19 @@ pub trait Cacher: Send + Sync {
/// Returns the compressed bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn compress_results(&mut self, mut bytes: Vec<u8>) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 11, 22);
async fn compress_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<Vec<u8>, Report<CacheError>> {
use tokio::io::AsyncWriteExt;
let mut writer = async_compression::tokio::write::BrotliEncoder::new(Vec::new());
writer
.write_all(&bytes)
.await
.map_err(|_| CacheError::CompressionError)?;
writer
.shutdown()
.await
.map_err(|_| CacheError::CompressionError)?;
bytes = writer.into_inner();
Ok(bytes)
@ -159,17 +167,17 @@ pub trait Cacher: Send + Sync {
/// Returns the compressed and encrypted bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(feature = "cec-cache-results")]
fn compress_encrypt_compress_results(
async fn compress_encrypt_compress_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<Vec<u8>, Report<CacheError>> {
// compress first
bytes = self.compress_results(bytes)?;
bytes = self.compress_results(bytes).await?;
// encrypt
bytes = self.encrypt_or_decrypt_results(bytes, true)?;
bytes = self.encrypt_or_decrypt_results(bytes, true).await?;
// compress again;
bytes = self.compress_results(bytes)?;
bytes = self.compress_results(bytes).await?;
Ok(bytes)
}
@ -187,11 +195,11 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_results(&mut self, bytes: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
async fn decompress_results(&mut self, bytes: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
cfg_if::cfg_if! {
if #[cfg(feature = "compress-cache-results")]
{
decompress_util(bytes)
decompress_util(bytes).await
}
else if #[cfg(feature = "cec-cache-results")]
@ -199,7 +207,7 @@ pub trait Cacher: Send + Sync {
let decompressed = decompress_util(bytes)?;
let decrypted = self.encrypt_or_decrypt_results(decompressed, false)?;
decompress_util(&decrypted)
decompress_util(&decrypted).await
}
}
@ -216,7 +224,7 @@ pub trait Cacher: Send + Sync {
/// # Error
/// Returns a Vec of compressed or encrypted bytes on success otherwise it returns a CacheError
/// on failure.
fn pre_process_search_results(
async fn pre_process_search_results(
&mut self,
search_results: &SearchResults,
) -> Result<Vec<u8>, Report<CacheError>> {
@ -224,19 +232,20 @@ pub trait Cacher: Send + Sync {
let mut bytes: Vec<u8> = search_results.try_into()?;
#[cfg(feature = "compress-cache-results")]
{
let compressed = self.compress_results(bytes)?;
let compressed = self.compress_results(bytes).await?;
bytes = compressed;
}
#[cfg(feature = "encrypt-cache-results")]
{
let encrypted = self.encrypt_or_decrypt_results(bytes, true)?;
let encrypted = self.encrypt_or_decrypt_results(bytes, true).await?;
bytes = encrypted;
}
#[cfg(feature = "cec-cache-results")]
{
let compressed_encrypted_compressed = self.compress_encrypt_compress_results(bytes)?;
let compressed_encrypted_compressed =
self.compress_encrypt_compress_results(bytes).await?;
bytes = compressed_encrypted_compressed;
}
@ -256,25 +265,25 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[allow(unused_mut)] // needs to be mutable when any of the features is enabled
fn post_process_search_results(
async fn post_process_search_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<SearchResults, Report<CacheError>> {
#[cfg(feature = "compress-cache-results")]
{
let decompressed = self.decompress_results(&bytes)?;
let decompressed = self.decompress_results(&bytes).await?;
bytes = decompressed
}
#[cfg(feature = "encrypt-cache-results")]
{
let decrypted = self.encrypt_or_decrypt_results(bytes, false)?;
let decrypted = self.encrypt_or_decrypt_results(bytes, false).await?;
bytes = decrypted
}
#[cfg(feature = "cec-cache-results")]
{
let decompressed_decrypted = self.decompress_results(&bytes)?;
let decompressed_decrypted = self.decompress_results(&bytes).await?;
bytes = decompressed_decrypted;
}
@ -295,16 +304,19 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_util(input: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::DecompressorWriter::new(Vec::new(), 4096);
async fn decompress_util(input: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
use tokio::io::AsyncWriteExt;
let mut writer = async_compression::tokio::write::BrotliDecoder::new(Vec::new());
writer
.write_all(input)
.await
.map_err(|_| CacheError::CompressionError)?;
let bytes = writer
.into_inner()
writer
.shutdown()
.await
.map_err(|_| CacheError::CompressionError)?;
let bytes = writer.into_inner();
Ok(bytes)
}
@ -329,7 +341,7 @@ impl Cacher for RedisCache {
let bytes = base64::engine::general_purpose::STANDARD_NO_PAD
.decode(base64_string)
.map_err(|_| CacheError::Base64DecodingOrEncodingError)?;
self.post_process_search_results(bytes)
self.post_process_search_results(bytes).await
}
async fn cache_results(
@ -345,7 +357,7 @@ impl Cacher for RedisCache {
let mut bytes = Vec::with_capacity(search_results_len);
for result in search_results {
let processed = self.pre_process_search_results(result)?;
let processed = self.pre_process_search_results(result).await?;
bytes.push(processed);
}
@ -405,7 +417,7 @@ impl Cacher for InMemoryCache {
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
match self.cache.get(&hashed_url_string) {
Some(res) => self.post_process_search_results(res),
Some(res) => self.post_process_search_results(res).await,
None => Err(Report::new(CacheError::MissingValue)),
}
}
@ -417,7 +429,7 @@ impl Cacher for InMemoryCache {
) -> Result<(), Report<CacheError>> {
for (url, search_result) in urls.iter().zip(search_results.iter()) {
let hashed_url_string = self.hash_url(url);
let bytes = self.pre_process_search_results(search_result)?;
let bytes = self.pre_process_search_results(search_result).await?;
self.cache.insert(hashed_url_string, bytes);
}

View File

@ -14,12 +14,12 @@ use regex::Regex;
use reqwest::{Client, ClientBuilder};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fs::File, io::BufRead};
use std::{
io::{BufReader, Read},
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
task::JoinHandle,
time::Duration,
};
use tokio::task::JoinHandle;
/// A constant for holding the prebuilt Client globally in the app.
static CLIENT: std::sync::OnceLock<Client> = std::sync::OnceLock::new();
@ -169,13 +169,15 @@ pub async fn aggregate(
&mut result_map,
&mut blacklist_map,
file_path(FileType::BlockList)?,
)?;
)
.await?;
filter_with_lists(
&mut blacklist_map,
&mut result_map,
file_path(FileType::AllowList)?,
)?;
)
.await?;
drop(blacklist_map);
}
@ -196,15 +198,16 @@ pub async fn aggregate(
/// # Errors
///
/// Returns an error if the file at `file_path` cannot be opened or read, or if a regex pattern is invalid.
pub fn filter_with_lists(
pub async fn filter_with_lists(
map_to_be_filtered: &mut Vec<(String, SearchResult)>,
resultant_map: &mut Vec<(String, SearchResult)>,
file_path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut reader = BufReader::new(File::open(file_path)?);
let reader = BufReader::new(File::open(file_path).await?);
let mut lines = reader.lines();
for line in reader.by_ref().lines() {
let re = Regex::new(line?.trim())?;
while let Some(line) = lines.next_line().await? {
let re = Regex::new(line.trim())?;
let mut length = map_to_be_filtered.len();
let mut idx: usize = Default::default();
@ -236,8 +239,8 @@ mod tests {
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_filter_with_lists() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::test]
async fn test_filter_with_lists() -> Result<(), Box<dyn std::error::Error>> {
// Create a map of search results to filter
let mut map_to_be_filtered = Vec::new();
map_to_be_filtered.push((
@ -271,7 +274,8 @@ mod tests {
&mut map_to_be_filtered,
&mut resultant_map,
file.path().to_str().unwrap(),
)?;
)
.await?;
assert_eq!(resultant_map.len(), 2);
assert!(resultant_map
@ -285,8 +289,8 @@ mod tests {
Ok(())
}
#[test]
fn test_filter_with_lists_wildcard() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::test]
async fn test_filter_with_lists_wildcard() -> Result<(), Box<dyn std::error::Error>> {
let mut map_to_be_filtered = Vec::new();
map_to_be_filtered.push((
"https://www.example.com".to_owned(),
@ -319,7 +323,8 @@ mod tests {
&mut map_to_be_filtered,
&mut resultant_map,
file.path().to_str().unwrap(),
)?;
)
.await?;
assert_eq!(resultant_map.len(), 1);
assert!(resultant_map
@ -333,8 +338,8 @@ mod tests {
Ok(())
}
#[test]
fn test_filter_with_lists_file_not_found() {
#[tokio::test]
async fn test_filter_with_lists_file_not_found() {
let mut map_to_be_filtered = Vec::new();
let mut resultant_map = Vec::new();
@ -346,11 +351,11 @@ mod tests {
"non-existent-file.txt",
);
assert!(result.is_err());
assert!(result.await.is_err());
}
#[test]
fn test_filter_with_lists_invalid_regex() {
#[tokio::test]
async fn test_filter_with_lists_invalid_regex() {
let mut map_to_be_filtered = Vec::new();
map_to_be_filtered.push((
"https://www.example.com".to_owned(),
@ -376,6 +381,6 @@ mod tests {
file.path().to_str().unwrap(),
);
assert!(result.is_err());
assert!(result.await.is_err());
}
}

View File

@ -7,7 +7,7 @@ use crate::{
handler::{file_path, FileType},
};
use actix_web::{get, http::header::ContentType, web, HttpRequest, HttpResponse};
use std::fs::read_to_string;
use tokio::fs::read_to_string;
/// Handles the route of index page or main page of the `websurfx` meta search engine website.
#[get("/")]
@ -43,7 +43,7 @@ pub async fn not_found(
#[get("/robots.txt")]
pub async fn robots_data(_req: HttpRequest) -> Result<HttpResponse, Box<dyn std::error::Error>> {
let page_content: String =
read_to_string(format!("{}/robots.txt", file_path(FileType::Theme)?))?;
read_to_string(format!("{}/robots.txt", file_path(FileType::Theme)?)).await?;
Ok(HttpResponse::Ok()
.content_type(ContentType::plaintext())
.body(page_content))

View File

@ -13,12 +13,12 @@ use crate::{
};
use actix_web::{get, http::header::ContentType, web, HttpRequest, HttpResponse};
use regex::Regex;
use std::{
borrow::Cow,
use std::borrow::Cow;
use tokio::{
fs::File,
io::{BufRead, BufReader, Read},
io::{AsyncBufReadExt, BufReader},
join,
};
use tokio::join;
/// Handles the route of search page of the `websurfx` meta search engine website and it takes
/// two search url parameters `q` and `page` where `page` parameter is optional.
@ -188,7 +188,7 @@ async fn results(
let mut results: SearchResults = SearchResults::default();
let flag: bool =
!is_match_from_filter_list(file_path(FileType::BlockList)?, query)?;
!is_match_from_filter_list(file_path(FileType::BlockList)?, query).await?;
// Return early when query contains disallowed words,
if flag {
results.set_disallowed();
@ -252,13 +252,14 @@ async fn results(
///
/// Returns a bool indicating whether the results were found in the list or not on success
/// otherwise returns a standard error type on a failure.
fn is_match_from_filter_list(
async fn is_match_from_filter_list(
file_path: &str,
query: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut reader = BufReader::new(File::open(file_path)?);
for line in reader.by_ref().lines() {
let re = Regex::new(&line?)?;
let reader = BufReader::new(File::open(file_path).await?);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let re = Regex::new(&line)?;
if re.is_match(query) {
return Ok(true);
}