diff --git a/src/lib.rs b/src/lib.rs index 4695db3..244c4dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +// TODO: Module api can probably use a cleanup mod move_money; pub use self::move_money::*; use std::ffi::c_char; @@ -9,7 +10,7 @@ pub use self::overhead_allocation::*; mod products; pub use self::products::create_products; -pub use self::products::CreateProductInputs; +pub use self::products::csv::SourceType; mod shared_models; pub use self::shared_models::*; diff --git a/src/main.rs b/src/main.rs index cd1664e..7711973 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ -use std::{fs::File, io::BufWriter, path::PathBuf}; +use std::{collections::HashMap, fs::File, io::BufWriter, path::PathBuf}; use clap::{Parser, Subcommand}; -use coster_rs::CreateProductInputs; +use coster_rs::{create_products::InputFile, SourceType}; #[derive(Parser)] #[command(name = "coster-rs")] @@ -95,6 +95,12 @@ enum Commands { #[arg(short, long, value_name = "FILE")] diagnoses: PathBuf, + #[arg(short, long, value_name = "FILE")] + patients: PathBuf, + + #[arg(short, long, value_name = "FILE")] + revenues: PathBuf, + #[arg(short, long, value_name = "FILE")] output: PathBuf, }, @@ -175,18 +181,61 @@ fn main() -> anyhow::Result<()> { transfers, procedures, diagnoses, + patients, + revenues, output, - } => coster_rs::create_products( - &mut csv::Reader::from_path(definitions)?, - CreateProductInputs { - encounters: csv::Reader::from_path(encounters)?, - services: csv::Reader::from_path(services)?, - transfers: csv::Reader::from_path(transfers)?, - procedures: csv::Reader::from_path(procedures)?, - diagnoses: csv::Reader::from_path(diagnoses)?, - }, - &mut csv::Writer::from_path(output)?, - 1000000, - ), + } => { + let mut inputs = HashMap::new(); + inputs.insert( + SourceType::Encounter, + InputFile { + file_path: encounters, + joins: vec![], + }, + ); + inputs.insert( + SourceType::Service, + InputFile { + file_path: services, + joins: vec![], + }, + ); + inputs.insert( + SourceType::Transfer, + InputFile { + file_path: transfers, + joins: vec![], + }, + ); + inputs.insert( + SourceType::CodingProcedure, + InputFile { + file_path: procedures, + joins: vec![], + }, + ); + inputs.insert( + SourceType::CodingDiagnosis, + InputFile { + file_path: diagnoses, + joins: vec![], + }, + ); + inputs.insert( + SourceType::Patient, + InputFile { + file_path: patients, + joins: vec![], + }, + ); + inputs.insert( + SourceType::Revenue, + InputFile { + file_path: revenues, + joins: vec![], + }, + ); + coster_rs::create_products::create_products_polars(definitions, inputs, output) + } } } diff --git a/src/products/create_products.rs b/src/products/create_products.rs index 40c5036..95d3f36 100644 --- a/src/products/create_products.rs +++ b/src/products/create_products.rs @@ -1,20 +1,14 @@ -use std::{ - collections::HashMap, - io::{Read, Write}, -}; +use std::{collections::HashMap, path::PathBuf}; -use anyhow::bail; +use anyhow::anyhow; use chrono::NaiveDateTime; -use csv::Position; +use itertools::Itertools; // inluding dsl works better for completion with rust analyzer use polars::lazy::dsl::*; use polars::prelude::*; use serde::Serialize; -use super::csv::{ - read_definitions, BuildFrom, Component, ComponentSourceType, ConstraintType, Definition, - SourceType, -}; +use super::csv::{read_definitions, Definition, FileJoin, SourceType}; // TODO: Polars suggests this, but docs suggest it doesn't have very good platform support //use jemallocator::Jemalloc; @@ -40,276 +34,76 @@ struct Product { source_allocated_amount: Option, } -pub struct CreateProductInputs -where - E: Read, - S: Read, - T: Read, - P: Read, - Di: Read, -{ - pub encounters: csv::Reader, - pub services: csv::Reader, - pub transfers: csv::Reader, - pub procedures: csv::Reader

