Add necessary joins
This commit is contained in:
14
src/main.rs
14
src/main.rs
@@ -190,7 +190,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::Encounter,
|
SourceType::Encounter,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: encounters,
|
file_path: encounters,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: Some("StartDateTime".to_owned()),
|
date_order_column: Some("StartDateTime".to_owned()),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -198,7 +198,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::Service,
|
SourceType::Service,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: services,
|
file_path: services,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: Some("StartDateTime".to_owned()),
|
date_order_column: Some("StartDateTime".to_owned()),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -206,7 +206,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::Transfer,
|
SourceType::Transfer,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: transfers,
|
file_path: transfers,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: Some("StartDateTime".to_owned()),
|
date_order_column: Some("StartDateTime".to_owned()),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -214,7 +214,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::CodingProcedure,
|
SourceType::CodingProcedure,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: procedures,
|
file_path: procedures,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: Some("ProcedureDateTime".to_owned()),
|
date_order_column: Some("ProcedureDateTime".to_owned()),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -222,7 +222,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::CodingDiagnosis,
|
SourceType::CodingDiagnosis,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: diagnoses,
|
file_path: diagnoses,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: None,
|
date_order_column: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -230,7 +230,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::Patient,
|
SourceType::Patient,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: patients,
|
file_path: patients,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: None,
|
date_order_column: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -238,7 +238,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
SourceType::Revenue,
|
SourceType::Revenue,
|
||||||
InputFile {
|
InputFile {
|
||||||
file_path: revenues,
|
file_path: revenues,
|
||||||
joins: vec![],
|
joins: HashMap::new(),
|
||||||
date_order_column: None,
|
date_order_column: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
use std::{collections::HashMap, path::PathBuf};
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
path::PathBuf,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use chrono::NaiveDateTime;
|
use chrono::NaiveDateTime;
|
||||||
@@ -8,7 +11,7 @@ use polars::lazy::dsl::*;
|
|||||||
use polars::prelude::*;
|
use polars::prelude::*;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use super::csv::{read_definitions, Definition, FileJoin, SourceType};
|
use super::csv::{read_definitions, Component, Definition, FileJoin, SourceType};
|
||||||
|
|
||||||
// TODO: Polars suggests this, but docs suggest it doesn't have very good platform support
|
// TODO: Polars suggests this, but docs suggest it doesn't have very good platform support
|
||||||
//use jemallocator::Jemalloc;
|
//use jemallocator::Jemalloc;
|
||||||
@@ -36,7 +39,7 @@ struct Product {
|
|||||||
|
|
||||||
pub struct InputFile {
|
pub struct InputFile {
|
||||||
pub file_path: PathBuf,
|
pub file_path: PathBuf,
|
||||||
pub joins: Vec<FileJoin>,
|
pub joins: HashMap<SourceType, String>,
|
||||||
// if not specified, then don't allow change in type builds, as there's no way to detect changes over time
|
// if not specified, then don't allow change in type builds, as there's no way to detect changes over time
|
||||||
pub date_order_column: Option<String>,
|
pub date_order_column: Option<String>,
|
||||||
}
|
}
|
||||||
@@ -54,12 +57,6 @@ pub fn create_products_polars(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: This will iterate over the file multiple times, which could technically be
|
|
||||||
// slower than just going through the file once since reading from disk is slower
|
|
||||||
// than reading from memory. However, reading from
|
|
||||||
// Also, we can use a custom definition format that is translated from the
|
|
||||||
// ppm format, so things like constraints/filters are one thing, and way more generic
|
|
||||||
// (i.e. filter can be based on a join between files).
|
|
||||||
pub fn build_polars(
|
pub fn build_polars(
|
||||||
definition: &Definition,
|
definition: &Definition,
|
||||||
inputs: &HashMap<SourceType, InputFile>,
|
inputs: &HashMap<SourceType, InputFile>,
|
||||||
@@ -87,11 +84,36 @@ pub fn build_polars(
|
|||||||
let input_file = inputs
|
let input_file = inputs
|
||||||
.get(&definition.source_type)
|
.get(&definition.source_type)
|
||||||
.ok_or(anyhow!("Failed to find valid file"))?;
|
.ok_or(anyhow!("Failed to find valid file"))?;
|
||||||
let reader = LazyCsvReader::new(&input_file.file_path)
|
let mut reader = LazyCsvReader::new(&input_file.file_path)
|
||||||
.has_header(true)
|
.has_header(true)
|
||||||
.finish()?;
|
.finish()?;
|
||||||
// TODO: Do joins based on usage in definitions components and filters. Ideally just join the columns that are actually wanted.
|
let mut required_files = HashSet::new();
|
||||||
// Can do this by first going over each component/filter, and
|
for component in &definition.components {
|
||||||
|
if let Component::Field(file, field) = component {
|
||||||
|
required_files.insert(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for filter in &definition.filters {
|
||||||
|
required_files.insert(&filter.file);
|
||||||
|
}
|
||||||
|
for source_type in required_files {
|
||||||
|
// TODO: Better error messages
|
||||||
|
if source_type != &definition.source_type {
|
||||||
|
let source_file = inputs
|
||||||
|
.get(&source_type)
|
||||||
|
.ok_or(anyhow!("Input file was not specified for source type"))?;
|
||||||
|
let join_reader = LazyCsvReader::new(source_file.file_path.clone()).finish()?;
|
||||||
|
let left_column = input_file
|
||||||
|
.joins
|
||||||
|
.get(source_type)
|
||||||
|
.ok_or(anyhow!("Failed to get left join column"))?;
|
||||||
|
let right_column = source_file
|
||||||
|
.joins
|
||||||
|
.get(&definition.source_type)
|
||||||
|
.ok_or(anyhow!("Failed to get right join column"))?;
|
||||||
|
reader = reader.inner_join(join_reader, col(&left_column), col(&right_column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut filtered = match filter {
|
let mut filtered = match filter {
|
||||||
Some(filter) => reader.filter(filter),
|
Some(filter) => reader.filter(filter),
|
||||||
|
|||||||
@@ -6,12 +6,12 @@ use chrono::NaiveDateTime;
|
|||||||
#[derive(Hash, PartialEq, PartialOrd)]
|
#[derive(Hash, PartialEq, PartialOrd)]
|
||||||
pub struct Filter {
|
pub struct Filter {
|
||||||
pub filter_type: FilterType,
|
pub filter_type: FilterType,
|
||||||
pub file: String,
|
pub file: SourceType,
|
||||||
pub field: String,
|
pub field: String,
|
||||||
pub value: String,
|
pub value: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Hash, PartialEq, PartialOrd, Eq, Ord)]
|
#[derive(Hash, PartialEq, PartialOrd, Eq, Ord, Clone)]
|
||||||
pub enum SourceType {
|
pub enum SourceType {
|
||||||
CodingDiagnosis,
|
CodingDiagnosis,
|
||||||
CodingProcedure,
|
CodingProcedure,
|
||||||
@@ -298,7 +298,7 @@ where
|
|||||||
let source_type =
|
let source_type =
|
||||||
SourceType::try_from(record.get("FilterSourceType").unwrap())?;
|
SourceType::try_from(record.get("FilterSourceType").unwrap())?;
|
||||||
Filter {
|
Filter {
|
||||||
// TODO: This all looks wrong
|
// TODO: This looks wrong
|
||||||
filter_type: if record.get("FilterNotIn").unwrap() != "" {
|
filter_type: if record.get("FilterNotIn").unwrap() != "" {
|
||||||
FilterType::Equal
|
FilterType::Equal
|
||||||
} else {
|
} else {
|
||||||
@@ -307,12 +307,7 @@ where
|
|||||||
// TODO: extra/classification types need to append Extra:/Classification: to the start of the field
|
// TODO: extra/classification types need to append Extra:/Classification: to the start of the field
|
||||||
field: record.get("FilterField").unwrap().clone(),
|
field: record.get("FilterField").unwrap().clone(),
|
||||||
value: record.get("FilterValue").unwrap().clone(),
|
value: record.get("FilterValue").unwrap().clone(),
|
||||||
// TODO: Work out a way to handle this
|
file: source_type,
|
||||||
file: match source_type {
|
|
||||||
SourceType::CodingDiagnosis => "",
|
|
||||||
_ => "",
|
|
||||||
}
|
|
||||||
.to_owned(),
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let all_filters = &mut all_definitions
|
let all_filters = &mut all_definitions
|
||||||
@@ -354,8 +349,7 @@ where
|
|||||||
field: record.get("ConstraintColumn").unwrap().to_owned(),
|
field: record.get("ConstraintColumn").unwrap().to_owned(),
|
||||||
filter_type,
|
filter_type,
|
||||||
value: record.get("ConstraintValue").unwrap().to_owned(),
|
value: record.get("ConstraintValue").unwrap().to_owned(),
|
||||||
// TODO: Figure this out, should be determined from the source type
|
file: source_type,
|
||||||
file: "".to_owned(),
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let all_filters = &mut all_definitions
|
let all_filters = &mut all_definitions
|
||||||
|
|||||||
Reference in New Issue
Block a user