Start adding db upload function, component mapping in product creater

This commit is contained in:
Michael Pivato
2024-02-28 22:18:54 +10:30
parent e88d2a6319
commit 51fc216c82
5 changed files with 91 additions and 9 deletions

View File

@@ -22,7 +22,7 @@ chrono = {version = "0.4.31", features = ["default", "serde"]}
rayon = "1.6.0" rayon = "1.6.0"
tokio = { version = "1.26.0", features = ["full"] } tokio = { version = "1.26.0", features = ["full"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql" ] } sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql", "any" ] }
rmp-serde = "1.1.1" rmp-serde = "1.1.1"
tempfile = "3.7.0" tempfile = "3.7.0"
polars = {version = "0.32.1", features = ["lazy", "performant", "streaming", "cse", "dtype-datetime"]} polars = {version = "0.32.1", features = ["lazy", "performant", "streaming", "cse", "dtype-datetime"]}

View File

@@ -1,4 +1,5 @@
use sqlx::mssql::MssqlPoolOptions; use coster_rs::upload_to_db;
use sqlx::{any::AnyPoolOptions, mssql::MssqlPoolOptions};
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@@ -8,11 +9,11 @@ async fn main() -> anyhow::Result<()> {
let database = ""; let database = "";
// USing sqlx: https://github.com/launchbadge/sqlx // USing sqlx: https://github.com/launchbadge/sqlx
let connection_string = format!("mssq://{}:{}@{}/{}", user, password, host, database); let connection_string = format!("mssq://{}:{}@{}/{}", user, password, host, database);
let pool = MssqlPoolOptions::new() let pool = AnyPoolOptions::new()
.max_connections(20) .max_connections(20)
.connect(&connection_string) .connect(&connection_string)
.await?; .await?;
// sqlx::query_as("")
// connection. upload_to_db::upload_file_bulk(&pool, &"".to_owned(), &"".to_owned(), None, "".to_owned()).await?;
Ok(()) Ok(())
} }

View File

@@ -19,6 +19,8 @@ pub mod link;
pub mod filter; pub mod filter;
pub mod upload_to_db;
mod io; mod io;
#[no_mangle] #[no_mangle]

View File

@@ -102,6 +102,7 @@ pub fn build_polars(
let source_file = inputs let source_file = inputs
.get(&source_type) .get(&source_type)
.ok_or(anyhow!("Input file was not specified for source type"))?; .ok_or(anyhow!("Input file was not specified for source type"))?;
// TODO: Alias the joined columns so they don't potentially clash with the current column
let join_reader = LazyCsvReader::new(source_file.file_path.clone()).finish()?; let join_reader = LazyCsvReader::new(source_file.file_path.clone()).finish()?;
let left_column = input_file let left_column = input_file
.joins .joins
@@ -115,18 +116,37 @@ pub fn build_polars(
} }
} }
let mut built_expression = lit("");
// Create component columns
for component in &definition.components {
match component {
Component::Constant(constant) => {
built_expression = built_expression + lit(constant.clone())
}
// TODO: Do we need to worry about the source type? Might be clashing column names we need to think about earlier then address here?
Component::Field(source_type, column) => {
built_expression = built_expression + col(&column)
} // TODO: Also work out how to expand rows, so that transfers can have stuff like daily or change in x expanded into multiple rows
// Since it's related to time it is probably related to this: https://docs.pola.rs/user-guide/transformations/time-series/parsing/
// I'm guessing upsampling is what I'm looking for: https://docs.pola.rs/user-guide/transformations/time-series/resampling/#upsampling-to-a-higher-frequency
// Can use different strategies to break the time period down, range can be calculated by using start/end datetime
// Wonder if this can be done more generally (e.g. splitting up based on a number?)
}
}
// TODO: Build out the rest of the product definition, depending on the input definition
let select_columns = [built_expression];
// Filter and select the required data in one step, so optimiser can speed things up if necessary
let mut filtered = match filter { let mut filtered = match filter {
Some(filter) => reader.filter(filter), Some(filter) => reader.filter(filter),
None => reader, None => reader,
} }
.select(select_columns)
.with_streaming(true) .with_streaming(true)
.collect()?; .collect()?;
// TODO: Now for each of the filtered records, create a new record that is the built record, based on the components
// quantity, etc. from the definition
let mut file = std::fs::File::create(output_path).unwrap(); let mut file = std::fs::File::create(output_path).unwrap();
// TODO: Don't use filtered, but the results that outputs created product columns
CsvWriter::new(&mut file).finish(&mut filtered)?; CsvWriter::new(&mut file).finish(&mut filtered)?;
Ok(()) Ok(())
} }

59
src/upload_to_db.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::{collections::HashMap, io::Read};
use csv::Reader;
use sqlx::{Mssql, Pool, QueryBuilder};
// Upload data in a file to a db table, with an optional post-script to run,
// such as to move data from the upload table into other tables
// TODO: Add bulk insert options for non-mssql dbs
// TODO: Add fallback insert when bulk insert fails (e.g. due to
// permission errors)
pub async fn upload_file_bulk(
pool: &Pool<sqlx::Any>,
file_name: &String,
table_name: &String,
// Mappings from column in file -> column in db
column_mappings: Option<HashMap<String, String>>,
post_script: Option<String>,
) -> anyhow::Result<u64> {
// TODO: Test if the table already exists. If it doesn't, try creating the table
// First try a bulk insert command
let result = match pool.any_kind() {
sqlx::any::AnyKind::Mssql => {
sqlx::query(&format!("BULK INSERT {} FROM {}", table_name, file_name))
.execute(pool)
.await
}
};
let mut rows_affected = match &result {
Result::Ok(result) => result.rows_affected(),
// TODO: Log error
Err(error) => 0_u64,
};
// TODO: Adjust for various dbmss
if let Err(_) = result {
let rows: Vec<HashMap<String, String>> = vec![];
let BIND_LIMIT: usize = 65535;
// TODO: Use csv to read from file
// TODO: When bulk insert fails, Fall back to sql batched insert
let mut query_builder = QueryBuilder::new(format!("INSERT INTO {}({}) ", table_name, ""));
query_builder.push_values(rows, |mut b, row| {
b.push_bind(row.get("s"));
});
let mut query = query_builder.build();
let result = query.execute(pool).await?;
rows_affected = result.rows_affected();
}
if let Some(post_script) = post_script {
sqlx::query(&post_script).execute(pool).await?;
}
Ok(rows_affected)
}