95 lines
2.6 KiB
Rust
95 lines
2.6 KiB
Rust
use std::fs::File;
|
|
|
|
use super::node::RunnableNode;
|
|
use crate::io::{DataSource, SourceType};
|
|
use async_trait::async_trait;
|
|
use polars::prelude::{LazyFrame, ScanArgsParquet};
|
|
use polars::{
|
|
io::SerWriter,
|
|
prelude::{CsvWriter, LazyCsvReader, LazyFileListReader},
|
|
};
|
|
use polars_sql::SQLContext;
|
|
use schemars::JsonSchema;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
|
|
pub struct SqlFile {
|
|
pub name: String,
|
|
pub data_source: DataSource,
|
|
}
|
|
|
|
/**
|
|
* Run SQL over files using polars, export results to output file
|
|
*/
|
|
fn run_sql(files: &Vec<SqlFile>, output_path: &String, query: &String) -> anyhow::Result<()> {
|
|
let mut ctx = SQLContext::new();
|
|
for file in files {
|
|
let df = match file.data_source.source_type {
|
|
SourceType::CSV => LazyCsvReader::new(&file.data_source.path).finish()?,
|
|
SourceType::PARQUET => {
|
|
LazyFrame::scan_parquet(&file.data_source.path, ScanArgsParquet::default())?
|
|
}
|
|
};
|
|
ctx.register(&file.name, df);
|
|
}
|
|
let result = ctx.execute(&query)?;
|
|
let mut file = File::create(output_path)?;
|
|
CsvWriter::new(&mut file).finish(&mut result.collect()?)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
|
|
pub struct SQLNode {
|
|
pub files: Vec<SqlFile>,
|
|
pub output_file: String,
|
|
pub query: String,
|
|
}
|
|
|
|
pub struct SQLNodeRunner {
|
|
pub sql_node: SQLNode,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl RunnableNode for SQLNodeRunner {
|
|
async fn run(&self) -> anyhow::Result<()> {
|
|
run_sql(
|
|
&self.sql_node.files,
|
|
&self.sql_node.output_file,
|
|
&self.sql_node.query,
|
|
)
|
|
}
|
|
}
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::{run_sql, SqlFile};
|
|
use crate::io::{DataSource, SourceType};
|
|
use std::path::PathBuf;
|
|
use std::{fs::File, io::Read};
|
|
|
|
#[test]
|
|
fn basic_query_works() -> anyhow::Result<()> {
|
|
let output_path = "./testing/output/output.csv".to_owned();
|
|
run_sql(
|
|
&vec![SqlFile {
|
|
name: "Account".to_owned(),
|
|
data_source: DataSource {
|
|
source_type: SourceType::CSV,
|
|
path: PathBuf::from("./testing/test.csv"),
|
|
}
|
|
}],
|
|
&output_path,
|
|
&"SELECT * FROM Account WHERE Code = 'A195950'".to_owned(),
|
|
)?;
|
|
let mut output = String::new();
|
|
let mut output_file = File::open(output_path)?;
|
|
output_file.read_to_string(&mut output)?;
|
|
assert_eq!(
|
|
output,
|
|
"Code,Description,Type,CostOutput,PercentFixed
|
|
A195950,A195950 Staff Related Other,E,GS,100.0
|
|
"
|
|
);
|
|
Ok(())
|
|
}
|
|
}
|