Move move_money into a wasm module, add dynamic node test that uses move_money
Some checks failed
test / test (push) Failing after 1m11s
Some checks failed
test / test (push) Failing after 1m11s
This commit is contained in:
47
move-money-dynamic/src/lib.rs
Normal file
47
move-money-dynamic/src/lib.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use crate::bindings::{CsvReaders, CsvWriter, Guest, ReadMap};
|
||||
use crate::move_money::move_money;
|
||||
|
||||
#[allow(warnings)]
|
||||
mod bindings;
|
||||
pub mod move_money;
|
||||
|
||||
struct Component;
|
||||
|
||||
impl Guest for Component {
|
||||
fn evaluate(properties: ReadMap, readers: CsvReaders, writer: CsvWriter) -> () {
|
||||
let accounts_reader = readers.read_into_string("Account");
|
||||
let cost_centres_reader = readers.read_into_string("CostCentre");
|
||||
let lines = readers.read_into_string("Line");
|
||||
let rules = readers.read_into_string("Rule");
|
||||
|
||||
let use_numeric_accounts = properties
|
||||
.get("use_numeric_accounts")
|
||||
.map(|param| param == "true")
|
||||
.unwrap_or(false);
|
||||
let flush_pass = properties
|
||||
.get("flush_pass")
|
||||
.map(|param| param == "true")
|
||||
.unwrap_or(false);
|
||||
|
||||
let mut output_writer = csv::Writer::from_writer(vec![]);
|
||||
let result = move_money(
|
||||
&mut csv::Reader::from_reader(rules.as_bytes()),
|
||||
&mut csv::Reader::from_reader(lines.as_bytes()),
|
||||
&mut csv::Reader::from_reader(accounts_reader.as_bytes()),
|
||||
&mut csv::Reader::from_reader(cost_centres_reader.as_bytes()),
|
||||
&mut output_writer,
|
||||
use_numeric_accounts,
|
||||
flush_pass,
|
||||
);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let inner = output_writer.into_inner().unwrap();
|
||||
let wrapped = String::from_utf8(inner).unwrap();
|
||||
writer.write_string(wrapped.as_str());
|
||||
}
|
||||
Err(e) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bindings::export!(Component with_types_in bindings);
|
||||
448
move-money-dynamic/src/move_money.rs
Normal file
448
move-money-dynamic/src/move_money.rs
Normal file
@@ -0,0 +1,448 @@
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
#[derive(Deserialize)]
|
||||
pub struct CsvAccount {
|
||||
#[serde(rename = "Code")]
|
||||
pub code: String,
|
||||
#[serde(rename = "Description")]
|
||||
pub description: Option<String>,
|
||||
#[serde(rename = "Type")]
|
||||
pub account_type: String,
|
||||
#[serde(rename = "CostOutput")]
|
||||
pub cost_output: Option<String>,
|
||||
#[serde(rename = "PercentFixed")]
|
||||
pub percent_fixed: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CsvMovementRule {
|
||||
#[serde(rename = "CostCentreSourceFrom", default)]
|
||||
source_from_department: String,
|
||||
#[serde(rename = "CostCentreSourceTo", default)]
|
||||
source_to_department: String,
|
||||
#[serde(rename = "CostCentreDestFrom", default)]
|
||||
dest_from_department: String,
|
||||
#[serde(rename = "CostCentreDestTo", default)]
|
||||
dest_to_department: String,
|
||||
#[serde(rename = "AccountSourceFrom", default)]
|
||||
source_from_account: String,
|
||||
#[serde(rename = "AccountSourceTo", default)]
|
||||
source_to_account: String,
|
||||
#[serde(rename = "AccountDestFrom", default)]
|
||||
dest_from_account: String,
|
||||
#[serde(rename = "AccountDestTo", default)]
|
||||
dest_to_account: String,
|
||||
#[serde(rename = "AmountValue")]
|
||||
amount: Option<f64>,
|
||||
#[serde(rename = "AmountType", default)]
|
||||
is_percent: String,
|
||||
#[serde(rename = "Apply", default)]
|
||||
apply: String,
|
||||
#[serde(rename = "CostOutput")]
|
||||
cost_output: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct PartialCostCentre {
|
||||
#[serde(rename = "Code")]
|
||||
pub code: String,
|
||||
#[serde(rename = "Area")]
|
||||
pub area: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MovementRule {
|
||||
// If the vectors are empty, then it means 'all'
|
||||
pub from_departments: HashSet<String>,
|
||||
pub to_departments: HashSet<String>,
|
||||
pub all_from_departments: bool,
|
||||
pub all_to_departments: bool,
|
||||
pub from_accounts: HashSet<String>,
|
||||
pub to_accounts: HashSet<String>,
|
||||
pub all_from_accounts: bool,
|
||||
pub all_to_accounts: bool,
|
||||
pub amount: f64,
|
||||
pub is_percent: bool,
|
||||
pub is_separator: bool,
|
||||
}
|
||||
|
||||
impl MovementRule {
|
||||
pub fn pass_break() -> MovementRule {
|
||||
MovementRule {
|
||||
is_separator: true,
|
||||
..MovementRule::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> bool {
|
||||
if self.from_departments.is_empty() && self.to_departments.is_empty() {
|
||||
// Would be nice to have a decent message/error here as well
|
||||
return false;
|
||||
}
|
||||
if self.is_percent && (self.amount < 0.0 || self.amount > 100.0) {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Hash, Clone, Default, PartialEq, Eq)]
|
||||
pub struct Unit {
|
||||
pub department: String,
|
||||
pub account: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CsvCost {
|
||||
#[serde(rename = "ACCOUNT")]
|
||||
pub account: String,
|
||||
#[serde(rename = "COSTCENTRE")]
|
||||
pub department: String,
|
||||
#[serde(serialize_with = "round_serialize")]
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MovedMoneyAmount {
|
||||
pub account: String,
|
||||
pub department: String,
|
||||
pub value: f64,
|
||||
pub from_account: String,
|
||||
pub from_department: String,
|
||||
pub pass: i32,
|
||||
}
|
||||
|
||||
fn round_serialize<S>(x: &f64, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
s.serialize_f64((x * 100000.).round() / 100000.)
|
||||
}
|
||||
|
||||
pub fn move_money<R, L, A, C, O>(
|
||||
rules_reader: &mut csv::Reader<R>,
|
||||
lines_reader: &mut csv::Reader<L>,
|
||||
accounts_reader: &mut csv::Reader<A>,
|
||||
cost_centres_reader: &mut csv::Reader<C>,
|
||||
output: &mut csv::Writer<O>,
|
||||
use_numeric_accounts: bool,
|
||||
flush_pass: bool,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
R: std::io::Read,
|
||||
L: std::io::Read,
|
||||
A: std::io::Read,
|
||||
C: std::io::Read,
|
||||
O: std::io::Write,
|
||||
{
|
||||
let headers = lines_reader.headers()?;
|
||||
let mut account_index = 0;
|
||||
let mut department_index = 0;
|
||||
let mut account_type_index = 0;
|
||||
for (index, field) in headers.iter().enumerate() {
|
||||
if field.eq_ignore_ascii_case("account") {
|
||||
account_index = index;
|
||||
} else if field.eq_ignore_ascii_case("costcentre") {
|
||||
department_index = index;
|
||||
} else if field.eq_ignore_ascii_case("accounttype") {
|
||||
account_type_index = index;
|
||||
}
|
||||
}
|
||||
|
||||
let lines: HashMap<Unit, f64> = lines_reader
|
||||
.records()
|
||||
.map(|record| {
|
||||
let record = record.unwrap();
|
||||
let account = record.get(account_index).unwrap();
|
||||
let department = record.get(department_index).unwrap();
|
||||
let sum = record
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(i, _)| {
|
||||
*i != account_index && *i != department_index && *i != account_type_index
|
||||
})
|
||||
.map(|(_, f)| f.parse::<f64>().unwrap_or(0.))
|
||||
.sum();
|
||||
(
|
||||
Unit {
|
||||
account: account.into(),
|
||||
department: department.into(),
|
||||
},
|
||||
sum,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let accounts_reader = accounts_reader;
|
||||
let all_accounts = accounts_reader
|
||||
.deserialize()
|
||||
.collect::<Result<Vec<CsvAccount>, csv::Error>>()?;
|
||||
let account_mappings: HashMap<String, CsvAccount> = all_accounts
|
||||
.into_iter()
|
||||
.map(|account| (account.code.clone(), account))
|
||||
.collect();
|
||||
|
||||
let all_accounts_sorted = if use_numeric_accounts {
|
||||
lines
|
||||
.keys()
|
||||
.map(|key| key.account.clone().parse::<i32>().unwrap())
|
||||
.unique()
|
||||
.sorted()
|
||||
.map(|account| account.to_string())
|
||||
.collect()
|
||||
} else {
|
||||
lines
|
||||
.keys()
|
||||
.map(|key| key.account.clone())
|
||||
.unique()
|
||||
.sorted()
|
||||
.collect()
|
||||
};
|
||||
let all_departments_sorted = cost_centres_reader
|
||||
.deserialize::<PartialCostCentre>()
|
||||
.map(|cc| cc.unwrap().code)
|
||||
.unique()
|
||||
.sorted()
|
||||
.collect();
|
||||
let mut rules: Vec<MovementRule> = vec![];
|
||||
for movement_rule in rules_reader.deserialize() {
|
||||
// TODO: Consider reclass rule group, how does that even work?
|
||||
let movement_rule: CsvMovementRule = movement_rule?;
|
||||
let is_separator = movement_rule.apply == "-DIVIDER-";
|
||||
let from_accounts = if is_separator {
|
||||
HashSet::new()
|
||||
} else if movement_rule.cost_output.is_some() {
|
||||
account_mappings
|
||||
.iter()
|
||||
.filter(|(_, account)| {
|
||||
account.cost_output.is_some()
|
||||
&& account.cost_output.clone().unwrap()
|
||||
== movement_rule.cost_output.clone().unwrap()
|
||||
})
|
||||
.map(|(code, _)| code.clone())
|
||||
.collect()
|
||||
} else {
|
||||
extract_range(
|
||||
movement_rule.source_from_account,
|
||||
movement_rule.source_to_account,
|
||||
&all_accounts_sorted,
|
||||
)
|
||||
};
|
||||
let to_accounts = if is_separator {
|
||||
HashSet::new()
|
||||
} else if movement_rule.cost_output.is_some() {
|
||||
account_mappings
|
||||
.iter()
|
||||
.filter(|(_, account)| {
|
||||
account.cost_output.is_some()
|
||||
&& account.cost_output.clone().unwrap()
|
||||
== movement_rule.cost_output.clone().unwrap()
|
||||
})
|
||||
.map(|(code, _)| code.clone())
|
||||
.collect()
|
||||
} else {
|
||||
extract_range(
|
||||
movement_rule.dest_from_account,
|
||||
movement_rule.dest_to_account,
|
||||
&all_accounts_sorted,
|
||||
)
|
||||
};
|
||||
let from_departments = if is_separator {
|
||||
HashSet::new()
|
||||
} else {
|
||||
extract_range(
|
||||
movement_rule.source_from_department,
|
||||
movement_rule.source_to_department,
|
||||
&all_departments_sorted,
|
||||
)
|
||||
};
|
||||
let to_departments = if is_separator {
|
||||
HashSet::new()
|
||||
} else {
|
||||
extract_range(
|
||||
movement_rule.dest_from_department,
|
||||
movement_rule.dest_to_department,
|
||||
&all_departments_sorted,
|
||||
)
|
||||
};
|
||||
rules.push(MovementRule {
|
||||
all_from_departments: from_departments.is_empty(),
|
||||
all_to_departments: to_departments.is_empty(),
|
||||
from_departments,
|
||||
to_departments,
|
||||
all_from_accounts: from_accounts.is_empty(),
|
||||
all_to_accounts: to_accounts.is_empty(),
|
||||
from_accounts,
|
||||
to_accounts,
|
||||
amount: movement_rule.amount.unwrap_or(0.)
|
||||
* (if movement_rule.is_percent == "%" {
|
||||
0.01
|
||||
} else {
|
||||
1.
|
||||
}),
|
||||
is_percent: movement_rule.is_percent == "%",
|
||||
is_separator,
|
||||
})
|
||||
}
|
||||
|
||||
// Then run move_money
|
||||
let moved = move_money_2(lines, &rules, flush_pass, output);
|
||||
|
||||
if !flush_pass {
|
||||
// Ouput the final moneys
|
||||
for (unit, value) in moved {
|
||||
output.serialize(CsvCost {
|
||||
account: unit.account,
|
||||
department: unit.department,
|
||||
value,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn extract_range(from: String, to: String, options: &Vec<String>) -> HashSet<String> {
|
||||
if from.is_empty() && to.is_empty() {
|
||||
return HashSet::new();
|
||||
}
|
||||
let start_index = options
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|option| option.1 == &from)
|
||||
.map(|start| start.0);
|
||||
let end_index = options
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|option| option.1 == &to)
|
||||
.map(|end| end.0);
|
||||
if let Some(start) = start_index {
|
||||
if let Some(end) = end_index {
|
||||
return Vec::from(&options[start..end + 1]).into_iter().collect();
|
||||
} else {
|
||||
return vec![options[start].clone()].into_iter().collect();
|
||||
}
|
||||
} else if let Some(end) = end_index {
|
||||
return vec![options[end].clone()].into_iter().collect();
|
||||
}
|
||||
return HashSet::new();
|
||||
}
|
||||
|
||||
// Approach 1:
|
||||
// Use math (linear algebra) to move between departments. Memory/computationally it's equivalent
|
||||
// to the worst case of approach one, however can take advantage of auto parallelisation/simd
|
||||
// to perform fast, particularly on larger datasets.
|
||||
// This basically just involves smushing all the rules, then doing a matrix multiple and matrix addition
|
||||
// on the initial set. Can't record passes, but can record the smushed rules if only the data changes later
|
||||
// Advantage of this approach is it can be easily extended to run on the gpu.
|
||||
pub fn move_money_1() {}
|
||||
|
||||
// Approach 2:
|
||||
// Traditinoal/naive, total for each department is stored in an initial map (department -> total amount).
|
||||
// Another map is built up for each rule, and each rule is processed based on the amount in the current total
|
||||
// map.
|
||||
// Upon a pass break (separator), the temp map will assign the values into the total map.
|
||||
// Once done, do a final assignment back to the total, and return that.
|
||||
// Advantage of this is the required code is tiny, and no third-party math library is required.
|
||||
// Note that the movement happens on a line-by-line level. So we can stream the data from disk, and potentially apply this
|
||||
// to every. It's also much more memory efficient than approach 1.
|
||||
// TODO: Time both approaches to seee which is faster depending on the size of the input data/number of rules
|
||||
// Verdict: This is already pretty fast, at least much faster than ppm for BigDataset
|
||||
pub fn move_money_2<W>(
|
||||
initial_totals: HashMap<Unit, f64>,
|
||||
rules: &Vec<MovementRule>,
|
||||
flush_pass: bool,
|
||||
output: &mut csv::Writer<W>,
|
||||
) -> HashMap<Unit, f64>
|
||||
where
|
||||
W: std::io::Write,
|
||||
{
|
||||
// Note: It's potentially a bit more intensive to use cloned totals (rather than just update temp_total per rule),
|
||||
// but it's much simpler code and, and since we're only working line-by-line, it isn't really that much memory in practice
|
||||
// TODO: This is initial totals is problematic, as we may move into a cc that wasn't in the list of lines. In which case we'd need
|
||||
// to add a new line
|
||||
let mut running_total = HashMap::from(initial_totals);
|
||||
let mut temp_total = running_total.clone();
|
||||
let mut current_pass = 0;
|
||||
if flush_pass {
|
||||
for (unit, value) in running_total.iter() {
|
||||
output
|
||||
.serialize(MovedMoneyAmount {
|
||||
account: unit.account.clone(),
|
||||
department: unit.department.clone(),
|
||||
value: *value,
|
||||
from_account: unit.account.clone(),
|
||||
from_department: unit.department.clone(),
|
||||
pass: current_pass,
|
||||
})
|
||||
.expect("Failed to write moved amount")
|
||||
}
|
||||
}
|
||||
for rule in rules {
|
||||
if rule.is_separator {
|
||||
current_pass += 1;
|
||||
running_total = temp_total.clone();
|
||||
} else {
|
||||
for (unit, amount) in &running_total {
|
||||
if (rule.all_from_departments || rule.from_departments.contains(&unit.department))
|
||||
&& (rule.all_from_accounts || rule.from_accounts.contains(&unit.account))
|
||||
{
|
||||
let previous_temp = amount;
|
||||
let added_amount = if rule.is_percent {
|
||||
previous_temp * rule.amount
|
||||
} else {
|
||||
rule.amount
|
||||
};
|
||||
// TODO: Track the from department/account, maintained in a separate list if flush_pass is set
|
||||
*temp_total.get_mut(&unit).unwrap() -= added_amount;
|
||||
let department = if rule.to_departments.len() == 1 {
|
||||
rule.to_departments.iter().next().unwrap().clone()
|
||||
} else {
|
||||
unit.department.clone()
|
||||
};
|
||||
let account = if rule.to_accounts.len() == 1 {
|
||||
rule.to_accounts.iter().next().unwrap().clone()
|
||||
} else {
|
||||
unit.account.clone()
|
||||
};
|
||||
if flush_pass {
|
||||
output
|
||||
.serialize(MovedMoneyAmount {
|
||||
department: department.clone(),
|
||||
account: department.clone(),
|
||||
value: added_amount,
|
||||
from_department: unit.department.clone(),
|
||||
from_account: unit.account.clone(),
|
||||
pass: current_pass,
|
||||
})
|
||||
.expect("Failed to write moved amount");
|
||||
}
|
||||
*temp_total
|
||||
.entry(Unit {
|
||||
department,
|
||||
account,
|
||||
})
|
||||
.or_insert(0.) += added_amount;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
temp_total
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn move_money() {
|
||||
super::move_money(
|
||||
&mut csv::Reader::from_path("../testing/input/move_money/reclassrule.csv").unwrap(),
|
||||
&mut csv::Reader::from_path("../testing/input/move_money/line.csv").unwrap(),
|
||||
&mut csv::Reader::from_path("../testing/input/account.csv").unwrap(),
|
||||
&mut csv::Reader::from_path("../testing/input/costcentre.csv").unwrap(),
|
||||
&mut csv::Writer::from_path("../testing/output/output.csv").unwrap(),
|
||||
false,
|
||||
true,
|
||||
)
|
||||
.expect("Failed to move money");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user