Refactor filters for derive node, stub derive node implementation, fix first line check in filter node
This commit is contained in:
174
src/derive.rs
174
src/derive.rs
@@ -1,5 +1,12 @@
|
|||||||
|
use std::{collections::BTreeMap, str::FromStr};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
io::{RecordDeserializer, RecordSerializer},
|
||||||
|
node::RunnableNode,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub enum DeriveColumnType {
|
pub enum DeriveColumnType {
|
||||||
Column(String),
|
Column(String),
|
||||||
@@ -63,13 +70,178 @@ pub struct DeriveFilter {
|
|||||||
pub value_type: ValueType,
|
pub value_type: ValueType,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum Comparator<T: PartialOrd> {
|
||||||
|
Equal(T),
|
||||||
|
NotEqual(T),
|
||||||
|
GreaterThan(T),
|
||||||
|
LessThan(T),
|
||||||
|
In(Vec<T>),
|
||||||
|
NotIn(Vec<T>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: PartialOrd> Comparator<T> {
|
||||||
|
pub fn is_valid(&self, value: T) -> bool {
|
||||||
|
match self {
|
||||||
|
Comparator::Equal(v) => value == *v,
|
||||||
|
Comparator::NotEqual(v) => value != *v,
|
||||||
|
Comparator::GreaterThan(v) => value > *v,
|
||||||
|
Comparator::LessThan(v) => value < *v,
|
||||||
|
Comparator::In(v) => v.contains(&value),
|
||||||
|
Comparator::NotIn(v) => !v.contains(&value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait FieldName {
|
||||||
|
// Name of the field this validator should work on
|
||||||
|
fn get_field_name(&self) -> String;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type DataValidators = Vec<Box<dyn DataValidator>>;
|
||||||
|
|
||||||
|
pub trait DataValidator: FieldName {
|
||||||
|
// Whether the given value is valid for the validator
|
||||||
|
fn is_valid(&self, s: &str) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FilterRule<T: PartialOrd> {
|
||||||
|
pub column_name: String,
|
||||||
|
pub comparator: Comparator<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: PartialOrd> FieldName for FilterRule<T> {
|
||||||
|
fn get_field_name(&self) -> String {
|
||||||
|
self.column_name.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: FromStr + PartialOrd> DataValidator for FilterRule<T> {
|
||||||
|
fn is_valid(&self, s: &str) -> bool {
|
||||||
|
s.parse().map_or(false, |f| self.comparator.is_valid(f))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_filter_rules(filters: &Vec<DeriveFilter>) -> anyhow::Result<Vec<Box<dyn DataValidator>>> {
|
||||||
|
filters
|
||||||
|
.iter()
|
||||||
|
// For some reason inlining to_filter_rules causes a compiler error, so leaving
|
||||||
|
// in a separate function (it is cleaner at least)
|
||||||
|
.map(|filter| to_filter_rule(filter))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_filter_rule(filter: &DeriveFilter) -> anyhow::Result<Box<dyn DataValidator>> {
|
||||||
|
let value = filter.match_value.clone();
|
||||||
|
match filter.value_type {
|
||||||
|
crate::derive::ValueType::String => Ok(Box::new(get_filter_rule(filter, value))),
|
||||||
|
crate::derive::ValueType::Integer => {
|
||||||
|
Ok(Box::new(get_filter_rule(filter, value.parse::<i64>()?)))
|
||||||
|
}
|
||||||
|
crate::derive::ValueType::Float => {
|
||||||
|
Ok(Box::new(get_filter_rule(filter, value.parse::<f64>()?)))
|
||||||
|
}
|
||||||
|
crate::derive::ValueType::Boolean => {
|
||||||
|
Ok(Box::new(get_filter_rule(filter, value.parse::<bool>()?)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_filter_rule<T: PartialOrd>(filter: &DeriveFilter, value: T) -> FilterRule<T> {
|
||||||
|
FilterRule {
|
||||||
|
column_name: filter.column_name.clone(),
|
||||||
|
comparator: match filter.comparator {
|
||||||
|
MatchComparisonType::Equal => Comparator::Equal(value),
|
||||||
|
MatchComparisonType::GreaterThan => Comparator::GreaterThan(value),
|
||||||
|
MatchComparisonType::LessThan => Comparator::LessThan(value),
|
||||||
|
MatchComparisonType::NotEqual => Comparator::NotEqual(value),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
|
pub struct DeriveColumnOperation {
|
||||||
|
pub column_name: String,
|
||||||
|
pub operation: DeriveOperation,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub struct DeriveRule {
|
pub struct DeriveRule {
|
||||||
pub operations: Vec<DeriveOperation>,
|
pub operations: Vec<DeriveColumnOperation>,
|
||||||
pub filters: Vec<DeriveFilter>,
|
pub filters: Vec<DeriveFilter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub struct DeriveNode {
|
pub struct DeriveNode {
|
||||||
pub rules: Vec<DeriveRule>,
|
pub rules: Vec<DeriveRule>,
|
||||||
|
pub input_file_path: String,
|
||||||
|
pub output_file_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RunnableDeriveRule {
|
||||||
|
pub operations: Vec<DeriveColumnOperation>,
|
||||||
|
pub filters: Vec<Box<dyn DataValidator>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DeriveRule {
|
||||||
|
fn to_runnable_rule(&self) -> anyhow::Result<RunnableDeriveRule> {
|
||||||
|
let filters = to_filter_rules(&self.filters)?;
|
||||||
|
Ok(RunnableDeriveRule {
|
||||||
|
operations: self.operations.clone(),
|
||||||
|
filters,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive(
|
||||||
|
rules: &Vec<RunnableDeriveRule>,
|
||||||
|
input: &mut impl RecordDeserializer,
|
||||||
|
output: &mut impl RecordSerializer,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
if let Some(line) = input.deserialize()? {
|
||||||
|
let line: BTreeMap<String, String> = line;
|
||||||
|
output.write_header(&line)?;
|
||||||
|
derive_line(line, rules, output)?;
|
||||||
|
|
||||||
|
while let Some(line) = input.deserialize()? {
|
||||||
|
let line: BTreeMap<String, String> = line;
|
||||||
|
derive_line(line, rules, output)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_line(
|
||||||
|
line: BTreeMap<String, String>,
|
||||||
|
rules: &Vec<RunnableDeriveRule>,
|
||||||
|
output: &mut impl RecordSerializer,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
for rule in rules {
|
||||||
|
// First check the filter works. If there are no filters, the rule applies to all rows
|
||||||
|
for filter in &rule.filters {}
|
||||||
|
// TODO: Split operations should be processed separately, after all the other operations have been applied
|
||||||
|
// Apply all operations individually, adding as a column to the record map
|
||||||
|
for operation in &rule.operations {}
|
||||||
|
}
|
||||||
|
// for line in line {
|
||||||
|
output.serialize(line)
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DeriveNodeRunner {
|
||||||
|
derive_node: DeriveNode,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RunnableNode for DeriveNodeRunner {
|
||||||
|
fn run(&self) -> anyhow::Result<()> {
|
||||||
|
let mut reader = csv::Reader::from_path(&self.derive_node.input_file_path)?;
|
||||||
|
let mut writer = csv::Writer::from_path(&self.derive_node.output_file_path)?;
|
||||||
|
let rules: anyhow::Result<Vec<RunnableDeriveRule>> = self
|
||||||
|
.derive_node
|
||||||
|
.rules
|
||||||
|
.iter()
|
||||||
|
.map(|rule| rule.to_runnable_rule())
|
||||||
|
.collect();
|
||||||
|
let rules = rules?;
|
||||||
|
derive(&rules, &mut reader, &mut writer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
142
src/filter.rs
142
src/filter.rs
@@ -1,79 +1,15 @@
|
|||||||
use std::{collections::BTreeMap, str::FromStr};
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
derive::{DeriveFilter, MatchComparisonType},
|
derive::{to_filter_rules, DataValidator, DataValidators, DeriveFilter},
|
||||||
io::{RecordDeserializer, RecordSerializer},
|
io::{RecordDeserializer, RecordSerializer},
|
||||||
node::RunnableNode,
|
node::RunnableNode,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub enum Comparator<T: PartialOrd> {
|
fn is_line_valid(line: &BTreeMap<String, String>, rules: &DataValidators) -> bool {
|
||||||
Equal(T),
|
rules.iter().all(|rule| {
|
||||||
NotEqual(T),
|
|
||||||
GreaterThan(T),
|
|
||||||
LessThan(T),
|
|
||||||
In(Vec<T>),
|
|
||||||
NotIn(Vec<T>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: PartialOrd> Comparator<T> {
|
|
||||||
pub fn is_valid(&self, value: T) -> bool {
|
|
||||||
match self {
|
|
||||||
Comparator::Equal(v) => value == *v,
|
|
||||||
Comparator::NotEqual(v) => value != *v,
|
|
||||||
Comparator::GreaterThan(v) => value > *v,
|
|
||||||
Comparator::LessThan(v) => value < *v,
|
|
||||||
Comparator::In(v) => v.contains(&value),
|
|
||||||
Comparator::NotIn(v) => !v.contains(&value),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait FieldName {
|
|
||||||
// Name of the field this validator should work on
|
|
||||||
fn get_field_name(&self) -> String;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait DataValidator: FieldName {
|
|
||||||
// Whether the given value is valid for the validator
|
|
||||||
fn is_valid(&self, s: &str) -> bool;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FilterRule<T: PartialOrd> {
|
|
||||||
column_name: String,
|
|
||||||
comparator: Comparator<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: PartialOrd> FieldName for FilterRule<T> {
|
|
||||||
fn get_field_name(&self) -> String {
|
|
||||||
self.column_name.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: FromStr + PartialOrd> DataValidator for FilterRule<T> {
|
|
||||||
fn is_valid(&self, s: &str) -> bool {
|
|
||||||
s.parse().map_or(false, |f| self.comparator.is_valid(f))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write all lines from the input file to the output file, skipping records
|
|
||||||
* that don't satisfy the filter criteria
|
|
||||||
*/
|
|
||||||
pub fn filter_file(
|
|
||||||
rules: &Vec<Box<dyn DataValidator>>,
|
|
||||||
input: &mut impl RecordDeserializer,
|
|
||||||
output: &mut impl RecordSerializer,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
if let Some(line) = input.deserialize()? {
|
|
||||||
let line: BTreeMap<String, String> = line;
|
|
||||||
output.write_header(&line)?;
|
|
||||||
output.write_record(&line)?;
|
|
||||||
|
|
||||||
while let Some(line) = input.deserialize()? {
|
|
||||||
let line: BTreeMap<String, String> = line;
|
|
||||||
if rules.iter().all(|rule| {
|
|
||||||
line.get(&rule.get_field_name()).map_or(true, |value| {
|
line.get(&rule.get_field_name()).map_or(true, |value| {
|
||||||
if value.trim().is_empty() {
|
if value.trim().is_empty() {
|
||||||
true
|
true
|
||||||
@@ -81,7 +17,29 @@ pub fn filter_file(
|
|||||||
rule.is_valid(value)
|
rule.is_valid(value)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}) {
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write all lines from the input file to the output file, skipping records
|
||||||
|
* that don't satisfy the filter criteria
|
||||||
|
*/
|
||||||
|
pub fn filter_file(
|
||||||
|
rules: &DataValidators,
|
||||||
|
input: &mut impl RecordDeserializer,
|
||||||
|
output: &mut impl RecordSerializer,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
if let Some(line) = input.deserialize()? {
|
||||||
|
let line: BTreeMap<String, String> = line;
|
||||||
|
output.write_header(&line)?;
|
||||||
|
|
||||||
|
if (is_line_valid(&line, &rules)) {
|
||||||
|
output.write_record(&line)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(line) = input.deserialize()? {
|
||||||
|
let line: BTreeMap<String, String> = line;
|
||||||
|
if is_line_valid(&line, rules) {
|
||||||
output.write_record(&line)?;
|
output.write_record(&line)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -97,45 +55,6 @@ pub struct FilterNode {
|
|||||||
pub output_file_path: String,
|
pub output_file_path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilterNode {
|
|
||||||
fn to_filter_rules(&self) -> anyhow::Result<Vec<Box<dyn DataValidator>>> {
|
|
||||||
self.filters
|
|
||||||
.iter()
|
|
||||||
// For some reason inlining to_filter_rules causes a compiler error, so leaving
|
|
||||||
// in a separate function (it is cleaner at least)
|
|
||||||
.map(|filter| to_filter_rule(filter))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_filter_rule(filter: &DeriveFilter) -> anyhow::Result<Box<dyn DataValidator>> {
|
|
||||||
let value = filter.match_value.clone();
|
|
||||||
match filter.value_type {
|
|
||||||
crate::derive::ValueType::String => Ok(Box::new(get_filter_rule(filter, value))),
|
|
||||||
crate::derive::ValueType::Integer => {
|
|
||||||
Ok(Box::new(get_filter_rule(filter, value.parse::<i64>()?)))
|
|
||||||
}
|
|
||||||
crate::derive::ValueType::Float => {
|
|
||||||
Ok(Box::new(get_filter_rule(filter, value.parse::<f64>()?)))
|
|
||||||
}
|
|
||||||
crate::derive::ValueType::Boolean => {
|
|
||||||
Ok(Box::new(get_filter_rule(filter, value.parse::<bool>()?)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_filter_rule<T: PartialOrd>(filter: &DeriveFilter, value: T) -> FilterRule<T> {
|
|
||||||
FilterRule {
|
|
||||||
column_name: filter.column_name.clone(),
|
|
||||||
comparator: match filter.comparator {
|
|
||||||
MatchComparisonType::Equal => Comparator::Equal(value),
|
|
||||||
MatchComparisonType::GreaterThan => Comparator::GreaterThan(value),
|
|
||||||
MatchComparisonType::LessThan => Comparator::LessThan(value),
|
|
||||||
MatchComparisonType::NotEqual => Comparator::NotEqual(value),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FilterNodeRunner {
|
pub struct FilterNodeRunner {
|
||||||
pub filter_node: FilterNode,
|
pub filter_node: FilterNode,
|
||||||
}
|
}
|
||||||
@@ -144,14 +63,15 @@ impl RunnableNode for FilterNodeRunner {
|
|||||||
fn run(&self) -> anyhow::Result<()> {
|
fn run(&self) -> anyhow::Result<()> {
|
||||||
let mut reader = csv::Reader::from_path(&self.filter_node.input_file_path)?;
|
let mut reader = csv::Reader::from_path(&self.filter_node.input_file_path)?;
|
||||||
let mut writer = csv::Writer::from_path(&self.filter_node.output_file_path)?;
|
let mut writer = csv::Writer::from_path(&self.filter_node.output_file_path)?;
|
||||||
let rules = self.filter_node.to_filter_rules()?;
|
let rules = to_filter_rules(&self.filter_node.filters)?;
|
||||||
filter_file(&rules, &mut reader, &mut writer)
|
filter_file(&rules, &mut reader, &mut writer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::filter::FilterRule;
|
|
||||||
|
use crate::derive::{Comparator, FilterRule};
|
||||||
|
|
||||||
use super::filter_file;
|
use super::filter_file;
|
||||||
|
|
||||||
@@ -183,7 +103,7 @@ Value3,Value4
|
|||||||
filter_file(
|
filter_file(
|
||||||
&vec![Box::new(FilterRule {
|
&vec![Box::new(FilterRule {
|
||||||
column_name: "Column1".to_owned(),
|
column_name: "Column1".to_owned(),
|
||||||
comparator: crate::filter::Comparator::NotEqual("Value3".to_owned()),
|
comparator: Comparator::NotEqual("Value3".to_owned()),
|
||||||
})],
|
})],
|
||||||
&mut reader,
|
&mut reader,
|
||||||
&mut writer,
|
&mut writer,
|
||||||
|
|||||||
Reference in New Issue
Block a user