diff --git a/src/graph/mod.rs b/src/graph/mod.rs index c10fab2..7b5f85e 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -228,13 +228,11 @@ impl RunnableGraph { let node_fn = Arc::new(get_node_fn); for n in 0..num_threads { let finish_task = finish_task.clone(); - // let finish_task = finish_task.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel(32); senders.push(tx); let node_fn = node_fn.clone(); - // TODO: Think this needs to be all reworked to be more inline with async let handle = tokio::spawn(async move { - for node in rx.recv().await { + while let Some(node) = rx.recv().await { let status = match node_fn(node.clone()).run().await { Ok(_) => NodeStatus::Completed, Err(err) => NodeStatus::Failed(err), diff --git a/src/graph/upload_to_db.rs b/src/graph/upload_to_db.rs index f284bea..9ee09fa 100644 --- a/src/graph/upload_to_db.rs +++ b/src/graph/upload_to_db.rs @@ -2,13 +2,11 @@ use std::collections::HashMap; use anyhow::bail; use async_trait::async_trait; -use futures::executor; use itertools::Itertools; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use sqlx::{AnyPool}; +use sqlx::AnyPool; use tiberius::Config; -use tokio::{ task}; use tokio_util::compat::TokioAsyncWriteCompatExt; use super::{node::RunnableNode, sql::QueryExecutor}; @@ -18,6 +16,7 @@ const BIND_LIMIT: usize = 65535; pub async fn upload_file_bulk( executor: &mut impl QueryExecutor, upload_node: &UploadNode, + bind_limit: usize, ) -> anyhow::Result { let mut rows_affected = None; if upload_node.column_mappings.is_none() { @@ -69,7 +68,7 @@ pub async fn upload_file_bulk( let mut values = result.iter().map(|value| value.to_owned()).collect_vec(); params.append(&mut values); num_params += csv_columns.len(); - if num_params == BIND_LIMIT { + if num_params == bind_limit { running_row_total += executor.execute_query(&query_template, ¶ms).await?; insert_query = "".to_owned(); params = vec![]; @@ -122,26 +121,23 @@ impl RunnableNode for UploadNodeRunner { async fn run(&self) -> anyhow::Result<()> { let upload_node = self.upload_node.clone(); if upload_node.db_type == DBType::Mssql { - let mut config = Config::from_jdbc_string(&upload_node.connection_string); - if let Ok(mut config) = config { + let config = Config::from_jdbc_string(&upload_node.connection_string); + if let Ok(config) = config { let tcp = tokio::net::TcpStream::connect(config.get_addr()).await; if let Ok(tcp) = tcp { - tcp.set_nodelay(true); + tcp.set_nodelay(true)?; let client = tiberius::Client::connect(config, tcp.compat_write()).await; if let Ok(mut client) = client { - upload_file_bulk(&mut client, &upload_node).await; + upload_file_bulk(&mut client, &upload_node, BIND_LIMIT).await?; } } } }else { - let mut pool = AnyPool::connect(&upload_node.connection_string).await; + let pool = AnyPool::connect(&upload_node.connection_string).await; if let Ok(mut pool) = pool { - upload_file_bulk(&mut pool, &upload_node).await; + upload_file_bulk(&mut pool, &upload_node, BIND_LIMIT).await?; } } - // TODO: Message to listen for task completing since join handle doesn't include this - // Alternative is to make run signature async, though that may add more complexity - // to graph mode. Ok(()) } }