From 5acee8c889d5c0e12e406ec056bb0e5891885548 Mon Sep 17 00:00:00 2001 From: vato007 Date: Sun, 28 Jul 2024 16:41:49 +0930 Subject: [PATCH] Add custom graph executor and implement filter node to test it (#2) Reviewed-on: https://git.michaelpivato.dev/vato007/coster-rs/pulls/2 --- Cargo.lock | 816 ++++++++++++++++++++++---------- Cargo.toml | 2 +- src/bin/agent2/main.rs | 14 +- src/derive.rs | 75 +++ src/filter.rs | 157 +++++- src/graph.rs | 310 +++++++----- src/io.rs | 111 ++--- src/lib.rs | 29 +- src/link.rs | 4 +- src/node.rs | 6 + src/products/create_products.rs | 2 +- src/upload_to_db.rs | 97 ++-- 12 files changed, 1123 insertions(+), 500 deletions(-) create mode 100644 src/derive.rs create mode 100644 src/node.rs diff --git a/Cargo.lock b/Cargo.lock index 491fe0c..86aefda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,25 +4,15 @@ version = 3 [[package]] name = "ahash" -version = "0.7.6" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - -[[package]] -name = "ahash" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "getrandom", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -101,7 +91,7 @@ version = "0.17.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59c468daea140b747d781a1da9f7db5f0a8e6636d4af20cc539e43d05b0604fa" dependencies = [ - "ahash 0.8.3", + "ahash", "arrow-format", "bytemuck", "chrono", @@ -132,16 +122,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", -] - -[[package]] -name = "atoi" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" -dependencies = [ - "num-traits", + "syn 2.0.72", ] [[package]] @@ -161,9 +142,21 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" -version = "0.21.0" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "bitflags" @@ -176,6 +169,9 @@ name = "bitflags" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +dependencies = [ + "serde", +] [[package]] name = "block-buffer" @@ -221,7 +217,7 @@ checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.72", ] [[package]] @@ -287,11 +283,11 @@ version = "4.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44bec8e5c9d09e439c4335b1af0abaab56dcf3b94999a936e1bb47b9134288f0" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", ] [[package]] @@ -325,6 +321,21 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -437,7 +448,7 @@ dependencies = [ "crossterm_winapi", "libc", "mio", - "parking_lot 0.12.1", + "parking_lot", "signal-hook", "signal-hook-mio", "winapi", @@ -508,7 +519,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 1.0.95", + "syn 1.0.109", ] [[package]] @@ -525,7 +536,18 @@ checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", +] + +[[package]] +name = "der" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", ] [[package]] @@ -535,7 +557,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ "block-buffer", + "const-oid", "crypto-common", + "subtle", ] [[package]] @@ -555,14 +579,8 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" - -[[package]] -name = "encoding_rs" -version = "0.8.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" dependencies = [ - "cfg-if", + "serde", ] [[package]] @@ -574,7 +592,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.72", ] [[package]] @@ -614,6 +632,17 @@ dependencies = [ "libc", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "ethnum" version = "1.4.0" @@ -622,9 +651,14 @@ checksum = "6c8ff382b2fa527fb7fb06eeebfc5bbb3f17e3cc6b9d70b006c41daa8824adac" [[package]] name = "event-listener" -version = "2.5.3" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] [[package]] name = "fast-float" @@ -638,6 +672,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "foreign_vec" version = "0.1.0" @@ -697,13 +742,13 @@ dependencies = [ [[package]] name = "futures-intrusive" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" dependencies = [ "futures-core", "lock_api", - "parking_lot 0.11.2", + "parking_lot", ] [[package]] @@ -720,7 +765,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.72", ] [[package]] @@ -790,31 +835,22 @@ checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" [[package]] name = "hashbrown" -version = "0.12.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "ahash 0.7.6", -] - -[[package]] -name = "hashbrown" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" -dependencies = [ - "ahash 0.8.3", + "ahash", "allocator-api2", "rayon", ] [[package]] name = "hashlink" -version = "0.8.1" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ - "hashbrown 0.12.3", + "hashbrown", ] [[package]] @@ -822,9 +858,12 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -847,6 +886,24 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.5" @@ -890,16 +947,6 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "indexmap" -version = "1.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.0.2" @@ -907,16 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.1", -] - -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", + "hashbrown", ] [[package]] @@ -985,6 +1023,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] [[package]] name = "lexical" @@ -1061,9 +1102,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libm" @@ -1071,6 +1112,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.8" @@ -1104,12 +1156,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lz4" @@ -1140,6 +1189,15 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest", +] + [[package]] name = "memchr" version = "2.6.4" @@ -1200,7 +1258,7 @@ checksum = "26a83d8500ed06d68877e9de1dde76c1dbb83885dcdbda4ef44ccbc3fbda2ac8" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", "target-features", ] @@ -1228,7 +1286,7 @@ checksum = "01fcc0b8149b4632adc89ac3b7b31a12fb6099a0317a4eb2ebff574ef7de7218" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", ] [[package]] @@ -1259,6 +1317,23 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-complex" version = "0.4.1" @@ -1278,6 +1353,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-rational" version = "0.4.1" @@ -1322,15 +1408,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" [[package]] -name = "parking_lot" -version = "0.11.2" +name = "parking" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" @@ -1339,21 +1420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -1375,6 +1442,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -1383,9 +1459,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -1393,6 +1469,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -1431,7 +1528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f967c901fa5da4ca7f64e813d1268488ba97e9b3004cefc579ff851c197a1138" dependencies = [ "arrow2", - "hashbrown 0.14.1", + "hashbrown", "multiversion", "num-traits", "polars-error", @@ -1445,14 +1542,14 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b24f92fc5b167f668ff85ab9607dfa72e2c09664cacef59297ee8601dee60126" dependencies = [ - "ahash 0.8.3", + "ahash", "arrow2", "bitflags 2.4.0", "chrono", "comfy-table", "either", - "hashbrown 0.14.1", - "indexmap 2.0.2", + "hashbrown", + "indexmap", "num-traits", "once_cell", "polars-arrow", @@ -1486,7 +1583,7 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92cab0df9f2a35702fa5aec99edfaabf9ae8e9cdd0acf69e143ad2d132f34f9c" dependencies = [ - "ahash 0.8.3", + "ahash", "arrow2", "async-trait", "bytes", @@ -1517,7 +1614,7 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c33762ec2a55e01c9f8776b34db86257c70a0a3b3929bd4eb91a52aacf61456" dependencies = [ - "ahash 0.8.3", + "ahash", "bitflags 2.4.0", "glob", "once_cell", @@ -1543,7 +1640,7 @@ dependencies = [ "argminmax", "arrow2", "either", - "indexmap 2.0.2", + "indexmap", "memchr", "polars-arrow", "polars-core", @@ -1561,7 +1658,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-queue", "enum_dispatch", - "hashbrown 0.14.1", + "hashbrown", "num-traits", "polars-arrow", "polars-core", @@ -1581,7 +1678,7 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb67b014f0295e8e9dbb84404a91d666d477b3bc248a2ed51bc442833b16da35" dependencies = [ - "ahash 0.8.3", + "ahash", "arrow2", "once_cell", "polars-arrow", @@ -1630,7 +1727,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53f42d2632f5971c9575041d33cbcfb1f996900c40bbf58bc6eb0a0c5efbecea" dependencies = [ "arrow2", - "atoi 2.0.0", + "atoi", "chrono", "now", "once_cell", @@ -1648,8 +1745,8 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c326708a370d71dc6e11a8f4bbc10a8479e1c314dc048ba73543b815cd0bf339" dependencies = [ - "ahash 0.8.3", - "hashbrown 0.14.1", + "ahash", + "hashbrown", "num-traits", "once_cell", "polars-error", @@ -1674,7 +1771,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", "version_check", ] @@ -1691,18 +1788,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.33" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -1836,17 +1933,16 @@ checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33" [[package]] name = "ring" -version = "0.16.20" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e" dependencies = [ "cc", + "getrandom", "libc", - "once_cell", - "spin", + "spin 0.9.8", "untrusted", - "web-sys", - "winapi", + "windows-sys 0.48.0", ] [[package]] @@ -1871,6 +1967,26 @@ dependencies = [ "serde", ] +[[package]] +name = "rsa" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rustc_version" version = "0.4.0" @@ -1896,9 +2012,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.4.0", "errno 0.3.8", @@ -1909,23 +2025,32 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.8" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ - "log", "ring", + "rustls-webpki", "sct", - "webpki", ] [[package]] name = "rustls-pemfile" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", ] [[package]] @@ -1963,9 +2088,9 @@ checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" [[package]] name = "sct" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ "ring", "untrusted", @@ -1994,7 +2119,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.72", ] [[package]] @@ -2008,6 +2133,29 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa 1.0.6", + "ryu", + "serde", +] + +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.6" @@ -2049,6 +2197,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "simba" version = "0.7.1" @@ -2082,6 +2240,9 @@ name = "smallvec" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +dependencies = [ + "serde", +] [[package]] name = "smartstring" @@ -2110,6 +2271,25 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "sqlformat" version = "0.2.1" @@ -2132,88 +2312,199 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.6.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428" +checksum = "27144619c6e5802f1380337a209d2ac1c431002dd74c6e60aebff3c506dc4f0c" dependencies = [ "sqlx-core", "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", ] [[package]] name = "sqlx-core" -version = "0.6.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105" +checksum = "a999083c1af5b5d6c071d34a708a19ba3e02106ad82ef7bbd69f5e48266b613b" dependencies = [ - "ahash 0.7.6", - "atoi 1.0.0", - "bitflags 1.3.2", + "atoi", "byteorder", "bytes", "crc", "crossbeam-queue", - "dotenvy", "either", - "encoding_rs", "event-listener", "futures-channel", "futures-core", "futures-intrusive", + "futures-io", "futures-util", + "hashbrown", "hashlink", "hex", - "indexmap 1.9.2", - "itoa 1.0.6", - "libc", + "indexmap", "log", "memchr", "once_cell", "paste", "percent-encoding", - "regex", "rustls", "rustls-pemfile", + "serde", + "serde_json", "sha2", "smallvec", "sqlformat", - "sqlx-rt", - "stringprep", "thiserror", + "tokio", "tokio-stream", + "tracing", "url", - "uuid", "webpki-roots", ] [[package]] name = "sqlx-macros" -version = "0.6.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9" +checksum = "a23217eb7d86c584b8cbe0337b9eacf12ab76fe7673c513141ec42565698bb88" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.72", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a099220ae541c5db479c6424bdf1b200987934033c2584f79a0e1693601e776" dependencies = [ "dotenvy", "either", - "heck", + "heck 0.5.0", + "hex", "once_cell", "proc-macro2", "quote", + "serde", + "serde_json", "sha2", "sqlx-core", - "sqlx-rt", - "syn 1.0.95", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.72", + "tempfile", + "tokio", "url", ] [[package]] -name = "sqlx-rt" -version = "0.6.2" +name = "sqlx-mysql" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396" +checksum = "5afe4c38a9b417b6a9a5eeffe7235d0a106716495536e7727d1c7f4b1ff3eba6" dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa 1.0.6", + "log", + "md-5", + "memchr", "once_cell", - "tokio", - "tokio-rustls", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1dbb157e65f10dbe01f729339c06d239120221c9ad9fa0ba8408c4cc18ecf21" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa 1.0.6", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b2cdd83c008a622d94499c0006d8ee5f821f36c89b7d625c900e5dc30b5c5ee" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing", + "url", ] [[package]] @@ -2256,11 +2547,11 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", - "syn 1.0.95", + "syn 1.0.109", ] [[package]] @@ -2269,18 +2560,24 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", - "syn 2.0.38", + "syn 2.0.72", ] [[package]] -name = "syn" -version = "1.0.95" +name = "subtle" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -2289,9 +2586,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -2320,15 +2617,14 @@ checksum = "cfb5fa503293557c5158bd215fdc225695e567a77e453f5d4452a50a193969bd" [[package]] name = "tempfile" -version = "3.8.1" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand", - "redox_syscall 0.4.1", - "rustix 0.38.28", - "windows-sys 0.48.0", + "rustix 0.38.34", + "windows-sys 0.52.0", ] [[package]] @@ -2357,7 +2653,7 @@ checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", ] [[package]] @@ -2387,7 +2683,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2403,18 +2699,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", -] - -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls", - "tokio", - "webpki", + "syn 1.0.109", ] [[package]] @@ -2428,6 +2713,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + [[package]] name = "typenum" version = "1.15.0" @@ -2455,12 +2772,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" - [[package]] name = "unicode-width" version = "0.1.10" @@ -2475,9 +2786,9 @@ checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" [[package]] name = "untrusted" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" @@ -2491,10 +2802,10 @@ dependencies = [ ] [[package]] -name = "uuid" -version = "1.3.0" +name = "vcpkg" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "version_check" @@ -2508,6 +2819,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.84" @@ -2529,7 +2846,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", "wasm-bindgen-shared", ] @@ -2551,7 +2868,7 @@ checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ "proc-macro2", "quote", - "syn 1.0.95", + "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2562,33 +2879,20 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" -[[package]] -name = "web-sys" -version = "0.3.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki-roots" -version = "0.22.6" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + +[[package]] +name = "whoami" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "webpki", + "redox_syscall 0.4.1", + "wasite", ] [[package]] @@ -2836,6 +3140,32 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 499dbdb..a0ce2dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ chrono = {version = "0.4.31", features = ["default", "serde"]} rayon = "1.6.0" tokio = { version = "1.26.0", features = ["full"] } -sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "mssql", "any" ] } +sqlx = { version = "0.8", features = [ "runtime-tokio-rustls", "any" ] } rmp-serde = "1.1.1" tempfile = "3.7.0" polars = {version = "0.32.1", features = ["lazy", "performant", "streaming", "cse", "dtype-datetime"]} diff --git a/src/bin/agent2/main.rs b/src/bin/agent2/main.rs index 7fba9bb..2bd6983 100644 --- a/src/bin/agent2/main.rs +++ b/src/bin/agent2/main.rs @@ -1,5 +1,4 @@ -use coster_rs::upload_to_db; -use sqlx::{any::AnyPoolOptions, mssql::MssqlPoolOptions}; +use sqlx::any::AnyPoolOptions; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -7,13 +6,14 @@ async fn main() -> anyhow::Result<()> { let password = ""; let host = ""; let database = ""; - // USing sqlx: https://github.com/launchbadge/sqlx - let connection_string = format!("mssq://{}:{}@{}/{}", user, password, host, database); - let pool = AnyPoolOptions::new() + let database_type = ""; + let connection_string = format!( + "{}://{}:{}@{}/{}", + database_type, user, password, host, database + ); + let _ = AnyPoolOptions::new() .max_connections(20) .connect(&connection_string) .await?; - - // upload_to_db::upload_file_bulk(&pool, &"".to_owned(), &"".to_owned(), None, "".to_owned()).await?; Ok(()) } diff --git a/src/derive.rs b/src/derive.rs new file mode 100644 index 0000000..4679df0 --- /dev/null +++ b/src/derive.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub enum DeriveColumnType { + Column(String), + Constant(String), +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct MapOperation { + pub mapped_value: String, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum DatePart { + Year, + Month, + Week, + Day, + Hour, + Minute, + Second, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum SplitType { + DateTime(String, DatePart), + Numeric(String, isize), +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum MatchComparisonType { + Equal, + GreaterThan, + LessThan, + NotEqual, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum DeriveOperation { + Concat(Vec), + Add(Vec), + Multiply(Vec), + Subtract(DeriveColumnType, DeriveColumnType), + Divide(DeriveColumnType, DeriveColumnType), + Map(String, Vec), + Split(String, SplitType), +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum ValueType { + String, + Integer, + Float, + Boolean, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct DeriveFilter { + pub column_name: String, + pub comparator: MatchComparisonType, + pub match_value: String, + pub value_type: ValueType, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct DeriveRule { + pub operations: Vec, + pub filters: Vec, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct DeriveNode { + pub rules: Vec, +} diff --git a/src/filter.rs b/src/filter.rs index 68bfadd..13f0048 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,6 +1,12 @@ -use std::{collections::HashMap, io::Read, str::FromStr}; +use std::{collections::BTreeMap, str::FromStr}; -use crate::{graph::RunnableNode, io::RecordSerializer}; +use serde::{Deserialize, Serialize}; + +use crate::{ + derive::{DeriveFilter, MatchComparisonType}, + io::{RecordDeserializer, RecordSerializer}, + node::RunnableNode, +}; pub enum Comparator { Equal(T), @@ -56,30 +62,143 @@ impl DataValidator for FilterRule { * that don't satisfy the filter criteria */ pub fn filter_file( - rules: Vec<&dyn DataValidator>, - // TODO: Custom serialisers/deserialisers so we don't rely on csv only - input: &mut csv::Reader, + rules: &Vec>, + input: &mut impl RecordDeserializer, output: &mut impl RecordSerializer, ) -> anyhow::Result<()> { - for line in input.deserialize() { - let line: HashMap = line?; - if rules.iter().all(|rule| { - line.get(&rule.get_field_name()).map_or(true, |value| { - if value.trim().is_empty() { - true - } else { - rule.is_valid(value) - } - }) - }) { - output.serialize(line)?; + if let Some(line) = input.deserialize()? { + let line: BTreeMap = line; + output.write_header(&line)?; + output.write_record(&line)?; + + while let Some(line) = input.deserialize()? { + let line: BTreeMap = line; + if rules.iter().all(|rule| { + line.get(&rule.get_field_name()).map_or(true, |value| { + if value.trim().is_empty() { + true + } else { + rule.is_valid(value) + } + }) + }) { + output.write_record(&line)?; + } } + output.flush()?; } Ok(()) } -pub struct FilterNodeRunner {} +#[derive(Serialize, Deserialize, Clone)] +pub struct FilterNode { + pub filters: Vec, + pub input_file_path: String, + pub output_file_path: String, +} + +impl FilterNode { + fn to_filter_rules(&self) -> anyhow::Result>> { + self.filters + .iter() + // For some reason inlining to_filter_rules causes a compiler error, so leaving + // in a separate function (it is cleaner at least) + .map(|filter| to_filter_rule(filter)) + .collect() + } +} + +fn to_filter_rule(filter: &DeriveFilter) -> anyhow::Result> { + let value = filter.match_value.clone(); + match filter.value_type { + crate::derive::ValueType::String => Ok(Box::new(get_filter_rule(filter, value))), + crate::derive::ValueType::Integer => { + Ok(Box::new(get_filter_rule(filter, value.parse::()?))) + } + crate::derive::ValueType::Float => { + Ok(Box::new(get_filter_rule(filter, value.parse::()?))) + } + crate::derive::ValueType::Boolean => { + Ok(Box::new(get_filter_rule(filter, value.parse::()?))) + } + } +} + +fn get_filter_rule(filter: &DeriveFilter, value: T) -> FilterRule { + FilterRule { + column_name: filter.column_name.clone(), + comparator: match filter.comparator { + MatchComparisonType::Equal => Comparator::Equal(value), + MatchComparisonType::GreaterThan => Comparator::GreaterThan(value), + MatchComparisonType::LessThan => Comparator::LessThan(value), + MatchComparisonType::NotEqual => Comparator::NotEqual(value), + }, + } +} + +pub struct FilterNodeRunner { + pub filter_node: FilterNode, +} impl RunnableNode for FilterNodeRunner { - fn run(&self) {} + fn run(&self) -> anyhow::Result<()> { + let mut reader = csv::Reader::from_path(&self.filter_node.input_file_path)?; + let mut writer = csv::Writer::from_path(&self.filter_node.output_file_path)?; + let rules = self.filter_node.to_filter_rules()?; + filter_file(&rules, &mut reader, &mut writer) + } +} + +#[cfg(test)] +mod tests { + use crate::filter::FilterRule; + + use super::filter_file; + + #[test] + fn no_filters_passes_through() -> anyhow::Result<()> { + let records = "Column1,Column2 +Value1,Value2 +Value3,Value4 +"; + let mut reader: csv::Reader<&[u8]> = csv::Reader::from_reader(records.as_bytes()); + let mut writer = csv::Writer::from_writer(vec![]); + filter_file(&vec![], &mut reader, &mut writer)?; + let result = String::from_utf8(writer.into_inner()?)?; + assert_eq!( + records, result, + "Should not modify input when no filters are defined" + ); + Ok(()) + } + + #[test] + fn filters_data() -> anyhow::Result<()> { + let records = "Column1,Column2 +Value1,Value2 +Value3,Value4 +"; + let mut reader: csv::Reader<&[u8]> = csv::Reader::from_reader(records.as_bytes()); + let mut writer = csv::Writer::from_writer(vec![]); + filter_file( + &vec![Box::new(FilterRule { + column_name: "Column1".to_owned(), + comparator: crate::filter::Comparator::NotEqual("Value3".to_owned()), + })], + &mut reader, + &mut writer, + )?; + let result = String::from_utf8(writer.into_inner()?)?; + assert_eq!( + "Column1,Column2 +Value1,Value2 +", + result, + "Should filter out second record due to filter rules" + ); + Ok(()) + } + + #[test] + fn should_print_header_when_no_rules_pass() {} } diff --git a/src/graph.rs b/src/graph.rs index 3040caf..ccdb098 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -1,9 +1,23 @@ -use itertools::Itertools; +use std::{ + cmp::{min, Ordering}, + collections::{HashMap, HashSet}, + sync::{ + mpsc::{self, Sender}, + Arc, + }, + thread, +}; + +use chrono::Local; use serde::{Deserialize, Serialize}; -use crate::filter::FilterNodeRunner; +use crate::{ + derive::DeriveNode, + filter::{FilterNode, FilterNodeRunner}, + node::RunnableNode, + upload_to_db::{UploadNode, UploadNodeRunner}, +}; -// TODO: Break all this up into separate files in the graph module #[derive(Serialize, Deserialize, Clone)] pub enum NodeConfiguration { FileNode, @@ -12,6 +26,14 @@ pub enum NodeConfiguration { DeriveNode(DeriveNode), CodeRuleNode(CodeRuleNode), FilterNode(FilterNode), + UploadNode(UploadNode), + Dynamic, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct DynamicConfiguration { + pub node_type: String, + pub parameters: HashMap, } #[derive(Serialize, Deserialize, Clone)] @@ -19,6 +41,7 @@ pub struct NodeInfo { pub name: String, pub output_files: Vec, pub configuration: NodeConfiguration, + pub dynamic_configuration: Option, } #[derive(Serialize, Deserialize, Clone)] @@ -65,70 +88,6 @@ pub struct MergeNode { pub joins: Vec, } -#[derive(Serialize, Deserialize, Clone)] -pub enum DeriveColumnType { - Column(String), - Constant(String), -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct MapOperation { - pub mapped_value: String, -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum DatePart { - Year, - Month, - Week, - Day, - Hour, - Minute, - Second, -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum SplitType { - DateTime(String, DatePart), - Numeric(String, isize), -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum MatchComparisonType { - Equal, - GreaterThan, - LessThan, -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum DeriveOperation { - Concat(Vec), - Add(Vec), - Multiply(Vec), - Subtract(DeriveColumnType, DeriveColumnType), - Divide(DeriveColumnType, DeriveColumnType), - Map(String, Vec), - Split(String, SplitType), -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct DeriveFilter { - pub column_name: String, - pub comparator: MatchComparisonType, - pub match_value: String, -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct DeriveRule { - pub operations: Vec, - pub filters: Vec, -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct DeriveNode { - pub rules: Vec, -} - #[derive(Serialize, Deserialize, Clone)] pub enum CodeRuleLanguage { Javascript, @@ -142,16 +101,16 @@ pub struct CodeRuleNode { pub text: String, } -#[derive(Serialize, Deserialize, Clone)] -pub struct FilterNode { - pub filters: Vec, -} - #[derive(Serialize, Deserialize, Clone)] pub struct Node { pub id: i64, pub info: NodeInfo, pub dependent_node_ids: Vec, + // Lets us work out whether a task should be rerun, by + // inspecting the files at the output paths and check if + // their timestamp is before this + // TODO: Could just be seconds since unix epoch? + pub last_modified: chrono::DateTime, } impl Node { @@ -160,21 +119,16 @@ impl Node { } } -impl Into for Node { - fn into(self) -> RunnableGraphNode { - RunnableGraphNode { - runnable_node: Box::new(FilterNodeRunner {}), - // TODO: Construct node objects - // runnable_node: match &self.info.configuration { - // NodeConfiguration::FileNode => todo!(), - // NodeConfiguration::MoveMoneyNode(_) => todo!(), - // NodeConfiguration::MergeNode(_) => todo!(), - // NodeConfiguration::DeriveNode(_) => todo!(), - // NodeConfiguration::CodeRuleNode(_) => todo!(), - // NodeConfiguration::FilterNode(_) => todo!(), - // }, - node: self, - } +fn get_runnable_node(node: Node) -> Box { + match node.info.configuration { + NodeConfiguration::FileNode => todo!(), + NodeConfiguration::MoveMoneyNode(_) => todo!(), + NodeConfiguration::MergeNode(_) => todo!(), + NodeConfiguration::DeriveNode(_) => todo!(), + NodeConfiguration::CodeRuleNode(_) => todo!(), + NodeConfiguration::FilterNode(filter_node) => Box::new(FilterNodeRunner { filter_node }), + NodeConfiguration::UploadNode(upload_node) => Box::new(UploadNodeRunner { upload_node }), + NodeConfiguration::Dynamic => todo!(), } } @@ -184,32 +138,176 @@ pub struct Graph { pub nodes: Vec, } -pub trait RunnableNode { - fn run(&self); -} - -pub struct RunnableGraphNode { - pub runnable_node: Box, - pub node: Node, +pub enum NodeStatus { + Completed, + Running, + // TODO: Error code? + Failed, } pub struct RunnableGraph { - pub name: String, - pub nodes: Vec, + pub graph: Graph, + pub node_statuses: HashMap, } impl RunnableGraph { - pub fn from_graph(graph: &Graph) -> RunnableGraph { + pub fn from_graph(graph: Graph) -> RunnableGraph { RunnableGraph { - name: graph.name.clone(), - nodes: graph - .nodes - .iter() - .map(|node| { - let runnable_graph_node: RunnableGraphNode = node.clone().into(); - runnable_graph_node - }) - .collect_vec(), + graph, + node_statuses: HashMap::new(), } } + + pub fn run_default_tasks(&mut self, num_threads: usize) -> anyhow::Result<()> { + self.run(num_threads, Box::new(|node| get_runnable_node(node))) + } + + // Make this not mutable, emit node status when required in a function or some other message + pub fn run( + &mut self, + num_threads: usize, + get_node_fn: Box Box + Send + Sync>, + ) -> anyhow::Result<()> { + let mut nodes = self.graph.nodes.clone(); + // 1. nodes the nodes based on dependencies (i.e. nodes without dependencies go first) + nodes.sort_by(|a, b| { + if b.dependent_node_ids.contains(&a.id) { + return Ordering::Greater; + } + if a.dependent_node_ids.contains(&b.id) { + return Ordering::Less; + } + Ordering::Equal + }); + + let num_threads = min(num_threads, nodes.len()); + + // Sync version + if num_threads < 2 { + for node in &nodes { + self.node_statuses.insert(node.id, NodeStatus::Running); + match get_node_fn(node.clone()).run() { + Ok(_) => self.node_statuses.insert(node.id, NodeStatus::Completed), + Err(_) => self.node_statuses.insert(node.id, NodeStatus::Failed), + }; + } + return Ok(()); + } + + let mut running_nodes = HashSet::new(); + let mut completed_nodes = HashSet::new(); + + let mut senders: Vec> = vec![]; + let mut handles = vec![]; + let (finish_task, listen_finish_task) = mpsc::sync_channel(num_threads); + let node_fn = Arc::new(get_node_fn); + for n in 0..num_threads { + let finish_task = finish_task.clone(); + let (tx, rx) = mpsc::channel(); + senders.push(tx); + let node_fn = node_fn.clone(); + let handle = thread::spawn(move || { + for node in rx { + node_fn(node.clone()).run(); + finish_task.send((n, node)); + } + println!("Thread {} finished", n); + }); + handles.push(handle); + } + + let mut running_threads = HashSet::new(); + // Run nodes without dependencies. There'll always be at least one + + for i in 0..nodes.len() { + // Ensure we don't overload threads + if i >= num_threads { + break; + } + if !nodes[i].has_dependent_nodes() { + running_threads.insert(i); + // Run all nodes that have no dependencies + let node = nodes.remove(i); + self.node_statuses.insert(node.id, NodeStatus::Running); + running_nodes.insert(node.id); + senders[i % senders.len()].send(node); + } + } + + // Run each dependent node after a graph above finishes. + for (n, node) in listen_finish_task { + running_threads.remove(&n); + // TODO: Add error check here + self.node_statuses.insert(node.id, NodeStatus::Completed); + running_nodes.remove(&node.id); + completed_nodes.insert(node.id); + // Run all the nodes that can be run and aren't in completed + let mut i = 0; + while running_threads.len() < num_threads && i < nodes.len() { + if !running_nodes.contains(&nodes[i].id) + && nodes[i] + .dependent_node_ids + .iter() + .all(|id| completed_nodes.contains(id)) + { + let node = nodes.remove(i); + for i in 0..num_threads { + if !running_threads.contains(&i) { + senders[i].send(node); + break; + } + } + } + i += 1; + } + if nodes.is_empty() { + break; + } + } + for sender in senders { + drop(sender); + } + + for handle in handles { + handle.join().expect("Failed to join thread"); + } + println!("Process finished"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use chrono::Local; + + use super::{NodeConfiguration, RunnableGraph}; + + #[test] + fn test_basic() -> anyhow::Result<()> { + let mut graph = RunnableGraph { + node_statuses: HashMap::new(), + graph: super::Graph { + name: "Test".to_owned(), + nodes: vec![super::Node { + id: 1, + dependent_node_ids: vec![], + info: super::NodeInfo { + name: "Hello".to_owned(), + configuration: NodeConfiguration::FilterNode(super::FilterNode { + filters: vec![], + input_file_path: "".to_owned(), + output_file_path: "".to_owned(), + }), + output_files: vec![], + dynamic_configuration: None, + }, + last_modified: Local::now(), + }], + }, + }; + graph.run_default_tasks(2)?; + Ok(()) + } } diff --git a/src/io.rs b/src/io.rs index d493c58..0480bf5 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,12 +1,21 @@ -use std::io::{Read, Seek, Write}; +use std::{ + collections::BTreeMap, + io::{Read, Seek, Write}, +}; use anyhow::bail; -use csv::Position; use rmp_serde::{decode::ReadReader, Deserializer, Serializer}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub trait RecordSerializer { fn serialize(&mut self, record: impl Serialize) -> anyhow::Result<()>; + + // For when serde serialization can't be used. Forcing BTreeMap to ensure keys/values are + // sorted consistently + fn write_header(&mut self, record: &BTreeMap) -> anyhow::Result<()>; + fn write_record(&mut self, record: &BTreeMap) -> anyhow::Result<()>; + + fn flush(&mut self) -> anyhow::Result<()>; } impl RecordSerializer for csv::Writer { @@ -14,6 +23,21 @@ impl RecordSerializer for csv::Writer { self.serialize(record)?; Ok(()) } + + fn flush(&mut self) -> anyhow::Result<()> { + self.flush()?; + Ok(()) + } + + fn write_header(&mut self, record: &BTreeMap) -> anyhow::Result<()> { + self.write_record(record.keys())?; + Ok(()) + } + + fn write_record(&mut self, record: &BTreeMap) -> anyhow::Result<()> { + self.write_record(record.values())?; + Ok(()) + } } impl RecordSerializer for Serializer { @@ -21,34 +45,28 @@ impl RecordSerializer for Serializer { record.serialize(self)?; Ok(()) } -} -// TODO: I still don't like this api, should split deserialize and position at the least, -// and we need a way to get the current position (otherwise it's left to consumers to track current) -// position -pub trait RecordDeserializer

{ - fn deserialize(&mut self) -> Result, anyhow::Error>; + fn flush(&mut self) -> anyhow::Result<()> { + Ok(()) + } - // Move the deserializer to the specified position in the underlying reader - fn position(&mut self, record: P) -> anyhow::Result<()>; -} + fn write_header(&mut self, _: &BTreeMap) -> anyhow::Result<()> { + Ok(()) + } -struct CsvMessagePackDeserializer { - reader: csv::Reader, -} - -impl CsvMessagePackDeserializer { - fn new(reader: R) -> CsvMessagePackDeserializer { - CsvMessagePackDeserializer { - reader: csv::Reader::from_reader(reader), - } + fn write_record(&mut self, record: &BTreeMap) -> anyhow::Result<()> { + self.serialize(record)?; + Ok(()) } } -impl RecordDeserializer for CsvMessagePackDeserializer { +pub trait RecordDeserializer { + fn deserialize(&mut self) -> Result, anyhow::Error>; +} + +impl RecordDeserializer for csv::Reader { fn deserialize(&mut self) -> Result, anyhow::Error> { - // TODO: This isn't great, need to somehow maintain the state/position - match self.reader.deserialize().next() { + match self.deserialize().next() { None => Ok(Option::None), Some(result) => match result { Ok(ok) => Ok(Option::Some(ok)), @@ -56,56 +74,13 @@ impl RecordDeserializer for CsvMessagePackDeserializer }, } } - - fn position(&mut self, record: Position) -> anyhow::Result<()> { - self.reader.seek(record)?; - Ok(()) - } } -struct MessagePackDeserializer { - reader: Deserializer>, - record_positions: Vec, -} - -impl MessagePackDeserializer { - fn new(reader: R) -> MessagePackDeserializer { - MessagePackDeserializer { - reader: Deserializer::new(reader), - record_positions: vec![], - } - } -} - -// TODO: These need tests -impl RecordDeserializer for MessagePackDeserializer { +impl RecordDeserializer for Deserializer> { fn deserialize(&mut self) -> Result, anyhow::Error> { - // Keep track of byte position of each record, in case we want to go back later - let current_position = self.reader.get_mut().stream_position()?; - if self - .record_positions - .last() - .map_or(true, |position| *position < current_position) - { - self.record_positions.push(current_position); - } - match Deserialize::deserialize(&mut self.reader) { + match Deserialize::deserialize(self) { Ok(value) => Ok(value), Err(value) => Err(anyhow::Error::from(value)), } } - - fn position(&mut self, record: usize) -> anyhow::Result<()> { - let reader = self.reader.get_mut(); - // Unsigned so can't be less than 0 - if self.record_positions.len() > record { - // Go to position in reader - let position = self.record_positions[record]; - reader.seek(std::io::SeekFrom::Start(position))?; - } else { - // read through the reader until we get to the correct record - bail!("Record hasn't been read yet, please use deserialize to find the record") - } - Ok(()) - } } diff --git a/src/lib.rs b/src/lib.rs index f4962a2..87fbb19 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,10 +12,12 @@ pub use self::products::csv::SourceType; mod shared_models; pub use self::shared_models::*; pub mod code_rule; +pub mod derive; pub mod filter; mod graph; mod io; pub mod link; +pub mod node; pub mod upload_to_db; #[no_mangle] @@ -63,22 +65,21 @@ pub extern "C" fn move_money_from_file( output_path: *const c_char, use_numeric_accounts: bool, ) { - let mut output_writer = csv::Writer::from_writer(vec![]); let safe_rules = unwrap_c_char(rules_file); let safe_lines = unwrap_c_char(lines); let safe_accounts = unwrap_c_char(accounts); let safe_cost_centres = unwrap_c_char(cost_centres); - // move_money_2() - // move_money( - // , - // &mut csv::Reader::from_reader(safe_lines.to_str().unwrap()), - // &mut csv::Reader::from_reader(safe_accounts.to_bytes()), - // &mut csv::Reader::from_reader(safe_cost_centres.to_bytes()), - // &mut output_writer, - // use_numeric_accounts, - // false, - // ) - // .expect("Failed to move money"); + let output_path = unwrap_c_char(output_path); + move_money( + &mut csv::Reader::from_reader(safe_rules.to_bytes()), + &mut csv::Reader::from_reader(safe_lines.to_bytes()), + &mut csv::Reader::from_reader(safe_accounts.to_bytes()), + &mut csv::Reader::from_reader(safe_cost_centres.to_bytes()), + &mut csv::Writer::from_path(output_path.to_str().unwrap()).unwrap(), + use_numeric_accounts, + false, + ) + .expect("Failed to move money"); } #[no_mangle] @@ -87,7 +88,7 @@ pub unsafe extern "C" fn move_money_from_text_free(s: *mut c_char) { if s.is_null() { return; } - CString::from_raw(s) + let _ = CString::from_raw(s); }; } @@ -181,6 +182,6 @@ pub unsafe extern "C" fn allocate_overheads_from_text_free(s: *mut c_char) { if s.is_null() { return; } - CString::from_raw(s) + let _ = CString::from_raw(s); }; } diff --git a/src/link.rs b/src/link.rs index 1dbca32..de76ae0 100644 --- a/src/link.rs +++ b/src/link.rs @@ -63,7 +63,7 @@ pub fn link( // TODO: Check this filters out correctly, as it's filtering on a reference, not a value .unique() .collect(); - let mut source_date_columns: Vec<&String> = linking_rule + let source_date_columns: Vec<&String> = linking_rule .linking_rules .iter() .flat_map(|rule| { @@ -86,7 +86,7 @@ pub fn link( // TODO: Merge with source_indexes? // Also store the actual date value rather than string, so we // don't need to convert as much later - let mut source_dates: Vec>>; + let source_dates: Vec>>; for source_record in source_reader.deserialize() { let source_record: HashMap = source_record?; let current_idx = source_ids.len(); diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..4e41188 --- /dev/null +++ b/src/node.rs @@ -0,0 +1,6 @@ +pub trait RunnableNode { + // TODO: Get inputs/outputs to determine whether we can skip running this task + + // TODO: Status + fn run(&self) -> anyhow::Result<()>; +} diff --git a/src/products/create_products.rs b/src/products/create_products.rs index 93bb0b8..f31d4bb 100644 --- a/src/products/create_products.rs +++ b/src/products/create_products.rs @@ -11,7 +11,7 @@ use polars::lazy::dsl::*; use polars::prelude::*; use serde::Serialize; -use super::csv::{read_definitions, Component, Definition, FileJoin, SourceType}; +use super::csv::{read_definitions, Component, Definition}; // TODO: Polars suggests this, but docs suggest it doesn't have very good platform support //use jemallocator::Jemalloc; diff --git a/src/upload_to_db.rs b/src/upload_to_db.rs index 65324f8..034b7df 100644 --- a/src/upload_to_db.rs +++ b/src/upload_to_db.rs @@ -1,7 +1,12 @@ -use std::{collections::HashMap, io::Read}; +use std::collections::HashMap; -use csv::Reader; -use sqlx::{query, query_builder, Any, Mssql, Pool, QueryBuilder}; +use anyhow::bail; +use serde::{Deserialize, Serialize}; +use sqlx::{Any, Pool, QueryBuilder}; + +use crate::node::RunnableNode; + +const BIND_LIMIT: usize = 65535; // Note: right now this is set to mssql only, since sqlx 0.7 is requried to use the Any // type for sqlx 0.6 and earlier due to a query_builder lifetime issue, @@ -12,44 +17,32 @@ use sqlx::{query, query_builder, Any, Mssql, Pool, QueryBuilder}; // TODO: Add bulk insert options for non-mssql dbs // TODO: Add fallback insert when bulk insert fails (e.g. due to // permission errors) -pub async fn upload_file_bulk( - pool: &Pool, - file_name: &String, - table_name: &String, - // Mappings from column in file -> column in db - column_mappings: Option>, - post_script: Option, -) -> anyhow::Result { - // TODO: Test if the table already exists. If it doesn't, try creating the table - - // First try a bulk insert command - // let result = match pool.any_kind() { - // sqlx::any::AnyKind::Mssql => { - let result = sqlx::query(&format!("BULK INSERT {} FROM {}", table_name, file_name)) +pub async fn upload_file_bulk(pool: &Pool, upload_node: &UploadNode) -> anyhow::Result { + let mut rows_affected = None; + if upload_node.column_mappings.is_none() { + let insert_from_file_query = match pool.connect_options().database_url.scheme() { + "postgres" => Some(format!("COPY {} FROM $1", upload_node.table_name)), + "mysql" => Some(format!( + "LOAD DATA INFILE ? INTO {}", + upload_node.table_name, + )), + _ => None, + }; + if let Some(insert_from_file_query) = insert_from_file_query { + let result = sqlx::query(&insert_from_file_query) + .bind(&upload_node.file_path) .execute(pool) .await?; - // } - // }; + rows_affected = Some(result.rows_affected()); + } + } - let mut rows_affected = result.rows_affected(); - - - // let mut rows_affected = match &result { - // Result::Ok(result) => result.rows_affected(), - // // TODO: Log error - // Err(error) => 0_u64, - // }; - - // TODO: Adjust for various dbmss - if rows_affected == 0 { + if rows_affected == None { let rows: Vec> = vec![]; - let BIND_LIMIT: usize = 65535; - // TODO: Use csv to read from file - - // TODO: When bulk insert fails, Fall back to sql batched insert // TODO: Columns to insert... needs some kind of mapping from file column name <-> db column - let mut query_builder = QueryBuilder::new(format!("INSERT INTO {}({}) ", table_name, "")); + let mut query_builder = + QueryBuilder::new(format!("INSERT INTO {}({}) ", upload_node.table_name, "")); // TODO: Iterate over all values in file, not the limit query_builder.push_values(&rows[0..BIND_LIMIT], |mut b, row| { b.push_bind(row.get("s")); @@ -58,14 +51,40 @@ pub async fn upload_file_bulk( // TODO: Looks like this issue: https://github.com/launchbadge/sqlx/issues/1978 // Turns out we need v0.7 for this to not bug out, however mssql is only supported in versions before v0.7, so right now can't use sqlx // to use this, unless we explicity specified mssql only, not Any as the db type... + // Can probably work around this by specifying an actual implementation? let query = query_builder.build(); let result = query.execute(pool).await?; - rows_affected = result.rows_affected(); + rows_affected = Some(result.rows_affected()); } - if let Some(post_script) = post_script { + if let Some(post_script) = &upload_node.post_script { sqlx::query(&post_script).execute(pool).await?; } - Ok(rows_affected) -} \ No newline at end of file + match rows_affected { + Some(rows_affected) => Ok(rows_affected), + None => bail!( + "Invalid state, rows_affected must be populated, or an error should have occurred" + ), + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct UploadNode { + file_path: String, + table_name: String, + // Mappings from column in file -> column in db + column_mappings: Option>, + post_script: Option, +} + +pub struct UploadNodeRunner { + pub upload_node: UploadNode, +} + +impl RunnableNode for UploadNodeRunner { + fn run(&self) -> anyhow::Result<()> { + // TODO: Get db connection from some kind of property manager/context + todo!() + } +}