diff --git a/src/create_products.rs b/src/create_products.rs index 8a361ab..a88493c 100644 --- a/src/create_products.rs +++ b/src/create_products.rs @@ -1,19 +1,25 @@ +use core::panic; use std::{ collections::HashMap, io::{Read, Write}, - sync::{mpsc, Arc, Mutex}, + sync::mpsc, thread, }; use chrono::NaiveDateTime; -use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator}; +use csv::Position; +use itertools::Itertools; use serde::Serialize; +#[derive(Hash, PartialEq, PartialOrd, Ord, Eq)] struct Filter { // Equal/not equal equal: bool, field: String, value: String, + // TODO: Probably want to enum this. Source type determines things like filtering + // on encounter/patient fields when using something like a transfer + source_type: String, } enum ConstraintType { @@ -25,6 +31,20 @@ enum ConstraintType { NotEqualTo, } +impl From<&String> for ConstraintType { + fn from(string: &String) -> Self { + match string.as_str() { + "=" => ConstraintType::Equal, + ">" => ConstraintType::GreaterThan, + ">=" => ConstraintType::GreaterThanOrEqualTo, + "<" => ConstraintType::LessThan, + "<=" => ConstraintType::LessThanOrEqualTo, + "!=" => ConstraintType::NotEqualTo, + _ => panic!(), + } + } +} + struct Constraint { source_type: String, field: String, @@ -35,6 +55,7 @@ struct Constraint { enum Component { Constant(String), // Even extras are allowed here, just specify the field type (encounter, service, etc) and the field name (incl Extra: or Classification: as appropriate) + // TODO: This first string should also be some kind of source type enum, probably shared with source types on filter/constraint Field(String, String), } @@ -123,6 +144,7 @@ impl From<&String> for RoundingMode { "N" => RoundingMode::None, "D" => RoundingMode::DownToClosestWhole, "T" => RoundingMode::ToClosestWhole, + // TODO: Just use none when unknown? _ => panic!(), } } @@ -152,8 +174,22 @@ enum Quantity { // Name of the extra Extra(String), SourceQuantity, - Hours(RoundingMode), - Days(RoundingMode), + Hours, + Days, + AdmissionWeight, + Age, + ExpectedLengthOfStay, + ICUHours, + LengthOfStay, + MechVentHours, + Revenue, + WeightedSeparation, +} + +// TODO: Pretty sure rounding mode can be used with all quantities, but check this +struct BuiltQuantity { + quantity: Quantity, + rounding_mode: RoundingMode, } enum DurationFallback { @@ -169,7 +205,7 @@ struct Definition { constraints: Vec, build_from: BuildFrom, frequency: Frequency, - quantity: Quantity, + quantity: BuiltQuantity, duration_fallback: DurationFallback, } @@ -228,31 +264,98 @@ where let record_type = record.get("Type").unwrap(); match record_type.as_str() { "Definition" => { - let build_quantity = all_definitions.insert( - record.get("Name").unwrap().to_owned(), - Definition { - name: record.get("Name").unwrap().to_owned(), - components: vec![], - filters: vec![], - constraints: vec![], - build_from: BuildFrom::from(record.get("BuildFrom").unwrap()), - frequency: Frequency::from(record.get("Frequency").unwrap()), - quantity: Quantity::Constant(1.), - duration_fallback: DurationFallback::BuiltService, - }, - ); + let quantity_type = record.get("BuiltQuantity").unwrap(); + let rounding_mode = + RoundingMode::from(record.get("BuiltQuantityRounding").unwrap()); + let quantity = match quantity_type.as_str() { + "S" => Quantity::SourceQuantity, + "C" => Quantity::Constant( + record + .get("BuiltQuantityConstant") + .unwrap() + .parse() + .unwrap(), + ), + "H" => Quantity::Hours, + // Above 3 are all that's needed for now + _ => panic![], + }; + let built_quantity = BuiltQuantity { + quantity, + rounding_mode, + }; + all_definitions + .insert( + record.get("Name").unwrap().clone(), + Definition { + name: record.get("Name").unwrap().to_owned(), + components: vec![], + filters: vec![], + constraints: vec![], + build_from: BuildFrom::from(record.get("BuildFrom").unwrap()), + frequency: Frequency::from(record.get("Frequency").unwrap()), + quantity: built_quantity, + // TODO: Figure this out + // Not even in use, can ignore, or will BuiltService always be the default? + duration_fallback: DurationFallback::BuiltService, + }, + ) + .unwrap(); + } + "Filter" => { + let new_filter = Filter { + equal: record.get("FilterNotIn").unwrap() != "", + field: record.get("FilterField").unwrap().clone(), + value: record.get("FilterValue").unwrap().clone(), + source_type: record.get("FilterSourceType").unwrap().clone(), + }; + let all_filters = &mut all_definitions + .get_mut(record.get("Name").unwrap()) + .unwrap() + .filters; + all_filters.push(new_filter); + } + "Component" => { + let component = match record.get("ComponentSource").unwrap().as_str() { + "C" => { + Component::Constant(record.get("ComponentValueOrField").unwrap().to_owned()) + } + source => Component::Field( + // TODO: Parse into source type enum + source.to_owned(), + record.get("ComponentValueOrField").unwrap().to_owned(), + ), + }; + let all_components = &mut all_definitions + .get_mut(record.get("Name").unwrap()) + .unwrap() + .components; + all_components.push(component); + } + "Constraint" => { + let constraint = Constraint { + source_type: record.get("ConstraintSourceType").unwrap().to_owned(), + field: record.get("ConstraintColumn").unwrap().to_owned(), + constraint_type: ConstraintType::from(record.get("ConstraintType").unwrap()), + value: record.get("ConstraintValue").unwrap().to_owned(), + }; + let all_constraints = &mut all_definitions + .get_mut(record.get("Name").unwrap()) + .unwrap() + .constraints; + all_constraints.push(constraint); } - "Filter" => {} - "Component" => {} - "Constraint" => {} unknown => println!("Invalid type found: {}", unknown), } } - let mut mapped_definitions: HashMap = all_definitions - .into_values() - .map(|value| (value.build_from, value)) - .collect(); + let mut mapped_definitions: HashMap> = HashMap::new(); + for (_, definition) in all_definitions { + mapped_definitions + .entry(definition.build_from) + .or_insert(vec![]) + .push(definition); + } // Then read through each file type line by line if there are definitions for that type, and process all records (read into memory the batch size) // Probably output to a separate thread (or maybe some kind of limited queue?) to write to disk. Try on same thread to start, then if it's too slow @@ -269,23 +372,76 @@ where // Now whenever we want to produce a built service, just write it to tx. + // Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then + // seek to that position in the reader (or position 0) if we couldn't find a particular record. + // Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader, + // so we can quickly seek to the appropriate index and read the record. + // https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek + // Store encounter positions in file, so that later when we read through transfers/whatever we can easily + // seak to the correct position quickly in case we have a cache miss + let mut encounter_positions: HashMap = HashMap::new(); + + // TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?). + // TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially, // although we're still likely to be bottlenecked by just write-speed let mut encounters = encounters; - encounters - .deserialize::>() - .map(|encounter| encounter.unwrap()) - //TODO: Rayon can't be used with csv, consider just batching reads perhaps? - // .par_bridge() - .for_each(|encounter| { - // TODO: Calculate quantitty for this encounter - tx.send(Product::default()).unwrap(); - }); - let encounters: Vec = vec![]; - encounters.par_iter().for_each(|encounter| { - println!("{:?}", encounter); + let headers = encounters.headers()?.clone(); + + for encounter in encounters.records() { + let encounter = encounter?; + let position = encounter.position().unwrap(); + let encounter: HashMap = encounter.deserialize(Some(&headers))?; + encounter_positions.insert( + encounter.get("EncounterNumber").unwrap().to_string(), + position.clone(), + ); + // TODO: For each encounter definition, check this fits the filter criteria/constraints, + // and + let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap(); + for definition in definitions { + let matching_filter = (definition.filters.is_empty() + || definition.filters.iter().any(|filter| { + let field = encounter.get(filter.field.as_str()); + if field.is_none() { + return false; + } + let field = field.unwrap(); + if filter.equal { + return filter.value == *field; + } else { + return filter.value != *field; + } + })) + && (definition.constraints.is_empty() + || definition.constraints.iter().any(|constraint| { + let field = encounter.get(constraint.field.as_str()); + if field.is_none() { + return false; + } + let field = field.unwrap(); + // TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data + let field_type = &constraint.source_type; + match constraint.constraint_type { + ConstraintType::Equal => *field == constraint.value, + _ => false, + } + })); + if matching_filter { + // Generate the service code + } + } + + // TODO: Generate the built service tx.send(Product::default()).unwrap(); - }); + } + + // Now do the same with transfers, services, etc, referencing the encounter reader by using the + // indexes in encounter_positions + + // Have to drop the tx, which will cause the write thread to finish up so that it can be joined before + // the function ends + drop(tx); write_thread.join().unwrap(); Ok(())