Delete split node. It won't be very useful in most circumstances, and complex use cases that involve row splitting can be served via wasm or sql nodes instead.
All checks were successful
test / test (push) Successful in 5m39s

This commit is contained in:
2025-02-11 07:58:55 +10:30
parent 9578bd6965
commit c8fd8734eb
2 changed files with 24 additions and 194 deletions

View File

@@ -1,15 +1,12 @@
use std::{
cmp::{min, Ordering},
collections::{HashMap, HashSet},
sync::{
mpsc, Arc,
},
sync::{mpsc, Arc},
};
use chrono::Local;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use split::{SplitNode, SplitNodeRunner};
use tokio::sync::mpsc::Sender;
use crate::graph::derive::DeriveNodeRunner;
@@ -24,15 +21,14 @@ use {
};
mod derive;
mod dynamic;
mod filter;
mod node;
mod pull_from_db;
mod split;
mod reduction;
mod sql;
mod sql_rule;
mod upload_to_db;
mod dynamic;
mod reduction;
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum NodeConfiguration {
@@ -44,7 +40,6 @@ pub enum NodeConfiguration {
UploadNode(UploadNode),
SQLNode(SQLNode),
Dynamic(DynamicNode),
SplitNode(SplitNode),
ReductionNode(ReductionNode),
}
@@ -147,8 +142,9 @@ fn get_runnable_node(node: Node) -> Box<dyn RunnableNode + Send> {
NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }),
NodeConfiguration::SQLNode(sql_node) => Box::new(SQLNodeRunner { sql_node }),
NodeConfiguration::Dynamic(dynamic_node) => Box::new(DynamicNodeRunner { dynamic_node }),
NodeConfiguration::SplitNode(split_node) => Box::new(SplitNodeRunner { split_node }),
NodeConfiguration::ReductionNode(reduction_node) => Box::new(ReductionNodeRunner { reduction_node }),
NodeConfiguration::ReductionNode(reduction_node) => {
Box::new(ReductionNodeRunner { reduction_node })
}
}
}
@@ -175,7 +171,11 @@ impl RunnableGraph {
RunnableGraph { graph }
}
pub async fn run_default_tasks<F>(&self, num_threads: usize, status_changed: F) -> anyhow::Result<()>
pub async fn run_default_tasks<F>(
&self,
num_threads: usize,
status_changed: F,
) -> anyhow::Result<()>
where
F: Fn(i64, NodeStatus),
{
@@ -183,7 +183,8 @@ impl RunnableGraph {
num_threads,
Box::new(|node| get_runnable_node(node)),
status_changed,
).await
)
.await
}
pub async fn run<'a, F, StatusChanged>(
@@ -279,9 +280,9 @@ impl RunnableGraph {
while running_threads.len() < num_threads && i < nodes.len() {
if !running_nodes.contains(&nodes[i].id)
&& nodes[i]
.dependent_node_ids
.iter()
.all(|id| completed_nodes.contains(id))
.dependent_node_ids
.iter()
.all(|id| completed_nodes.contains(id))
{
let node = nodes.remove(i);
for i in 0..num_threads {
@@ -328,8 +329,14 @@ mod tests {
name: "Hello".to_owned(),
configuration: NodeConfiguration::FilterNode(super::FilterNode {
filters: vec![],
input_data_source: DataSource { path: PathBuf::from(""), source_type: SourceType::CSV },
output_data_source: DataSource { path: PathBuf::from(""), source_type: SourceType::CSV },
input_data_source: DataSource {
path: PathBuf::from(""),
source_type: SourceType::CSV,
},
output_data_source: DataSource {
path: PathBuf::from(""),
source_type: SourceType::CSV,
},
}),
output_files: vec![],
dynamic_configuration: None,

View File

@@ -1,177 +0,0 @@
use async_trait::async_trait;
use chrono::DateTime;
// use polars::io::SerReader;
// use polars::prelude::ParquetReader;
use polars::{
io::SerWriter,
prelude::{CsvWriter, LazyCsvReader, LazyFileListReader},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// use std::fs::File;
use tempfile::tempfile;
use crate::io::RecordSerializer;
use super::{
derive::{self, DataValidator, DeriveFilter},
node::RunnableNode,
};
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum DatePart {
Year,
Month,
Week,
Day,
Hour,
Minute,
Second,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum SplitType {
// Column, frequency
DateTime(DatePart),
Numeric(isize),
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct SplitOnChangeInColumn {
id_column: String,
change_column: String,
limit: Option<u64>,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct Split {
column: String,
split_type: SplitType,
// If specified, a split will also be made when the change column changes for the id column
change_in_column: Option<SplitOnChangeInColumn>,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct SplitRule {
pub filters: Vec<DeriveFilter>,
pub splits: Vec<Split>,
}
impl SplitRule {
fn to_runnable_rule(&self) -> anyhow::Result<RunnableSplitRule> {
let filters = derive::to_filter_rules(&self.filters)?;
Ok(RunnableSplitRule {
filters,
splits: self.splits.clone(),
})
}
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct SplitNode {
pub filters: Vec<DeriveFilter>,
pub rules: Vec<SplitRule>,
pub input_file_path: String,
pub output_file_path: String,
}
pub struct RunnableSplitRule {
pub filters: Vec<Box<dyn DataValidator>>,
pub splits: Vec<Split>,
}
pub struct SplitNodeRunner {
pub split_node: SplitNode,
}
fn split_line(
line: BTreeMap<String, String>,
rules: &Vec<RunnableSplitRule>,
output: &mut impl RecordSerializer,
date_format: &str,
last_split_value: Option<(String, String)>,
) -> anyhow::Result<Option<(String, String)>> {
let mut result_lines = vec![];
for rule in rules {
if !derive::is_line_valid(&line, &rule.filters) {
continue;
}
for split in &rule.splits {
let value = line.get(&split.column);
if let Some(value) = value {
// Parse the value in the column for the rule
match &split.split_type {
SplitType::DateTime(frequency) => {
let date_time = DateTime::parse_from_str(&value, &date_format)?;
// TODO: Now split the row up based on the frequency in the rule
}
SplitType::Numeric(frequency) => {
// TODO: Just skip unparsable values, log out it's incorrect?
let number = value.parse::<f64>()?;
}
}
}
}
}
result_lines.push(line);
for line in result_lines {
output.serialize(line)?;
}
Ok(None)
}
fn split(
rules: &Vec<RunnableSplitRule>,
input: &String,
output: &mut impl RecordSerializer,
date_format: &str,
) -> anyhow::Result<()> {
// First sort the input file into the output file
let mut temp_path = tempfile()?;
// This needs to be done for each split rule with a change column specified
// TODO: Add parquet support (both read and write)
// let file = File::open(input)?;
// let df = ParquetReader::new(file).finish()?;
let df = LazyCsvReader::new(input).finish()?;
// TODO: Needs sorting
let df = df.sort(["", ""], Default::default());
CsvWriter::new(&mut temp_path).finish(&mut df.collect()?)?;
// Then read from the temporary file (since it's sorted) and do the standard split over each row
let mut input = csv::Reader::from_reader(temp_path);
if let Some(line) = input.deserialize().next() {
let line: BTreeMap<String, String> = line?;
output.write_header(&line)?;
let mut last_split_value = split_line(line, rules, output, &date_format, None)?;
for line in input.deserialize() {
let line: BTreeMap<String, String> = line?;
last_split_value = split_line(line, rules, output, &date_format, last_split_value)?;
}
}
Ok(())
}
#[async_trait]
impl RunnableNode for SplitNodeRunner {
async fn run(&self) -> anyhow::Result<()> {
let mut output = csv::Writer::from_path(&self.split_node.output_file_path)?;
let rules: anyhow::Result<Vec<RunnableSplitRule>> = self
.split_node
.rules
.iter()
.map(|rule| rule.to_runnable_rule())
.collect();
let rules = rules?;
split(
&rules,
&self.split_node.input_file_path,
&mut output,
"%Y-%m-%d %H-%M-%S",
)
}
}