diff --git a/src/graph/dynamic/csv_readers.rs b/src/graph/dynamic/csv_readers.rs index 8bc45e5..c429251 100644 --- a/src/graph/dynamic/csv_readers.rs +++ b/src/graph/dynamic/csv_readers.rs @@ -4,7 +4,7 @@ use wasmtime::component::Resource; pub struct CsvReadersData { // Map name of reader to path - readers: HashMap, + pub readers: HashMap, } impl HostCsvReaders for DynamicState { diff --git a/src/graph/dynamic/csv_writer.rs b/src/graph/dynamic/csv_writer.rs index 8b09e78..045eef7 100644 --- a/src/graph/dynamic/csv_writer.rs +++ b/src/graph/dynamic/csv_writer.rs @@ -1,18 +1,38 @@ -use super::{dynamic_state::{vato007::ingey::types::HostCsvWriter, DynamicState}, write_map::WriteMap}; +use super::dynamic_state::{vato007::ingey::types::HostCsvWriter, DynamicState}; +use crate::io::RecordSerializer; +use csv::Writer; +use std::collections::BTreeMap; +use std::fs::File; -pub struct CsvWriterData {} +pub struct CsvWriterData { + writer: Writer, + wrote_header: bool, +} + +impl CsvWriterData { + pub fn new(path: String) -> anyhow::Result { + let writer = Writer::from_path(path)?; + Ok(CsvWriterData { + writer, + wrote_header: false, + }) + } +} impl HostCsvWriter for DynamicState { - fn write_map(&mut self, self_: wasmtime::component::Resource, row: wasmtime::component::Resource) -> () { - todo!() - } - - fn write_row(&mut self, self_: wasmtime::component::Resource, row: Vec<(String, String,)>) -> () { - todo!() + fn write_row(&mut self, self_: wasmtime::component::Resource, row: Vec<(String, String)>) -> () { + let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); + let write_data: BTreeMap = row.into_iter().collect(); + if !resource.wrote_header { + resource.writer.write_header(&write_data).expect("Failed to write header"); + resource.wrote_header = true; + } + resource.writer.write_record(write_data.values()).expect("Failed to write row"); } fn drop(&mut self, rep: wasmtime::component::Resource) -> wasmtime::Result<()> { - todo!() + self.resources.delete(rep)?; + Ok(()) } } \ No newline at end of file diff --git a/src/graph/dynamic/dynamic_state.rs b/src/graph/dynamic/dynamic_state.rs index 5585ba9..07fe2e9 100644 --- a/src/graph/dynamic/dynamic_state.rs +++ b/src/graph/dynamic/dynamic_state.rs @@ -3,7 +3,6 @@ pub use super::csv_readers::CsvReadersData; pub use super::csv_row::CsvRow; pub use super::csv_writer::CsvWriterData; pub use super::read_map::ReadMapData; -pub use super::write_map::WriteMap; use vato007::ingey::types::Host; use wasmtime::component::{bindgen, ResourceTable}; @@ -11,7 +10,6 @@ bindgen!({ with: { "vato007:ingey/types/csv-row": CsvRow, "vato007:ingey/types/csv-reader": CsvReader, - "vato007:ingey/types/write-map": WriteMap, "vato007:ingey/types/csv-readers": CsvReadersData, "vato007:ingey/types/csv-writer": CsvWriterData, "vato007:ingey/types/read-map": ReadMapData, diff --git a/src/graph/dynamic/mod.rs b/src/graph/dynamic/mod.rs index fdfacc3..535dd32 100644 --- a/src/graph/dynamic/mod.rs +++ b/src/graph/dynamic/mod.rs @@ -2,15 +2,19 @@ use crate::graph::node::RunnableNode; use async_trait::async_trait; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use wasmtime::component::{Component, Linker}; use wasmtime::{Config, Engine, Store}; mod dynamic_state; +use crate::graph::dynamic::csv_readers::CsvReadersData; +use crate::graph::dynamic::csv_writer::CsvWriterData; +use crate::graph::dynamic::dynamic_state::ReadMap; +use crate::graph::dynamic::read_map::ReadMapData; use dynamic_state::{Dynamic, DynamicState}; mod csv_row; mod csv_reader; -mod write_map; mod csv_readers; mod csv_writer; mod read_map; @@ -41,8 +45,11 @@ impl RunnableNode for DynamicNodeRunner { DynamicState::new(), ); let bindings = Dynamic::instantiate(&mut store, &component, &linker)?; - // TODO: Instantiate readers - // bindings.call_evaluate(&mut store, ReadersMap{readers: vec![]}, &ReadMap {}, &Writer {})?; + let read_map = store.data_mut().resources.push(ReadMapData { data: HashMap::new() })?; + let readers = store.data_mut().resources.push(CsvReadersData { readers: HashMap::new() })?; + let writer = CsvWriterData::new(self.dynamic_node.output_file.clone())?; + let writer = store.data_mut().resources.push(writer)?; + bindings.call_evaluate(&mut store, read_map, readers, writer)?; Ok(()) } } \ No newline at end of file diff --git a/src/graph/dynamic/write_map.rs b/src/graph/dynamic/write_map.rs deleted file mode 100644 index 8f9d779..0000000 --- a/src/graph/dynamic/write_map.rs +++ /dev/null @@ -1,23 +0,0 @@ -use super::dynamic_state::{vato007::ingey::types::HostWriteMap, DynamicState}; -use itertools::Itertools; -use std::collections::HashMap; - -pub struct WriteMap { - pub data: HashMap, -} - -impl HostWriteMap for DynamicState { - fn keys(&mut self, self_: wasmtime::component::Resource) -> Vec { - self.resources.get(&self_).map(|data| data.data.keys()).expect("Failed to find resource").cloned().collect_vec() - } - - fn put(&mut self, self_: wasmtime::component::Resource, name: String, value: String) -> () { - let resource = self.resources.get_mut(&self_).expect("Failed to find resource"); - resource.data.insert(name, value); - } - - fn drop(&mut self, rep: wasmtime::component::Resource) -> wasmtime::Result<()> { - self.resources.delete(rep)?; - Ok(()) - } -} \ No newline at end of file diff --git a/wit/dynamic_node.wit b/wit/dynamic_node.wit index f806ea0..95510d4 100644 --- a/wit/dynamic_node.wit +++ b/wit/dynamic_node.wit @@ -21,17 +21,11 @@ interface types { read-into-string: func() -> string; } - resource write-map { - keys: func() -> list; - put: func(name: string, value: string); - } - resource csv-readers { get-reader: func(name: string) -> option; } resource csv-writer { - write-map: func(row: write-map); write-row: func(row: list>); }