Add data model and structure for swift graph executor, rework create products to bem ore general

This commit is contained in:
Michael Pivato
2024-05-09 22:50:35 +09:30
parent 51fc216c82
commit 98d38d47a3
17 changed files with 432 additions and 50 deletions

View File

@@ -14,6 +14,6 @@ async fn main() -> anyhow::Result<()> {
.connect(&connection_string)
.await?;
upload_to_db::upload_file_bulk(&pool, &"".to_owned(), &"".to_owned(), None, "".to_owned()).await?;
// upload_to_db::upload_file_bulk(&pool, &"".to_owned(), &"".to_owned(), None, "".to_owned()).await?;
Ok(())
}

View File

@@ -59,6 +59,33 @@ pub extern "C" fn move_money_from_text(
// This looks like exactly what I'm doing too: https://mozilla.github.io/firefox-browser-architecture/experiments/2017-09-06-rust-on-ios.htmlcar
}
#[no_mangle]
pub extern "C" fn move_money_from_file(
rules_file: *const c_char,
lines: *const c_char,
accounts: *const c_char,
cost_centres: *const c_char,
output_path: *const c_char,
use_numeric_accounts: bool,
) {
let mut output_writer = csv::Writer::from_writer(vec![]);
let safe_rules = unwrap_c_char(rules_file);
let safe_lines = unwrap_c_char(lines);
let safe_accounts = unwrap_c_char(accounts);
let safe_cost_centres = unwrap_c_char(cost_centres);
move_money_2()
// move_money(
// ,
// &mut csv::Reader::from_reader(safe_lines.to_str().unwrap()),
// &mut csv::Reader::from_reader(safe_accounts.to_bytes()),
// &mut csv::Reader::from_reader(safe_cost_centres.to_bytes()),
// &mut output_writer,
// use_numeric_accounts,
// false,
// )
// .expect("Failed to move money");
}
#[no_mangle]
pub unsafe extern "C" fn move_money_from_text_free(s: *mut c_char) {
unsafe {

View File

@@ -242,7 +242,7 @@ fn main() -> anyhow::Result<()> {
date_order_column: None,
},
);
coster_rs::create_products::create_products_polars(definitions, inputs, output)
coster_rs::create_products::create_products_polars(definitions, vec![], output)
}
}
}

View File

