More product creator implementation
This commit is contained in:
@@ -1,19 +1,25 @@
|
|||||||
|
use core::panic;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
sync::{mpsc, Arc, Mutex},
|
sync::mpsc,
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use chrono::NaiveDateTime;
|
use chrono::NaiveDateTime;
|
||||||
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};
|
use csv::Position;
|
||||||
|
use itertools::Itertools;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Hash, PartialEq, PartialOrd, Ord, Eq)]
|
||||||
struct Filter {
|
struct Filter {
|
||||||
// Equal/not equal
|
// Equal/not equal
|
||||||
equal: bool,
|
equal: bool,
|
||||||
field: String,
|
field: String,
|
||||||
value: String,
|
value: String,
|
||||||
|
// TODO: Probably want to enum this. Source type determines things like filtering
|
||||||
|
// on encounter/patient fields when using something like a transfer
|
||||||
|
source_type: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ConstraintType {
|
enum ConstraintType {
|
||||||
@@ -25,6 +31,20 @@ enum ConstraintType {
|
|||||||
NotEqualTo,
|
NotEqualTo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<&String> for ConstraintType {
|
||||||
|
fn from(string: &String) -> Self {
|
||||||
|
match string.as_str() {
|
||||||
|
"=" => ConstraintType::Equal,
|
||||||
|
">" => ConstraintType::GreaterThan,
|
||||||
|
">=" => ConstraintType::GreaterThanOrEqualTo,
|
||||||
|
"<" => ConstraintType::LessThan,
|
||||||
|
"<=" => ConstraintType::LessThanOrEqualTo,
|
||||||
|
"!=" => ConstraintType::NotEqualTo,
|
||||||
|
_ => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Constraint {
|
struct Constraint {
|
||||||
source_type: String,
|
source_type: String,
|
||||||
field: String,
|
field: String,
|
||||||
@@ -35,6 +55,7 @@ struct Constraint {
|
|||||||
enum Component {
|
enum Component {
|
||||||
Constant(String),
|
Constant(String),
|
||||||
// Even extras are allowed here, just specify the field type (encounter, service, etc) and the field name (incl Extra: or Classification: as appropriate)
|
// Even extras are allowed here, just specify the field type (encounter, service, etc) and the field name (incl Extra: or Classification: as appropriate)
|
||||||
|
// TODO: This first string should also be some kind of source type enum, probably shared with source types on filter/constraint
|
||||||
Field(String, String),
|
Field(String, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,6 +144,7 @@ impl From<&String> for RoundingMode {
|
|||||||
"N" => RoundingMode::None,
|
"N" => RoundingMode::None,
|
||||||
"D" => RoundingMode::DownToClosestWhole,
|
"D" => RoundingMode::DownToClosestWhole,
|
||||||
"T" => RoundingMode::ToClosestWhole,
|
"T" => RoundingMode::ToClosestWhole,
|
||||||
|
// TODO: Just use none when unknown?
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -152,8 +174,22 @@ enum Quantity {
|
|||||||
// Name of the extra
|
// Name of the extra
|
||||||
Extra(String),
|
Extra(String),
|
||||||
SourceQuantity,
|
SourceQuantity,
|
||||||
Hours(RoundingMode),
|
Hours,
|
||||||
Days(RoundingMode),
|
Days,
|
||||||
|
AdmissionWeight,
|
||||||
|
Age,
|
||||||
|
ExpectedLengthOfStay,
|
||||||
|
ICUHours,
|
||||||
|
LengthOfStay,
|
||||||
|
MechVentHours,
|
||||||
|
Revenue,
|
||||||
|
WeightedSeparation,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Pretty sure rounding mode can be used with all quantities, but check this
|
||||||
|
struct BuiltQuantity {
|
||||||
|
quantity: Quantity,
|
||||||
|
rounding_mode: RoundingMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum DurationFallback {
|
enum DurationFallback {
|
||||||
@@ -169,7 +205,7 @@ struct Definition {
|
|||||||
constraints: Vec<Constraint>,
|
constraints: Vec<Constraint>,
|
||||||
build_from: BuildFrom,
|
build_from: BuildFrom,
|
||||||
frequency: Frequency,
|
frequency: Frequency,
|
||||||
quantity: Quantity,
|
quantity: BuiltQuantity,
|
||||||
duration_fallback: DurationFallback,
|
duration_fallback: DurationFallback,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,8 +264,29 @@ where
|
|||||||
let record_type = record.get("Type").unwrap();
|
let record_type = record.get("Type").unwrap();
|
||||||
match record_type.as_str() {
|
match record_type.as_str() {
|
||||||
"Definition" => {
|
"Definition" => {
|
||||||
let build_quantity = all_definitions.insert(
|
let quantity_type = record.get("BuiltQuantity").unwrap();
|
||||||
record.get("Name").unwrap().to_owned(),
|
let rounding_mode =
|
||||||
|
RoundingMode::from(record.get("BuiltQuantityRounding").unwrap());
|
||||||
|
let quantity = match quantity_type.as_str() {
|
||||||
|
"S" => Quantity::SourceQuantity,
|
||||||
|
"C" => Quantity::Constant(
|
||||||
|
record
|
||||||
|
.get("BuiltQuantityConstant")
|
||||||
|
.unwrap()
|
||||||
|
.parse()
|
||||||
|
.unwrap(),
|
||||||
|
),
|
||||||
|
"H" => Quantity::Hours,
|
||||||
|
// Above 3 are all that's needed for now
|
||||||
|
_ => panic![],
|
||||||
|
};
|
||||||
|
let built_quantity = BuiltQuantity {
|
||||||
|
quantity,
|
||||||
|
rounding_mode,
|
||||||
|
};
|
||||||
|
all_definitions
|
||||||
|
.insert(
|
||||||
|
record.get("Name").unwrap().clone(),
|
||||||
Definition {
|
Definition {
|
||||||
name: record.get("Name").unwrap().to_owned(),
|
name: record.get("Name").unwrap().to_owned(),
|
||||||
components: vec![],
|
components: vec![],
|
||||||
@@ -237,22 +294,68 @@ where
|
|||||||
constraints: vec![],
|
constraints: vec![],
|
||||||
build_from: BuildFrom::from(record.get("BuildFrom").unwrap()),
|
build_from: BuildFrom::from(record.get("BuildFrom").unwrap()),
|
||||||
frequency: Frequency::from(record.get("Frequency").unwrap()),
|
frequency: Frequency::from(record.get("Frequency").unwrap()),
|
||||||
quantity: Quantity::Constant(1.),
|
quantity: built_quantity,
|
||||||
|
// TODO: Figure this out
|
||||||
|
// Not even in use, can ignore, or will BuiltService always be the default?
|
||||||
duration_fallback: DurationFallback::BuiltService,
|
duration_fallback: DurationFallback::BuiltService,
|
||||||
},
|
},
|
||||||
);
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
"Filter" => {
|
||||||
|
let new_filter = Filter {
|
||||||
|
equal: record.get("FilterNotIn").unwrap() != "",
|
||||||
|
field: record.get("FilterField").unwrap().clone(),
|
||||||
|
value: record.get("FilterValue").unwrap().clone(),
|
||||||
|
source_type: record.get("FilterSourceType").unwrap().clone(),
|
||||||
|
};
|
||||||
|
let all_filters = &mut all_definitions
|
||||||
|
.get_mut(record.get("Name").unwrap())
|
||||||
|
.unwrap()
|
||||||
|
.filters;
|
||||||
|
all_filters.push(new_filter);
|
||||||
|
}
|
||||||
|
"Component" => {
|
||||||
|
let component = match record.get("ComponentSource").unwrap().as_str() {
|
||||||
|
"C" => {
|
||||||
|
Component::Constant(record.get("ComponentValueOrField").unwrap().to_owned())
|
||||||
|
}
|
||||||
|
source => Component::Field(
|
||||||
|
// TODO: Parse into source type enum
|
||||||
|
source.to_owned(),
|
||||||
|
record.get("ComponentValueOrField").unwrap().to_owned(),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
let all_components = &mut all_definitions
|
||||||
|
.get_mut(record.get("Name").unwrap())
|
||||||
|
.unwrap()
|
||||||
|
.components;
|
||||||
|
all_components.push(component);
|
||||||
|
}
|
||||||
|
"Constraint" => {
|
||||||
|
let constraint = Constraint {
|
||||||
|
source_type: record.get("ConstraintSourceType").unwrap().to_owned(),
|
||||||
|
field: record.get("ConstraintColumn").unwrap().to_owned(),
|
||||||
|
constraint_type: ConstraintType::from(record.get("ConstraintType").unwrap()),
|
||||||
|
value: record.get("ConstraintValue").unwrap().to_owned(),
|
||||||
|
};
|
||||||
|
let all_constraints = &mut all_definitions
|
||||||
|
.get_mut(record.get("Name").unwrap())
|
||||||
|
.unwrap()
|
||||||
|
.constraints;
|
||||||
|
all_constraints.push(constraint);
|
||||||
}
|
}
|
||||||
"Filter" => {}
|
|
||||||
"Component" => {}
|
|
||||||
"Constraint" => {}
|
|
||||||
unknown => println!("Invalid type found: {}", unknown),
|
unknown => println!("Invalid type found: {}", unknown),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut mapped_definitions: HashMap<BuildFrom, Definition> = all_definitions
|
let mut mapped_definitions: HashMap<BuildFrom, Vec<Definition>> = HashMap::new();
|
||||||
.into_values()
|
for (_, definition) in all_definitions {
|
||||||
.map(|value| (value.build_from, value))
|
mapped_definitions
|
||||||
.collect();
|
.entry(definition.build_from)
|
||||||
|
.or_insert(vec![])
|
||||||
|
.push(definition);
|
||||||
|
}
|
||||||
|
|
||||||
// Then read through each file type line by line if there are definitions for that type, and process all records (read into memory the batch size)
|
// Then read through each file type line by line if there are definitions for that type, and process all records (read into memory the batch size)
|
||||||
// Probably output to a separate thread (or maybe some kind of limited queue?) to write to disk. Try on same thread to start, then if it's too slow
|
// Probably output to a separate thread (or maybe some kind of limited queue?) to write to disk. Try on same thread to start, then if it's too slow
|
||||||
@@ -269,23 +372,76 @@ where
|
|||||||
|
|
||||||
// Now whenever we want to produce a built service, just write it to tx.
|
// Now whenever we want to produce a built service, just write it to tx.
|
||||||
|
|
||||||
|
// Note that rust csv can seek to a certain position, so we can read in a batch from a reader, then
|
||||||
|
// seek to that position in the reader (or position 0) if we couldn't find a particular record.
|
||||||
|
// Alternatively, we could store an index of all records (e.g. encounter numbers) that map to their position in the reader,
|
||||||
|
// so we can quickly seek to the appropriate index and read the record.
|
||||||
|
// https://docs.rs/csv/latest/csv/struct.Reader.html#method.seek
|
||||||
|
// Store encounter positions in file, so that later when we read through transfers/whatever we can easily
|
||||||
|
// seak to the correct position quickly in case we have a cache miss
|
||||||
|
let mut encounter_positions: HashMap<String, Position> = HashMap::new();
|
||||||
|
|
||||||
|
// TODO: Alternative to storing encounter positions would be to sort portions of the file bits at a time (I think it's called a merge sort?).
|
||||||
|
|
||||||
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
|
// TODO: Try with and without rayon, should be able to help I think as we're going through so much data sequentially,
|
||||||
// although we're still likely to be bottlenecked by just write-speed
|
// although we're still likely to be bottlenecked by just write-speed
|
||||||
let mut encounters = encounters;
|
let mut encounters = encounters;
|
||||||
encounters
|
let headers = encounters.headers()?.clone();
|
||||||
.deserialize::<HashMap<String, String>>()
|
|
||||||
.map(|encounter| encounter.unwrap())
|
for encounter in encounters.records() {
|
||||||
//TODO: Rayon can't be used with csv, consider just batching reads perhaps?
|
let encounter = encounter?;
|
||||||
// .par_bridge()
|
let position = encounter.position().unwrap();
|
||||||
.for_each(|encounter| {
|
let encounter: HashMap<String, String> = encounter.deserialize(Some(&headers))?;
|
||||||
// TODO: Calculate quantitty for this encounter
|
encounter_positions.insert(
|
||||||
|
encounter.get("EncounterNumber").unwrap().to_string(),
|
||||||
|
position.clone(),
|
||||||
|
);
|
||||||
|
// TODO: For each encounter definition, check this fits the filter criteria/constraints,
|
||||||
|
// and
|
||||||
|
let definitions = mapped_definitions.get(&BuildFrom::Encounter).unwrap();
|
||||||
|
for definition in definitions {
|
||||||
|
let matching_filter = (definition.filters.is_empty()
|
||||||
|
|| definition.filters.iter().any(|filter| {
|
||||||
|
let field = encounter.get(filter.field.as_str());
|
||||||
|
if field.is_none() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let field = field.unwrap();
|
||||||
|
if filter.equal {
|
||||||
|
return filter.value == *field;
|
||||||
|
} else {
|
||||||
|
return filter.value != *field;
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
&& (definition.constraints.is_empty()
|
||||||
|
|| definition.constraints.iter().any(|constraint| {
|
||||||
|
let field = encounter.get(constraint.field.as_str());
|
||||||
|
if field.is_none() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let field = field.unwrap();
|
||||||
|
// TODO: Is this just number/datetime? Should probably be an enum? It's not, seems to be E in the test data
|
||||||
|
let field_type = &constraint.source_type;
|
||||||
|
match constraint.constraint_type {
|
||||||
|
ConstraintType::Equal => *field == constraint.value,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
if matching_filter {
|
||||||
|
// Generate the service code
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Generate the built service
|
||||||
tx.send(Product::default()).unwrap();
|
tx.send(Product::default()).unwrap();
|
||||||
});
|
}
|
||||||
let encounters: Vec<Product> = vec![];
|
|
||||||
encounters.par_iter().for_each(|encounter| {
|
// Now do the same with transfers, services, etc, referencing the encounter reader by using the
|
||||||
println!("{:?}", encounter);
|
// indexes in encounter_positions
|
||||||
tx.send(Product::default()).unwrap();
|
|
||||||
});
|
// Have to drop the tx, which will cause the write thread to finish up so that it can be joined before
|
||||||
|
// the function ends
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
write_thread.join().unwrap();
|
write_thread.join().unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user