Update dependencies, add some todos/notes for future changes

This commit is contained in:
2024-09-24 11:35:07 +09:30
parent bc5497ea16
commit 375e1f9638
5 changed files with 266 additions and 87 deletions

View File

@@ -174,7 +174,7 @@ impl Cli {
let reader = BufReader::new(file);
let graph = serde_json::from_reader(reader)?;
let graph = RunnableGraph::from_graph(graph);
// TODO: Possible to await here?
// TODO: Possible to await here? Actually needs awaiting to work
graph.run_default_tasks(threads, |id, status| {
info!("Node with id {} finished with status {:?}", id, status)
});

View File

@@ -1,13 +1,15 @@
use std::collections::BTreeMap;
use async_trait::async_trait;
use chrono::DateTime;
// use polars::io::SerReader;
// use polars::prelude::ParquetReader;
use polars::{
io::SerWriter,
prelude::{CsvWriter, LazyCsvReader, LazyFileListReader},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// use std::fs::File;
use tempfile::tempfile;
use crate::io::RecordSerializer;
@@ -131,7 +133,11 @@ fn split(
let mut temp_path = tempfile()?;
// This needs to be done for each split rule with a change column specified
// TODO: Add parquet support (both read and write)
// let file = File::open(input)?;
// let df = ParquetReader::new(file).finish()?;
let df = LazyCsvReader::new(input).finish()?;
// TODO: Needs sorting
let df = df.sort(["", ""], Default::default());
CsvWriter::new(&mut temp_path).finish(&mut df.collect()?)?;

View File

@@ -114,7 +114,7 @@ pub fn build_polars(
.joins
.get(&definition.source)
.ok_or(anyhow!("Failed to get right join column"))?;
reader = reader.inner_join(join_reader, col(&left_column), col(&right_column));
reader = reader.inner_join(join_reader, col(left_column), col(right_column));
}
}
// TODO: Also work out how to expand rows, so that transfers can have stuff like daily or change in x expanded into multiple rows
@@ -135,7 +135,7 @@ pub fn build_polars(
// to an actual filename. I don't want to be limited by a concept of 'sourcetype' at all, instead the definition should treat everything
// the same, and just translate the imported csv format to the necessary files and columns in files that are expected to be input.
Component::Field(source_type, column) => {
built_expression = built_expression + col(&column)
built_expression = built_expression + col(column)
}
}
}