Refactor product creator, remove threading for writing to disk

This commit is contained in:
piv
2023-03-11 10:55:41 +10:30
parent 363c972b71
commit 7cd893cbf8
5 changed files with 250 additions and 256 deletions

View File

@@ -8,8 +8,8 @@ pub use self::move_money::*;
mod overhead_allocation;
pub use self::overhead_allocation::*;
mod create_products;
pub use self::create_products::*;
mod products;
pub use self::products::create_products;
mod shared_models;
pub use self::shared_models::*;

View File

@@ -66,6 +66,28 @@ enum Commands {
#[arg(short, long, value_name = "FILE")]
output: Option<PathBuf>,
},
CreateProducts {
#[arg(short, long, value_name = "FILE")]
definitions: PathBuf,
#[arg(short, long, value_name = "FILE")]
encounters: PathBuf,
#[arg(short, long, value_name = "FILE")]
services: PathBuf,
#[arg(short, long, value_name = "FILE")]
transfers: PathBuf,
#[arg(short, long, value_name = "FILE")]
procedures: PathBuf,
#[arg(short, long, value_name = "FILE")]
diagnoses: PathBuf,
#[arg(short, long, value_name = "FILE")]
output: PathBuf,
},
}
fn main() -> anyhow::Result<()> {
@@ -80,12 +102,12 @@ fn main() -> anyhow::Result<()> {
output,
use_numeric_accounts,
flush_pass,
} => move_money(
rules,
lines,
accounts,
cost_centres,
output,
} => coster_rs::move_money(
&mut csv::Reader::from_path(rules)?,
&mut csv::Reader::from_path(lines)?,
&mut csv::Reader::from_path(accounts)?,
&mut csv::Reader::from_path(cost_centres)?,
&mut csv::Writer::from_path(output.unwrap_or(PathBuf::from("output.csv")))?,
use_numeric_accounts,
flush_pass,
),
@@ -99,61 +121,35 @@ fn main() -> anyhow::Result<()> {
account_type,
exclude_negative_allocation_statistics,
output,
} => allocate_overheads(
lines,
accounts,
allocation_statistics,
areas,
cost_centres,
} => coster_rs::reciprocal_allocation(
csv::Reader::from_path(lines)?,
csv::Reader::from_path(accounts)?,
csv::Reader::from_path(allocation_statistics)?,
csv::Reader::from_path(areas)?,
csv::Reader::from_path(cost_centres)?,
&mut csv::Writer::from_path(output.unwrap_or(PathBuf::from("alloc_output.csv")))?,
use_numeric_accounts,
account_type,
exclude_negative_allocation_statistics,
true,
account_type,
),
Commands::CreateProducts {
definitions,
encounters,
services,
transfers,
procedures,
diagnoses,
output,
} => coster_rs::create_products(
&mut csv::Reader::from_path(definitions)?,
&mut csv::Reader::from_path(encounters)?,
&mut csv::Reader::from_path(services)?,
&mut csv::Reader::from_path(transfers)?,
&mut csv::Reader::from_path(procedures)?,
&mut csv::Reader::from_path(diagnoses)?,
&mut csv::Writer::from_path(output)?,
1000000,
),
}
}
fn move_money(
rules: PathBuf,
lines: PathBuf,
accounts: PathBuf,
cost_centres: PathBuf,
output: Option<PathBuf>,
use_numeric_accounts: bool,
flush_pass: bool,
) -> anyhow::Result<()> {
coster_rs::move_money(
&mut csv::Reader::from_path(rules)?,
&mut csv::Reader::from_path(lines)?,
&mut csv::Reader::from_path(accounts)?,
&mut csv::Reader::from_path(cost_centres)?,
&mut csv::Writer::from_path(output.unwrap_or(PathBuf::from("output.csv")))?,
use_numeric_accounts,
flush_pass,
)
}
fn allocate_overheads(
lines: PathBuf,
accounts: PathBuf,
allocation_statistics: PathBuf,
areas: PathBuf,
cost_centres: PathBuf,
use_numeric_accounts: bool,
account_type: String,
exclude_negative_allocation_statistics: bool,
output: Option<PathBuf>,
) -> anyhow::Result<()> {
coster_rs::reciprocal_allocation(
csv::Reader::from_path(lines)?,
csv::Reader::from_path(accounts)?,
csv::Reader::from_path(allocation_statistics)?,
csv::Reader::from_path(areas)?,
csv::Reader::from_path(cost_centres)?,
&mut csv::Writer::from_path(output.unwrap_or(PathBuf::from("alloc_output.csv")))?,
use_numeric_accounts,
exclude_negative_allocation_statistics,
true,
account_type,
)
}

View File