@@ -39,14 +39,14 @@ struct Product {
pub struct InputFile {
pub file_path: PathBuf,
pub joins: HashMap<SourceType, String>,
pub joins: HashMap<PathBuf, String>,
// if not specified, then don't allow change in type builds, as there's no way to detect changes over time
pub date_order_column: Option<String>,
}
pub fn create_products_polars(
definitions_path: PathBuf,
inputs: HashMap<SourceType, InputFile>,
inputs: Vec<InputFile>,
output_path: PathBuf,
) -> anyhow::Result<()> {
let definitions = read_definitions(&mut csv::Reader::from_path(definitions_path)?)?;
@@ -59,7 +59,7 @@ pub fn create_products_polars(
pub fn build_polars(
definition: &Definition,
inputs: &HashMap<SourceType, InputFile>,
inputs: &Vec<InputFile>,
output_path: &PathBuf,
) -> anyhow::Result<()> {
// 1. Apply filters to limit encounters
@@ -81,8 +81,7 @@ pub fn build_polars(
})
.reduce(|prev, next| prev.and(next));
let input_file = inputs
.get(&definition.source_type)
let input_file = inputs.iter().find(|input| input.file_path == definition.source)
.ok_or(anyhow!("Failed to find valid file"))?;
let mut reader = LazyCsvReader::new(&input_file.file_path)
.has_header(true)
@@ -98,9 +97,9 @@ pub fn build_polars(
}
for source_type in required_files {
// TODO: Better error messages
if source_type != &definition.source_type {
let source_file = inputs
.get(&source_type)
if source_type != &definition.source {
let source_file = inputs.iter()
.find(|input| input.file_path == definition.source)
.ok_or(anyhow!("Input file was not specified for source type"))?;
// TODO: Alias the joined columns so they don't potentially clash with the current column
let join_reader = LazyCsvReader::new(source_file.file_path.clone()).finish()?;
@@ -110,27 +109,32 @@ pub fn build_polars(
.ok_or(anyhow!("Failed to get left join column"))?;
let right_column = source_file
.joins
.get(&definition.source_type)
.get(&definition.source)
.ok_or(anyhow!("Failed to get right join column"))?;
reader = reader.inner_join(join_reader, col(&left_column), col(&right_column));
}
}
// TODO: Also work out how to expand rows, so that transfers can have stuff like daily or change in x expanded into multiple rows
// Since it's related to time it is probably related to this: https://docs.pola.rs/user-guide/transformations/time-series/parsing/
// I'm guessing upsampling is what I'm looking for: https://docs.pola.rs/user-guide/transformations/time-series/resampling/#upsampling-to-a-higher-frequency
// Can use different strategies to break the time period down, range can be calculated by using start/end datetime
// Wonder if this can be done more generally (e.g. splitting up based on a number?)
// Note: This must occur before creating the components, since we'll need to create one for every upsampled row
let mut built_expression = lit("");
// Create component columns
for component in &definition.components {
match component {
Component::Constant(constant) => {
built_expression = built_expression + lit(constant.clone())
}
// TODO: Do we need to worry about the source type? Might be clashing column names we need to think about earlier then address here?
// TODO: What I really want to do is not use source type, instead I want to be referring to a file, which we translate from the sourcetype
// to an actual filename. I don't want to be limited by a concept of 'sourcetype' at all, instead the definition should treat everything
// the same, and just translate the imported csv format to the necessary files and columns in files that are expected to be input.
Component::Field(source_type, column) => {
built_expression = built_expression + col(&column)
} // TODO: Also work out how to expand rows, so that transfers can have stuff like daily or change in x expanded into multiple rows
// Since it's related to time it is probably related to this: https://docs.pola.rs/user-guide/transformations/time-series/parsing/
// I'm guessing upsampling is what I'm looking for: https://docs.pola.rs/user-guide/transformations/time-series/resampling/#upsampling-to-a-higher-frequency
// Can use different strategies to break the time period down, range can be calculated by using start/end datetime
// Wonder if this can be done more generally (e.g. splitting up based on a number?)
}
}
}

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, io::Read};
use std::{collections::HashMap, io::Read, path::PathBuf};
use anyhow::bail;
use chrono::NaiveDateTime;
@@ -6,7 +6,7 @@ use chrono::NaiveDateTime;
#[derive(Hash, PartialEq, PartialOrd)]
pub struct Filter {
pub filter_type: FilterType,
pub file: SourceType,
pub file: PathBuf,
pub field: String,
pub value: String,
}
@@ -16,7 +16,8 @@ pub enum SourceType {
CodingDiagnosis,
CodingProcedure,
Encounter,
Incident,
// TODO: Incident isn't used right now
// Incident,
Patient,
Revenue,
Service,
@@ -31,7 +32,6 @@ impl TryFrom<&String> for SourceType {
"CD" => Ok(SourceType::CodingDiagnosis),
"CP" => Ok(SourceType::CodingProcedure),
"E" => Ok(SourceType::Encounter),
"I" => Ok(SourceType::Incident),
"P" => Ok(SourceType::Patient),
"R" => Ok(SourceType::Revenue),
"S" => Ok(SourceType::Service),
@@ -62,6 +62,18 @@ impl SourceType {
_ => bail!("Invalid ComponentSourceType found: {}", value),
}
}
fn to_file_path(&self) -> String {
match self {
SourceType::CodingDiagnosis => "coding_diagnoses.csv".to_owned(),
SourceType::CodingProcedure => "coding_procedures.csv".to_owned(),
SourceType::Encounter => "encounters.csv".to_owned(),
SourceType::Patient => "patients.csv".to_owned(),
SourceType::Revenue => "revenues.csv".to_owned(),
SourceType::Service => "services.csv".to_owned(),
SourceType::Transfer => "transfers.csv".to_owned(),
}
}
}
#[derive(Hash, PartialEq, PartialOrd)]
@@ -104,7 +116,7 @@ pub enum ExtraType {
pub enum Component {
Constant(String),
// File, column_name
Field(SourceType, String),
Field(PathBuf, String),
}
// Frequency per type:
@@ -233,7 +245,7 @@ pub struct Definition {
pub name: String,
pub components: Vec<Component>,
pub filters: Vec<Filter>,
pub source_type: SourceType,
pub source: PathBuf,
pub frequency: Frequency,
pub quantity: BuiltQuantity,
pub duration_fallback: DurationFallback,
@@ -284,7 +296,7 @@ where
name: record.get("Name").unwrap().to_owned(),
components: vec![],
filters: vec![],
source_type: build_from,
source: build_from.to_file_path().into(),
frequency,
quantity: built_quantity,
// TODO: Figure this out
@@ -307,7 +319,7 @@ where
// TODO: extra/classification types need to append Extra:/Classification: to the start of the field
field: record.get("FilterField").unwrap().clone(),
value: record.get("FilterValue").unwrap().clone(),
file: source_type,
file: source_type.to_file_path().into(),
}
};
let all_filters = &mut all_definitions
@@ -327,9 +339,7 @@ where
source => {
let component_source_type = SourceType::from_component_source_type(source)?;
Component::Field(
// TODO: Figure this out, should be determined from the source type
component_source_type,
// TODO: Field probably needs to be enumed to match onto the correct column in input files
component_source_type.to_file_path().into(),
record.get("ComponentValueOrField").unwrap().to_owned(),
)
}
@@ -349,7 +359,7 @@ where
field: record.get("ConstraintColumn").unwrap().to_owned(),
filter_type,
value: record.get("ConstraintValue").unwrap().to_owned(),
file: source_type,
file: source_type.to_file_path().into(),
}
};
let all_filters = &mut all_definitions

View File

@@ -1,7 +1,11 @@
use std::{collections::HashMap, io::Read};
use csv::Reader;
use sqlx::{Mssql, Pool, QueryBuilder};
use sqlx::{query, query_builder, Any, Mssql, Pool, QueryBuilder};
// Note: right now this is set to mssql only, since sqlx 0.7 is requried to use the Any
// type for sqlx 0.6 and earlier due to a query_builder lifetime issue,
// however sqlx >=0.7 currently doesn't support mssql.
// Upload data in a file to a db table, with an optional post-script to run,
// such as to move data from the upload table into other tables
@@ -9,7 +13,7 @@ use sqlx::{Mssql, Pool, QueryBuilder};
// TODO: Add fallback insert when bulk insert fails (e.g. due to
// permission errors)
pub async fn upload_file_bulk(
pool: &Pool<sqlx::Any>,
pool: &Pool<sqlx::Mssql>,
file_name: &String,
table_name: &String,
// Mappings from column in file -> column in db
@@ -19,34 +23,42 @@ pub async fn upload_file_bulk(
// TODO: Test if the table already exists. If it doesn't, try creating the table
// First try a bulk insert command
let result = match pool.any_kind() {
sqlx::any::AnyKind::Mssql => {
sqlx::query(&format!("BULK INSERT {} FROM {}", table_name, file_name))
// let result = match pool.any_kind() {
// sqlx::any::AnyKind::Mssql => {
let result = sqlx::query(&format!("BULK INSERT {} FROM {}", table_name, file_name))
.execute(pool)
.await
}
};
.await?;
// }
// };
let mut rows_affected = match &result {
Result::Ok(result) => result.rows_affected(),
// TODO: Log error
Err(error) => 0_u64,
};
let mut rows_affected = result.rows_affected();
// let mut rows_affected = match &result {
// Result::Ok(result) => result.rows_affected(),
// // TODO: Log error
// Err(error) => 0_u64,
// };
// TODO: Adjust for various dbmss
if let Err(_) = result {
if rows_affected == 0 {
let rows: Vec<HashMap<String, String>> = vec![];
let BIND_LIMIT: usize = 65535;
// TODO: Use csv to read from file
// TODO: When bulk insert fails, Fall back to sql batched insert
// TODO: Columns to insert... needs some kind of mapping from file column name <-> db column
let mut query_builder = QueryBuilder::new(format!("INSERT INTO {}({}) ", table_name, ""));
query_builder.push_values(rows, |mut b, row| {
// TODO: Iterate over all values in file, not the limit
query_builder.push_values(&rows[0..BIND_LIMIT], |mut b, row| {
b.push_bind(row.get("s"));
});
let mut query = query_builder.build();
let mut query_builder = query_builder;
// TODO: Looks like this issue: https://github.com/launchbadge/sqlx/issues/1978
// Turns out we need v0.7 for this to not bug out, however mssql is only supported in versions before v0.7, so right now can't use sqlx
// to use this, unless we explicity specified mssql only, not Any as the db type...
let query = query_builder.build();
let result = query.execute(pool).await?;
rows_affected = result.rows_affected();
}
@@ -56,4 +68,4 @@ pub async fn upload_file_bulk(
}
Ok(rows_affected)
}
}