Files
ingey/src/io.rs

100 lines
2.8 KiB
Rust

use anyhow::bail;
use rmp_serde::{decode::ReadReader, Deserializer, Serializer};
use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::path::PathBuf;
use std::{
collections::BTreeMap,
io::{Read, Seek, Write},
};
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub enum SourceType {
CSV,
PARQUET,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
pub struct DataSource {
pub path: PathBuf,
pub source_type: SourceType,
}
pub trait RecordSerializer {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()>;
// For when serde serialization can't be used. Forcing BTreeMap to ensure keys/values are
// sorted consistently
fn write_header(&mut self, record: &BTreeMap<String, String>) -> anyhow::Result<()>;
fn write_record(&mut self, record: &BTreeMap<String, String>) -> anyhow::Result<()>;
fn flush(&mut self) -> anyhow::Result<()>;
}
impl<W: Write> RecordSerializer for csv::Writer<W> {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()> {
self.serialize(record)?;
Ok(())
}
fn flush(&mut self) -> anyhow::Result<()> {
self.flush()?;
Ok(())
}
fn write_header(&mut self, record: &BTreeMap<String, String>) -> anyhow::Result<()> {
self.write_record(record.keys())?;
Ok(())
}
fn write_record(&mut self, record: &BTreeMap<String, String>) -> anyhow::Result<()> {
self.write_record(record.values())?;
Ok(())
}
}
impl<W: Write> RecordSerializer for Serializer<W> {
fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()> {
record.serialize(self)?;
Ok(())
}
fn flush(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn write_header(&mut self, _: &BTreeMap<String, String>) -> anyhow::Result<()> {
Ok(())
}
fn write_record(&mut self, record: &BTreeMap<String, String>) -> anyhow::Result<()> {
self.serialize(record)?;
Ok(())
}
}
pub trait RecordDeserializer {
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error>;
}
impl<R: Read> RecordDeserializer for csv::Reader<R> {
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error> {
match self.deserialize().next() {
None => Ok(Option::None),
Some(result) => match result {
Ok(ok) => Ok(Option::Some(ok)),
Err(err) => bail!(err),
},
}
}
}
impl<R: Read + Seek> RecordDeserializer for Deserializer<ReadReader<R>> {
fn deserialize<D: DeserializeOwned>(&mut self) -> Result<Option<D>, anyhow::Error> {
match Deserialize::deserialize(self) {
Ok(value) => Ok(value),
Err(value) => Err(anyhow::Error::from(value)),
}
}
}