@@ -0,0 +1,139 @@
use core::panic;
use std::{
collections::HashMap,
io::{Read, Write},
sync::mpsc,
thread,
};
use chrono::NaiveDateTime;
use csv::Position;
use serde::Serialize;
use super::csv::{read_definitions, BuildFrom, ConstraintType, Definition};
#[derive(Debug, Serialize, Default)]
struct Product {
// Parse datetime from string: https://rust-lang-nursery.github.io/rust-cookbook/datetime/parse.html#parse-string-into-datetime-struct
// TODO: Serialisers.
start_date_time: NaiveDateTime,
end_date_time: NaiveDateTime,
encounter_start_date_time: Option<NaiveDateTime>,
encounter: Option<String>,
service: Option<String>,
transfer: Option<String>,
quantity: Option<f64>,
duration: Option<f64>,
actual_charge: Option<f64>,
standard_cost: Option<f64>,
// TODO: Enum this?
day_of_stay: Option<u8>,
source_allocated_amount: Option<f64>,
}
// TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's
// cosing run (BSCO, Dataset_Encounter_Cache, etc).
pub fn create_products<D, E, S, T, P, Di, O>(
definitions: &mut csv::Reader<D>,
encounters: &mut csv::Reader<E>,
services: &mut csv::Reader<S>,
transfers: &mut csv::Reader<T>,
procedures: &mut csv::Reader<P>,
diagnoses: &mut csv::Reader<Di>,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
output: &mut csv::Writer<O>,
// TODO: Default to 10 million or something sane
batch_size: usize,
) -> anyhow::Result<()>
where
D: Read,
E: Read,
S: Read,
T: Read,
P: Read,
Di: Read,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
O: Write + Send + 'static,
{
let mut all_definitions: HashMap<String, Definition> = read_definitions(definitions)?;
// Partition the rules by the build from type, so that we'll run all the rules at once for a particular file, which should be much faster
// then opening files and scanning one at a time. Could also do batches in files
let mut mapped_definitions: HashMap<BuildFrom, Vec<Definition>> = HashMap::new();
for (_, definition) in all_definitions {
mapped_definitions
.entry(definition.build_from)
.or_insert(vec![])
.push(definition);
}
// Now whenever we want to produce a built service, just write it to tx.
// Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then
// seek to that position in the reader (or position 0) if we couldn't find a particular record.
// Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader,
// so we can quickly seek to the appropriate index and read the record.
// https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek
// Store encounter positions in file, so that later when we read through transfers/whatever we can easily
// seak to the correct position quickly in case we have a cache miss
let mut encounter_positions: HashMap<String, Position> = HashMap::new();
// TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?).
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
// although we're still likely to be bottlenecked by just write-speed
let mut encounters = encounters;
let headers = encounters.headers()?.clone();
for encounter in encounters.records() {
let encounter = encounter?;
let position = encounter.position().unwrap();
let encounter: HashMap<String, String> = encounter.deserialize(Some(&headers))?;
encounter_positions.insert(
encounter.get("EncounterNumber").unwrap().to_string(),
position.clone(),
);
// TODO: For each encounter definition, check this fits the filter criteria/constraints,
// and
let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap();
for definition in definitions {
let matching_filter = (definition.filters.is_empty()
|| definition.filters.iter().any(|filter| {
let field = encounter.get(filter.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
if filter.equal {
return filter.value == *field;
} else {
return filter.value != *field;
}
}))
&& (definition.constraints.is_empty()
|| definition.constraints.iter().any(|constraint| {
let field = encounter.get(constraint.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
// TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data
let field_type = &constraint.source_type;
match constraint.constraint_type {
ConstraintType::Equal => *field == constraint.value,
_ => false,
}
}));
if matching_filter {
// Generate the service code
}
}
// TODO: Generate the built service
output.serialize(Product::default());
}
// Now do the same with transfers, services, etc, referencing the encounter reader by using the
// indexes in encounter_positions
Ok(())
}

View File

@@ -1,27 +1,21 @@
use core::panic;
use std::{
collections::HashMap,
io::{Read, Write},
sync::mpsc,
thread,
};
use std::{collections::HashMap, io::Read};
use chrono::NaiveDateTime;
use csv::Position;
use serde::Serialize;
#[derive(Hash, PartialEq, PartialOrd, Ord, Eq)]
struct Filter {
pub struct Filter {
// Equal/not equal
equal: bool,
field: String,
value: String,
pub equal: bool,
pub field: String,
pub value: String,
// TODO: Probably want to enum this. Source type determines things like filtering
// on encounter/patient fields when using something like a transfer
source_type: String,
pub source_type: String,
}
enum ConstraintType {
pub enum ConstraintType {
Equal,
GreaterThan,
GreaterThanOrEqualTo,
@@ -44,14 +38,14 @@ impl From<&String> for ConstraintType {
}
}
struct Constraint {
source_type: String,
field: String,
constraint_type: ConstraintType,
value: String,
pub struct Constraint {
pub source_type: String,
pub field: String,
pub constraint_type: ConstraintType,
pub value: String,
}
enum Component {
pub enum Component {
Constant(String),
// Even extras are allowed here, just specify the field type (encounter, service, etc) and the field name (incl Extra: or Classification: as appropriate)
// TODO: This first string should also be some kind of source type enum, probably shared with source types on filter/constraint
@@ -59,7 +53,7 @@ enum Component {
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
enum BuildFrom {
pub enum BuildFrom {
Service,
Transfer,
Encounter,
@@ -89,7 +83,7 @@ impl From<&String> for BuildFrom {
// Linked Dataset: One Per Area, One per Built Service code, One per source item
// Coding Diagnosis, Coding Procedure, Encounter, Revenue, Service: One per source item
// Transfer: Change in bed number, change in clinic, change in consultant, change in consultant specialty, change in consultant specialty rollup, change in unit, change in ward, daily, daily or Change in bed number, daily or change in clinic, daily or change in consultant, daily or change in consultant specialty, cdaily or hange in consultant specialty rollup, daily or change in unit, daily or change in ward, daily except on admission day, daily except on admission day (incl same day), daily except on discharge day, daily except on discharge day (incl same day), Only admission location, only discharge location, one per source item
enum Frequency {
pub enum Frequency {
ChangeInBedNumber,
ChangeInClinic,
ChangeInConsultant,
@@ -129,7 +123,7 @@ impl From<&String> for Frequency {
}
}
enum RoundingMode {
pub enum RoundingMode {
ToClosestWhole,
UpToClosestWhole,
DownToClosestWhole,
@@ -168,7 +162,7 @@ impl From<&String> for RoundingMode {
// Revenue: Constant, Extra, SourceQuantity
// Service: Constant, Extra, SourceQuantity
// Transfer: Constant, Days, Extra, Hours
enum Quantity {
pub enum Quantity {
Constant(f64),
// Name of the extra
Extra(String),
@@ -186,77 +180,35 @@ enum Quantity {
}
// TODO: Pretty sure rounding mode can be used with all quantities, but check this
struct BuiltQuantity {
quantity: Quantity,
rounding_mode: RoundingMode,
pub struct BuiltQuantity {
pub quantity: Quantity,
pub rounding_mode: RoundingMode,
}
enum DurationFallback {
pub enum DurationFallback {
BuiltService,
Encounter,
Service,
}
struct Definition {
name: String,
components: Vec<Component>,
filters: Vec<Filter>,
constraints: Vec<Constraint>,
build_from: BuildFrom,
frequency: Frequency,
quantity: BuiltQuantity,
duration_fallback: DurationFallback,
pub struct Definition {
pub name: String,
pub components: Vec<Component>,
pub filters: Vec<Filter>,
pub constraints: Vec<Constraint>,
pub build_from: BuildFrom,
pub frequency: Frequency,
pub quantity: BuiltQuantity,
pub duration_fallback: DurationFallback,
}
#[derive(Debug, Serialize, Default)]
struct Product {
// Parse datetime from string: https://rust-lang-nursery.github.io/rust-cookbook/datetime/parse.html#parse-string-into-datetime-struct
// TODO: Serialisers.
start_date_time: NaiveDateTime,
end_date_time: NaiveDateTime,
encounter_start_date_time: Option<NaiveDateTime>,
encounter: Option<String>,
service: Option<String>,
transfer: Option<String>,
quantity: Option<f64>,
duration: Option<f64>,
actual_charge: Option<f64>,
standard_cost: Option<f64>,
// TODO: Enum this?
day_of_stay: Option<u8>,
source_allocated_amount: Option<f64>,
}
// TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's
// cosing run (BSCO, Dataset_Encounter_Cache, etc).
fn create_products<D, E, S, T, P, Di, O>(
definitions: csv::Reader<D>,
encounters: csv::Reader<E>,
services: csv::Reader<S>,
transfers: csv::Reader<T>,
procedures: csv::Reader<P>,
diagnoses: csv::Reader<Di>,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
output: &'static mut csv::Writer<O>,
// TODO: Default to 10 million or something sane
batch_size: usize,
) -> anyhow::Result<()>
pub fn read_definitions<R>(
definitions: &mut csv::Reader<R>,
) -> anyhow::Result<HashMap<String, Definition>>
where
D: Read,
E: Read,
S: Read,
T: Read,
P: Read,
Di: Read,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
O: Write + Send + 'static,
R: Read,
{
let mut all_definitions: HashMap<String, Definition> = HashMap::new();
// Partition the rules by the build from type, so that we'll run all the rules at once for a particular file, which should be much faster
// then opening files and scanning one at a time. Could also do batches in files
let mut definitions = definitions;
// First read in all the definitions. Read them into a map <build from type> -> definition
for record in definitions.deserialize::<HashMap<String, String>>() {
let record = record?;
// Get the type, then switch based on that, as that's how we determine whether we've got a definition/filter/component/constraint (definition should always come first)
@@ -283,23 +235,21 @@ where
quantity,
rounding_mode,
};
all_definitions
.insert(
record.get("Name").unwrap().clone(),
Definition {
name: record.get("Name").unwrap().to_owned(),
components: vec![],
filters: vec![],
constraints: vec![],
build_from: BuildFrom::from(record.get("BuildFrom").unwrap()),
frequency: Frequency::from(record.get("Frequency").unwrap()),
quantity: built_quantity,
// TODO: Figure this out
// Not even in use, can ignore, or will BuiltService always be the default?
duration_fallback: DurationFallback::BuiltService,
},
)
.unwrap();
all_definitions.insert(
record.get("Name").unwrap().to_owned(),
Definition {
name: record.get("Name").unwrap().to_owned(),
components: vec![],
filters: vec![],
constraints: vec![],
build_from: BuildFrom::from(record.get("BuildFrom").unwrap()),
frequency: Frequency::from(record.get("Frequency").unwrap()),
quantity: built_quantity,
// TODO: Figure this out
// Not even in use, can ignore, or will BuiltService always be the default?
duration_fallback: DurationFallback::BuiltService,
},
);
}
"Filter" => {
let new_filter = Filter {
@@ -347,101 +297,5 @@ where
unknown => println!("Invalid type found: {}", unknown),
}
}
let mut mapped_definitions: HashMap<BuildFrom, Vec<Definition>> = HashMap::new();
for (_, definition) in all_definitions {
mapped_definitions
.entry(definition.build_from)
.or_insert(vec![])
.push(definition);
}
// Then read through each file type line by line if there are definitions for that type, and process all records (read into memory the batch size)
// Probably output to a separate thread (or maybe some kind of limited queue?) to write to disk. Try on same thread to start, then if it's too slow
// or I want to experiment, split to a separate thread. Probably want to use sync_sender to limit number of messages to a max size in case
// writing to disk takes longer than reading (unlikely but possible): https://doc.rust-lang.org/std/sync/mpsc/fn.sync_channel.html
let (tx, rx) = mpsc::sync_channel::<Product>(batch_size);
let write_thread = thread::spawn(move || {
let mut output = output;
while let Ok(product) = rx.recv() {
output.serialize(product).unwrap();
}
});
// Now whenever we want to produce a built service, just write it to tx.
// Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then
// seek to that position in the reader (or position 0) if we couldn't find a particular record.
// Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader,
// so we can quickly seek to the appropriate index and read the record.
// https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek
// Store encounter positions in file, so that later when we read through transfers/whatever we can easily
// seak to the correct position quickly in case we have a cache miss
let mut encounter_positions: HashMap<String, Position> = HashMap::new();
// TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?).
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
// although we're still likely to be bottlenecked by just write-speed
let mut encounters = encounters;
let headers = encounters.headers()?.clone();
for encounter in encounters.records() {
let encounter = encounter?;
let position = encounter.position().unwrap();
let encounter: HashMap<String, String> = encounter.deserialize(Some(&headers))?;
encounter_positions.insert(
encounter.get("EncounterNumber").unwrap().to_string(),
position.clone(),
);
// TODO: For each encounter definition, check this fits the filter criteria/constraints,
// and
let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap();
for definition in definitions {
let matching_filter = (definition.filters.is_empty()
|| definition.filters.iter().any(|filter| {
let field = encounter.get(filter.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
if filter.equal {
return filter.value == *field;
} else {
return filter.value != *field;
}
}))
&& (definition.constraints.is_empty()
|| definition.constraints.iter().any(|constraint| {
let field = encounter.get(constraint.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
// TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data
let field_type = &constraint.source_type;
match constraint.constraint_type {
ConstraintType::Equal => *field == constraint.value,
_ => false,
}
}));
if matching_filter {
// Generate the service code
}
}
// TODO: Generate the built service
tx.send(Product::default()).unwrap();
}
// Now do the same with transfers, services, etc, referencing the encounter reader by using the
// indexes in encounter_positions
// Have to drop the tx, which will cause the write thread to finish up so that it can be joined before
// the function ends
drop(tx);
write_thread.join().unwrap();
Ok(())
Ok(all_definitions)
}

5
src/products/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
mod create_products;
pub use create_products::*;
// Don't re-export anything in csv atm, it's only used for internal processing
mod csv;