Fix file formatting, add tests for bulk upload to db in supported databases
Some checks failed
test / test (push) Failing after 15m3s

This commit is contained in:
2025-02-14 16:21:09 +10:30
parent c8fd8734eb
commit d9f69ff298
17 changed files with 408 additions and 140 deletions

1
Cargo.lock generated
View File

@@ -624,7 +624,6 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",
"tempfile",
"testcontainers", "testcontainers",
"tiberius", "tiberius",
"tokio", "tokio",

View File

@@ -6,25 +6,17 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
# https://nalgebra.org/docs/user_guide/getting_started
nalgebra = "0.33" nalgebra = "0.33"
# https://docs.rs/csv/1.1.6/csv/
csv = "1" csv = "1"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
# num = "0.4"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
anyhow = "1" anyhow = "1"
itertools = "0.14.0" itertools = "0.14.0"
chrono = { version = "0.4.39", features = ["default", "serde"] } chrono = { version = "0.4.39", features = ["default", "serde"] }
rayon = "1.10.0" rayon = "1.10.0"
tokio = { version = "1.42.0", features = ["full"] } tokio = { version = "1.42.0", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "any"] } sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "any", "mysql", "postgres", "sqlite"] }
rmp-serde = "1.1" rmp-serde = "1.1"
tempfile = "3.7"
polars = { version = "0.45.1", features = ["lazy", "performant", "parquet", "streaming", "cse", "dtype-datetime"] } polars = { version = "0.45.1", features = ["lazy", "performant", "parquet", "streaming", "cse", "dtype-datetime"] }
polars-sql = "0.45.1" polars-sql = "0.45.1"
serde_json = "1.0.122" serde_json = "1.0.122"

View File

