diff --git a/src/io.rs b/src/io.rs index cc08db2..4c97a1e 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,7 +1,15 @@ -use std::io::{Read, Write}; +use std::{ + io::{Read, Seek, Write}, + thread::current, +}; -use rmp_serde::{Deserializer, Serializer}; -use serde::{Deserialize, Serialize}; +use anyhow::bail; +use csv::Position; +use rmp_serde::{ + decode::{ReadReader, ReadRefReader, ReadSlice}, + from_read, Deserializer, Serializer, +}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub trait RecordSerializer { fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()>; @@ -21,68 +29,89 @@ impl RecordSerializer for Serializer { } } -// // pub struct RecordDeserializerIterWrapper< -// // D: DeserializeOwned, -// // I: Iterator>, -// // > { -// // pub inner: I, -// // } +// TODO: I still don't like this api, should split deserialize and position at the least, +// and we need a way to get the current position (otherwise it's left to consumers to track current) +// position +pub trait RecordDeserializer

{ + fn deserialize(&mut self) -> Result, anyhow::Error>; -// // impl>> Iterator -// // for RecordDeserializerIterWrapper -// // { -// // type Item = Result; + // Move the deserializer to the specified position in the underlying reader + fn position(&mut self, record: P) -> anyhow::Result<()>; +} -// // fn next(&mut self) -> Option { -// // self.inner.next() -// // } -// // } +struct CsvMessagePackDeserializer { + reader: csv::Reader, +} -// // This person has the same issue as me (it's exactly the same, just with different serde implementations) -// // https://stackoverflow.com/questions/69691366/using-serde-for-two-deserialization-formats +impl CsvMessagePackDeserializer { + fn new(reader: R) -> CsvMessagePackDeserializer { + CsvMessagePackDeserializer { + reader: csv::Reader::from_reader(reader), + } + } +} -// // impl +impl RecordDeserializer for CsvMessagePackDeserializer { + fn deserialize(&mut self) -> Result, anyhow::Error> { + // TODO: This isn't great, need to somehow maintain the state/position + match self.reader.deserialize().next() { + None => Ok(Option::None), + Some(result) => match result { + Ok(ok) => Ok(Option::Some(ok)), + Err(err) => bail!(err), + }, + } + } -// // TODO: Want to be able to deserialise with serde over a reader like we currently do with the writer (rather than forcing to deserialize to a hashmap) -// // However, this doesn't really work when returning an arbitrary iterator, so the iterator should be specific, like what's done in + fn position(&mut self, record: Position) -> anyhow::Result<()> { + self.reader.seek(record)?; + Ok(()) + } +} -// // This from rust messagepack is basically what I'm looking for? Would prefer to be an iterator but might still work? -// // https://github.com/3Hren/msgpack-rust/blob/master/rmp-serde/src/decode.rs#L1017 -// // I think what might be possible is to have a struct for messagepack that wraps the deserializer, implements the iter trait (for result deserializeowned), -// // then just always return that struct (avoids the issues of returning an arbitrary trait). Really something like I did with RecordSerializer would be the best -// // But the problem is I can't easily return arbitrary iterators without boxing (then specifying what I'm returning anyway), really in java the implementing -// // class could return whatever implementation it wants... -// // Maybe I just create a custom deserializer that implements Deserializer for both csv and msgpack? Then just return the struct? Seems like -// // the opposite of recordserializer though which is kind of annoying. +struct MessagePackDeserializer { + reader: Deserializer>, + record_positions: Vec, +} -// // Really what I would like is to be able to just pass a deserializer, but the issues is csv doesn't export its deserializer (though it does have an implementation) -// // Actually it's even more complex than that, as I want to also be able to seek in the underlying reader somehow, I think this'll only be supported by -// // csv though? For now just leave it and stick with csv reading instead +impl MessagePackDeserializer { + fn new(reader: R) -> MessagePackDeserializer { + MessagePackDeserializer { + reader: Deserializer::new(reader), + record_positions: vec![], + } + } +} -// pub trait RecordDeserializer { -// // TODO: Don't do this, deserializer is for messagepack only, want it to easily support other deserializers -// fn deserializeT>(&mut self) -> Result; -// } +// TODO: These need tests +impl RecordDeserializer for MessagePackDeserializer { + fn deserialize(&mut self) -> Result, anyhow::Error> { + // Keep track of byte position of each record, in case we want to go back later + let current_position = self.reader.get_mut().stream_position()?; + if self + .record_positions + .last() + .map_or(true, |position| *position < current_position) + { + self.record_positions.push(current_position); + } + match Deserialize::deserialize(&mut self.reader) { + Ok(value) => Ok(value), + Err(value) => Err(anyhow::Error::from(value)), + } + } -// // impl RecordDeserializer for csv::Reader { -// // fn deserialize( -// // &mut self, -// // ) -> Box>> { -// // Box::new(self.deserialize().into_iter().map(|r| match r { -// // Ok(ok) => Ok(ok), -// // Err(err) => bail!(err), -// // })) -// // } -// // } - -// impl RecordDeserializer for Deserializer { -// fn deserializeT<'de, D>(&mut self) -> Result -// where -// D: Deserialize<'de>, -// { -// match Deserialize::deserialize(self) { -// Ok(ok) => Ok(ok), -// Err(e) => Err(anyhow::Error::from(e)), -// } -// } -// } + fn position(&mut self, record: usize) -> anyhow::Result<()> { + let reader = self.reader.get_mut(); + // Unsigned so can't be less than 0 + if self.record_positions.len() > record { + // Go to position in reader + let position = self.record_positions[record]; + reader.seek(std::io::SeekFrom::Start(position))?; + } else { + // read through the reader until we get to the correct record + bail!("Record hasn't been read yet, please use deserialize to find the record") + } + Ok(()) + } +}