From 9a5a89d6831e531f0326de4dd59d487df586cf42 Mon Sep 17 00:00:00 2001 From: Michael Pivato Date: Wed, 11 Oct 2023 21:41:08 +1030 Subject: [PATCH] Add filtering to encounter product building --- src/products/create_products.rs | 141 +++++++++++++++++++++++++++++++- src/products/csv.rs | 31 ++++++- 2 files changed, 169 insertions(+), 3 deletions(-) diff --git a/src/products/create_products.rs b/src/products/create_products.rs index f868aa8..c519a60 100644 --- a/src/products/create_products.rs +++ b/src/products/create_products.rs @@ -5,9 +5,17 @@ use std::{ use chrono::NaiveDateTime; use csv::Position; +// inluding dsl works better for completion with rust analyzer +use polars::lazy::dsl::*; +use polars::prelude::*; use serde::Serialize; -use super::csv::{read_definitions, BuildFrom, ConstraintType, Definition}; +use super::csv::{read_definitions, BuildFrom, Component, ConstraintType, Definition, SourceType}; + +// TODO: Polars suggests this, but docs suggest it doesn't have very good platform support +//use jemallocator::Jemalloc; +// #[global_allocator] +// static GLOBAL: Jemalloc = Jemalloc; #[derive(Debug, Serialize, Default)] struct Product { @@ -43,6 +51,39 @@ where pub diagnoses: csv::Reader, } +pub fn create_products_polars( + definitions_path: String, + patients_path: String, + encounters_path: String, + services_path: String, + transfers_path: String, + procedures_path: String, + diagnoses_path: String, + output_path: String, +) -> anyhow::Result<()> { + let mut all_definitions: HashMap = + read_definitions(&mut csv::Reader::from_path(definitions_path)?)?; + + for (key, definition) in all_definitions { + match definition.build_from { + BuildFrom::Encounter => build_encounters_polars( + definition, + encounters_path.clone(), + patients_path.clone(), + output_path.clone(), + ), + BuildFrom::Service => todo!(), + BuildFrom::Transfer => todo!(), + BuildFrom::CodingProcedure => todo!(), + BuildFrom::CodingDiagnosis => todo!(), + BuildFrom::LinkedDataset => todo!(), + BuildFrom::Revenue => todo!(), + }?; + } + + Ok(()) +} + // TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's // cosing run (BSCO, Dataset_Encounter_Cache, etc). pub fn create_products( @@ -85,6 +126,10 @@ where // Store encounter positions in file, so that later when we read through transfers/whatever we can easily // seak to the correct position quickly in case we have a cache miss let mut encounter_positions: HashMap = HashMap::new(); + // TODO: Consider just using polars for this, can use streaming so don't need to worry about + // running out of memory or being super efficient, can just focus on the code and let the query + // optimiser sort things out (main reason I don't like this though is it may be inderterminate in + // runtime speed, will just need to see how it performs I guess, so maybe just make a seperate implementation) // TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?). @@ -145,3 +190,97 @@ where // indexes in encounter_positions Ok(()) } + +//TODO: This will iterate over the file multiple times, which could technically be +// slower than just going through the file once since reading from disk is slower +// than reading from memory. However, reading from +pub fn build_encounters_polars( + definition: Definition, + encounters_path: String, + patients_path: String, + output_path: String, +) -> anyhow::Result<()> { + // 1. Apply filters/constraints to limit encounters + let filter = definition + .filters + .iter() + .map(|filter| { + // TODO: Filter field depends on type, as extra/classificiation need to append extra/classification + // but how do we check this? + let col = col(&filter.field); + if filter.equal { + col.eq(lit(filter.value.clone())) + } else { + col.neq(lit(filter.value.clone())) + } + }) + .reduce(|prev, next| prev.and(next)) + .unwrap(); + + let constraint = definition + .constraints + .iter() + .map(|constraint| { + let col = col(&constraint.field); + + // TODO: Might need to do a cast here + match constraint.constraint_type { + ConstraintType::Equal => col.eq(lit(constraint.value.clone())), + ConstraintType::GreaterThan => col.gt(lit(constraint.value.clone())), + ConstraintType::GreaterThanOrEqualTo => col.gt_eq(lit(constraint.value.clone())), + ConstraintType::LessThan => col.lt(lit(constraint.value.clone())), + ConstraintType::LessThanOrEqualTo => col.lt_eq(lit(constraint.value.clone())), + ConstraintType::NotEqualTo => col.neq(lit(constraint.value.clone())), + } + }) + .reduce(|prev, next| prev.and(next)) + .unwrap(); + + // TODO: If constraints or components include patient field, then we need to join onto + // the patient file. + let mut reader = LazyCsvReader::new(encounters_path) + .has_header(true) + .finish()?; + // TODO: Refactor me + if definition + .constraints + .iter() + .any(|c| c.source_type == SourceType::Patient) + || definition + .filters + .iter() + .any(|f| f.source_type == SourceType::Patient) + || definition.components.iter().any(|c| match c { + Component::Field(source, _) => source == "P", + _ => false, + }) + { + let patient_reader = LazyCsvReader::new(patients_path) + .has_header(true) + .finish()?; + reader = reader.join( + patient_reader, + [col("Id")], + [col("Patient_Id")], + JoinArgs::new(JoinType::Inner), + ); + } + let filtered = reader + .filter(filter.and(constraint)) + .with_streaming(true) + .collect()?; + + // TODO: Now for each of the filtered records, create a new record that is the built record, based on the components + // quantity, etc. from the definition + + let mut file = std::fs::File::create("output_path").unwrap(); + let mut writer = CsvWriter::new(&mut file); + + // 2. Create the built services and write to disk + // TODO: Kind of sucks we need to colelct into memory, see if there's a better way to do this later (otherwise can't use polars) + // writer + // .finish(&mut reader.with_streaming(true).collect()?) + // .unwrap(); + + Ok(()) +} diff --git a/src/products/csv.rs b/src/products/csv.rs index 07c92a4..bda2efa 100644 --- a/src/products/csv.rs +++ b/src/products/csv.rs @@ -8,7 +8,34 @@ pub struct Filter { pub value: String, // TODO: Probably want to enum this. Source type determines things like filtering // on encounter/patient fields when using something like a transfer - pub source_type: String, + pub source_type: SourceType, +} + +#[derive(Hash, PartialEq, PartialOrd, Ord, Eq)] +pub enum SourceType { + Patient, + Encounter, + Service, + Transfer, + CodingDiagnosis, + CodingProcedure, + Revenue, + Incident, +} + +impl From<&String> for SourceType { + fn from(value: &String) -> Self { + match value.as_str() { + "P" => SourceType::Patient, + "CP" => SourceType::CodingProcedure, + "CD" => SourceType::CodingDiagnosis, + "E" => SourceType::Encounter, + "I" => SourceType::Incident, + "S" => SourceType::Service, + "R" => SourceType::Revenue, + "T" => SourceType::Transfer, + } + } } pub enum ConstraintType { @@ -35,7 +62,7 @@ impl From<&String> for ConstraintType { } pub struct Constraint { - pub source_type: String, + pub source_type: SourceType, pub field: String, pub constraint_type: ConstraintType, pub value: String,