Add most of the dynamic node host implementations
All checks were successful
test / test (push) Successful in 14m4s

This commit is contained in:
2025-01-23 07:48:59 +10:30
parent 9f6fa04dcf
commit 70248de192
11 changed files with 427 additions and 216 deletions

419
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,7 @@ serde = { version = "1", features = ["derive"] }
clap = { version = "4", features = ["derive"] }
anyhow = "1"
itertools = "0.13.0"
itertools = "0.14.0"
chrono = { version = "0.4.39", features = ["default", "serde"] }
rayon = "1.10.0"
@@ -25,8 +25,8 @@ tokio = { version = "1.42.0", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "any"] }
rmp-serde = "1.1"
tempfile = "3.7"
polars = { version = "0.43.1", features = ["lazy", "performant", "parquet", "streaming", "cse", "dtype-datetime"] }
polars-sql = "0.43.1"
polars = { version = "0.45.1", features = ["lazy", "performant", "parquet", "streaming", "cse", "dtype-datetime"] }
polars-sql = "0.45.1"
serde_json = "1.0.122"
num_cpus = "1.16.0"
schemars = { version = "0.8.21", features = ["chrono"] }
@@ -38,7 +38,7 @@ futures = "0.3.31"
tokio-util = { version = "0.7.13", features = ["compat"] }
async-trait = "0.1.83"
testcontainers = "0.23.1"
wasmtime = "28.0.0"
wasmtime = "29.0.1"
# More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target
[lib]

View File

