diff --git a/src/graph/mod.rs b/src/graph/mod.rs index e9a61c2..8305e42 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -1,15 +1,12 @@ use std::{ cmp::{min, Ordering}, collections::{HashMap, HashSet}, - sync::{ - mpsc, Arc, - }, + sync::{mpsc, Arc}, }; use chrono::Local; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use split::{SplitNode, SplitNodeRunner}; use tokio::sync::mpsc::Sender; use crate::graph::derive::DeriveNodeRunner; @@ -24,15 +21,14 @@ use { }; mod derive; +mod dynamic; mod filter; mod node; mod pull_from_db; -mod split; +mod reduction; mod sql; mod sql_rule; mod upload_to_db; -mod dynamic; -mod reduction; #[derive(Serialize, Deserialize, Clone, JsonSchema)] pub enum NodeConfiguration { @@ -44,7 +40,6 @@ pub enum NodeConfiguration { UploadNode(UploadNode), SQLNode(SQLNode), Dynamic(DynamicNode), - SplitNode(SplitNode), ReductionNode(ReductionNode), } @@ -147,8 +142,9 @@ fn get_runnable_node(node: Node) -> Box { NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }), NodeConfiguration::SQLNode(sql_node) => Box::new(SQLNodeRunner { sql_node }), NodeConfiguration::Dynamic(dynamic_node) => Box::new(DynamicNodeRunner { dynamic_node }), - NodeConfiguration::SplitNode(split_node) => Box::new(SplitNodeRunner { split_node }), - NodeConfiguration::ReductionNode(reduction_node) => Box::new(ReductionNodeRunner { reduction_node }), + NodeConfiguration::ReductionNode(reduction_node) => { + Box::new(ReductionNodeRunner { reduction_node }) + } } } @@ -175,7 +171,11 @@ impl RunnableGraph { RunnableGraph { graph } } - pub async fn run_default_tasks(&self, num_threads: usize, status_changed: F) -> anyhow::Result<()> + pub async fn run_default_tasks( + &self, + num_threads: usize, + status_changed: F, + ) -> anyhow::Result<()> where F: Fn(i64, NodeStatus), { @@ -183,7 +183,8 @@ impl RunnableGraph { num_threads, Box::new(|node| get_runnable_node(node)), status_changed, - ).await + ) + .await } pub async fn run<'a, F, StatusChanged>( @@ -279,9 +280,9 @@ impl RunnableGraph { while running_threads.len() < num_threads && i < nodes.len() { if !running_nodes.contains(&nodes[i].id) && nodes[i] - .dependent_node_ids - .iter() - .all(|id| completed_nodes.contains(id)) + .dependent_node_ids + .iter() + .all(|id| completed_nodes.contains(id)) { let node = nodes.remove(i); for i in 0..num_threads { @@ -328,8 +329,14 @@ mod tests { name: "Hello".to_owned(), configuration: NodeConfiguration::FilterNode(super::FilterNode { filters: vec![], - input_data_source: DataSource { path: PathBuf::from(""), source_type: SourceType::CSV }, - output_data_source: DataSource { path: PathBuf::from(""), source_type: SourceType::CSV }, + input_data_source: DataSource { + path: PathBuf::from(""), + source_type: SourceType::CSV, + }, + output_data_source: DataSource { + path: PathBuf::from(""), + source_type: SourceType::CSV, + }, }), output_files: vec![], dynamic_configuration: None, diff --git a/src/graph/split.rs b/src/graph/split.rs deleted file mode 100644 index 2a63604..0000000 --- a/src/graph/split.rs +++ /dev/null @@ -1,177 +0,0 @@ -use async_trait::async_trait; -use chrono::DateTime; -// use polars::io::SerReader; -// use polars::prelude::ParquetReader; -use polars::{ - io::SerWriter, - prelude::{CsvWriter, LazyCsvReader, LazyFileListReader}, -}; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -// use std::fs::File; -use tempfile::tempfile; - -use crate::io::RecordSerializer; - -use super::{ - derive::{self, DataValidator, DeriveFilter}, - node::RunnableNode, -}; - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub enum DatePart { - Year, - Month, - Week, - Day, - Hour, - Minute, - Second, -} - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub enum SplitType { - // Column, frequency - DateTime(DatePart), - Numeric(isize), -} - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub struct SplitOnChangeInColumn { - id_column: String, - change_column: String, - limit: Option, -} - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub struct Split { - column: String, - split_type: SplitType, - // If specified, a split will also be made when the change column changes for the id column - change_in_column: Option, -} - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub struct SplitRule { - pub filters: Vec, - pub splits: Vec, -} - -impl SplitRule { - fn to_runnable_rule(&self) -> anyhow::Result { - let filters = derive::to_filter_rules(&self.filters)?; - Ok(RunnableSplitRule { - filters, - splits: self.splits.clone(), - }) - } -} - -#[derive(Serialize, Deserialize, Clone, JsonSchema)] -pub struct SplitNode { - pub filters: Vec, - pub rules: Vec, - pub input_file_path: String, - pub output_file_path: String, -} - -pub struct RunnableSplitRule { - pub filters: Vec>, - pub splits: Vec, -} - -pub struct SplitNodeRunner { - pub split_node: SplitNode, -} - -fn split_line( - line: BTreeMap, - rules: &Vec, - output: &mut impl RecordSerializer, - date_format: &str, - last_split_value: Option<(String, String)>, -) -> anyhow::Result> { - let mut result_lines = vec![]; - for rule in rules { - if !derive::is_line_valid(&line, &rule.filters) { - continue; - } - for split in &rule.splits { - let value = line.get(&split.column); - if let Some(value) = value { - // Parse the value in the column for the rule - match &split.split_type { - SplitType::DateTime(frequency) => { - let date_time = DateTime::parse_from_str(&value, &date_format)?; - // TODO: Now split the row up based on the frequency in the rule - } - SplitType::Numeric(frequency) => { - // TODO: Just skip unparsable values, log out it's incorrect? - let number = value.parse::()?; - } - } - } - } - } - - result_lines.push(line); - for line in result_lines { - output.serialize(line)?; - } - Ok(None) -} - -fn split( - rules: &Vec, - input: &String, - output: &mut impl RecordSerializer, - date_format: &str, -) -> anyhow::Result<()> { - // First sort the input file into the output file - - let mut temp_path = tempfile()?; - - // This needs to be done for each split rule with a change column specified - // TODO: Add parquet support (both read and write) - // let file = File::open(input)?; - // let df = ParquetReader::new(file).finish()?; - let df = LazyCsvReader::new(input).finish()?; - // TODO: Needs sorting - let df = df.sort(["", ""], Default::default()); - CsvWriter::new(&mut temp_path).finish(&mut df.collect()?)?; - - // Then read from the temporary file (since it's sorted) and do the standard split over each row - let mut input = csv::Reader::from_reader(temp_path); - if let Some(line) = input.deserialize().next() { - let line: BTreeMap = line?; - output.write_header(&line)?; - let mut last_split_value = split_line(line, rules, output, &date_format, None)?; - - for line in input.deserialize() { - let line: BTreeMap = line?; - last_split_value = split_line(line, rules, output, &date_format, last_split_value)?; - } - } - Ok(()) -} - -#[async_trait] -impl RunnableNode for SplitNodeRunner { - async fn run(&self) -> anyhow::Result<()> { - let mut output = csv::Writer::from_path(&self.split_node.output_file_path)?; - let rules: anyhow::Result> = self - .split_node - .rules - .iter() - .map(|rule| rule.to_runnable_rule()) - .collect(); - let rules = rules?; - split( - &rules, - &self.split_node.input_file_path, - &mut output, - "%Y-%m-%d %H-%M-%S", - ) - } -}