Add initial reduction node implementation, start adding dynamic node
All checks were successful
test / test (push) Successful in 15m58s

This commit is contained in:
2025-01-09 13:55:01 +10:30
parent 43db0dc308
commit 65d1e9fec4
6 changed files with 1039 additions and 60 deletions

930
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -38,6 +38,7 @@ futures = "0.3.31"
tokio-util = { version = "0.7.13", features = ["compat"] } tokio-util = { version = "0.7.13", features = ["compat"] }
async-trait = "0.1.83" async-trait = "0.1.83"
testcontainers = "0.23.1" testcontainers = "0.23.1"
wasmtime = "28.0.0"
# More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target # More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target
[lib] [lib]

38
src/graph/dynamic.rs Normal file
View File

@@ -0,0 +1,38 @@
use crate::graph::node::RunnableNode;
use async_trait::async_trait;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use wasmtime::component::{bindgen, Component};
use wasmtime::{Config, Engine, Linker, Store};
bindgen!();
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct DynamicNode {
pub wasm_file: String,
pub file_paths: Vec<String>,
pub output_file: String,
}
pub struct DynamicNodeRunner {
pub dynamic_node: DynamicNode,
}
#[async_trait]
impl RunnableNode for DynamicNodeRunner {
async fn run(&self) -> anyhow::Result<()> {
let mut config = Config::new();
config.wasm_component_model(true);
let engine = Engine::new(&config)?;
// let component = Component::from_file(&engine, self.dynamic_node.wasm_file.to_owned())?;
// let mut linker = Linker::new(&engine);
// ::add_to_linker(&mut linker, |state: &mut TestState| state)?;
let mut store = Store::new(
&engine,
&self.dynamic_node,
);
// let (bindings, _) = Dynamic::instantiate(&mut store, &component, &linker)?;
// bindings.call_greet(&mut store)?;
Ok(())
}
}

View File

@@ -13,6 +13,8 @@ use split::{SplitNode, SplitNodeRunner};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::graph::derive::DeriveNodeRunner; use crate::graph::derive::DeriveNodeRunner;
use crate::graph::dynamic::{DynamicNode, DynamicNodeRunner};
use crate::graph::reduction::{ReductionNode, ReductionNodeRunner};
use { use {
derive::DeriveNode, derive::DeriveNode,
filter::{FilterNode, FilterNodeRunner}, filter::{FilterNode, FilterNodeRunner},
@@ -29,10 +31,11 @@ mod split;
mod sql; mod sql;
mod sql_rule; mod sql_rule;
mod upload_to_db; mod upload_to_db;
mod dynamic;
mod reduction;
#[derive(Serialize, Deserialize, Clone, JsonSchema)] #[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum NodeConfiguration { pub enum NodeConfiguration {
FileNode,
MoveMoneyNode(MoveMoneyNode), MoveMoneyNode(MoveMoneyNode),
MergeNode(MergeNode), MergeNode(MergeNode),
DeriveNode(DeriveNode), DeriveNode(DeriveNode),
@@ -40,8 +43,9 @@ pub enum NodeConfiguration {
FilterNode(FilterNode), FilterNode(FilterNode),
UploadNode(UploadNode), UploadNode(UploadNode),
SQLNode(SQLNode), SQLNode(SQLNode),
Dynamic, Dynamic(DynamicNode),
SplitNode(SplitNode), SplitNode(SplitNode),
ReductionNode(ReductionNode),
} }
#[derive(Serialize, Deserialize, Clone, JsonSchema)] #[derive(Serialize, Deserialize, Clone, JsonSchema)]
@@ -135,7 +139,6 @@ impl Node {
fn get_runnable_node(node: Node) -> Box<dyn RunnableNode + Send> { fn get_runnable_node(node: Node) -> Box<dyn RunnableNode + Send> {
match node.info.configuration { match node.info.configuration {
NodeConfiguration::FileNode => todo!(),
NodeConfiguration::MoveMoneyNode(_) => todo!(), NodeConfiguration::MoveMoneyNode(_) => todo!(),
NodeConfiguration::MergeNode(_) => todo!(), NodeConfiguration::MergeNode(_) => todo!(),
NodeConfiguration::DeriveNode(derive_node) => Box::new(DeriveNodeRunner { derive_node }), NodeConfiguration::DeriveNode(derive_node) => Box::new(DeriveNodeRunner { derive_node }),
@@ -143,8 +146,9 @@ fn get_runnable_node(node: Node) -> Box<dyn RunnableNode + Send> {
NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }), NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }),
NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }), NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }),
NodeConfiguration::SQLNode(sql_node) => Box::new(SQLNodeRunner { sql_node }), NodeConfiguration::SQLNode(sql_node) => Box::new(SQLNodeRunner { sql_node }),
NodeConfiguration::Dynamic => todo!(), NodeConfiguration::Dynamic(dynamic_node) => Box::new(DynamicNodeRunner { dynamic_node }),
NodeConfiguration::SplitNode(split_node) => Box::new(SplitNodeRunner { split_node }), NodeConfiguration::SplitNode(split_node) => Box::new(SplitNodeRunner { split_node }),
NodeConfiguration::ReductionNode(reduction_node) => Box::new(ReductionNodeRunner { reduction_node }),
} }
} }

