Implement refactoring on definition to use same build per source file type
This commit is contained in:
@@ -1,20 +1,14 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Read, Write},
|
||||
};
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::anyhow;
|
||||
use chrono::NaiveDateTime;
|
||||
use csv::Position;
|
||||
use itertools::Itertools;
|
||||
// inluding dsl works better for completion with rust analyzer
|
||||
use polars::lazy::dsl::*;
|
||||
use polars::prelude::*;
|
||||
use serde::Serialize;
|
||||
|
||||
use super::csv::{
|
||||
read_definitions, BuildFrom, Component, ComponentSourceType, ConstraintType, Definition,
|
||||
SourceType,
|
||||
};
|
||||
use super::csv::{read_definitions, Definition, FileJoin, SourceType};
|
||||
|
||||
// TODO: Polars suggests this, but docs suggest it doesn't have very good platform support
|
||||
//use jemallocator::Jemalloc;
|
||||
@@ -40,276 +34,76 @@ struct Product {
|
||||
source_allocated_amount: Option<f64>,
|
||||
}
|
||||
|
||||
pub struct CreateProductInputs<E, S, T, P, Di>
|
||||
where
|
||||
E: Read,
|
||||
S: Read,
|
||||
T: Read,
|
||||
P: Read,
|
||||
Di: Read,
|
||||
{
|
||||
pub encounters: csv::Reader<E>,
|
||||
pub services: csv::Reader<S>,
|
||||
pub transfers: csv::Reader<T>,
|
||||
pub procedures: csv::Reader<P>,
|
||||
pub diagnoses: csv::Reader<Di>,
|
||||
pub struct InputFile {
|
||||
pub file_path: PathBuf,
|
||||
pub joins: Vec<FileJoin>,
|
||||
}
|
||||
|
||||
pub fn create_products_polars(
|
||||
definitions_path: String,
|
||||
patients_path: String,
|
||||
encounters_path: String,
|
||||
services_path: String,
|
||||
transfers_path: String,
|
||||
procedures_path: String,
|
||||
diagnoses_path: String,
|
||||
output_path: String,
|
||||
definitions_path: PathBuf,
|
||||
inputs: HashMap<SourceType, InputFile>,
|
||||
output_path: PathBuf,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut all_definitions: HashMap<String, Definition> =
|
||||
read_definitions(&mut csv::Reader::from_path(definitions_path)?)?;
|
||||
|
||||
for (key, definition) in all_definitions {
|
||||
match definition.build_from {
|
||||
BuildFrom::Encounter => build_encounters_polars(
|
||||
definition,
|
||||
encounters_path.clone(),
|
||||
patients_path.clone(),
|
||||
output_path.clone(),
|
||||
),
|
||||
BuildFrom::Service => todo!(),
|
||||
BuildFrom::Transfer => todo!(),
|
||||
BuildFrom::CodingProcedure => todo!(),
|
||||
BuildFrom::CodingDiagnosis => todo!(),
|
||||
BuildFrom::LinkedDataset => todo!(),
|
||||
BuildFrom::Revenue => todo!(),
|
||||
}?;
|
||||
let definitions = read_definitions(&mut csv::Reader::from_path(definitions_path)?)?;
|
||||
let definitions = definitions.values().collect_vec();
|
||||
for definition in definitions {
|
||||
build_polars(definition, &inputs, &output_path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: Build from linked dataset is pretty hard, it potentially requires knowing everything abuot the previous year's
|
||||
// cosing run (BSCO, Dataset_Encounter_Cache, etc).
|
||||
pub fn create_products<D, E, S, T, P, Di, O>(
|
||||
definitions: &mut csv::Reader<D>,
|
||||
product_inputs: CreateProductInputs<E, S, T, P, Di>,
|
||||
// TODO: Looks kind of bad, any other way around it? I'd rather not have to depend on crossbeam as well
|
||||
output: &mut csv::Writer<O>,
|
||||
// TODO: Default to 10 million or something sane
|
||||
batch_size: usize,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
D: Read,
|
||||
E: Read,
|
||||
S: Read,
|
||||
T: Read,
|
||||
P: Read,
|
||||
Di: Read,
|
||||
O: Write + Send,
|
||||
{
|
||||
let mut all_definitions: HashMap<String, Definition> = read_definitions(definitions)?;
|
||||
// Partition the rules by the build from type, so that we'll run all the rules at once for a particular file, which should be much faster
|
||||
// then opening files and scanning one at a time. Could also do batches in files
|
||||
|
||||
let mut mapped_definitions: HashMap<BuildFrom, Vec<Definition>> = HashMap::new();
|
||||
for (_, definition) in all_definitions {
|
||||
mapped_definitions
|
||||
.entry(definition.build_from)
|
||||
.or_insert(vec![])
|
||||
.push(definition);
|
||||
}
|
||||
|
||||
// Now whenever we want to produce a built service, just write it to tx.
|
||||
|
||||
// Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then
|
||||
// seek to that position in the reader (or position 0) if we couldn't find a particular record.
|
||||
// Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader,
|
||||
// so we can quickly seek to the appropriate index and read the record.
|
||||
// https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek
|
||||
// Store encounter positions in file, so that later when we read through transfers/whatever we can easily
|
||||
// seak to the correct position quickly in case we have a cache miss
|
||||
let mut encounter_positions: HashMap<String, Position> = HashMap::new();
|
||||
// TODO: Consider just using polars for this, can use streaming so don't need to worry about
|
||||
// running out of memory or being super efficient, can just focus on the code and let the query
|
||||
// optimiser sort things out (main reason I don't like this though is it may be inderterminate in
|
||||
// runtime speed, will just need to see how it performs I guess, so maybe just make a seperate implementation)
|
||||
|
||||
// TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?).
|
||||
|
||||
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
|
||||
// although we're still likely to be bottlenecked by just write-speed
|
||||
let mut encounters = product_inputs.encounters;
|
||||
let headers = encounters.headers()?.clone();
|
||||
|
||||
for encounter in encounters.records() {
|
||||
let encounter = encounter?;
|
||||
let position = encounter.position();
|
||||
if position.is_none() {
|
||||
bail!("Position in encounter file not found")
|
||||
}
|
||||
let position = position.unwrap();
|
||||
let encounter: HashMap<String, String> = encounter.deserialize(Some(&headers))?;
|
||||
encounter_positions.insert(
|
||||
encounter.get("EncounterNumber").unwrap().to_string(),
|
||||
position.clone(),
|
||||
);
|
||||
// TODO: For each encounter definition, check this fits the filter criteria/constraints,
|
||||
// and
|
||||
let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap();
|
||||
for definition in definitions {
|
||||
let matching_filter = (definition.filters.is_empty()
|
||||
|| definition.filters.iter().any(|filter| {
|
||||
let field = encounter.get(filter.field.as_str());
|
||||
if field.is_none() {
|
||||
return false;
|
||||
}
|
||||
let field = field.unwrap();
|
||||
if filter.equal {
|
||||
filter.value == *field
|
||||
} else {
|
||||
filter.value != *field
|
||||
}
|
||||
}))
|
||||
&& (definition.constraints.is_empty()
|
||||
|| definition.constraints.iter().any(|constraint| {
|
||||
let field = encounter.get(constraint.field.as_str());
|
||||
if field.is_none() {
|
||||
return false;
|
||||
}
|
||||
let field = field.unwrap();
|
||||
// TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data
|
||||
let field_type = &constraint.source_type;
|
||||
match constraint.constraint_type {
|
||||
ConstraintType::Equal => *field == constraint.value,
|
||||
_ => false,
|
||||
}
|
||||
}));
|
||||
if matching_filter {
|
||||
// Generate the service code
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Generate the built service
|
||||
output.serialize(Product::default())?;
|
||||
}
|
||||
|
||||
// Now do the same with transfers, services, etc, referencing the encounter reader by using the
|
||||
// indexes in encounter_positions
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//TODO: This will iterate over the file multiple times, which could technically be
|
||||
// slower than just going through the file once since reading from disk is slower
|
||||
// than reading from memory. However, reading from
|
||||
// Also, we can use a custom definition format that is translated from the
|
||||
// ppm format, so things like constraints/filters are one thing, and way more generic
|
||||
// (i.e. filter can be based on a join between files).
|
||||
pub fn build_encounters_polars(
|
||||
definition: Definition,
|
||||
encounters_path: String,
|
||||
patients_path: String,
|
||||
output_path: String,
|
||||
pub fn build_polars(
|
||||
definition: &Definition,
|
||||
inputs: &HashMap<SourceType, InputFile>,
|
||||
output_path: &PathBuf,
|
||||
) -> anyhow::Result<()> {
|
||||
// 1. Apply filters/constraints to limit encounters
|
||||
// 1. Apply filters to limit encounters
|
||||
let filter = definition
|
||||
.filters
|
||||
.iter()
|
||||
.map(|filter| {
|
||||
// TODO: Filter field depends on type, as extra/classificiation need to append extra/classification
|
||||
// but how do we check this?
|
||||
let col = col(&filter.field);
|
||||
match filter.source_type {
|
||||
SourceType::CodingDiagnosis => todo!(),
|
||||
SourceType::CodingProcedure => todo!(),
|
||||
SourceType::Encounter => todo!(),
|
||||
SourceType::Incident => todo!(),
|
||||
SourceType::Patient => todo!(),
|
||||
SourceType::Revenue => todo!(),
|
||||
SourceType::Service => todo!(),
|
||||
SourceType::Transfer => todo!(),
|
||||
}
|
||||
if filter.equal {
|
||||
col.eq(lit(filter.value.clone()))
|
||||
} else {
|
||||
col.neq(lit(filter.value.clone()))
|
||||
match filter.filter_type {
|
||||
super::csv::FilterType::Equal => col.eq(lit(filter.value.clone())),
|
||||
super::csv::FilterType::GreaterThan => col.gt(lit(filter.value.clone())),
|
||||
super::csv::FilterType::GreaterThanOrEqualTo => {
|
||||
col.gt_eq(lit(filter.value.clone()))
|
||||
}
|
||||
super::csv::FilterType::LessThan => col.lt(lit(filter.value.clone())),
|
||||
super::csv::FilterType::LessThanOrEqualTo => col.lt_eq(lit(filter.value.clone())),
|
||||
super::csv::FilterType::NotEqualTo => col.neq(lit(filter.value.clone())),
|
||||
}
|
||||
})
|
||||
.reduce(|prev, next| prev.and(next));
|
||||
|
||||
let constraint = definition
|
||||
.constraints
|
||||
.iter()
|
||||
.map(|constraint| {
|
||||
let col = col(&constraint.field);
|
||||
|
||||
// TODO: Might need to do a cast here
|
||||
match constraint.constraint_type {
|
||||
ConstraintType::Equal => col.eq(lit(constraint.value.clone())),
|
||||
ConstraintType::GreaterThan => col.gt(lit(constraint.value.clone())),
|
||||
ConstraintType::GreaterThanOrEqualTo => col.gt_eq(lit(constraint.value.clone())),
|
||||
ConstraintType::LessThan => col.lt(lit(constraint.value.clone())),
|
||||
ConstraintType::LessThanOrEqualTo => col.lt_eq(lit(constraint.value.clone())),
|
||||
ConstraintType::NotEqualTo => col.neq(lit(constraint.value.clone())),
|
||||
}
|
||||
})
|
||||
.reduce(|prev, next| prev.and(next));
|
||||
|
||||
// TODO: If constraints or components include patient field, then we need to join onto
|
||||
// the patient file.
|
||||
let mut reader = LazyCsvReader::new(encounters_path)
|
||||
let input_file = inputs
|
||||
.get(&definition.source_type)
|
||||
.ok_or(anyhow!("Failed to find valid file"))?;
|
||||
let reader = LazyCsvReader::new(&input_file.file_path)
|
||||
.has_header(true)
|
||||
.finish()?;
|
||||
// TODO: Refactor me
|
||||
// TODO: Could also make this really generic. Instead of patient/whatever type field, we make it some
|
||||
// file and specify the join field in the file. Then could add arbitrary files when building.
|
||||
// This would require reworking the constraint/filter to specify a filename rather than a sourcetype,
|
||||
// and would then have some common filename for ppm's sourcetype or whatever.
|
||||
if definition
|
||||
.constraints
|
||||
.iter()
|
||||
.any(|c| c.source_type == SourceType::Patient)
|
||||
|| definition
|
||||
.filters
|
||||
.iter()
|
||||
.any(|f| f.source_type == SourceType::Patient)
|
||||
|| definition.components.iter().any(|c| match c {
|
||||
Component::Field(source, _) => *source == ComponentSourceType::Patient,
|
||||
_ => false,
|
||||
})
|
||||
{
|
||||
let patient_reader = LazyCsvReader::new(patients_path)
|
||||
.has_header(true)
|
||||
.finish()?;
|
||||
reader = reader.join(
|
||||
patient_reader,
|
||||
[col("Id")],
|
||||
[col("Patient_Id")],
|
||||
JoinArgs::new(JoinType::Inner),
|
||||
);
|
||||
}
|
||||
let filter = match constraint {
|
||||
Some(constraint) => filter.map(|f| f.and(constraint)),
|
||||
None => filter,
|
||||
};
|
||||
// TODO: Do joins based on usage in definitions components and filters. Ideally just join the columns that are actually wanted.
|
||||
// Can do this by first going over each component/filter, and
|
||||
|
||||
let filtered = (match filter {
|
||||
let mut filtered = match filter {
|
||||
Some(filter) => reader.filter(filter),
|
||||
None => reader,
|
||||
})
|
||||
}
|
||||
.with_streaming(true)
|
||||
.collect()?;
|
||||
|
||||
// TODO: Now for each of the filtered records, create a new record that is the built record, based on the components
|
||||
// quantity, etc. from the definition
|
||||
|
||||
let mut file = std::fs::File::create("output_path").unwrap();
|
||||
let mut writer = CsvWriter::new(&mut file);
|
||||
|
||||
// 2. Create the built services and write to disk
|
||||
// TODO: Kind of sucks we need to colelct into memory, see if there's a better way to do this later (otherwise can't use polars)
|
||||
// writer
|
||||
// .finish(&mut reader.with_streaming(true).collect()?)
|
||||
// .unwrap();
|
||||
|
||||
let mut file = std::fs::File::create(output_path).unwrap();
|
||||
// TODO: Don't use filtered, but the results that outputs created product columns
|
||||
CsvWriter::new(&mut file).finish(&mut filtered)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user