From 51fc216c82b4450c71ac73be79c1ef8648413ca9 Mon Sep 17 00:00:00 2001 From: Michael Pivato Date: Wed, 28 Feb 2024 22:18:54 +1030 Subject: [PATCH] Start adding db upload function, component mapping in product creater --- Cargo.toml | 2 +- src/bin/agent2/main.rs | 9 ++--- src/lib.rs | 2 ++ src/products/create_products.rs | 28 +++++++++++++--- src/upload_to_db.rs | 59 +++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 src/upload_to_db.rs diff --git a/Cargo.toml b/Cargo.toml index 0d69249..499dbdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ chrono = {version = "0.4.31", features = ["default", "serde"]} rayon = "1.6.0" tokio = { version = "1.26.0", features = ["full"] } -sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql" ] } +sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql", "any" ] } rmp-serde = "1.1.1" tempfile = "3.7.0" polars = {version = "0.32.1", features = ["lazy", "performant", "streaming", "cse", "dtype-datetime"]} diff --git a/src/bin/agent2/main.rs b/src/bin/agent2/main.rs index dd515ae..0cc49ef 100644 --- a/src/bin/agent2/main.rs +++ b/src/bin/agent2/main.rs @@ -1,4 +1,5 @@ -use sqlx::mssql::MssqlPoolOptions; +use coster_rs::upload_to_db; +use sqlx::{any::AnyPoolOptions, mssql::MssqlPoolOptions}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -8,11 +9,11 @@ async fn main() -> anyhow::Result<()> { let database = ""; // USing sqlx: https://github.com/launchbadge/sqlx let connection_string = format!("mssq://{}:{}@{}/{}", user, password, host, database); - let pool = MssqlPoolOptions::new() + let pool = AnyPoolOptions::new() .max_connections(20) .connect(&connection_string) .await?; - // sqlx::query_as("") - // connection. + + upload_to_db::upload_file_bulk(&pool, &"".to_owned(), &"".to_owned(), None, "".to_owned()).await?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 244c4dd..4304f1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,8 @@ pub mod link; pub mod filter; +pub mod upload_to_db; + mod io; #[no_mangle] diff --git a/src/products/create_products.rs b/src/products/create_products.rs index 3c9268a..07f78a8 100644 --- a/src/products/create_products.rs +++ b/src/products/create_products.rs @@ -102,6 +102,7 @@ pub fn build_polars( let source_file = inputs .get(&source_type) .ok_or(anyhow!("Input file was not specified for source type"))?; + // TODO: Alias the joined columns so they don't potentially clash with the current column let join_reader = LazyCsvReader::new(source_file.file_path.clone()).finish()?; let left_column = input_file .joins @@ -115,18 +116,37 @@ pub fn build_polars( } } + let mut built_expression = lit(""); + // Create component columns + for component in &definition.components { + match component { + Component::Constant(constant) => { + built_expression = built_expression + lit(constant.clone()) + } + // TODO: Do we need to worry about the source type? Might be clashing column names we need to think about earlier then address here? + Component::Field(source_type, column) => { + built_expression = built_expression + col(&column) + } // TODO: Also work out how to expand rows, so that transfers can have stuff like daily or change in x expanded into multiple rows + // Since it's related to time it is probably related to this: https://docs.pola.rs/user-guide/transformations/time-series/parsing/ + // I'm guessing upsampling is what I'm looking for: https://docs.pola.rs/user-guide/transformations/time-series/resampling/#upsampling-to-a-higher-frequency + // Can use different strategies to break the time period down, range can be calculated by using start/end datetime + // Wonder if this can be done more generally (e.g. splitting up based on a number?) + } + } + + // TODO: Build out the rest of the product definition, depending on the input definition + let select_columns = [built_expression]; + + // Filter and select the required data in one step, so optimiser can speed things up if necessary let mut filtered = match filter { Some(filter) => reader.filter(filter), None => reader, } + .select(select_columns) .with_streaming(true) .collect()?; - // TODO: Now for each of the filtered records, create a new record that is the built record, based on the components - // quantity, etc. from the definition - let mut file = std::fs::File::create(output_path).unwrap(); - // TODO: Don't use filtered, but the results that outputs created product columns CsvWriter::new(&mut file).finish(&mut filtered)?; Ok(()) } diff --git a/src/upload_to_db.rs b/src/upload_to_db.rs new file mode 100644 index 0000000..4ab0faa --- /dev/null +++ b/src/upload_to_db.rs @@ -0,0 +1,59 @@ +use std::{collections::HashMap, io::Read}; + +use csv::Reader; +use sqlx::{Mssql, Pool, QueryBuilder}; + +// Upload data in a file to a db table, with an optional post-script to run, +// such as to move data from the upload table into other tables +// TODO: Add bulk insert options for non-mssql dbs +// TODO: Add fallback insert when bulk insert fails (e.g. due to +// permission errors) +pub async fn upload_file_bulk( + pool: &Pool, + file_name: &String, + table_name: &String, + // Mappings from column in file -> column in db + column_mappings: Option>, + post_script: Option, +) -> anyhow::Result { + // TODO: Test if the table already exists. If it doesn't, try creating the table + + // First try a bulk insert command + let result = match pool.any_kind() { + sqlx::any::AnyKind::Mssql => { + sqlx::query(&format!("BULK INSERT {} FROM {}", table_name, file_name)) + .execute(pool) + .await + } + }; + + let mut rows_affected = match &result { + Result::Ok(result) => result.rows_affected(), + // TODO: Log error + Err(error) => 0_u64, + }; + + // TODO: Adjust for various dbmss + + if let Err(_) = result { + let rows: Vec> = vec![]; + + let BIND_LIMIT: usize = 65535; + // TODO: Use csv to read from file + + // TODO: When bulk insert fails, Fall back to sql batched insert + let mut query_builder = QueryBuilder::new(format!("INSERT INTO {}({}) ", table_name, "")); + query_builder.push_values(rows, |mut b, row| { + b.push_bind(row.get("s")); + }); + let mut query = query_builder.build(); + let result = query.execute(pool).await?; + rows_affected = result.rows_affected(); + } + + if let Some(post_script) = post_script { + sqlx::query(&post_script).execute(pool).await?; + } + + Ok(rows_affected) +}