83
src/graph/reduction.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::fs::File;
use crate::graph::node::RunnableNode;
use crate::io::DataSource;
use async_trait::async_trait;
use itertools::Itertools;
use polars::io::SerWriter;
use polars::prelude::{col, lit, CsvWriter, Expr, LazyCsvReader, LazyFileListReader};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
fn reduce(grouping_nodes: &Vec<String>, operations: &Vec<ReductionOperation>, input: &DataSource, output: &DataSource) -> anyhow::Result<()> {
let df = LazyCsvReader::new(&input.path).finish()?;
let mut df = df
.group_by(grouping_nodes.iter().map(|column| col(column)).collect_vec())
.agg(&operations.iter().map(|operation| operation.to_aggregate_function()).collect_vec())
.collect()?;
let mut file = File::create(&output.path)?;
CsvWriter::new(&mut file).finish(&mut df)?;
Ok(())
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum ReductionOperationType {
Sum,
Multiply,
Max,
Min,
Average,
Count,
Concat(ConcatProperties),
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct ConcatProperties {
pub prefix: String,
pub suffix: String,
pub separator: String,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct ReductionOperation {
pub column_name: String,
pub reducer: ReductionOperationType,
// TODO: Consider adding filtering on another column, so we can get sum of males grouping on some other column
// TODO: Also consider adding an alias here
}
impl ReductionOperation {
fn to_aggregate_function(&self) -> Expr {
match &self.reducer {
ReductionOperationType::Sum => col(&self.column_name).sum(),
// TODO: Check this, docs are a bit flaky on whether this'll work
ReductionOperationType::Multiply => col(&self.column_name).product(),
ReductionOperationType::Max => col(&self.column_name).max(),
ReductionOperationType::Min => col(&self.column_name).min(),
ReductionOperationType::Average => col(&self.column_name).mean(),
ReductionOperationType::Count => col(&self.column_name).count(),
ReductionOperationType::Concat(concat_properties) => lit(concat_properties.prefix.clone()).append(col(&self.column_name).list().join(lit(concat_properties.separator.clone()), true), false).append(lit(concat_properties.suffix.clone()), false),
}
}
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct ReductionNode {
pub grouping_columns: Vec<String>,
pub operations: Vec<ReductionOperation>,
pub input_file: DataSource,
pub output_file: DataSource,
}
pub struct ReductionNodeRunner {
pub reduction_node: ReductionNode,
}
#[async_trait]
impl RunnableNode for ReductionNodeRunner {
async fn run(&self) -> anyhow::Result<()> {
reduce(&self.reduction_node.grouping_columns, &self.reduction_node.operations, &self.reduction_node.input_file, &self.reduction_node.output_file)?;
Ok(())
}
}

35
wit/dynamic_node.wit Normal file
View File

@@ -0,0 +1,35 @@
package vato007:ingey;
world dynamic {
resource row {
get: func(name: string) -> string;
}
resource reader {
columns: func() -> list<string>;
next: func() -> row;
has-next: func() -> bool;
// Get a row by values in one or more columns
query: func(values: list<tuple<string, string>>) -> row;
}
resource read-map {
get: func(key: string) -> string;
}
resource write-map {
add: func(name: string, value: string);
}
resource readers {
get-reader: func(name: string) -> option<reader>;
}
resource writer {
write: func(row: write-map);
}
export evaluate: func(readers: readers, properties: read-map, writer: writer);
}