, - pub diagnoses: csv::Reader, +pub struct InputFile { + pub file_path: PathBuf, + pub joins: Vec, } pub fn create_products_polars( - definitions_path: String, - patients_path: String, - encounters_path: String, - services_path: String, - transfers_path: String, - procedures_path: String, - diagnoses_path: String, - output_path: String, + definitions_path: PathBuf, + inputs: HashMap, + output_path: PathBuf, ) -> anyhow::Result<()> { - let mut all_definitions: HashMap = - read_definitions(&mut csv::Reader::from_path(definitions_path)?)?; - - for (key, definition) in all_definitions { - match definition.build_from { - BuildFrom::Encounter => build_encounters_polars( - definition, - encounters_path.clone(), - patients_path.clone(), - output_path.clone(), - ), - BuildFrom::Service => todo!(), - BuildFrom::Transfer => todo!(), - BuildFrom::CodingProcedure => todo!(), - BuildFrom::CodingDiagnosis => todo!(), - BuildFrom::LinkedDataset => todo!(), - BuildFrom::Revenue => todo!(), - }?; + let definitions = read_definitions(&mut csv::Reader::from_path(definitions_path)?)?; + let definitions = definitions.values().collect_vec(); + for definition in definitions { + build_polars(definition, &inputs, &output_path)?; } Ok(()) } -// TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's -// cosing run (BSCO, Dataset_Encounter_Cache, etc). -pub fn create_products( - definitions: &mut csv::Reader, - product_inputs: CreateProductInputs, - // TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well - output: &mut csv::Writer, - // TODO: Default to 10 million or something sane - batch_size: usize, -) -> anyhow::Result<()> -where - D: Read, - E: Read, - S: Read, - T: Read, - P: Read, - Di: Read, - O: Write + Send, -{ - let mut all_definitions: HashMap = read_definitions(definitions)?; - // Partition the rules by the build from type, so that we'll run all the rules at once for a particular file, which should be much faster - // then opening files and scanning one at a time. Could also do batches in files - - let mut mapped_definitions: HashMap> = HashMap::new(); - for (_, definition) in all_definitions { - mapped_definitions - .entry(definition.build_from) - .or_insert(vec![]) - .push(definition); - } - - // Now whenever we want to produce a built service, just write it to tx. - - // Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then - // seek to that position in the reader (or position 0) if we couldn't find a particular record. - // Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader, - // so we can quickly seek to the appropriate index and read the record. - // https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek - // Store encounter positions in file, so that later when we read through transfers/whatever we can easily - // seak to the correct position quickly in case we have a cache miss - let mut encounter_positions: HashMap = HashMap::new(); - // TODO: Consider just using polars for this, can use streaming so don't need to worry about - // running out of memory or being super efficient, can just focus on the code and let the query - // optimiser sort things out (main reason I don't like this though is it may be inderterminate in - // runtime speed, will just need to see how it performs I guess, so maybe just make a seperate implementation) - - // TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?). - - // TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially, - // although we're still likely to be bottlenecked by just write-speed - let mut encounters = product_inputs.encounters; - let headers = encounters.headers()?.clone(); - - for encounter in encounters.records() { - let encounter = encounter?; - let position = encounter.position(); - if position.is_none() { - bail!("Position in encounter file not found") - } - let position = position.unwrap(); - let encounter: HashMap = encounter.deserialize(Some(&headers))?; - encounter_positions.insert( - encounter.get("EncounterNumber").unwrap().to_string(), - position.clone(), - ); - // TODO: For each encounter definition, check this fits the filter criteria/constraints, - // and - let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap(); - for definition in definitions { - let matching_filter = (definition.filters.is_empty() - || definition.filters.iter().any(|filter| { - let field = encounter.get(filter.field.as_str()); - if field.is_none() { - return false; - } - let field = field.unwrap(); - if filter.equal { - filter.value == *field - } else { - filter.value != *field - } - })) - && (definition.constraints.is_empty() - || definition.constraints.iter().any(|constraint| { - let field = encounter.get(constraint.field.as_str()); - if field.is_none() { - return false; - } - let field = field.unwrap(); - // TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data - let field_type = &constraint.source_type; - match constraint.constraint_type { - ConstraintType::Equal => *field == constraint.value, - _ => false, - } - })); - if matching_filter { - // Generate the service code - } - } - - // TODO: Generate the built service - output.serialize(Product::default())?; - } - - // Now do the same with transfers, services, etc, referencing the encounter reader by using the - // indexes in encounter_positions - Ok(()) -} - //TODO: This will iterate over the file multiple times, which could technically be // slower than just going through the file once since reading from disk is slower // than reading from memory. However, reading from // Also, we can use a custom definition format that is translated from the // ppm format, so things like constraints/filters are one thing, and way more generic // (i.e. filter can be based on a join between files). -pub fn build_encounters_polars( - definition: Definition, - encounters_path: String, - patients_path: String, - output_path: String, +pub fn build_polars( + definition: &Definition, + inputs: &HashMap, + output_path: &PathBuf, ) -> anyhow::Result<()> { - // 1. Apply filters/constraints to limit encounters + // 1. Apply filters to limit encounters let filter = definition .filters .iter() .map(|filter| { - // TODO: Filter field depends on type, as extra/classificiation need to append extra/classification - // but how do we check this? let col = col(&filter.field); - match filter.source_type { - SourceType::CodingDiagnosis => todo!(), - SourceType::CodingProcedure => todo!(), - SourceType::Encounter => todo!(), - SourceType::Incident => todo!(), - SourceType::Patient => todo!(), - SourceType::Revenue => todo!(), - SourceType::Service => todo!(), - SourceType::Transfer => todo!(), - } - if filter.equal { - col.eq(lit(filter.value.clone())) - } else { - col.neq(lit(filter.value.clone())) + match filter.filter_type { + super::csv::FilterType::Equal => col.eq(lit(filter.value.clone())), + super::csv::FilterType::GreaterThan => col.gt(lit(filter.value.clone())), + super::csv::FilterType::GreaterThanOrEqualTo => { + col.gt_eq(lit(filter.value.clone())) + } + super::csv::FilterType::LessThan => col.lt(lit(filter.value.clone())), + super::csv::FilterType::LessThanOrEqualTo => col.lt_eq(lit(filter.value.clone())), + super::csv::FilterType::NotEqualTo => col.neq(lit(filter.value.clone())), } }) .reduce(|prev, next| prev.and(next)); - let constraint = definition - .constraints - .iter() - .map(|constraint| { - let col = col(&constraint.field); - - // TODO: Might need to do a cast here - match constraint.constraint_type { - ConstraintType::Equal => col.eq(lit(constraint.value.clone())), - ConstraintType::GreaterThan => col.gt(lit(constraint.value.clone())), - ConstraintType::GreaterThanOrEqualTo => col.gt_eq(lit(constraint.value.clone())), - ConstraintType::LessThan => col.lt(lit(constraint.value.clone())), - ConstraintType::LessThanOrEqualTo => col.lt_eq(lit(constraint.value.clone())), - ConstraintType::NotEqualTo => col.neq(lit(constraint.value.clone())), - } - }) - .reduce(|prev, next| prev.and(next)); - - // TODO: If constraints or components include patient field, then we need to join onto - // the patient file. - let mut reader = LazyCsvReader::new(encounters_path) + let input_file = inputs + .get(&definition.source_type) + .ok_or(anyhow!("Failed to find valid file"))?; + let reader = LazyCsvReader::new(&input_file.file_path) .has_header(true) .finish()?; - // TODO: Refactor me - // TODO: Could also make this really generic. Instead of patient/whatever type field, we make it some - // file and specify the join field in the file. Then could add arbitrary files when building. - // This would require reworking the constraint/filter to specify a filename rather than a sourcetype, - // and would then have some common filename for ppm's sourcetype or whatever. - if definition - .constraints - .iter() - .any(|c| c.source_type == SourceType::Patient) - || definition - .filters - .iter() - .any(|f| f.source_type == SourceType::Patient) - || definition.components.iter().any(|c| match c { - Component::Field(source, _) => *source == ComponentSourceType::Patient, - _ => false, - }) - { - let patient_reader = LazyCsvReader::new(patients_path) - .has_header(true) - .finish()?; - reader = reader.join( - patient_reader, - [col("Id")], - [col("Patient_Id")], - JoinArgs::new(JoinType::Inner), - ); - } - let filter = match constraint { - Some(constraint) => filter.map(|f| f.and(constraint)), - None => filter, - }; + // TODO: Do joins based on usage in definitions components and filters. Ideally just join the columns that are actually wanted. + // Can do this by first going over each component/filter, and - let filtered = (match filter { + let mut filtered = match filter { Some(filter) => reader.filter(filter), None => reader, - }) + } .with_streaming(true) .collect()?; // TODO: Now for each of the filtered records, create a new record that is the built record, based on the components // quantity, etc. from the definition - let mut file = std::fs::File::create("output_path").unwrap(); - let mut writer = CsvWriter::new(&mut file); - - // 2. Create the built services and write to disk - // TODO: Kind of sucks we need to colelct into memory, see if there's a better way to do this later (otherwise can't use polars) - // writer - // .finish(&mut reader.with_streaming(true).collect()?) - // .unwrap(); - + let mut file = std::fs::File::create(output_path).unwrap(); + // TODO: Don't use filtered, but the results that outputs created product columns + CsvWriter::new(&mut file).finish(&mut filtered)?; Ok(()) } diff --git a/src/products/csv.rs b/src/products/csv.rs index a1e2dec..6bccfad 100644 --- a/src/products/csv.rs +++ b/src/products/csv.rs @@ -5,16 +5,13 @@ use chrono::NaiveDateTime; #[derive(Hash, PartialEq, PartialOrd)] pub struct Filter { - // Equal/not equal - pub equal: bool, + pub filter_type: FilterType, + pub file: String, pub field: String, pub value: String, - // TODO: Probably want to enum this. Source type determines things like filtering - // on encounter/patient fields when using something like a transfer - pub source_type: SourceType, } -#[derive(Hash, PartialEq, PartialOrd)] +#[derive(Hash, PartialEq, PartialOrd, Eq, Ord)] pub enum SourceType { CodingDiagnosis, CodingProcedure, @@ -44,7 +41,8 @@ impl TryFrom<&String> for SourceType { } } -pub enum ConstraintType { +#[derive(Hash, PartialEq, PartialOrd)] +pub enum FilterType { Equal, GreaterThan, GreaterThanOrEqualTo, @@ -53,29 +51,22 @@ pub enum ConstraintType { NotEqualTo, } -impl TryFrom<&String> for ConstraintType { +impl TryFrom<&String> for FilterType { type Error = anyhow::Error; fn try_from(value: &String) -> Result { match value.as_str() { - "=" => Ok(ConstraintType::Equal), - ">" => Ok(ConstraintType::GreaterThan), - ">=" => Ok(ConstraintType::GreaterThanOrEqualTo), - "<" => Ok(ConstraintType::LessThan), - "<=" => Ok(ConstraintType::LessThanOrEqualTo), - "!=" => Ok(ConstraintType::NotEqualTo), - _ => bail!("Invalid ConstraintType found: {}", value), + "=" => Ok(FilterType::Equal), + ">" => Ok(FilterType::GreaterThan), + ">=" => Ok(FilterType::GreaterThanOrEqualTo), + "<" => Ok(FilterType::LessThan), + "<=" => Ok(FilterType::LessThanOrEqualTo), + "!=" => Ok(FilterType::NotEqualTo), + _ => bail!("Invalid FilterType found: {}", value), } } } -pub struct Constraint { - pub source_type: SourceType, - pub field: String, - pub constraint_type: ConstraintType, - pub value: String, -} - #[derive(PartialEq)] pub enum ExtraType { CodingDiagnosis, @@ -127,39 +118,8 @@ impl TryFrom<&str> for ComponentSourceType { pub enum Component { Constant(String), - MultiConstant(String), - // Even extras are allowed here, just specify the field type (encounter, service, etc) and the field name (incl Extra: or Classification: as appropriate) - // TODO: This first string should also be some kind of source type enum, probably shared with source types on filter/constraint - Field(ComponentSourceType, String), -} - -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)] -pub enum BuildFrom { - Service, - Transfer, - Encounter, - CodingProcedure, - CodingDiagnosis, - // TODO: This is hard/expensive, ignore for now as we don't have test data - LinkedDataset, - Revenue, -} - -impl TryFrom<&String> for BuildFrom { - type Error = anyhow::Error; - - fn try_from(string: &String) -> Result { - match string.as_str() { - "S" => Ok(BuildFrom::Service), - "E" => Ok(BuildFrom::Encounter), - "CP" => Ok(BuildFrom::CodingProcedure), - "CD" => Ok(BuildFrom::CodingDiagnosis), - "T" => Ok(BuildFrom::Transfer), - "BS" => Ok(BuildFrom::LinkedDataset), - "R" => Ok(BuildFrom::Revenue), - _ => bail!("Invalid BuildFrom value: {}", string), - } - } + // File, column_name + Field(String, String), } // Frequency per type: @@ -278,15 +238,22 @@ pub enum DurationFallback { Service, } +pub struct FileJoin { + join_column: String, + file: String, + file_join_column: String, +} + pub struct Definition { pub name: String, pub components: Vec, pub filters: Vec, - pub constraints: Vec, - pub build_from: BuildFrom, + pub source_type: SourceType, pub frequency: Frequency, pub quantity: BuiltQuantity, pub duration_fallback: DurationFallback, + // TODO: Need a way to define joins between different files. Or put that at some higher level might be better + // At the very least we still need a source/file type, and there should be one file supplied for each type } pub fn read_definitions( @@ -324,7 +291,7 @@ where quantity, rounding_mode, }; - let build_from = BuildFrom::try_from(record.get("BuildFrom").unwrap())?; + let build_from = SourceType::try_from(record.get("BuildFrom").unwrap())?; let frequency = Frequency::try_from(record.get("Frequency").unwrap())?; all_definitions.insert( record.get("Name").unwrap().to_owned(), @@ -332,8 +299,7 @@ where name: record.get("Name").unwrap().to_owned(), components: vec![], filters: vec![], - constraints: vec![], - build_from, + source_type: build_from, frequency, quantity: built_quantity, // TODO: Figure this out @@ -348,10 +314,20 @@ where SourceType::try_from(record.get("FilterSourceType").unwrap())?; Filter { // TODO: This all looks wrong - equal: record.get("FilterNotIn").unwrap() != "", + filter_type: if record.get("FilterNotIn").unwrap() != "" { + FilterType::Equal + } else { + FilterType::NotEqualTo + }, + // TODO: extra/classification types need to append Extra:/Classification: to the start of the field field: record.get("FilterField").unwrap().clone(), value: record.get("FilterValue").unwrap().clone(), - source_type, + // TODO: Work out a way to handle this + file: match source_type { + SourceType::CodingDiagnosis => "", + _ => "", + } + .to_owned(), } }; let all_filters = &mut all_definitions @@ -365,13 +341,15 @@ where "C" => { Component::Constant(record.get("ComponentValueOrField").unwrap().to_owned()) } - "MC" => Component::MultiConstant( - record.get("ComponentValueOrField").unwrap().to_owned(), - ), + "MC" => { + Component::Constant(record.get("ComponentValueOrField").unwrap().to_owned()) + } source => { let component_source_type = ComponentSourceType::try_from(source)?; Component::Field( - component_source_type, + // TODO: Figure this out, should be determined from the source type + "".to_owned(), + // TODO: Field probably needs to be enumed to match onto the correct column in input files record.get("ComponentValueOrField").unwrap().to_owned(), ) } @@ -384,22 +362,22 @@ where } "Constraint" => { let constraint = { - let constraint_type = - ConstraintType::try_from(record.get("ConstraintType").unwrap())?; + let filter_type = FilterType::try_from(record.get("FilterType").unwrap())?; let source_type = SourceType::try_from(record.get("ConstraintSourceType").unwrap())?; - Constraint { - source_type, + Filter { field: record.get("ConstraintColumn").unwrap().to_owned(), - constraint_type, + filter_type, value: record.get("ConstraintValue").unwrap().to_owned(), + // TODO: Figure this out, should be determined from the source type + file: "".to_owned(), } }; - let all_constraints = &mut all_definitions + let all_filters = &mut all_definitions .get_mut(record.get("Name").unwrap()) .unwrap() - .constraints; - all_constraints.push(constraint); + .filters; + all_filters.push(constraint); } unknown => println!("Invalid type found: {}", unknown), } diff --git a/src/products/mod.rs b/src/products/mod.rs index 8d738ef..185d247 100644 --- a/src/products/mod.rs +++ b/src/products/mod.rs @@ -1,5 +1,4 @@ -mod create_products; -pub use create_products::*; +pub mod create_products; // Don't re-export anything in csv atm, it's only used for internal processing -mod csv; +pub mod csv;