diff --git a/src/graph/derive.rs b/src/graph/derive.rs index dbc4808..0fde62d 100644 --- a/src/graph/derive.rs +++ b/src/graph/derive.rs @@ -296,7 +296,7 @@ fn derive( } pub struct DeriveNodeRunner { - derive_node: DeriveNode, + pub derive_node: DeriveNode, } #[async_trait] diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 7b5f85e..a802c05 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -2,7 +2,7 @@ use std::{ cmp::{min, Ordering}, collections::{HashMap, HashSet}, sync::{ - mpsc, Arc + mpsc, Arc, }, }; @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use split::{SplitNode, SplitNodeRunner}; use tokio::sync::mpsc::Sender; +use crate::graph::derive::DeriveNodeRunner; use { derive::DeriveNode, filter::{FilterNode, FilterNodeRunner}, @@ -138,7 +139,7 @@ fn get_runnable_node(node: Node) -> Box { NodeConfiguration::FileNode => todo!(), NodeConfiguration::MoveMoneyNode(_) => todo!(), NodeConfiguration::MergeNode(_) => todo!(), - NodeConfiguration::DeriveNode(_) => todo!(), + NodeConfiguration::DeriveNode(derive_node) => Box::new(DeriveNodeRunner { derive_node }), NodeConfiguration::CodeRuleNode(_) => todo!(), NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }), NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }), @@ -182,7 +183,6 @@ impl RunnableGraph { ).await } - // Make this not mutable, emit node status when required in a function or some other message pub async fn run<'a, F, StatusChanged>( &self, num_threads: usize, @@ -211,7 +211,7 @@ impl RunnableGraph { if num_threads < 2 { for node in &nodes { node_status_changed_fn(node.id, NodeStatus::Running); - match get_node_fn(node.clone()).run().await { + match get_node_fn(node.clone()).run().await { Ok(_) => node_status_changed_fn(node.id, NodeStatus::Completed), Err(err) => node_status_changed_fn(node.id, NodeStatus::Failed(err)), }; @@ -276,9 +276,9 @@ impl RunnableGraph { while running_threads.len() < num_threads && i < nodes.len() { if !running_nodes.contains(&nodes[i].id) && nodes[i] - .dependent_node_ids - .iter() - .all(|id| completed_nodes.contains(id)) + .dependent_node_ids + .iter() + .all(|id| completed_nodes.contains(id)) { let node = nodes.remove(i); for i in 0..num_threads { @@ -308,7 +308,6 @@ impl RunnableGraph { #[cfg(test)] mod tests { - use chrono::Local; use super::{NodeConfiguration, RunnableGraph};