diff --git a/src/search_results_handler/aggregator.rs b/src/search_results_handler/aggregator.rs index cba266c..18cd143 100644 --- a/src/search_results_handler/aggregator.rs +++ b/src/search_results_handler/aggregator.rs @@ -3,15 +3,20 @@ use std::{collections::HashMap, time::Duration}; +use error_stack::Report; use rand::Rng; -use tokio::join; +use tokio::task::JoinHandle; use super::{ aggregation_models::{RawSearchResult, SearchResult, SearchResults}, user_agent::random_user_agent, }; -use crate::engines::{duckduckgo, searx}; +use crate::engines::{ + duckduckgo, + engine_models::{EngineError, SearchEngine}, + searx, +}; /// A function that aggregates all the scraped results from the above upstream engines and /// then removes duplicate results and if two results are found to be from two or more engines @@ -37,10 +42,11 @@ use crate::engines::{duckduckgo, searx}; /// function in either `searx` or `duckduckgo` or both otherwise returns a `SearchResults struct` /// containing appropriate values. pub async fn aggregate( - query: &str, + query: String, page: u32, random_delay: bool, debug: bool, + upstream_search_engines: Vec, ) -> Result> { let user_agent: String = random_user_agent(); let mut result_map: HashMap = HashMap::new(); @@ -53,41 +59,86 @@ pub async fn aggregate( } // fetch results from upstream search engines simultaneously/concurrently. - let (ddg_map_results, searx_map_results) = join!( - duckduckgo::results(query, page, &user_agent), - searx::results(query, page, &user_agent) - ); + let search_engines: Vec> = upstream_search_engines + .iter() + .map(|engine| match engine.to_lowercase().as_str() { + "duckduckgo" => Box::new(duckduckgo::DuckDuckGo) as Box, + "searx " => Box::new(searx::Searx) as Box, + }) + .collect(); - let ddg_map_results = ddg_map_results.unwrap_or_else(|e| { - if debug { - log::error!("Error fetching results from DuckDuckGo: {:?}", e); - } - HashMap::new() - }); - - let searx_map_results = searx_map_results.unwrap_or_else(|e| { - if debug { - log::error!("Error fetching results from Searx: {:?}", e); - } - HashMap::new() - }); - - result_map.extend(ddg_map_results); - - searx_map_results.into_iter().for_each(|(key, value)| { - result_map - .entry(key) - .and_modify(|result| { - result.add_engines(value.clone().engine()); + let tasks: Vec, Report>>> = + search_engines + .iter() + .map(|search_engine| { + tokio::spawn(search_engine.results(query.clone(), page, user_agent.clone())) }) - .or_insert_with(|| -> RawSearchResult { - RawSearchResult::new( - value.title.clone(), - value.visiting_url.clone(), - value.description.clone(), - value.engine.clone(), - ) - }); + .collect(); + + let mut outputs = Vec::with_capacity(search_engines.len()); + + for task in tasks { + outputs.push(task.await.ok()) + } + + let mut initial: bool = true; + let mut counter: usize = 0; + outputs.iter().for_each(|results| { + if initial { + match results { + Some(result) => { + let new_result = result.clone(); + result_map.extend(new_result.as_ref().unwrap().clone()); + counter += 1; + initial = false + } + None => { + if debug { + log::error!( + "Error fetching results from {}", + upstream_search_engines[counter] + ); + }; + counter += 1 + } + } + } else { + match results { + Some(result) => { + let new_result = result.clone(); + new_result + .as_ref() + .unwrap() + .clone() + .into_iter() + .for_each(|(key, value)| { + result_map + .entry(key) + .and_modify(|result| { + result.add_engines(value.clone().engine()); + }) + .or_insert_with(|| -> RawSearchResult { + RawSearchResult::new( + value.title.clone(), + value.visiting_url.clone(), + value.description.clone(), + value.engine.clone(), + ) + }); + }); + counter += 1 + } + None => { + if debug { + log::error!( + "Error fetching results from {}", + upstream_search_engines[counter] + ); + }; + counter += 1 + } + } + } }); Ok(SearchResults::new(