Start adding messagepack serialization by adding custom serialize trait to handle csv and messagepack serialization

This is required as csv writer doesn't expose the serde::Serializer implementation, so we can't just use that trait in place of the output writer
This commit is contained in:
piv
2023-03-15 21:48:49 +10:30
parent 83a4fc28cc
commit 7949a0a07b
4 changed files with 53 additions and 14 deletions

23
Cargo.lock generated
View File

@@ -210,6 +210,7 @@ dependencies = [
"itertools",
"nalgebra",
"rayon",
"rmp-serde",
"serde",
"sqlx",
"tokio",
@@ -1036,6 +1037,28 @@ dependencies = [
"winapi",
]
[[package]]
name = "rmp"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5b13be192e0220b8afb7222aa5813cb62cc269ebb5cac346ca6487681d2913e"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rustix"
version = "0.36.8"

View File

@@ -23,6 +23,7 @@ chrono = {version = "0.4.23", features = ["default", "serde"]}
rayon = "1.6.0"
tokio = { version = "1.26.0", features = ["full"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql" ] }
rmp-serde = "1.1.1"
# More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target
[lib]

View File

@@ -1,10 +1,9 @@
mod move_money;
pub use self::move_money::*;
use std::ffi::c_char;
use std::ffi::CStr;
use std::ffi::CString;
pub use self::move_money::*;
mod overhead_allocation;
pub use self::overhead_allocation::*;

View File

@@ -5,8 +5,9 @@ use std::{
use csv::Writer;
use itertools::Itertools;
use nalgebra::{zero, DMatrix, Dynamic, LU};
use nalgebra::{DMatrix, Dynamic, LU};
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use crate::{CsvAccount, CsvCost};
@@ -88,7 +89,7 @@ impl ReciprocalAllocationSolver for DMatrix<f64> {
}
}
pub fn reciprocal_allocation<Lines, Account, AllocationStatistic, Area, CostCentre, Output>(
pub fn reciprocal_allocation<Lines, Account, AllocationStatistic, Area, CostCentre>(
lines: &mut csv::Reader<Lines>,
accounts: &mut csv::Reader<Account>,
allocation_statistics: &mut csv::Reader<AllocationStatistic>,
@@ -97,7 +98,7 @@ pub fn reciprocal_allocation<Lines, Account, AllocationStatistic, Area, CostCent
// TODO: Receiver method rather than this writer that can accept
// the raw float results, so we can write in an alternate format
// that more accurately represents the values on disk
output: &mut csv::Writer<Output>,
output: &mut impl RecordSerializer,
use_numeric_accounts: bool,
exclude_negative_allocation_statistics: bool,
any_limit_criteria: bool,
@@ -111,7 +112,6 @@ where
AllocationStatistic: Read,
Area: Read,
CostCentre: Read,
Output: std::io::Write,
{
let lines = lines
.deserialize()
@@ -485,10 +485,10 @@ fn remove_quote_and_padding(s: &str) -> String {
// Perform the reciprocal allocation (matrix) method to allocate servicing departments (indirect) costs
// to functional departments. Basically just a matrix solve, uses regression (moore-penrose pseudoinverse) when
// matrix is singular
fn reciprocal_allocation_impl<W: Write>(
fn reciprocal_allocation_impl(
allocations: Vec<OverheadAllocationRule>,
account_costs: Vec<AccountCost>,
movement_writer: Option<&mut csv::Writer<W>>,
movement_writer: Option<&mut impl RecordSerializer>,
zero_threshold: f64,
) -> anyhow::Result<Vec<AccountCost>> {
let overhead_department_mappings = get_rules_indexes(&allocations, DepartmentType::Overhead);
@@ -567,7 +567,7 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver + Sync + Send>(
account_costs: Vec<AccountCost>,
overhead_department_mappings: HashMap<String, usize>,
allocations: Vec<OverheadAllocationRule>,
temp_writer: Option<&mut Writer<impl Write>>,
temp_writer: Option<&mut impl RecordSerializer>,
zero_threshold: f64,
) -> anyhow::Result<Vec<AccountCost>> {
let operating_department_mappings = get_rules_indexes(&allocations, DepartmentType::Operating);
@@ -590,8 +590,7 @@ fn do_solve_reciprocal<T: ReciprocalAllocationSolver + Sync + Send>(
overhead_department_mappings.len(),
operating_overhead_mappings,
);
let mut temp_writer = temp_writer;
if let Some(temp_writer) = temp_writer.as_mut() {
if let Some(temp_writer) = temp_writer {
solve_reciprocal_with_from(
solver,
account_costs,
@@ -717,7 +716,7 @@ fn solve_reciprocal_with_from<T: ReciprocalAllocationSolver + Sync + Send>(
overhead_department_mappings: HashMap<String, usize>,
operating_department_mappings: HashMap<String, usize>,
operating_overhead_mappings: DMatrix<f64>,
temp_writer: &mut Writer<impl Write>,
temp_writer: &mut impl RecordSerializer,
zero_threshold: f64,
) -> anyhow::Result<()> {
for total_costs in total_costs {
@@ -758,10 +757,27 @@ fn solve_reciprocal_with_from<T: ReciprocalAllocationSolver + Sync + Send>(
temp_writer.serialize(moved_amount)?;
}
}
temp_writer.flush().unwrap();
Ok(())
}
pub trait RecordSerializer {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()>;
}
impl<W: Write> RecordSerializer for csv::Writer<W> {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()> {
self.serialize(record)?;
Ok(())
}
}
impl<W: Write> RecordSerializer for Serializer<W> {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()> {
record.serialize(self)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
@@ -851,7 +867,7 @@ mod tests {
}];
let mut movement_writer = csv::Writer::from_path("test_output.csv").unwrap();
let result = reciprocal_allocation_impl::<File>(
let result = reciprocal_allocation_impl(
allocation_rules,
initial_totals,
Some(&mut movement_writer),