Add multithreading to overhead allocation, add optional storage of from departments

This commit is contained in:
Michael Pivato
2023-03-13 06:03:18 +00:00
parent 8a6a94a11e
commit 933e9b33ca
2 changed files with 169 additions and 107 deletions

View File

@@ -66,7 +66,7 @@ enum Commands {
#[arg(short = 'f', long)] #[arg(short = 'f', long)]
show_from: bool, show_from: bool,
#[arg(short, long, default_value = "0.000001")] #[arg(short, long, default_value = "0.00000000000000001")]
zero_threshold: f64, zero_threshold: f64,
#[arg(short, long, value_name = "FILE")] #[arg(short, long, value_name = "FILE")]

View File

@@ -5,7 +5,7 @@ use std::{
use csv::Writer; use csv::Writer;
use itertools::Itertools; use itertools::Itertools;
use nalgebra::{DMatrix, Dynamic, LU}; use nalgebra::{zero, DMatrix, Dynamic, LU};
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -64,7 +64,7 @@ pub struct AccountCost {
summed_department_costs: Vec<TotalDepartmentCost>, summed_department_costs: Vec<TotalDepartmentCost>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize, Deserialize)]
struct MovedAmount { struct MovedAmount {
account: String, account: String,
cost_centre: String, cost_centre: String,
@@ -94,6 +94,9 @@ pub fn reciprocal_allocation<Lines, Account, AllocationStatistic, Area, CostCent
allocation_statistics: &mut csv::Reader<AllocationStatistic>, allocation_statistics: &mut csv::Reader<AllocationStatistic>,
areas: &mut csv::Reader<Area>, areas: &mut csv::Reader<Area>,
cost_centres: &mut csv::Reader<CostCentre>, cost_centres: &mut csv::Reader<CostCentre>,
// TODO: Receiver method rather than this writer that can accept
// the raw float results, so we can write in an alternate format
// that more accurately represents the values on disk
output: &mut csv::Writer<Output>, output: &mut csv::Writer<Output>,
use_numeric_accounts: bool, use_numeric_accounts: bool,
exclude_negative_allocation_statistics: bool, exclude_negative_allocation_statistics: bool,
@@ -559,7 +562,7 @@ fn get_rules_indexes(
.collect() .collect()
} }
fn do_solve_reciprocal<T: ReciprocalAllocationSolver>( fn do_solve_reciprocal<T: ReciprocalAllocationSolver + Sync + Send>(
solver: T, solver: T,
account_costs: Vec<AccountCost>, account_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>, overhead_department_mappings: HashMap<String, usize>,
@@ -587,118 +590,176 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver>(
overhead_department_mappings.len(), overhead_department_mappings.len(),
operating_overhead_mappings, operating_overhead_mappings,
); );
let mut final_account_costs: Vec<AccountCost> = Vec::with_capacity(account_costs.len());
let mut temp_writer = temp_writer; let mut temp_writer = temp_writer;
for total_costs in account_costs { if let Some(temp_writer) = temp_writer.as_mut() {
// TODO: There has to be a cleaner way to do this, perhaps by presorting things? solve_reciprocal_with_from(
let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()]; solver,
for cost in total_costs.summed_department_costs.iter() { account_costs,
if overhead_department_mappings.contains_key(&cost.department) { overhead_department_mappings,
overhead_slice_costs[*overhead_department_mappings.get(&cost.department).unwrap()] = operating_department_mappings,
cost.value operating_overhead_mappings_mat,
temp_writer,
zero_threshold,
)?;
Ok(vec![])
} else {
Ok(solve_reciprocal_no_from(
solver,
account_costs,
overhead_department_mappings,
operating_department_mappings,
operating_overhead_mappings_mat,
))
}
}
fn solve_reciprocal_no_from(
solver: impl ReciprocalAllocationSolver + Sync + Send,
account_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>,
operating_department_mappings: HashMap<String, usize>,
operating_overhead_mappings_mat: DMatrix<f64>,
) -> Vec<AccountCost> {
account_costs
.par_iter()
// .filter(|cost| cost.account == "A480200")
.map(|total_costs| {
// To get the from/to ccs like ppm does, we ignore the initial totals. Then for each overhead cc,
// we zero out all the calculated overheads except for this cc and do
// operating_overhead_mappings * calculated_overheads (basically the first part of the normal calculation)
// TODO: There has to be a cleaner way to do this, perhaps by presorting things?
let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()];
for cost in total_costs.summed_department_costs.iter() {
if overhead_department_mappings.contains_key(&cost.department) {
overhead_slice_costs
[*overhead_department_mappings.get(&cost.department).unwrap()] = cost.value
}
} }
} let overhead_costs_vec: DMatrix<f64> = DMatrix::from_row_slice(
let overhead_costs_vec: DMatrix<f64> = overhead_department_mappings.len(),
DMatrix::from_row_slice(overhead_department_mappings.len(), 1, &overhead_slice_costs); 1,
let calculated_overheads = solver.solve(&overhead_costs_vec); &overhead_slice_costs,
);
let calculated_overheads = solver.solve(&overhead_costs_vec);
let mut operating_slice_costs = vec![0.; operating_department_mappings.len()]; let mut operating_slice_costs = vec![0.; operating_department_mappings.len()];
for cost in &total_costs.summed_department_costs { for cost in &total_costs.summed_department_costs {
if operating_department_mappings.contains_key(&cost.department) { if operating_department_mappings.contains_key(&cost.department) {
let elem = &mut operating_slice_costs let elem = &mut operating_slice_costs
[*operating_department_mappings.get(&cost.department).unwrap()]; [*operating_department_mappings.get(&cost.department).unwrap()];
*elem = cost.value; *elem = cost.value;
}
} }
} let operating_costs_vec: DMatrix<f64> = DMatrix::from_row_slice(
let operating_costs_vec: DMatrix<f64> = DMatrix::from_row_slice( operating_department_mappings.len(),
operating_department_mappings.len(), 1,
1, &operating_slice_costs,
&operating_slice_costs, );
);
// Borrow so we don't move between loops // // Borrow so we don't move between loops
let operating_overhead_mappings = &operating_overhead_mappings_mat; let operating_overhead_mappings = &operating_overhead_mappings_mat;
let calculated_overheads = &calculated_overheads; let calculated_overheads = &calculated_overheads;
// To get the from/to ccs like ppm does, we ignore the initial totals. Then for each overhead cc, // Calculation: operating_overhead_usage . calculated_overheads + initial_totals
// we zero out all the calculated overheads except for this cc and do // Where operating_overhead_usage is the direct mapping from overhead -> operating department, calculated overheads is the
// operating_overhead_mappings * calculated_overheads (basically the first part of the normal calculation) // solved overheads usages after taking into account usage between departments, and initial_totals is the initial values
if let Some(temp_writer) = temp_writer.as_mut() { // for the operating departments.
// TODO: A performance improvement will be to create another hashmap for index -> department, then just let calculated =
// iterate over the actual indexes instead (will have preloading) operating_overhead_mappings * calculated_overheads + operating_costs_vec;
for (overhead_department, index) in overhead_department_mappings.iter() {
// TODO: Check this filter is actually working correctly by summing the costs and comparing to the non show_from setting let converted_result: Vec<TotalDepartmentCost> = operating_department_mappings
// (the sums should match up) .iter()
// Thinking intuitively, if the cost truely didn't exist, then it never would have been included in the totals .map(|(department, index)| TotalDepartmentCost {
// in the first place, department: department.clone(),
let initial_amount = total_costs value: *calculated.get(*index).unwrap(),
.summed_department_costs })
.collect();
// Redistribute floating point errors (only for ccs we actually allocated from/to)
// Considered removing this since redistribution should be done in cost driver calculations, however since that usually
// does nothing, we may as well keep this just in case.
// TODO: Not sure we actually need this, would probably be better to have a better storage format than
// csv/string conversions
// let initial_cost: f64 = total_costs
// .summed_department_costs
// .iter()
// .filter(|cost| {
// operating_department_mappings.contains_key(&cost.department)
// || overhead_department_mappings.contains_key(&cost.department)
// })
// .map(|cost| cost.value)
// .sum();
// let new_cost: f64 = converted_result.iter().map(|cost| cost.value).sum();
// let diff = initial_cost - new_cost;
AccountCost {
account: total_costs.account.clone(),
summed_department_costs: converted_result
.into_iter()
.map(|cost| TotalDepartmentCost {
department: cost.department,
value: cost.value, // + if new_cost == 0_f64 || diff == 0_f64 {
// 0_f64
// } else {
// cost.value / new_cost * diff
// },
})
.collect(),
}
})
.collect()
}
fn solve_reciprocal_with_from<T: ReciprocalAllocationSolver + Sync + Send>(
solver: T,
total_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>,
operating_department_mappings: HashMap<String, usize>,
operating_overhead_mappings: DMatrix<f64>,
temp_writer: &mut Writer<impl Write>,
zero_threshold: f64,
) -> anyhow::Result<()> {
for total_costs in total_costs {
let moved_amounts: Vec<MovedAmount> = total_costs
.summed_department_costs
.par_iter()
.filter(|overhead_department_cost| {
overhead_department_mappings.contains_key(&overhead_department_cost.department)
&& overhead_department_cost.value != 0_f64
})
.flat_map(|overhead_department_cost| {
let mut overhead_slice_costs = vec![0.; overhead_department_mappings.len()];
overhead_slice_costs[*overhead_department_mappings
.get(&overhead_department_cost.department)
.unwrap()] = overhead_department_cost.value;
let overhead_costs_vec: DMatrix<f64> = DMatrix::from_row_slice(
overhead_department_mappings.len(),
1,
&overhead_slice_costs,
);
let calculated_overheads = solver.solve(&overhead_costs_vec);
let calculated = &operating_overhead_mappings * calculated_overheads;
operating_department_mappings
.iter() .iter()
.filter(|cost| cost.department == *overhead_department) .map(|(department, index)| (department, *calculated.get(*index).unwrap()))
.next(); .map(|(department, value)| MovedAmount {
if initial_amount.is_none() { account: total_costs.account.clone(),
continue; cost_centre: department.clone(),
} value: value,
from_cost_centre: department.clone(),
let calculated_amount = calculated_overheads[*index]; })
// Calculate each movement individually .collect::<Vec<MovedAmount>>()
let calculated = operating_overhead_mappings.column(*index) * calculated_amount;
for (department, index) in &operating_department_mappings {
let value = *calculated.get(*index).unwrap();
if value > zero_threshold || value < -1. * zero_threshold {
temp_writer.serialize(MovedAmount {
account: total_costs.account.clone(),
cost_centre: department.clone(),
value,
from_cost_centre: overhead_department.clone(),
})?;
}
}
}
// Don't bother performing the second calculation, it's redundant
continue;
}
// Calculation: operating_overhead_usage . calculated_overheads + initial_totals
// Where operating_overhead_usage is the direct mapping from overhead -> operating department, calculated overheads is the
// solved overheads usages after taking into account usage between departments, and initial_totals is the initial values
// for the operating departments.
let calculated = operating_overhead_mappings * calculated_overheads + operating_costs_vec;
let converted_result: Vec<TotalDepartmentCost> = operating_department_mappings
.iter()
.map(|(department, index)| TotalDepartmentCost {
department: department.clone(),
value: *calculated.get(*index).unwrap(),
}) })
.collect(); .collect();
// Redistribute floating point errors (only for ccs we actually allocated from/to)
// Considered removing this since redistribution should be done in cost driver calculations, however since that usually for moved_amount in moved_amounts {
// does nothing, we may as well keep this just in case. temp_writer.serialize(moved_amount)?;
let initial_cost: f64 = total_costs }
.summed_department_costs
.iter()
.filter(|cost| {
operating_department_mappings.contains_key(&cost.department)
|| overhead_department_mappings.contains_key(&cost.department)
})
.map(|cost| cost.value)
.sum();
let new_cost: f64 = converted_result.iter().map(|cost| cost.value).sum();
let diff = initial_cost - new_cost;
final_account_costs.push(AccountCost {
account: total_costs.account,
summed_department_costs: converted_result
.into_iter()
.map(|cost| TotalDepartmentCost {
department: cost.department,
value: cost.value + cost.value / new_cost * diff,
})
.collect(),
});
break;
} }
Ok(final_account_costs) temp_writer.flush().unwrap();
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@@ -712,6 +773,7 @@ mod tests {
use crate::TotalDepartmentCost; use crate::TotalDepartmentCost;
use super::reciprocal_allocation_impl; use super::reciprocal_allocation_impl;
use super::MovedAmount;
#[test] #[test]
fn test_basic() { fn test_basic() {
@@ -832,7 +894,7 @@ mod tests {
true, true,
"E".to_owned(), "E".to_owned(),
true, true,
0.00001, 0.001,
); );
assert!(result.is_ok()) assert!(result.is_ok())
} }