@@ -1,36 +1,124 @@
use super::{csv_row::CsvRow, dynamic_state::{vato007::ingey::types::HostCsvReader, DynamicState, ReadMapData}};
use csv::{Reader, StringRecord};
use polars::datatypes::AnyValue;
use polars::prelude::{col, lit, LazyCsvReader, LazyFileListReader};
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::Read;
pub struct CsvReader {
path: String,
reader: Reader<File>,
}
impl CsvReader {
pub fn new(path: String) -> Self {
let reader = Reader::from_path(&path).expect("Failed to create csv reader");
CsvReader {
path,
reader,
}
}
}
impl HostCsvReader for DynamicState {
fn columns(&mut self,self_:wasmtime::component::Resource<CsvReader>,) -> wasmtime::component::__internal::Vec<wasmtime::component::__internal::String> {
todo!()
fn columns(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> Vec<String> {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().map(|element| element.to_owned()).collect()
} else {
vec![]
}
}
fn next(&mut self,self_:wasmtime::component::Resource<CsvReader>,) -> wasmtime::component::Resource<CsvRow> {
todo!()
// TODO: These next methods needs to be cleaned up badly
fn next(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> Result<wasmtime::component::Resource<CsvRow>, String> {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
let mut buf = StringRecord::new();
let result = resource.reader.read_record(&mut buf);
match result {
Ok(read) => {
if read {
let mut record_map = BTreeMap::new();
if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().enumerate().for_each(|(i, name)| {
record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned());
});
}
let result = self.resources.push(CsvRow { values: record_map }).expect("");
Ok(result)
} else {
Err("No more records available to read".to_owned())
}
}
Err(err) => {
Err(err.to_string())
}
}
}
fn has_next(&mut self,self_:wasmtime::component::Resource<CsvReader>,) -> bool {
todo!()
fn next_into_map(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> wasmtime::component::Resource<ReadMapData> {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
let mut buf = StringRecord::new();
let result = resource.reader.read_record(&mut buf);
let record_map = match result {
Ok(read) => {
if read {
let mut record_map = HashMap::new();
if resource.reader.has_headers() {
resource.reader.headers().expect("Reader says it has headers but doesn't").iter().enumerate().for_each(|(i, name)| {
record_map.insert(name.to_owned(), buf.get(i).unwrap().to_owned());
});
}
record_map
} else {
HashMap::new()
}
}
Err(err) => {
HashMap::new()
}
};
self.resources.push(ReadMapData { data: record_map }).expect("")
}
fn has_next(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> bool {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
!resource.reader.is_done()
}
// TODO: Clean this up as well
#[doc = " Get a row by values in one or more columns"]
fn query(&mut self,self_:wasmtime::component::Resource<CsvReader>,values:wasmtime::component::__internal::Vec<(wasmtime::component::__internal::String,wasmtime::component::__internal::String,)>,) -> wasmtime::component::Resource<CsvRow> {
todo!()
fn query(&mut self, self_: wasmtime::component::Resource<CsvReader>, values: Vec<(String, String,)>) -> wasmtime::component::Resource<CsvRow> {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
let mut df = LazyCsvReader::new(&resource.path).finish().expect("Failed to open file");
for (key, value) in values {
df = df.filter(col(key).eq(lit(value)));
}
let result = df.collect().expect("Failed to filter file");
let mut record_map = BTreeMap::new();
if let Ok(row) = result.get_row(0) {
for field in result.fields() {
let column_index = result.get_column_index(field.name()).unwrap();
let value: &AnyValue = row.0.get(column_index).unwrap();
record_map.insert(field.name.to_string(), value.to_string());
}
}
self.resources.push(CsvRow { values: record_map }).expect("Failed to create csv row")
}
fn read_into_string(&mut self,self_:wasmtime::component::Resource<CsvReader>,) -> wasmtime::component::__internal::String {
todo!()
}
fn read_into_map(&mut self,self_:wasmtime::component::Resource<CsvReader>,) -> wasmtime::component::Resource<ReadMapData> {
todo!()
fn read_into_string(&mut self, self_: wasmtime::component::Resource<CsvReader>) -> String {
let resource = self.resources.get(&self_).expect("Failed to find resource");
let mut file = File::open(&resource.path).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap_or(0);
contents
}
fn drop(&mut self, rep: wasmtime::component::Resource<CsvReader>) -> wasmtime::Result<()> {
todo!()
self.resources.delete(rep)?;
Ok(())
}
}

View File

@@ -1,15 +1,26 @@
use super::{csv_reader::CsvReader, dynamic_state::{vato007::ingey::types::HostCsvReaders, DynamicState}};
use std::collections::HashMap;
use wasmtime::component::Resource;
pub struct CsvReadersData {
// Map name of reader to path
readers: HashMap<String, String>,
}
impl HostCsvReaders for DynamicState {
fn get_reader(&mut self,self_:wasmtime::component::Resource<CsvReadersData>,name:wasmtime::component::__internal::String,) -> Option<wasmtime::component::Resource<CsvReader>> {
todo!()
fn get_reader(&mut self, self_: Resource<CsvReadersData>, name: String) -> Option<Resource<CsvReader>> {
let resource = self.resources.get(&self_).expect("Failed to find own resource");
let file_path = resource.readers.get(&name);
if let Some(path) = file_path.cloned() {
let csv_reader = CsvReader::new(path);
self.resources.push(csv_reader).ok()
} else {
None
}
}
fn drop(&mut self,rep:wasmtime::component::Resource<CsvReadersData>) -> wasmtime::Result<()> {
todo!()
fn drop(&mut self, rep: Resource<CsvReadersData>) -> wasmtime::Result<()> {
self.resources.delete(rep)?;
Ok(())
}
}

View File

@@ -3,26 +3,26 @@ use std::collections::BTreeMap;
use super::dynamic_state::{vato007::ingey::types::HostCsvRow, DynamicState};
pub struct CsvRow {
values: BTreeMap<String, String>,
pub values: BTreeMap<String, String>,
}
impl HostCsvRow for DynamicState {
fn columns(&mut self,self_:wasmtime::component::Resource<CsvRow>,) -> wasmtime::component::__internal::Vec<wasmtime::component::__internal::String> {
fn columns(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource");
resource.values.keys().cloned().collect()
}
fn values(&mut self,self_:wasmtime::component::Resource<CsvRow>,) -> wasmtime::component::__internal::Vec<wasmtime::component::__internal::String> {
fn values(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<String> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource");
resource.values.values().cloned().collect()
}
fn entries(&mut self,self_:wasmtime::component::Resource<CsvRow>,) -> wasmtime::component::__internal::Vec<(wasmtime::component::__internal::String,wasmtime::component::__internal::String,)> {
fn entries(&mut self, self_: wasmtime::component::Resource<CsvRow>) -> Vec<(String, String,)> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource");
resource.values.keys().map(|key| (key.clone(), resource.values.get(key).unwrap().clone())).collect()
}
fn value(&mut self,self_:wasmtime::component::Resource<CsvRow>,name:wasmtime::component::__internal::String,) -> Option<wasmtime::component::__internal::String> {
fn value(&mut self, self_: wasmtime::component::Resource<CsvRow>, name: String) -> Option<String> {
let resource = self.resources.get(&self_).expect("Failed to find the required resource");
resource.values.get(&name).cloned()
}

View File

@@ -1,16 +1,14 @@
use super::{dynamic_state::{vato007::ingey::types::HostCsvWriter, DynamicState}, write_map::WriteMap};
pub struct CsvWriterData {
}
pub struct CsvWriterData {}
impl HostCsvWriter for DynamicState {
fn write_map(&mut self,self_:wasmtime::component::Resource<CsvWriterData>,row:wasmtime::component::Resource<WriteMap>,) -> () {
fn write_map(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: wasmtime::component::Resource<WriteMap>) -> () {
todo!()
}
fn write_row(&mut self,self_:wasmtime::component::Resource<CsvWriterData>,row:wasmtime::component::__internal::Vec<(wasmtime::component::__internal::String,wasmtime::component::__internal::String,)>,) -> () {
fn write_row(&mut self, self_: wasmtime::component::Resource<CsvWriterData>, row: Vec<(String, String,)>) -> () {
todo!()
}

View File

@@ -1,11 +1,11 @@
use vato007::ingey::types::Host;
use wasmtime::component::{bindgen, ResourceTable};
pub use super::csv_row::CsvRow;
pub use super::csv_reader::CsvReader;
pub use super::write_map::WriteMap;
pub use super::csv_readers::CsvReadersData;
pub use super::csv_row::CsvRow;
pub use super::csv_writer::CsvWriterData;
pub use super::read_map::ReadMapData;
pub use super::write_map::WriteMap;
use vato007::ingey::types::Host;
use wasmtime::component::{bindgen, ResourceTable};
bindgen!({
with: {
@@ -28,6 +28,4 @@ impl DynamicState {
}
}
impl Host for DynamicState {
}
impl Host for DynamicState {}

View File

@@ -38,7 +38,7 @@ impl RunnableNode for DynamicNodeRunner {
Dynamic::add_to_linker(&mut linker, |state: &mut DynamicState| state)?;
let mut store = Store::new(
&engine,
DynamicState::new()
DynamicState::new(),
);
let bindings = Dynamic::instantiate(&mut store, &component, &linker)?;
// TODO: Instantiate readers

View File

@@ -1,15 +1,18 @@
use std::collections::HashMap;
use super::dynamic_state::{vato007::ingey::types::HostReadMap, DynamicState};
pub struct ReadMapData {
pub data: HashMap<String, String>,
}
impl HostReadMap for DynamicState {
fn get(&mut self,self_:wasmtime::component::Resource<ReadMapData>,key:wasmtime::component::__internal::String,) -> wasmtime::component::__internal::String {
todo!()
fn get(&mut self, self_: wasmtime::component::Resource<ReadMapData>, key: String) -> Option<String> {
self.resources.get(&self_).ok().map(|data| data.data.get(&key).cloned()).flatten()
}
fn drop(&mut self, rep: wasmtime::component::Resource<ReadMapData>) -> wasmtime::Result<()> {
todo!()
self.resources.delete(rep)?;
Ok(())
}
}

View File

@@ -1,19 +1,23 @@
use super::dynamic_state::{vato007::ingey::types::HostWriteMap, DynamicState};
use itertools::Itertools;
use std::collections::HashMap;
pub struct WriteMap {
pub data: HashMap<String, String>,
}
impl HostWriteMap for DynamicState {
fn keys(&mut self,self_:wasmtime::component::Resource<WriteMap>,) -> wasmtime::component::__internal::Vec<wasmtime::component::__internal::String> {
todo!()
fn keys(&mut self, self_: wasmtime::component::Resource<WriteMap>) -> Vec<String> {
self.resources.get(&self_).map(|data| data.data.keys()).expect("Failed to find resource").cloned().collect_vec()
}
fn put(&mut self,self_:wasmtime::component::Resource<WriteMap>,name:wasmtime::component::__internal::String,value:wasmtime::component::__internal::String,) -> () {
todo!()
fn put(&mut self, self_: wasmtime::component::Resource<WriteMap>, name: String, value: String) -> () {
let resource = self.resources.get_mut(&self_).expect("Failed to find resource");
resource.data.insert(name, value);
}
fn drop(&mut self, rep: wasmtime::component::Resource<WriteMap>) -> wasmtime::Result<()> {
todo!()
self.resources.delete(rep)?;
Ok(())
}
}

View File

@@ -11,14 +11,14 @@ interface types {
resource csv-reader {
columns: func() -> list<string>;
next: func() -> csv-row;
next: func() -> result<csv-row, string>;
next-into-map: func() -> read-map;
has-next: func() -> bool;
// Get a row by values in one or more columns
query: func(values: list<tuple<string, string>>) -> csv-row;
read-into-string: func() -> string;
read-into-map: func() -> read-map;
}
resource write-map {
@@ -36,7 +36,7 @@ interface types {
}
resource read-map {
get: func(key: string) -> string;
get: func(key: string) -> option<string>;
}
}