Finish off basic dynamic node
Some checks are pending
test / test (push) Waiting to run

This commit is contained in:
2025-01-23 13:01:43 +10:30
parent 70248de192
commit e2604d4e70
6 changed files with 40 additions and 44 deletions

View File

@@ -4,7 +4,7 @@ use wasmtime::component::Resource;
pub struct CsvReadersData {
// Map name of reader to path
readers: HashMap<String, String>,
pub readers: HashMap<String, String>,
}
impl HostCsvReaders for DynamicState {

View File

@@ -1,18 +1,38 @@
use super::{dynamic_state::{vato007::ingey::types::HostCsvWriter, DynamicState}, write_map::WriteMap};
use super::dynamic_state::{vato007::ingey::types::HostCsvWriter, DynamicState};
use crate::io::RecordSerializer;
use csv::Writer;
use std::collections::BTreeMap;
use std::fs::File;
pub struct CsvWriterData {}
pub struct CsvWriterData {
writer: Writer<File>,
wrote_header: bool,
}
impl CsvWriterData {
pub fn new(path: String) -> anyhow::Result<Self> {
let writer = Writer::from_path(path)?;
Ok(CsvWriterData {
writer,
wrote_header: false,
})
}
}
impl HostCsvWriter for DynamicState {
fn write_map(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: wasmtime::component::Resource<WriteMap>) -> () {
todo!()
}
fn write_row(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: Vec<(String, String,)>) -> () {
todo!()
fn write_row(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: Vec<(String, String)>) -> () {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
let write_data: BTreeMap<String, String> = row.into_iter().collect();
if !resource.wrote_header {
resource.writer.write_header(&write_data).expect("Failed to write header");
resource.wrote_header = true;
}
resource.writer.write_record(write_data.values()).expect("Failed to write row");
}
fn drop(&mut self, rep: wasmtime::component::Resource<CsvWriterData>) -> wasmtime::Result<()> {
todo!()
self.resources.delete(rep)?;
Ok(())
}
}

View File

@@ -3,7 +3,6 @@ pub use super::csv_readers::CsvReadersData;
pub use super::csv_row::CsvRow;
pub use super::csv_writer::CsvWriterData;
pub use super::read_map::ReadMapData;
pub use super::write_map::WriteMap;
use vato007::ingey::types::Host;
use wasmtime::component::{bindgen, ResourceTable};
@@ -11,7 +10,6 @@ bindgen!({
with: {
"vato007:ingey/types/csv-row": CsvRow,
"vato007:ingey/types/csv-reader": CsvReader,
"vato007:ingey/types/write-map": WriteMap,
"vato007:ingey/types/csv-readers": CsvReadersData,
"vato007:ingey/types/csv-writer": CsvWriterData,
"vato007:ingey/types/read-map": ReadMapData,

View File

@@ -2,15 +2,19 @@ use crate::graph::node::RunnableNode;
use async_trait::async_trait;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use wasmtime::component::{Component, Linker};
use wasmtime::{Config, Engine, Store};
mod dynamic_state;
use crate::graph::dynamic::csv_readers::CsvReadersData;
use crate::graph::dynamic::csv_writer::CsvWriterData;
use crate::graph::dynamic::dynamic_state::ReadMap;
use crate::graph::dynamic::read_map::ReadMapData;
use dynamic_state::{Dynamic, DynamicState};
mod csv_row;
mod csv_reader;
mod write_map;
mod csv_readers;
mod csv_writer;
mod read_map;
@@ -41,8 +45,11 @@ impl RunnableNode for DynamicNodeRunner {
DynamicState::new(),
);
let bindings = Dynamic::instantiate(&mut store, &component, &linker)?;
// TODO: Instantiate readers
// bindings.call_evaluate(&mut store, ReadersMap{readers: vec![]}, &ReadMap {}, &Writer {})?;
let read_map = store.data_mut().resources.push(ReadMapData { data: HashMap::new() })?;
let readers = store.data_mut().resources.push(CsvReadersData { readers: HashMap::new() })?;
let writer = CsvWriterData::new(self.dynamic_node.output_file.clone())?;
let writer = store.data_mut().resources.push(writer)?;
bindings.call_evaluate(&mut store, read_map, readers, writer)?;
Ok(())
}
}

View File

@@ -1,23 +0,0 @@
use super::dynamic_state::{vato007::ingey::types::HostWriteMap, DynamicState};
use itertools::Itertools;
use std::collections::HashMap;
pub struct WriteMap {
pub data: HashMap<String, String>,
}
impl HostWriteMap for DynamicState {
fn keys(&mut self, self_: wasmtime::component::Resource<WriteMap>) -> Vec<String> {
self.resources.get(&self_).map(|data| data.data.keys()).expect("Failed to find resource").cloned().collect_vec()
}
fn put(&mut self, self_: wasmtime::component::Resource<WriteMap>, name: String, value: String) -> () {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
resource.data.insert(name, value);
}
fn drop(&mut self, rep: wasmtime::component::Resource<WriteMap>) -> wasmtime::Result<()> {
self.resources.delete(rep)?;
Ok(())
}
}