Add derive node to graph

This commit is contained in:
2024-09-07 14:07:32 +09:30
parent 3a0bfd6ad0
commit 3d8ce9c498
2 changed files with 8 additions and 9 deletions

View File

@@ -296,7 +296,7 @@ fn derive(
} }
pub struct DeriveNodeRunner { pub struct DeriveNodeRunner {
derive_node: DeriveNode, pub derive_node: DeriveNode,
} }
#[async_trait] #[async_trait]

View File

@@ -2,7 +2,7 @@ use std::{
cmp::{min, Ordering}, cmp::{min, Ordering},
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::{ sync::{
mpsc, Arc mpsc, Arc,
}, },
}; };
@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use split::{SplitNode, SplitNodeRunner}; use split::{SplitNode, SplitNodeRunner};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::graph::derive::DeriveNodeRunner;
use { use {
derive::DeriveNode, derive::DeriveNode,
filter::{FilterNode, FilterNodeRunner}, filter::{FilterNode, FilterNodeRunner},
@@ -138,7 +139,7 @@ fn get_runnable_node(node: Node) -> Box<dyn RunnableNode + Send> {
NodeConfiguration::FileNode => todo!(), NodeConfiguration::FileNode => todo!(),
NodeConfiguration::MoveMoneyNode(_) => todo!(), NodeConfiguration::MoveMoneyNode(_) => todo!(),
NodeConfiguration::MergeNode(_) => todo!(), NodeConfiguration::MergeNode(_) => todo!(),
NodeConfiguration::DeriveNode(_) => todo!(), NodeConfiguration::DeriveNode(derive_node) => Box::new(DeriveNodeRunner { derive_node }),
NodeConfiguration::CodeRuleNode(_) => todo!(), NodeConfiguration::CodeRuleNode(_) => todo!(),
NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }), NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }),
NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }), NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }),
@@ -182,7 +183,6 @@ impl RunnableGraph {
).await ).await
} }
// Make this not mutable, emit node status when required in a function or some other message
pub async fn run<'a, F, StatusChanged>( pub async fn run<'a, F, StatusChanged>(
&self, &self,
num_threads: usize, num_threads: usize,
@@ -211,7 +211,7 @@ impl RunnableGraph {
if num_threads < 2 { if num_threads < 2 {
for node in &nodes { for node in &nodes {
node_status_changed_fn(node.id, NodeStatus::Running); node_status_changed_fn(node.id, NodeStatus::Running);
match get_node_fn(node.clone()).run().await { match get_node_fn(node.clone()).run().await {
Ok(_) => node_status_changed_fn(node.id, NodeStatus::Completed), Ok(_) => node_status_changed_fn(node.id, NodeStatus::Completed),
Err(err) => node_status_changed_fn(node.id, NodeStatus::Failed(err)), Err(err) => node_status_changed_fn(node.id, NodeStatus::Failed(err)),
}; };
@@ -276,9 +276,9 @@ impl RunnableGraph {
while running_threads.len() < num_threads && i < nodes.len() { while running_threads.len() < num_threads && i < nodes.len() {
if !running_nodes.contains(&nodes[i].id) if !running_nodes.contains(&nodes[i].id)
&& nodes[i] && nodes[i]
.dependent_node_ids .dependent_node_ids
.iter() .iter()
.all(|id| completed_nodes.contains(id)) .all(|id| completed_nodes.contains(id))
{ {
let node = nodes.remove(i); let node = nodes.remove(i);
for i in 0..num_threads { for i in 0..num_threads {
@@ -308,7 +308,6 @@ impl RunnableGraph {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use chrono::Local; use chrono::Local;
use super::{NodeConfiguration, RunnableGraph}; use super::{NodeConfiguration, RunnableGraph};