@@ -1,4 +1,7 @@
use super::{csv_row::CsvRow, dynamic_state::{vato007::ingey::types::HostCsvReader, DynamicState, ReadMapData}}; use super::{
csv_row::CsvRow,
dynamic_state::{vato007::ingey::types::HostCsvReader, DynamicState, ReadMapData},
};
use csv::{Reader, StringRecord}; use csv::{Reader, StringRecord};
use polars::datatypes::AnyValue; use polars::datatypes::AnyValue;
use polars::prelude::{col, lit, LazyCsvReader, LazyFileListReader}; use polars::prelude::{col, lit, LazyCsvReader, LazyFileListReader};
@@ -14,26 +17,38 @@ pub struct CsvReader {
impl CsvReader { impl CsvReader {
pub fn new(path: String) -> Self { pub fn new(path: String) -> Self {
let reader = Reader::from_path(&path).expect("Failed to create csv reader"); let reader = Reader::from_path(&path).expect("Failed to create csv reader");
CsvReader { CsvReader { path, reader }
path,
reader,
}
} }
} }
impl HostCsvReader for DynamicState { impl HostCsvReader for DynamicState {
fn columns(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> Vec<String> { fn columns(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> Vec<String> {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
if resource.reader.has_headers() { if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().map(|element| element.to_owned()).collect() resource
.reader
.headers()
.expect("Reader says it has headers but doesn't")
.iter()
.map(|element| element.to_owned())
.collect()
} else { } else {
vec![] vec![]
} }
} }
// TODO: These next methods needs to be cleaned up badly // TODO: These next methods needs to be cleaned up badly
fn next(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> Result<wasmtime::component::Resource<CsvRow>, String> { fn next(
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); &mut self,
self_: wasmtime::component::Resource<CsvReader>,
) -> Result<wasmtime::component::Resource<CsvRow>, String> {
let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
let mut buf = StringRecord::new(); let mut buf = StringRecord::new();
let result = resource.reader.read_record(&mut buf); let result = resource.reader.read_record(&mut buf);
match result { match result {
@@ -41,25 +56,37 @@ impl HostCsvReader for DynamicState {
if read { if read {
let mut record_map = BTreeMap::new(); let mut record_map = BTreeMap::new();
if resource.reader.has_headers() { if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().enumerate().for_each(|(i, name)| { resource
.reader
.headers()
.expect("Reader says it has headers but doesn't")
.iter()
.enumerate()
.for_each(|(i, name)| {
record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned()); record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned());
}); });
} }
let result = self.resources.push(CsvRow { values: record_map }).expect(""); let result = self
.resources
.push(CsvRow { values: record_map })
.expect("");
Ok(result) Ok(result)
} else { } else {
Err("No more records available to read".to_owned()) Err("No more records available to read".to_owned())
} }
} }
Err(err) => { Err(err) => Err(err.to_string()),
Err(err.to_string())
}
} }
} }
fn next_into_map(
fn next_into_map(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> wasmtime::component::Resource<ReadMapData> { &mut self,
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); self_: wasmtime::component::Resource<CsvReader>,
) -> wasmtime::component::Resource<ReadMapData> {
let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
let mut buf = StringRecord::new(); let mut buf = StringRecord::new();
let result = resource.reader.read_record(&mut buf); let result = resource.reader.read_record(&mut buf);
let record_map = match result { let record_map = match result {
@@ -67,7 +94,13 @@ impl HostCsvReader for DynamicState {
if read { if read {
let mut record_map = HashMap::new(); let mut record_map = HashMap::new();
if resource.reader.has_headers() { if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().enumerate().for_each(|(i, name)| { resource
.reader
.headers()
.expect("Reader says it has headers but doesn't")
.iter()
.enumerate()
.for_each(|(i, name)| {
record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned()); record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned());
}); });
} }
@@ -76,23 +109,35 @@ impl HostCsvReader for DynamicState {
HashMap::new() HashMap::new()
} }
} }
Err(_) => { Err(_) => HashMap::new(),
HashMap::new()
}
}; };
self.resources.push(ReadMapData { data: record_map }).expect("") self.resources
.push(ReadMapData { data: record_map })
.expect("")
} }
fn has_next(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> bool { fn has_next(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> bool {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
!resource.reader.is_done() !resource.reader.is_done()
} }
// TODO: Clean this up as well // TODO: Clean this up as well
#[doc = " Get a row by values in one or more columns"] #[doc = " Get a row by values in one or more columns"]
fn query(&mut self, self_: wasmtime::component::Resource<CsvReader>, values: Vec<(String, String,)>) -> wasmtime::component::Resource<CsvRow> { fn query(
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); &mut self,
let mut df = LazyCsvReader::new(&resource.path).finish().expect("Failed to open file"); self_: wasmtime::component::Resource<CsvReader>,
values: Vec<(String, String)>,
) -> wasmtime::component::Resource<CsvRow> {
let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
let mut df = LazyCsvReader::new(&resource.path)
.finish()
.expect("Failed to open file");
for (key, value) in values { for (key, value) in values {
df = df.filter(col(key).eq(lit(value))); df = df.filter(col(key).eq(lit(value)));
} }
@@ -106,7 +151,9 @@ impl HostCsvReader for DynamicState {
record_map.insert(field.name.to_string(), value.to_string()); record_map.insert(field.name.to_string(), value.to_string());
} }
} }
self.resources.push(CsvRow { values: record_map }).expect("Failed to create csv row") self.resources
.push(CsvRow { values: record_map })
.expect("Failed to create csv row")
} }
fn read_into_string(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> String { fn read_into_string(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> String {

View File

@@ -1,4 +1,7 @@
use super::{csv_reader::CsvReader, dynamic_state::{vato007::ingey::types::HostCsvReaders, DynamicState}}; use super::{
csv_reader::CsvReader,
dynamic_state::{vato007::ingey::types::HostCsvReaders, DynamicState},
};
use std::collections::HashMap; use std::collections::HashMap;
use wasmtime::component::Resource; use wasmtime::component::Resource;
@@ -8,8 +11,15 @@ pub struct CsvReadersData {
} }
impl HostCsvReaders for DynamicState { impl HostCsvReaders for DynamicState {
fn get_reader(&mut self, self_: Resource<CsvReadersData>, name: String) -> Option<Resource<CsvReader>> { fn get_reader(
let resource = self.resources.get(&self_).expect("Failed to find own resource"); &mut self,
self_: Resource<CsvReadersData>,
name: String,
) -> Option<Resource<CsvReader>> {
let resource = self
.resources
.get(&self_)
.expect("Failed to find own resource");
let file_path = resource.readers.get(&name); let file_path = resource.readers.get(&name);
if let Some(path) = file_path.cloned() { if let Some(path) = file_path.cloned() {
let csv_reader = CsvReader::new(path); let csv_reader = CsvReader::new(path);

View File

@@ -8,22 +8,42 @@ pub struct CsvRow {
impl HostCsvRow for DynamicState { impl HostCsvRow for DynamicState {
fn columns(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> { fn columns(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource"); let resource = self
.resources
.get(&self_)
.expect("Failed to find the required resource");
resource.values.keys().cloned().collect() resource.values.keys().cloned().collect()
} }
fn values(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> { fn values(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource"); let resource = self
.resources
.get(&self_)
.expect("Failed to find the required resource");
resource.values.values().cloned().collect() resource.values.values().cloned().collect()
} }
fn entries(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<(String, String,)> { fn entries(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<(String, String)> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource"); let resource = self
resource.values.keys().map(|key| (key.clone(), resource.values.get(key).unwrap().clone())).collect() .resources
.get(&self_)
.expect("Failed to find the required resource");
resource
.values
.keys()
.map(|key| (key.clone(), resource.values.get(key).unwrap().clone()))
.collect()
} }
fn value(&mut self, self_: wasmtime::component::Resource<CsvRow>, name: String) -> Option<String> { fn value(
let resource = self.resources.get(&self_).expect("Failed to find the required resource"); &mut self,
self_: wasmtime::component::Resource<CsvRow>,
name: String,
) -> Option<String> {
let resource = self
.resources
.get(&self_)
.expect("Failed to find the required resource");
resource.values.get(&name).cloned() resource.values.get(&name).cloned()
} }

View File

@@ -19,16 +19,28 @@ impl CsvWriterData {
} }
} }
impl HostCsvWriter for DynamicState { impl HostCsvWriter for DynamicState {
fn write_row(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: Vec<(String, String)>) -> () { fn write_row(
let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); &mut self,
self_: wasmtime::component::Resource<CsvWriterData>,
row: Vec<(String, String)>,
) -> () {
let resource = self
.resources
.get_mut(&self_)
.expect("Failed to find resource");
let write_data: BTreeMap<String, String> = row.into_iter().collect(); let write_data: BTreeMap<String, String> = row.into_iter().collect();
if !resource.wrote_header { if !resource.wrote_header {
resource.writer.write_header(&write_data).expect("Failed to write header"); resource
.writer
.write_header(&write_data)
.expect("Failed to write header");
resource.wrote_header = true; resource.wrote_header = true;
} }
resource.writer.write_record(write_data.values()).expect("Failed to write row"); resource
.writer
.write_record(write_data.values())
.expect("Failed to write row");
} }
fn drop(&mut self, rep: wasmtime::component::Resource<CsvWriterData>) -> wasmtime::Result<()> { fn drop(&mut self, rep: wasmtime::component::Resource<CsvWriterData>) -> wasmtime::Result<()> {

View File

@@ -22,7 +22,9 @@ pub struct DynamicState {
impl DynamicState { impl DynamicState {
pub fn new() -> DynamicState { pub fn new() -> DynamicState {
DynamicState { resources: ResourceTable::new() } DynamicState {
resources: ResourceTable::new(),
}
} }
} }

View File

@@ -12,13 +12,12 @@ use crate::graph::dynamic::csv_writer::CsvWriterData;
use crate::graph::dynamic::read_map::ReadMapData; use crate::graph::dynamic::read_map::ReadMapData;
use dynamic_state::{Dynamic, DynamicState}; use dynamic_state::{Dynamic, DynamicState};
mod csv_row;
mod csv_reader; mod csv_reader;
mod csv_readers; mod csv_readers;
mod csv_row;
mod csv_writer; mod csv_writer;
mod read_map; mod read_map;
#[derive(Serialize, Deserialize, Clone, JsonSchema)] #[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct DynamicNode { pub struct DynamicNode {
pub wasm_file_path: String, pub wasm_file_path: String,
@@ -39,13 +38,14 @@ impl RunnableNode for DynamicNodeRunner {
let component = Component::from_file(&engine, &self.dynamic_node.wasm_file_path)?; let component = Component::from_file(&engine, &self.dynamic_node.wasm_file_path)?;
let mut linker = Linker::new(&engine); let mut linker = Linker::new(&engine);
Dynamic::add_to_linker(&mut linker, |state: &mut DynamicState| state)?; Dynamic::add_to_linker(&mut linker, |state: &mut DynamicState| state)?;
let mut store = Store::new( let mut store = Store::new(&engine, DynamicState::new());
&engine,
DynamicState::new(),
);
let bindings = Dynamic::instantiate(&mut store, &component, &linker)?; let bindings = Dynamic::instantiate(&mut store, &component, &linker)?;
let read_map = store.data_mut().resources.push(ReadMapData { data: HashMap::new() })?; let read_map = store.data_mut().resources.push(ReadMapData {
let readers = store.data_mut().resources.push(CsvReadersData { readers: self.dynamic_node.input_file_paths.clone() })?; data: HashMap::new(),
})?;
let readers = store.data_mut().resources.push(CsvReadersData {
readers: self.dynamic_node.input_file_paths.clone(),
})?;
let writer = CsvWriterData::new(self.dynamic_node.output_file.clone())?; let writer = CsvWriterData::new(self.dynamic_node.output_file.clone())?;
let writer = store.data_mut().resources.push(writer)?; let writer = store.data_mut().resources.push(writer)?;
bindings.call_evaluate(&mut store, read_map, readers, writer)?; bindings.call_evaluate(&mut store, read_map, readers, writer)?;

View File

@@ -7,8 +7,16 @@ pub struct ReadMapData {
} }
impl HostReadMap for DynamicState { impl HostReadMap for DynamicState {
fn get(&mut self, self_: wasmtime::component::Resource<ReadMapData>, key: String) -> Option<String> { fn get(
self.resources.get(&self_).ok().map(|data| data.data.get(&key).cloned()).flatten() &mut self,
self_: wasmtime::component::Resource<ReadMapData>,
key: String,
) -> Option<String> {
self.resources
.get(&self_)
.ok()
.map(|data| data.data.get(&key).cloned())
.flatten()
} }
fn drop(&mut self, rep: wasmtime::component::Resource<ReadMapData>) -> wasmtime::Result<()> { fn drop(&mut self, rep: wasmtime::component::Resource<ReadMapData>) -> wasmtime::Result<()> {

View File

@@ -12,17 +12,22 @@ use tokio_util::compat::TokioAsyncWriteCompatExt;
/** /**
* Pull data from a db using a db query into a csv file that can be used by another node * Pull data from a db using a db query into a csv file that can be used by another node
*/ */
async fn pull_from_db(executor: &mut impl QueryExecutor, node: &PullFromDBNode) -> anyhow::Result<()> { async fn pull_from_db(
executor: &mut impl QueryExecutor,
node: &PullFromDBNode,
) -> anyhow::Result<()> {
let mut output_file = csv::Writer::from_path(node.output_data_source.path.clone())?; let mut output_file = csv::Writer::from_path(node.output_data_source.path.clone())?;
let mut first_row = true; let mut first_row = true;
executor.get_rows(&node.query, &node.parameters, &mut move |row| { executor
.get_rows(&node.query, &node.parameters, &mut move |row| {
if first_row { if first_row {
output_file.write_header(&row)?; output_file.write_header(&row)?;
first_row = false; first_row = false;
} }
output_file.write_record(row.values())?; output_file.write_record(row.values())?;
Ok(()) Ok(())
}).await?; })
.await?;
Ok(()) Ok(())
} }
@@ -81,14 +86,20 @@ mod tests {
async fn test_sql_server() -> anyhow::Result<()> { async fn test_sql_server() -> anyhow::Result<()> {
let container = GenericImage::new("mcr.microsoft.com/mssql/server", "2022-latest") let container = GenericImage::new("mcr.microsoft.com/mssql/server", "2022-latest")
.with_exposed_port(1433.tcp()) .with_exposed_port(1433.tcp())
.with_wait_for(WaitFor::message_on_stdout("Recovery is complete.".to_owned())) .with_wait_for(WaitFor::message_on_stdout(
"Recovery is complete.".to_owned(),
))
.with_env_var("ACCEPT_EULA", "Y") .with_env_var("ACCEPT_EULA", "Y")
.with_env_var("MSSQL_SA_PASSWORD", "TestOnlyContainer123") .with_env_var("MSSQL_SA_PASSWORD", "TestOnlyContainer123")
.start() .start()
.await?; .await?;
let host = container.get_host().await?; let host = container.get_host().await?;
let port = container.get_host_port_ipv4(1433).await?; let port = container.get_host_port_ipv4(1433).await?;
let connection_string = format!("jdbc:sqlserver://{}:{};username=sa;password=TestOnlyContainer123", host, port).to_owned(); let connection_string = format!(
"jdbc:sqlserver://{}:{};username=sa;password=TestOnlyContainer123",
host, port
)
.to_owned();
let runner = PullFromDBNodeRunner { let runner = PullFromDBNodeRunner {
pull_from_db_node: PullFromDBNode { pull_from_db_node: PullFromDBNode {
@@ -100,7 +111,7 @@ mod tests {
path: PathBuf::from("test_pull.csv"), path: PathBuf::from("test_pull.csv"),
source_type: CSV, source_type: CSV,
}, },
} },
}; };
runner.run().await?; runner.run().await?;
let mut result_contents = String::new(); let mut result_contents = String::new();
@@ -109,8 +120,7 @@ mod tests {
"Test "Test
1 1
", ",
result_contents result_contents,
,
"Should pull the correct data from sql" "Should pull the correct data from sql"
); );
Ok(()) Ok(())

View File

@@ -9,13 +9,27 @@ use polars::prelude::{col, lit, CsvWriter, Expr, LazyCsvReader, LazyFileListRead
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
fn reduce(
fn reduce(grouping_nodes: &Vec<String>, operations: &Vec<ReductionOperation>, input: &DataSource, output: &DataSource) -> anyhow::Result<()> { grouping_nodes: &Vec<String>,
operations: &Vec<ReductionOperation>,
input: &DataSource,
output: &DataSource,
) -> anyhow::Result<()> {
let df = LazyCsvReader::new(&input.path).finish()?; let df = LazyCsvReader::new(&input.path).finish()?;
let mut df = df let mut df = df
.group_by(grouping_nodes.iter().map(|column| col(column)).collect_vec()) .group_by(
.agg(&operations.iter().map(|operation| operation.to_aggregate_function()).collect_vec()) grouping_nodes
.iter()
.map(|column| col(column))
.collect_vec(),
)
.agg(
&operations
.iter()
.map(|operation| operation.to_aggregate_function())
.collect_vec(),
)
.collect()?; .collect()?;
let mut file = File::create(&output.path)?; let mut file = File::create(&output.path)?;
CsvWriter::new(&mut file).finish(&mut df)?; CsvWriter::new(&mut file).finish(&mut df)?;
@@ -58,7 +72,16 @@ impl ReductionOperation {
ReductionOperationType::Min => col(&self.column_name).min(), ReductionOperationType::Min => col(&self.column_name).min(),
ReductionOperationType::Average => col(&self.column_name).mean(), ReductionOperationType::Average => col(&self.column_name).mean(),
ReductionOperationType::Count => col(&self.column_name).count(), ReductionOperationType::Count => col(&self.column_name).count(),
ReductionOperationType::Concat(concat_properties) => lit(concat_properties.prefix.clone()).append(col(&self.column_name).list().join(lit(concat_properties.separator.clone()), true), false).append(lit(concat_properties.suffix.clone()), false), ReductionOperationType::Concat(concat_properties) => {
lit(concat_properties.prefix.clone())
.append(
col(&self.column_name)
.list()
.join(lit(concat_properties.separator.clone()), true),
false,
)
.append(lit(concat_properties.suffix.clone()), false)
}
} }
} }
} }
@@ -77,7 +100,12 @@ pub struct ReductionNodeRunner {
#[async_trait] #[async_trait]
impl RunnableNode for ReductionNodeRunner { impl RunnableNode for ReductionNodeRunner {
async fn run(&self) -> anyhow::Result<()> { async fn run(&self) -> anyhow::Result<()> {
reduce(&self.reduction_node.grouping_columns, &self.reduction_node.operations, &self.reduction_node.input_file, &self.reduction_node.output_file)?; reduce(
&self.reduction_node.grouping_columns,
&self.reduction_node.operations,
&self.reduction_node.input_file,
&self.reduction_node.output_file,
)?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,13 +1,12 @@
use futures::TryStreamExt; use futures::TryStreamExt;
use futures_io::{AsyncRead, AsyncWrite}; use futures_io::{AsyncRead, AsyncWrite};
use sqlx::{Any, Column, Pool, Row}; use sqlx::{Any, Column, Executor, Pool, Row};
use std::borrow::Borrow; use std::borrow::Borrow;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use tiberius::{Client, Query}; use tiberius::{Client, Query};
pub trait QueryExecutor { pub trait QueryExecutor {
// Retrieve data from a database /// Retrieve data from a database
async fn get_rows( async fn get_rows(
&mut self, &mut self,
query: &str, query: &str,
@@ -16,8 +15,11 @@ pub trait QueryExecutor {
row_consumer: &mut impl FnMut(BTreeMap<String, String>) -> anyhow::Result<()>, row_consumer: &mut impl FnMut(BTreeMap<String, String>) -> anyhow::Result<()>,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
// Run a query that returns no results (e.g. bulk insert, insert) /// Run a query that returns no results (e.g. bulk insert, insert)
async fn execute_query(&mut self, query: &str, params: &[String]) -> anyhow::Result<u64>; async fn execute_query(&mut self, query: &str, params: &[String]) -> anyhow::Result<u64>;
/// Execute an unprepared query. Avoid where possible as sql injection is possible if not used carefully
async fn execute_unchecked(&mut self, query: &str) -> anyhow::Result<u64>;
} }
impl<S: AsyncRead + AsyncWrite + Unpin + Send> QueryExecutor for Client<S> { impl<S: AsyncRead + AsyncWrite + Unpin + Send> QueryExecutor for Client<S> {
@@ -38,8 +40,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> QueryExecutor for Client<S> {
let mut returned_row = BTreeMap::new(); let mut returned_row = BTreeMap::new();
// TODO: Check how empty columns are handled by tiberius // TODO: Check how empty columns are handled by tiberius
for column in row.columns().into_iter() { for column in row.columns().into_iter() {
returned_row.insert(column.name().to_owned(), row.get(column.name()).unwrap_or_else(|| "") returned_row.insert(
.to_owned()); column.name().to_owned(),
row.get(column.name()).unwrap_or_else(|| "").to_owned(),
);
} }
row_consumer(returned_row)?; row_consumer(returned_row)?;
} }
@@ -57,6 +61,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> QueryExecutor for Client<S> {
} }
Ok(result.rows_affected()[0]) Ok(result.rows_affected()[0])
} }
async fn execute_unchecked(&mut self, query: &str) -> anyhow::Result<u64> {
let result = self.execute(query, &[]).await?;
Ok(result.rows_affected()[0])
}
} }
impl QueryExecutor for Pool<Any> { impl QueryExecutor for Pool<Any> {
@@ -90,4 +99,9 @@ impl QueryExecutor for Pool<Any> {
let result = query.execute(self.borrow()).await?; let result = query.execute(self.borrow()).await?;
Ok(result.rows_affected()) Ok(result.rows_affected())
} }
async fn execute_unchecked(&mut self, query: &str) -> anyhow::Result<u64> {
let result = self.execute(query).await?;
Ok(result.rows_affected())
}
} }

View File

@@ -75,7 +75,7 @@ mod tests {
data_source: DataSource { data_source: DataSource {
source_type: SourceType::CSV, source_type: SourceType::CSV,
path: PathBuf::from("./testing/test.csv"), path: PathBuf::from("./testing/test.csv"),
} },
}], }],
&output_path, &output_path,
&"SELECT * FROM Account WHERE Code = 'A195950'".to_owned(), &"SELECT * FROM Account WHERE Code = 'A195950'".to_owned(),

View File

@@ -4,7 +4,8 @@ use itertools::Itertools;
use log::{log, Level}; use log::{log, Level};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::AnyPool; use sqlx::any::install_default_drivers;
use sqlx::{AnyConnection, AnyPool};
use std::{collections::HashMap, fmt::format}; use std::{collections::HashMap, fmt::format};
use tiberius::{Config, EncryptionLevel}; use tiberius::{Config, EncryptionLevel};
use tokio_util::compat::TokioAsyncWriteCompatExt; use tokio_util::compat::TokioAsyncWriteCompatExt;
@@ -22,25 +23,35 @@ pub async fn upload_file_bulk(
let mut rows_affected = None; let mut rows_affected = None;
if upload_node.column_mappings.is_none() { if upload_node.column_mappings.is_none() {
let insert_from_file_query = match upload_node.db_type { let insert_from_file_query = match upload_node.db_type {
DBType::Postgres => Some(format!("COPY {} FROM $1", upload_node.table_name)), DBType::Postgres => Some(format!(
DBType::Mysql => Some(format!( r#"COPY "{}" FROM '{}' DELIMITERS ',' CSV HEADER QUOTE '"';"#,
"LOAD DATA INFILE ? INTO {}", upload_node.table_name, upload_node.file_path
upload_node.table_name, )),
// TODO: Revisit this when sqlx lets this work, currently mysql won't allow this
// to execute since sqlx forces it to be inside of a prepare, which
// isn't allowed
DBType::Mysql => Some(format!(
"LOAD DATA INFILE '{}' INTO TABLE `{}`
FIELDS TERMINATED BY ',' ENCLOSED BY '\"'
LINES TERMINATED BY '\n'
IGNORE 1 LINES;",
upload_node.file_path, upload_node.table_name,
)),
DBType::Mssql => Some(format!(
"BULK INSERT [{}] FROM '{}' WITH ( FORMAT = 'CSV', FIRSTROW = 2 );",
upload_node.table_name, upload_node.file_path
)), )),
DBType::Mssql => Some(format!("DECLARE @insertStatement VARCHAR(MAX) = 'BULK INSERT [{}] FROM ''' + @P1 + ''' WITH ( FORMAT = ''CSV'', FIRSTROW = 2 )'; EXEC(@insertStatement);", upload_node.table_name)),
_ => None, _ => None,
}; };
if let Some(insert_from_file_query) = insert_from_file_query { if let Some(insert_from_file_query) = insert_from_file_query {
let result = executor let result = executor.execute_unchecked(&insert_from_file_query).await;
.execute_query(
&insert_from_file_query,
&[upload_node.file_path.clone()],
)
.await;
if let Ok(result) = result { if let Ok(result) = result {
rows_affected = Some(result); rows_affected = Some(result);
} else { } else {
log!(Level::Debug, "Failed to bulk insert, trying sql insert instead"); log!(
Level::Debug,
"Failed to bulk insert, trying sql insert instead"
);
} }
} }
} }
@@ -73,7 +84,15 @@ pub async fn upload_file_bulk(
for result in file_reader.records() { for result in file_reader.records() {
let result = result?; let result = result?;
insert_query = insert_query insert_query = insert_query
+ format!("VALUES ({})", result.iter().enumerate().map(|(index, _)| db_type.get_param_name(index)).join(",")).as_str(); + format!(
"VALUES ({})",
result
.iter()
.enumerate()
.map(|(index, _)| db_type.get_param_name(index))
.join(",")
)
.as_str();
let mut values = result.iter().map(|value| value.to_owned()).collect_vec(); let mut values = result.iter().map(|value| value.to_owned()).collect_vec();
params.append(&mut values); params.append(&mut values);
num_params += csv_columns.len(); num_params += csv_columns.len();
@@ -149,6 +168,7 @@ impl RunnableNode for UploadNodeRunner {
let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?; let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
upload_file_bulk(&mut client, &upload_node, BIND_LIMIT, &upload_node.db_type).await?; upload_file_bulk(&mut client, &upload_node, BIND_LIMIT, &upload_node.db_type).await?;
} else { } else {
install_default_drivers();
let mut pool = AnyPool::connect(&upload_node.connection_string).await?; let mut pool = AnyPool::connect(&upload_node.connection_string).await?;
upload_file_bulk(&mut pool, &upload_node, BIND_LIMIT, &upload_node.db_type).await?; upload_file_bulk(&mut pool, &upload_node, BIND_LIMIT, &upload_node.db_type).await?;
} }
@@ -158,11 +178,11 @@ impl RunnableNode for UploadNodeRunner {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::path::{self, PathBuf};
use crate::graph::node::RunnableNode; use crate::graph::node::RunnableNode;
use crate::graph::upload_to_db::{DBType, UploadNode, UploadNodeRunner}; use crate::graph::upload_to_db::{DBType, UploadNode, UploadNodeRunner};
use testcontainers::core::{IntoContainerPort, Mount, WaitFor}; use sqlx::{AnyPool, Row};
use std::path::PathBuf;
use testcontainers::core::{IntoContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner; use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, ImageExt}; use testcontainers::{GenericImage, ImageExt};
use tiberius::{Config, EncryptionLevel}; use tiberius::{Config, EncryptionLevel};
@@ -170,12 +190,20 @@ mod tests {
#[tokio::test] #[tokio::test]
pub async fn check_bulk_upload_mssql() -> anyhow::Result<()> { pub async fn check_bulk_upload_mssql() -> anyhow::Result<()> {
let container = GenericImage::new("gitea.michaelpivato.dev/vato007/ingey-test-db-mssql", "latest") let container = GenericImage::new(
"gitea.michaelpivato.dev/vato007/ingey-test-db-mssql",
"latest",
)
.with_exposed_port(1433.tcp()) .with_exposed_port(1433.tcp())
.with_wait_for(WaitFor::message_on_stdout("Recovery is complete.".to_owned())) .with_wait_for(WaitFor::message_on_stdout(
"Recovery is complete.".to_owned(),
))
.with_env_var("ACCEPT_EULA", "Y") .with_env_var("ACCEPT_EULA", "Y")
.with_env_var("MSSQL_SA_PASSWORD", "TestOnlyContainer123") .with_env_var("MSSQL_SA_PASSWORD", "TestOnlyContainer123")
.with_copy_to("/upload_to_db/test.csv", PathBuf::from("./testing/input/upload_to_db/test.csv")) .with_copy_to(
"/upload_to_db/test.csv",
PathBuf::from("./testing/input/upload_to_db/test.csv"),
)
.start() .start()
.await?; .await?;
let host = container.get_host().await?; let host = container.get_host().await?;
@@ -191,7 +219,7 @@ mod tests {
post_script: None, post_script: None,
db_type: DBType::Mssql, db_type: DBType::Mssql,
connection_string, connection_string,
} },
}; };
upload_node.run().await?; upload_node.run().await?;
let mut config = Config::from_jdbc_string(&upload_node.upload_node.connection_string)?; let mut config = Config::from_jdbc_string(&upload_node.upload_node.connection_string)?;
@@ -209,4 +237,102 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
pub async fn check_bulk_upload_postgres() -> anyhow::Result<()> {
let container = GenericImage::new(
"gitea.michaelpivato.dev/vato007/ingey-test-db-postgres",
"latest",
)
.with_exposed_port(5432.tcp())
.with_wait_for(WaitFor::message_on_stderr(
"database system is ready to accept connections",
))
.with_env_var("POSTGRES_PASSWORD", "TestOnlyContainer123")
.with_copy_to(
"/upload_to_db/test.csv",
PathBuf::from("./testing/input/upload_to_db/test.csv"),
)
.start()
.await?;
let host = container.get_host().await?;
let port = container.get_host_port_ipv4(5432).await?;
let connection_string = format!(
"postgres://postgres:TestOnlyContainer123@{}:{}/testingeydatabase",
host, port
)
.to_owned();
let file = "/upload_to_db/test.csv".to_owned();
let table_name = "My Test Table".to_string();
let upload_node = UploadNodeRunner {
upload_node: UploadNode {
file_path: file.to_owned(),
table_name: table_name.clone(),
column_mappings: None,
post_script: None,
db_type: DBType::Postgres,
connection_string,
},
};
upload_node.run().await?;
let pool = AnyPool::connect(&upload_node.upload_node.connection_string).await?;
let result = sqlx::query(r#"SELECT * FROM "My Test Table""#)
.fetch_one(&pool)
.await?;
let column1: i32 = result.try_get("column1")?;
let column2: &str = result.try_get("column2")?;
assert_eq!(1, column1);
assert_eq!("Hello", column2);
Ok(())
}
#[tokio::test]
pub async fn check_bulk_upload_mysql() -> anyhow::Result<()> {
let container = GenericImage::new(
"gitea.michaelpivato.dev/vato007/ingey-test-db-mysql",
"latest",
)
.with_exposed_port(3306.tcp())
.with_wait_for(WaitFor::message_on_stderr("ready for connections."))
.with_env_var("MYSQL_ROOT_PASSWORD", "TestOnlyContainer123")
.with_copy_to(
"/upload_to_db/test.csv",
PathBuf::from("./testing/input/upload_to_db/test.csv"),
)
// https://dev.mysql.com/doc/refman/8.4/en/server-system-variables.html#sysvar_secure_file_priv
.with_cmd(&["--secure-file-priv=/upload_to_db".to_owned()])
.start()
.await?;
let host = container.get_host().await?;
let port = container.get_host_port_ipv4(3306).await?;
let connection_string = format!(
"mysql://root:TestOnlyContainer123@{}:{}/TestIngeyDatabase",
host, port
)
.to_owned();
let file = "/upload_to_db/test.csv".to_owned();
let table_name = "My Test Table".to_string();
let upload_node = UploadNodeRunner {
upload_node: UploadNode {
file_path: file.to_owned(),
table_name: table_name.clone(),
column_mappings: None,
post_script: None,
db_type: DBType::Mysql,
connection_string,
},
};
upload_node.run().await?;
let pool = AnyPool::connect(&upload_node.upload_node.connection_string).await?;
let result = sqlx::query(r#"SELECT * FROM `My Test Table`"#)
.fetch_one(&pool)
.await?;
let column1: i32 = result.try_get("column1")?;
let column2: &str = result.try_get("column2")?;
assert_eq!(1, column1);
assert_eq!("Hello", column2);
Ok(())
}
} }

View File

@@ -892,8 +892,7 @@ mod tests {
assert_eq!(*expected_department_a, *final_department); assert_eq!(*expected_department_a, *final_department);
} else if final_department.department == expected_department_b.department { } else if final_department.department == expected_department_b.department {
assert_eq!(*expected_department_b, *final_department); assert_eq!(*expected_department_b, *final_department);
} } else {
else {
panic!("Unknown department found!"); panic!("Unknown department found!");
} }
} }

View File

@@ -134,9 +134,7 @@ pub fn build_polars(
// TODO: What I really want to do is not use source type, instead I want to be referring to a file, which we translate from the sourcetype // TODO: What I really want to do is not use source type, instead I want to be referring to a file, which we translate from the sourcetype
// to an actual filename. I don't want to be limited by a concept of 'sourcetype' at all, instead the definition should treat everything // to an actual filename. I don't want to be limited by a concept of 'sourcetype' at all, instead the definition should treat everything
// the same, and just translate the imported csv format to the necessary files and columns in files that are expected to be input. // the same, and just translate the imported csv format to the necessary files and columns in files that are expected to be input.
Component::Field(_, column) => { Component::Field(_, column) => built_expression = built_expression + col(column),
built_expression = built_expression + col(column)
}
} }
} }

View File

@@ -381,7 +381,10 @@ mod tests {
#[test] #[test]
fn test_read_definitions() { fn test_read_definitions() {
let definitions = read_definitions( let definitions = read_definitions(
&mut csv::Reader::from_path("testing/input/create_products/service_builder_definitions.csv").unwrap(), &mut csv::Reader::from_path(
"testing/input/create_products/service_builder_definitions.csv",
)
.unwrap(),
); );
if let Err(error) = &definitions { if let Err(error) = &definitions {
println!("{}", error) println!("{}", error)