Files
ingey/src/products/create_products.rs
2023-10-11 21:41:08 +10:30

287 lines
11 KiB
Rust

use std::{
collections::HashMap,
io::{Read, Write},
};
use chrono::NaiveDateTime;
use csv::Position;
// inluding dsl works better for completion with rust analyzer
use polars::lazy::dsl::*;
use polars::prelude::*;
use serde::Serialize;
use super::csv::{read_definitions, BuildFrom, Component, ConstraintType, Definition, SourceType};
// TODO: Polars suggests this, but docs suggest it doesn't have very good platform support
//use jemallocator::Jemalloc;
// #[global_allocator]
// static GLOBAL: Jemalloc = Jemalloc;
#[derive(Debug, Serialize, Default)]
struct Product {
// Parse datetime from string: https://rust-lang-nursery.github.io/rust-cookbook/datetime/parse.html#parse-string-into-datetime-struct
// TODO: Serialisers.
start_date_time: NaiveDateTime,
end_date_time: NaiveDateTime,
encounter_start_date_time: Option<NaiveDateTime>,
encounter: Option<String>,
service: Option<String>,
transfer: Option<String>,
quantity: Option<f64>,
duration: Option<f64>,
actual_charge: Option<f64>,
standard_cost: Option<f64>,
// TODO: Enum this?
day_of_stay: Option<u8>,
source_allocated_amount: Option<f64>,
}
pub struct CreateProductInputs<E, S, T, P, Di>
where
E: Read,
S: Read,
T: Read,
P: Read,
Di: Read,
{
pub encounters: csv::Reader<E>,
pub services: csv::Reader<S>,
pub transfers: csv::Reader<T>,
pub procedures: csv::Reader<P>,
pub diagnoses: csv::Reader<Di>,
}
pub fn create_products_polars(
definitions_path: String,
patients_path: String,
encounters_path: String,
services_path: String,
transfers_path: String,
procedures_path: String,
diagnoses_path: String,
output_path: String,
) -> anyhow::Result<()> {
let mut all_definitions: HashMap<String, Definition> =
read_definitions(&mut csv::Reader::from_path(definitions_path)?)?;
for (key, definition) in all_definitions {
match definition.build_from {
BuildFrom::Encounter => build_encounters_polars(
definition,
encounters_path.clone(),
patients_path.clone(),
output_path.clone(),
),
BuildFrom::Service => todo!(),
BuildFrom::Transfer => todo!(),
BuildFrom::CodingProcedure => todo!(),
BuildFrom::CodingDiagnosis => todo!(),
BuildFrom::LinkedDataset => todo!(),
BuildFrom::Revenue => todo!(),
}?;
}
Ok(())
}
// TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's
// cosing run (BSCO, Dataset_Encounter_Cache, etc).
pub fn create_products<D, E, S, T, P, Di, O>(
definitions: &mut csv::Reader<D>,
product_inputs: CreateProductInputs<E, S, T, P, Di>,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
output: &mut csv::Writer<O>,
// TODO: Default to 10 million or something sane
batch_size: usize,
) -> anyhow::Result<()>
where
D: Read,
E: Read,
S: Read,
T: Read,
P: Read,
Di: Read,
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
O: Write + Send + 'static,
{
let mut all_definitions: HashMap<String, Definition> = read_definitions(definitions)?;
// Partition the rules by the build from type, so that we'll run all the rules at once for a particular file, which should be much faster
// then opening files and scanning one at a time. Could also do batches in files
let mut mapped_definitions: HashMap<BuildFrom, Vec<Definition>> = HashMap::new();
for (_, definition) in all_definitions {
mapped_definitions
.entry(definition.build_from)
.or_insert(vec![])
.push(definition);
}
// Now whenever we want to produce a built service, just write it to tx.
// Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then
// seek to that position in the reader (or position 0) if we couldn't find a particular record.
// Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader,
// so we can quickly seek to the appropriate index and read the record.
// https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek
// Store encounter positions in file, so that later when we read through transfers/whatever we can easily
// seak to the correct position quickly in case we have a cache miss
let mut encounter_positions: HashMap<String, Position> = HashMap::new();
// TODO: Consider just using polars for this, can use streaming so don't need to worry about
// running out of memory or being super efficient, can just focus on the code and let the query
// optimiser sort things out (main reason I don't like this though is it may be inderterminate in
// runtime speed, will just need to see how it performs I guess, so maybe just make a seperate implementation)
// TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?).
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
// although we're still likely to be bottlenecked by just write-speed
let mut encounters = product_inputs.encounters;
let headers = encounters.headers()?.clone();
for encounter in encounters.records() {
let encounter = encounter?;
let position = encounter.position().unwrap();
let encounter: HashMap<String, String> = encounter.deserialize(Some(&headers))?;
encounter_positions.insert(
encounter.get("EncounterNumber").unwrap().to_string(),
position.clone(),
);
// TODO: For each encounter definition, check this fits the filter criteria/constraints,
// and
let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap();
for definition in definitions {
let matching_filter = (definition.filters.is_empty()
|| definition.filters.iter().any(|filter| {
let field = encounter.get(filter.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
if filter.equal {
filter.value == *field
} else {
filter.value != *field
}
}))
&& (definition.constraints.is_empty()
|| definition.constraints.iter().any(|constraint| {
let field = encounter.get(constraint.field.as_str());
if field.is_none() {
return false;
}
let field = field.unwrap();
// TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data
let field_type = &constraint.source_type;
match constraint.constraint_type {
ConstraintType::Equal => *field == constraint.value,
_ => false,
}
}));
if matching_filter {
// Generate the service code
}
}
// TODO: Generate the built service
output.serialize(Product::default())?;
}
// Now do the same with transfers, services, etc, referencing the encounter reader by using the
// indexes in encounter_positions
Ok(())
}
//TODO: This will iterate over the file multiple times, which could technically be
// slower than just going through the file once since reading from disk is slower
// than reading from memory. However, reading from
pub fn build_encounters_polars(
definition: Definition,
encounters_path: String,
patients_path: String,
output_path: String,
) -> anyhow::Result<()> {
// 1. Apply filters/constraints to limit encounters
let filter = definition
.filters
.iter()
.map(|filter| {
// TODO: Filter field depends on type, as extra/classificiation need to append extra/classification
// but how do we check this?
let col = col(&filter.field);
if filter.equal {
col.eq(lit(filter.value.clone()))
} else {
col.neq(lit(filter.value.clone()))
}
})
.reduce(|prev, next| prev.and(next))
.unwrap();
let constraint = definition
.constraints
.iter()
.map(|constraint| {
let col = col(&constraint.field);
// TODO: Might need to do a cast here
match constraint.constraint_type {
ConstraintType::Equal => col.eq(lit(constraint.value.clone())),
ConstraintType::GreaterThan => col.gt(lit(constraint.value.clone())),
ConstraintType::GreaterThanOrEqualTo => col.gt_eq(lit(constraint.value.clone())),
ConstraintType::LessThan => col.lt(lit(constraint.value.clone())),
ConstraintType::LessThanOrEqualTo => col.lt_eq(lit(constraint.value.clone())),
ConstraintType::NotEqualTo => col.neq(lit(constraint.value.clone())),
}
})
.reduce(|prev, next| prev.and(next))
.unwrap();
// TODO: If constraints or components include patient field, then we need to join onto
// the patient file.
let mut reader = LazyCsvReader::new(encounters_path)
.has_header(true)
.finish()?;
// TODO: Refactor me
if definition
.constraints
.iter()
.any(|c| c.source_type == SourceType::Patient)
|| definition
.filters
.iter()
.any(|f| f.source_type == SourceType::Patient)
|| definition.components.iter().any(|c| match c {
Component::Field(source, _) => source == "P",
_ => false,
})
{
let patient_reader = LazyCsvReader::new(patients_path)
.has_header(true)
.finish()?;
reader = reader.join(
patient_reader,
[col("Id")],
[col("Patient_Id")],
JoinArgs::new(JoinType::Inner),
);
}
let filtered = reader
.filter(filter.and(constraint))
.with_streaming(true)
.collect()?;
// TODO: Now for each of the filtered records, create a new record that is the built record, based on the components
// quantity, etc. from the definition
let mut file = std::fs::File::create("output_path").unwrap();
let mut writer = CsvWriter::new(&mut file);
// 2. Create the built services and write to disk
// TODO: Kind of sucks we need to colelct into memory, see if there's a better way to do this later (otherwise can't use polars)
// writer
// .finish(&mut reader.with_streaming(true).collect()?)
// .unwrap();
Ok(())
}