Merge branch 'overhead_allocation_multi_threading' into 'main'

Add multithreading to overhead allocation, add optional storage of from

See merge request vato007/coster-rs!3
This commit is contained in:
Michael Pivato
2023-03-13 06:03:18 +00:00
2 changed files with 169 additions and 107 deletions

View File

@@ -66,7 +66,7 @@ enum Commands {
#[arg(short = 'f', long)] #[arg(short = 'f', long)]
show_from: bool, show_from: bool,
#[arg(short, long, default_value = "0.000001")] #[arg(short, long, default_value = "0.00000000000000001")]
zero_threshold: f64, zero_threshold: f64,
#[arg(short, long, value_name = "FILE")] #[arg(short, long, value_name = "FILE")]

View File

@@ -5,7 +5,7 @@ use std::{
use csv::Writer; use csv::Writer;
use itertools::Itertools; use itertools::Itertools;
use nalgebra::{DMatrix, Dynamic, LU}; use nalgebra::{zero, DMatrix, Dynamic, LU};
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -64,7 +64,7 @@ pub struct AccountCost {
summed_department_costs: Vec<TotalDepartmentCost>, summed_department_costs: Vec<TotalDepartmentCost>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize, Deserialize)]
struct MovedAmount { struct MovedAmount {
account: String, account: String,
cost_centre: String, cost_centre: String,
@@ -94,6 +94,9 @@ pub fn reciprocal_allocation<Lines, Account, AllocationStatistic, Area, CostCent
allocation_statistics: &mut csv::Reader<AllocationStatistic>, allocation_statistics: &mut csv::Reader<AllocationStatistic>,
areas: &mut csv::Reader<Area>, areas: &mut csv::Reader<Area>,
cost_centres: &mut csv::Reader<CostCentre>, cost_centres: &mut csv::Reader<CostCentre>,
// TODO: Receiver method rather than this writer that can accept
// the raw float results, so we can write in an alternate format
// that more accurately represents the values on disk
output: &mut csv::Writer<Output>, output: &mut csv::Writer<Output>,
use_numeric_accounts: bool, use_numeric_accounts: bool,
exclude_negative_allocation_statistics: bool, exclude_negative_allocation_statistics: bool,
@@ -559,7 +562,7 @@ fn get_rules_indexes(
.collect() .collect()
} }
fn do_solve_reciprocal<T: ReciprocalAllocationSolver>( fn do_solve_reciprocal<T: ReciprocalAllocationSolver + Sync + Send>(
solver: T, solver: T,
account_costs: Vec<AccountCost>, account_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>, overhead_department_mappings: HashMap<String, usize>,
@@ -587,19 +590,57 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver>(
overhead_department_mappings.len(), overhead_department_mappings.len(),
operating_overhead_mappings, operating_overhead_mappings,
); );
let mut final_account_costs: Vec<AccountCost> = Vec::with_capacity(account_costs.len());
let mut temp_writer = temp_writer; let mut temp_writer = temp_writer;
for total_costs in account_costs { if let Some(temp_writer) = temp_writer.as_mut() {
solve_reciprocal_with_from(
solver,
account_costs,
overhead_department_mappings,
operating_department_mappings,
operating_overhead_mappings_mat,
temp_writer,
zero_threshold,
)?;
Ok(vec![])
} else {
Ok(solve_reciprocal_no_from(
solver,
account_costs,
overhead_department_mappings,
operating_department_mappings,
operating_overhead_mappings_mat,
))
}
}
fn solve_reciprocal_no_from(
solver: impl ReciprocalAllocationSolver + Sync + Send,
account_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>,
operating_department_mappings: HashMap<String, usize>,
operating_overhead_mappings_mat: DMatrix<f64>,
) -> Vec<AccountCost> {
account_costs
.par_iter()
// .filter(|cost| cost.account == "A480200")
.map(|total_costs| {
// To get the from/to ccs like ppm does, we ignore the initial totals. Then for each overhead cc,
// we zero out all the calculated overheads except for this cc and do
// operating_overhead_mappings * calculated_overheads (basically the first part of the normal calculation)
// TODO: There has to be a cleaner way to do this, perhaps by presorting things? // TODO: There has to be a cleaner way to do this, perhaps by presorting things?
let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()]; let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()];
for cost in total_costs.summed_department_costs.iter() { for cost in total_costs.summed_department_costs.iter() {
if overhead_department_mappings.contains_key(&cost.department) { if overhead_department_mappings.contains_key(&cost.department) {
overhead_slice_costs[*overhead_department_mappings.get(&cost.department).unwrap()] = overhead_slice_costs
cost.value [*overhead_department_mappings.get(&cost.department).unwrap()] = cost.value
} }
} }
let overhead_costs_vec: DMatrix<f64> = let overhead_costs_vec: DMatrix<f64> = DMatrix::from_row_slice(
DMatrix::from_row_slice(overhead_department_mappings.len(), 1, &overhead_slice_costs); overhead_department_mappings.len(),
1,
&overhead_slice_costs,
);
let calculated_overheads = solver.solve(&overhead_costs_vec); let calculated_overheads = solver.solve(&overhead_costs_vec);
let mut operating_slice_costs = vec![0.; operating_department_mappings.len()]; let mut operating_slice_costs = vec![0.; operating_department_mappings.len()];
@@ -616,54 +657,16 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver>(
&operating_slice_costs, &operating_slice_costs,
); );
// Borrow so we don't move between loops // // Borrow so we don't move between loops
let operating_overhead_mappings = &operating_overhead_mappings_mat; let operating_overhead_mappings = &operating_overhead_mappings_mat;
let calculated_overheads = &calculated_overheads; let calculated_overheads = &calculated_overheads;
// To get the from/to ccs like ppm does, we ignore the initial totals. Then for each overhead cc,
// we zero out all the calculated overheads except for this cc and do
// operating_overhead_mappings * calculated_overheads (basically the first part of the normal calculation)
if let Some(temp_writer) = temp_writer.as_mut() {
// TODO: A performance improvement will be to create another hashmap for index -> department, then just
// iterate over the actual indexes instead (will have preloading)
for (overhead_department, index) in overhead_department_mappings.iter() {
// TODO: Check this filter is actually working correctly by summing the costs and comparing to the non show_from setting
// (the sums should match up)
// Thinking intuitively, if the cost truely didn't exist, then it never would have been included in the totals
// in the first place,
let initial_amount = total_costs
.summed_department_costs
.iter()
.filter(|cost| cost.department == *overhead_department)
.next();
if initial_amount.is_none() {
continue;
}
let calculated_amount = calculated_overheads[*index];
// Calculate each movement individually
let calculated = operating_overhead_mappings.column(*index) * calculated_amount;
for (department, index) in &operating_department_mappings {
let value = *calculated.get(*index).unwrap();
if value > zero_threshold || value < -1. * zero_threshold {
temp_writer.serialize(MovedAmount {
account: total_costs.account.clone(),
cost_centre: department.clone(),
value,
from_cost_centre: overhead_department.clone(),
})?;
}
}
}
// Don't bother performing the second calculation, it's redundant
continue;
}
// Calculation: operating_overhead_usage . calculated_overheads + initial_totals // Calculation: operating_overhead_usage . calculated_overheads + initial_totals
// Where operating_overhead_usage is the direct mapping from overhead -> operating department, calculated overheads is the // Where operating_overhead_usage is the direct mapping from overhead -> operating department, calculated overheads is the
// solved overheads usages after taking into account usage between departments, and initial_totals is the initial values // solved overheads usages after taking into account usage between departments, and initial_totals is the initial values
// for the operating departments. // for the operating departments.
let calculated = operating_overhead_mappings * calculated_overheads + operating_costs_vec; let calculated =
operating_overhead_mappings * calculated_overheads + operating_costs_vec;
let converted_result: Vec<TotalDepartmentCost> = operating_department_mappings let converted_result: Vec<TotalDepartmentCost> = operating_department_mappings
.iter() .iter()
@@ -675,30 +678,88 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver>(
// Redistribute floating point errors (only for ccs we actually allocated from/to) // Redistribute floating point errors (only for ccs we actually allocated from/to)
// Considered removing this since redistribution should be done in cost driver calculations, however since that usually // Considered removing this since redistribution should be done in cost driver calculations, however since that usually
// does nothing, we may as well keep this just in case. // does nothing, we may as well keep this just in case.
let initial_cost: f64 = total_costs
.summed_department_costs // TODO: Not sure we actually need this, would probably be better to have a better storage format than
.iter() // csv/string conversions
.filter(|cost| { // let initial_cost: f64 = total_costs
operating_department_mappings.contains_key(&cost.department) // .summed_department_costs
|| overhead_department_mappings.contains_key(&cost.department) // .iter()
}) // .filter(|cost| {
.map(|cost| cost.value) // operating_department_mappings.contains_key(&cost.department)
.sum(); // || overhead_department_mappings.contains_key(&cost.department)
let new_cost: f64 = converted_result.iter().map(|cost| cost.value).sum(); // })
let diff = initial_cost - new_cost; // .map(|cost| cost.value)
final_account_costs.push(AccountCost { // .sum();
account: total_costs.account, // let new_cost: f64 = converted_result.iter().map(|cost| cost.value).sum();
// let diff = initial_cost - new_cost;
AccountCost {
account: total_costs.account.clone(),
summed_department_costs: converted_result summed_department_costs: converted_result
.into_iter() .into_iter()
.map(|cost| TotalDepartmentCost { .map(|cost| TotalDepartmentCost {
department: cost.department, department: cost.department,
value: cost.value + cost.value / new_cost * diff, value: cost.value, // + if new_cost == 0_f64 || diff == 0_f64 {
// 0_f64
// } else {
// cost.value / new_cost * diff
// },
}) })
.collect(), .collect(),
});
break;
} }
Ok(final_account_costs) })
.collect()
}
fn solve_reciprocal_with_from<T: ReciprocalAllocationSolver + Sync + Send>(
solver: T,
total_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>,
operating_department_mappings: HashMap<String, usize>,
operating_overhead_mappings: DMatrix<f64>,
temp_writer: &mut Writer<impl Write>,
zero_threshold: f64,
) -> anyhow::Result<()> {
for total_costs in total_costs {
let moved_amounts: Vec<MovedAmount> = total_costs
.summed_department_costs
.par_iter()
.filter(|overhead_department_cost| {
overhead_department_mappings.contains_key(&overhead_department_cost.department)
&& overhead_department_cost.value != 0_f64
})
.flat_map(|overhead_department_cost| {
let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()];
overhead_slice_costs[*overhead_department_mappings
.get(&overhead_department_cost.department)
.unwrap()] = overhead_department_cost.value;
let overhead_costs_vec: DMatrix<f64> = DMatrix::from_row_slice(
overhead_department_mappings.len(),
1,
&overhead_slice_costs,
);
let calculated_overheads = solver.solve(&overhead_costs_vec);
let calculated = &operating_overhead_mappings * calculated_overheads;
operating_department_mappings
.iter()
.map(|(department, index)| (department, *calculated.get(*index).unwrap()))
.map(|(department, value)| MovedAmount {
account: total_costs.account.clone(),
cost_centre: department.clone(),
value: value,
from_cost_centre: department.clone(),
})
.collect::<Vec<MovedAmount>>()
})
.collect();
for moved_amount in moved_amounts {
temp_writer.serialize(moved_amount)?;
}
}
temp_writer.flush().unwrap();
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@@ -712,6 +773,7 @@ mod tests {
use crate::TotalDepartmentCost; use crate::TotalDepartmentCost;
use super::reciprocal_allocation_impl; use super::reciprocal_allocation_impl;
use super::MovedAmount;
#[test] #[test]
fn test_basic() { fn test_basic() {
@@ -832,7 +894,7 @@ mod tests {
true, true,
"E".to_owned(), "E".to_owned(),
true, true,
0.00001, 0.001,
); );
assert!(result.is_ok()) assert!(result.is_ok())
} }