Address various linting issues in graph and db uploader
This commit is contained in:
@@ -228,13 +228,11 @@ impl RunnableGraph {
|
|||||||
let node_fn = Arc::new(get_node_fn);
|
let node_fn = Arc::new(get_node_fn);
|
||||||
for n in 0..num_threads {
|
for n in 0..num_threads {
|
||||||
let finish_task = finish_task.clone();
|
let finish_task = finish_task.clone();
|
||||||
// let finish_task = finish_task.clone();
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
|
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
|
||||||
senders.push(tx);
|
senders.push(tx);
|
||||||
let node_fn = node_fn.clone();
|
let node_fn = node_fn.clone();
|
||||||
// TODO: Think this needs to be all reworked to be more inline with async
|
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
for node in rx.recv().await {
|
while let Some(node) = rx.recv().await {
|
||||||
let status = match node_fn(node.clone()).run().await {
|
let status = match node_fn(node.clone()).run().await {
|
||||||
Ok(_) => NodeStatus::Completed,
|
Ok(_) => NodeStatus::Completed,
|
||||||
Err(err) => NodeStatus::Failed(err),
|
Err(err) => NodeStatus::Failed(err),
|
||||||
|
|||||||
@@ -2,13 +2,11 @@ use std::collections::HashMap;
|
|||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::executor;
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use schemars::JsonSchema;
|
use schemars::JsonSchema;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::{AnyPool};
|
use sqlx::AnyPool;
|
||||||
use tiberius::Config;
|
use tiberius::Config;
|
||||||
use tokio::{ task};
|
|
||||||
use tokio_util::compat::TokioAsyncWriteCompatExt;
|
use tokio_util::compat::TokioAsyncWriteCompatExt;
|
||||||
|
|
||||||
use super::{node::RunnableNode, sql::QueryExecutor};
|
use super::{node::RunnableNode, sql::QueryExecutor};
|
||||||
@@ -18,6 +16,7 @@ const BIND_LIMIT: usize = 65535;
|
|||||||
pub async fn upload_file_bulk(
|
pub async fn upload_file_bulk(
|
||||||
executor: &mut impl QueryExecutor,
|
executor: &mut impl QueryExecutor,
|
||||||
upload_node: &UploadNode,
|
upload_node: &UploadNode,
|
||||||
|
bind_limit: usize,
|
||||||
) -> anyhow::Result<u64> {
|
) -> anyhow::Result<u64> {
|
||||||
let mut rows_affected = None;
|
let mut rows_affected = None;
|
||||||
if upload_node.column_mappings.is_none() {
|
if upload_node.column_mappings.is_none() {
|
||||||
@@ -69,7 +68,7 @@ pub async fn upload_file_bulk(
|
|||||||
let mut values = result.iter().map(|value| value.to_owned()).collect_vec();
|
let mut values = result.iter().map(|value| value.to_owned()).collect_vec();
|
||||||
params.append(&mut values);
|
params.append(&mut values);
|
||||||
num_params += csv_columns.len();
|
num_params += csv_columns.len();
|
||||||
if num_params == BIND_LIMIT {
|
if num_params == bind_limit {
|
||||||
running_row_total += executor.execute_query(&query_template, ¶ms).await?;
|
running_row_total += executor.execute_query(&query_template, ¶ms).await?;
|
||||||
insert_query = "".to_owned();
|
insert_query = "".to_owned();
|
||||||
params = vec![];
|
params = vec![];
|
||||||
@@ -122,26 +121,23 @@ impl RunnableNode for UploadNodeRunner {
|
|||||||
async fn run(&self) -> anyhow::Result<()> {
|
async fn run(&self) -> anyhow::Result<()> {
|
||||||
let upload_node = self.upload_node.clone();
|
let upload_node = self.upload_node.clone();
|
||||||
if upload_node.db_type == DBType::Mssql {
|
if upload_node.db_type == DBType::Mssql {
|
||||||
let mut config = Config::from_jdbc_string(&upload_node.connection_string);
|
let config = Config::from_jdbc_string(&upload_node.connection_string);
|
||||||
if let Ok(mut config) = config {
|
if let Ok(config) = config {
|
||||||
let tcp = tokio::net::TcpStream::connect(config.get_addr()).await;
|
let tcp = tokio::net::TcpStream::connect(config.get_addr()).await;
|
||||||
if let Ok(tcp) = tcp {
|
if let Ok(tcp) = tcp {
|
||||||
tcp.set_nodelay(true);
|
tcp.set_nodelay(true)?;
|
||||||
let client = tiberius::Client::connect(config, tcp.compat_write()).await;
|
let client = tiberius::Client::connect(config, tcp.compat_write()).await;
|
||||||
if let Ok(mut client) = client {
|
if let Ok(mut client) = client {
|
||||||
upload_file_bulk(&mut client, &upload_node).await;
|
upload_file_bulk(&mut client, &upload_node, BIND_LIMIT).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
let mut pool = AnyPool::connect(&upload_node.connection_string).await;
|
let pool = AnyPool::connect(&upload_node.connection_string).await;
|
||||||
if let Ok(mut pool) = pool {
|
if let Ok(mut pool) = pool {
|
||||||
upload_file_bulk(&mut pool, &upload_node).await;
|
upload_file_bulk(&mut pool, &upload_node, BIND_LIMIT).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: Message to listen for task completing since join handle doesn't include this
|
|
||||||
// Alternative is to make run signature async, though that may add more complexity
|
|
||||||
// to graph mode.
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user