use std::collections::{HashMap, HashSet}; use itertools::Itertools; use serde::{Deserialize, Serialize, Serializer}; use crate::CsvAccount; #[derive(Debug, Deserialize)] struct CsvMovementRule { #[serde(rename = "CostCentreSourceFrom", default)] source_from_department: String, #[serde(rename = "CostCentreSourceTo", default)] source_to_department: String, #[serde(rename = "CostCentreDestFrom", default)] dest_from_department: String, #[serde(rename = "CostCentreDestTo", default)] dest_to_department: String, #[serde(rename = "AccountSourceFrom", default)] source_from_account: String, #[serde(rename = "AccountSourceTo", default)] source_to_account: String, #[serde(rename = "AccountDestFrom", default)] dest_from_account: String, #[serde(rename = "AccountDestTo", default)] dest_to_account: String, #[serde(rename = "AmountValue")] amount: Option, #[serde(rename = "AmountType", default)] is_percent: String, #[serde(rename = "Apply", default)] apply: String, #[serde(rename = "CostOutput")] cost_output: Option, } #[derive(Deserialize)] pub struct PartialCostCentre { #[serde(rename = "Code")] pub code: String, #[serde(rename = "Area")] pub area: Option, } #[derive(Default)] pub struct MovementRule { // If the vectors are empty, then it means 'all' pub from_departments: HashSet, pub to_departments: HashSet, pub all_from_departments: bool, pub all_to_departments: bool, pub from_accounts: HashSet, pub to_accounts: HashSet, pub all_from_accounts: bool, pub all_to_accounts: bool, pub amount: f64, pub is_percent: bool, pub is_separator: bool, } impl MovementRule { pub fn pass_break() -> MovementRule { MovementRule { is_separator: true, ..MovementRule::default() } } pub fn validate(&self) -> bool { if self.from_departments.is_empty() && self.to_departments.is_empty() { // Would be nice to have a decent message/error here as well return false; } if self.is_percent && (self.amount < 0.0 || self.amount > 100.0) { return false; } true } } #[derive(Hash, Clone, Default, PartialEq, Eq)] pub struct Unit { pub department: String, pub account: String, } #[derive(Debug, Serialize, Deserialize)] pub struct CsvCost { #[serde(rename = "ACCOUNT")] pub account: String, #[serde(rename = "COSTCENTRE")] pub department: String, #[serde(serialize_with = "round_serialize")] pub value: f64, pub pass: Option, } fn round_serialize(x: &f64, s: S) -> Result where S: Serializer, { s.serialize_f64((x * 100000.).round() / 100000.) } pub fn move_money( rules_reader: &mut csv::Reader, lines_reader: &mut csv::Reader, accounts_reader: &mut csv::Reader, cost_centres_reader: &mut csv::Reader, output: &mut csv::Writer, use_numeric_accounts: bool, flush_pass: bool, ) -> anyhow::Result<()> where R: std::io::Read, L: std::io::Read, A: std::io::Read, C: std::io::Read, O: std::io::Write, { let headers = lines_reader.headers()?; let mut account_index = 0; let mut department_index = 0; let mut account_type_index = 0; for (index, field) in headers.iter().enumerate() { if field.eq_ignore_ascii_case("account") { account_index = index; } else if field.eq_ignore_ascii_case("costcentre") { department_index = index; } else if field.eq_ignore_ascii_case("accounttype") { account_type_index = index; } } let lines: HashMap = lines_reader .records() .map(|record| { let record = record.unwrap(); let account = record.get(account_index).unwrap(); let department = record.get(department_index).unwrap(); let sum = record .iter() .enumerate() .filter(|(i, _)| { *i != account_index && *i != department_index && *i != account_type_index }) .map(|(_, f)| f.parse::().unwrap_or(0.)) .sum(); ( Unit { account: account.into(), department: department.into(), }, sum, ) }) .collect(); let accounts_reader = accounts_reader; let all_accounts = accounts_reader .deserialize() .collect::, csv::Error>>()?; let account_mappings: HashMap = all_accounts .into_iter() .map(|account| (account.code.clone(), account)) .collect(); let all_accounts_sorted = if use_numeric_accounts { lines .keys() .map(|key| key.account.clone().parse::().unwrap()) .unique() .sorted() .map(|account| account.to_string()) .collect() } else { lines .keys() .map(|key| key.account.clone()) .unique() .sorted() .collect() }; let all_departments_sorted = cost_centres_reader .deserialize::() .map(|cc| cc.unwrap().code) .unique() .sorted() .collect(); let mut rules: Vec = vec![]; for movement_rule in rules_reader.deserialize() { // TODO: Consider reclass rule group, how does that even work? let movement_rule: CsvMovementRule = movement_rule?; let is_separator = movement_rule.apply == "-DIVIDER-"; let from_accounts = if is_separator { HashSet::new() } else { if movement_rule.cost_output.is_some() { account_mappings .iter() .filter(|(_, account)| { account.cost_output.is_some() && account.cost_output.clone().unwrap() == movement_rule.cost_output.clone().unwrap() }) .map(|(code, _)| code.clone()) .collect() } else { extract_range( movement_rule.source_from_account, movement_rule.source_to_account, &all_accounts_sorted, ) } }; let to_accounts = if is_separator { HashSet::new() } else { if movement_rule.cost_output.is_some() { account_mappings .iter() .filter(|(_, account)| { account.cost_output.is_some() && account.cost_output.clone().unwrap() == movement_rule.cost_output.clone().unwrap() }) .map(|(code, _)| code.clone()) .collect() } else { extract_range( movement_rule.dest_from_account, movement_rule.dest_to_account, &all_accounts_sorted, ) } }; let from_departments = if is_separator { HashSet::new() } else { extract_range( movement_rule.source_from_department, movement_rule.source_to_department, &all_departments_sorted, ) }; let to_departments = if is_separator { HashSet::new() } else { extract_range( movement_rule.dest_from_department, movement_rule.dest_to_department, &all_departments_sorted, ) }; rules.push(MovementRule { all_from_departments: from_departments.is_empty(), all_to_departments: to_departments.is_empty(), from_departments, to_departments, all_from_accounts: from_accounts.is_empty(), all_to_accounts: to_accounts.is_empty(), from_accounts, to_accounts, amount: movement_rule.amount.unwrap_or(0.) * (if movement_rule.is_percent == "%" { 0.01 } else { 1. }), is_percent: movement_rule.is_percent == "%", is_separator, }) } // Then run move_money let moved = move_money_2(lines, &rules, flush_pass); // Ouput the list moved moneys for money in moved { for (unit, value) in money.totals { output.serialize(CsvCost { account: unit.account, department: unit.department, value, pass: if flush_pass { Some(money.pass) } else { None }, })?; } } Ok(()) } fn extract_range(from: String, to: String, options: &Vec) -> HashSet { if from.is_empty() && to.is_empty() { return HashSet::new(); } let start_index = options .iter() .enumerate() .find(|option| option.1 == &from) .map(|start| start.0); let end_index = options .iter() .enumerate() .find(|option| option.1 == &to) .map(|end| end.0); if let Some(start) = start_index { if let Some(end) = end_index { return Vec::from(&options[start..end + 1]).into_iter().collect(); } else { return vec![options[start].clone()].into_iter().collect(); } } else if let Some(end) = end_index { return vec![options[end].clone()].into_iter().collect(); } return HashSet::new(); } // Approach 1: // Use math (linear algebra) to move between departments. Memory/computationally it's equivalent // to the worst case of approach one, however can take advantage of auto parallelisation/simd // to perform fast, particularly on larger datasets. // This basically just involves smushing all the rules, then doing a matrix multiple and matrix addition // on the initial set. Can't record passes, but can record the smushed rules if only the data changes later // Advantage of this approach is it can be easily extended to run on the gpu. pub fn move_money_1() {} pub struct MoveMoneyResult { pass: i32, // TODO: We want the from account and the to account totals: HashMap, } // Approach 2: // Traditinoal/naive, total for each department is stored in an initial map (department -> total amount). // Another map is built up for each rule, and each rule is processed based on the amount in the current total // map. // Upon a pass break (separator), the temp map will assign the values into the total map. // Once done, do a final assignment back to the total, and return that. // Advantage of this is the required code is tiny, and no third-party math library is required. // Note that the movement happens on a line-by-line level. So we can stream the data from disk, and potentially apply this // to every. It's also much more memory efficient than approach 1. // TODO: Time both approaches to seee which is faster depending on the size of the input data/number of rules // Verdict: This is already pretty fast, at least much faster than ppm for BigDataset pub fn move_money_2( initial_totals: HashMap, rules: &Vec, flush_pass: bool, ) -> Vec { // Note: It's potentially a bit more intensive to use cloned totals (rather than just update temp_total per rule), // but it's much simpler code and, and since we're only working line-by-line, it isn't really that much memory in practice // TODO: This is initial totals is problematic, as we may move into a cc that wasn't in the list of lines. In which case we'd need // to add a new line let mut running_total = HashMap::from(initial_totals); let mut temp_total = running_total.clone(); let mut move_money_result: Vec = vec![]; let mut current_pass = 0; if flush_pass { move_money_result.push(MoveMoneyResult { pass: current_pass, totals: running_total.clone(), }) } for rule in rules { if rule.is_separator { if flush_pass { current_pass += 1; // Flush the totals at the end of this pass (more specifically the change) move_money_result.push(MoveMoneyResult { pass: current_pass, totals: temp_total .iter() .map(|(unit, value)| { (unit.clone(), value - running_total.get(unit).unwrap_or(&0.)) }) .filter(|(_, value)| *value != 0.) .collect(), }); } running_total = temp_total.clone(); } else { for (unit, amount) in &running_total { if (rule.all_from_departments || rule.from_departments.contains(&unit.department)) && (rule.all_from_accounts || rule.from_accounts.contains(&unit.account)) { let previous_temp = amount; let added_amount = if rule.is_percent { previous_temp * rule.amount } else { rule.amount }; // TODO: Track the from department/account, maintained in a separate list if flush_pass is set *temp_total.get_mut(&unit).unwrap() -= added_amount; let department = if rule.to_departments.len() == 1 { rule.to_departments.iter().next().unwrap().clone() } else { unit.department.clone() }; let account = if rule.to_accounts.len() == 1 { rule.to_accounts.iter().next().unwrap().clone() } else { unit.account.clone() }; *temp_total .entry(Unit { department, account, }) .or_insert(0.) += added_amount; } } } } move_money_result.push(MoveMoneyResult { pass: current_pass, totals: temp_total, }); move_money_result } #[cfg(test)] mod tests { #[test] fn move_money() { super::move_money( &mut csv::Reader::from_path("reclassrule.csv").unwrap(), &mut csv::Reader::from_path("line.csv").unwrap(), &mut csv::Reader::from_path("account.csv").unwrap(), &mut csv::Reader::from_path("costcentres.csv").unwrap(), &mut csv::Writer::from_path("output.csv").unwrap(), false, true, ) .unwrap(); } }