Add initial reader record reader trait and implementations
This commit is contained in:
147
src/io.rs
147
src/io.rs
@@ -1,7 +1,15 @@
|
||||
use std::io::{Read, Write};
|
||||
use std::{
|
||||
io::{Read, Seek, Write},
|
||||
thread::current,
|
||||
};
|
||||
|
||||
use rmp_serde::{Deserializer, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use anyhow::bail;
|
||||
use csv::Position;
|
||||
use rmp_serde::{
|
||||
decode::{ReadReader, ReadRefReader, ReadSlice},
|
||||
from_read, Deserializer, Serializer,
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
|
||||
pub trait RecordSerializer {
|
||||
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()>;
|
||||
@@ -21,68 +29,89 @@ impl<W: Write> RecordSerializer for Serializer<W> {
|
||||
}
|
||||
}
|
||||
|
||||
// // pub struct RecordDeserializerIterWrapper<
|
||||
// // D: DeserializeOwned,
|
||||
// // I: Iterator<Item = Result<D, anyhow::Error>>,
|
||||
// // > {
|
||||
// // pub inner: I,
|
||||
// // }
|
||||
// TODO: I still don't like this api, should split deserialize and position at the least,
|
||||
// and we need a way to get the current position (otherwise it's left to consumers to track current)
|
||||
// position
|
||||
pub trait RecordDeserializer<P> {
|
||||
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error>;
|
||||
|
||||
// // impl<D: DeserializeOwned, I: Iterator<Item = Result<D, anyhow::Error>>> Iterator
|
||||
// // for RecordDeserializerIterWrapper<D, I>
|
||||
// // {
|
||||
// // type Item = Result<D, anyhow::Error>;
|
||||
// Move the deserializer to the specified position in the underlying reader
|
||||
fn position(&mut self, record: P) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
// // fn next(&mut self) -> Option<Self::Item> {
|
||||
// // self.inner.next()
|
||||
// // }
|
||||
// // }
|
||||
struct CsvMessagePackDeserializer<R> {
|
||||
reader: csv::Reader<R>,
|
||||
}
|
||||
|
||||
// // This person has the same issue as me (it's exactly the same, just with different serde implementations)
|
||||
// // https://stackoverflow.com/questions/69691366/using-serde-for-two-deserialization-formats
|
||||
impl<R: Read> CsvMessagePackDeserializer<R> {
|
||||
fn new(reader: R) -> CsvMessagePackDeserializer<R> {
|
||||
CsvMessagePackDeserializer {
|
||||
reader: csv::Reader::from_reader(reader),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // impl <I: Iterator<Item = Iterator for RecordDeserializerIterWrapper<>
|
||||
impl<R: Read + Seek> RecordDeserializer<Position> for CsvMessagePackDeserializer<R> {
|
||||
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error> {
|
||||
// TODO: This isn't great, need to somehow maintain the state/position
|
||||
match self.reader.deserialize().next() {
|
||||
None => Ok(Option::None),
|
||||
Some(result) => match result {
|
||||
Ok(ok) => Ok(Option::Some(ok)),
|
||||
Err(err) => bail!(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// // TODO: Want to be able to deserialise with serde over a reader like we currently do with the writer (rather than forcing to deserialize to a hashmap)
|
||||
// // However, this doesn't really work when returning an arbitrary iterator, so the iterator should be specific, like what's done in
|
||||
fn position(&mut self, record: Position) -> anyhow::Result<()> {
|
||||
self.reader.seek(record)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// // This from rust messagepack is basically what I'm looking for? Would prefer to be an iterator but might still work?
|
||||
// // https://github.com/3Hren/msgpack-rust/blob/master/rmp-serde/src/decode.rs#L1017
|
||||
// // I think what might be possible is to have a struct for messagepack that wraps the deserializer, implements the iter trait (for result deserializeowned),
|
||||
// // then just always return that struct (avoids the issues of returning an arbitrary trait). Really something like I did with RecordSerializer would be the best
|
||||
// // But the problem is I can't easily return arbitrary iterators without boxing (then specifying what I'm returning anyway), really in java the implementing
|
||||
// // class could return whatever implementation it wants...
|
||||
// // Maybe I just create a custom deserializer that implements Deserializer for both csv and msgpack? Then just return the struct? Seems like
|
||||
// // the opposite of recordserializer though which is kind of annoying.
|
||||
struct MessagePackDeserializer<R: Read> {
|
||||
reader: Deserializer<ReadReader<R>>,
|
||||
record_positions: Vec<u64>,
|
||||
}
|
||||
|
||||
// // Really what I would like is to be able to just pass a deserializer, but the issues is csv doesn't export its deserializer (though it does have an implementation)
|
||||
// // Actually it's even more complex than that, as I want to also be able to seek in the underlying reader somehow, I think this'll only be supported by
|
||||
// // csv though? For now just leave it and stick with csv reading instead
|
||||
impl<R: Read + Seek> MessagePackDeserializer<R> {
|
||||
fn new(reader: R) -> MessagePackDeserializer<R> {
|
||||
MessagePackDeserializer {
|
||||
reader: Deserializer::new(reader),
|
||||
record_positions: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pub trait RecordDeserializer {
|
||||
// // TODO: Don't do this, deserializer is for messagepack only, want it to easily support other deserializers
|
||||
// fn deserializeT<D: Deserialize<'de>>(&mut self) -> Result<D, anyhow::Error>;
|
||||
// }
|
||||
// TODO: These need tests
|
||||
impl<R: Read + Seek> RecordDeserializer<usize> for MessagePackDeserializer<R> {
|
||||
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error> {
|
||||
// Keep track of byte position of each record, in case we want to go back later
|
||||
let current_position = self.reader.get_mut().stream_position()?;
|
||||
if self
|
||||
.record_positions
|
||||
.last()
|
||||
.map_or(true, |position| *position < current_position)
|
||||
{
|
||||
self.record_positions.push(current_position);
|
||||
}
|
||||
match Deserialize::deserialize(&mut self.reader) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(value) => Err(anyhow::Error::from(value)),
|
||||
}
|
||||
}
|
||||
|
||||
// // impl<R: Read> RecordDeserializer for csv::Reader<R> {
|
||||
// // fn deserialize<D: DeserializeOwned>(
|
||||
// // &mut self,
|
||||
// // ) -> Box<dyn Iterator<Item = Result<D, anyhow::Error>>> {
|
||||
// // Box::new(self.deserialize().into_iter().map(|r| match r {
|
||||
// // Ok(ok) => Ok(ok),
|
||||
// // Err(err) => bail!(err),
|
||||
// // }))
|
||||
// // }
|
||||
// // }
|
||||
|
||||
// impl<R: Read> RecordDeserializer for Deserializer<R> {
|
||||
// fn deserializeT<'de, D>(&mut self) -> Result<D, anyhow::Error>
|
||||
// where
|
||||
// D: Deserialize<'de>,
|
||||
// {
|
||||
// match Deserialize::deserialize(self) {
|
||||
// Ok(ok) => Ok(ok),
|
||||
// Err(e) => Err(anyhow::Error::from(e)),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
fn position(&mut self, record: usize) -> anyhow::Result<()> {
|
||||
let reader = self.reader.get_mut();
|
||||
// Unsigned so can't be less than 0
|
||||
if self.record_positions.len() > record {
|
||||
// Go to position in reader
|
||||
let position = self.record_positions[record];
|
||||
reader.seek(std::io::SeekFrom::Start(position))?;
|
||||
} else {
|
||||
// read through the reader until we get to the correct record
|
||||
bail!("Record hasn't been read yet, please use deserialize to find the record")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user