Make duck db optional, add beginning of ingey agent
All checks were successful
test / test (push) Successful in 29m57s

This commit is contained in:
2026-04-01 21:16:00 +10:30
parent 4da6d64988
commit 8e6a9f5e19
6 changed files with 341 additions and 72 deletions

278
Cargo.lock generated
View File

@@ -2,22 +2,13 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 4
[[package]]
name = "addr2line"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
dependencies = [
"gimli 0.29.0",
]
[[package]] [[package]]
name = "addr2line" name = "addr2line"
version = "0.24.2" version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [ dependencies = [
"gimli 0.31.1", "gimli",
] ]
[[package]] [[package]]
@@ -431,18 +422,55 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]] [[package]]
name = "backtrace" name = "axum"
version = "0.3.73" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [ dependencies = [
"addr2line 0.22.0", "axum-core",
"cc", "bytes",
"cfg-if", "form_urlencoded",
"libc", "futures-util",
"miniz_oxide", "http",
"object", "http-body",
"rustc-demangle", "http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde_core",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
] ]
[[package]] [[package]]
@@ -871,6 +899,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"axum",
"chrono", "chrono",
"clap", "clap",
"csv", "csv",
@@ -948,7 +977,7 @@ dependencies = [
"cranelift-control", "cranelift-control",
"cranelift-entity", "cranelift-entity",
"cranelift-isle", "cranelift-isle",
"gimli 0.31.1", "gimli",
"hashbrown 0.14.5", "hashbrown 0.14.5",
"log", "log",
"regalloc2", "regalloc2",
@@ -1672,12 +1701,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "gimli"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.31.1" version = "0.31.1"
@@ -1917,7 +1940,7 @@ dependencies = [
"http-body", "http-body",
"hyper", "hyper",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2 0.5.7",
"tokio", "tokio",
"tower-service", "tower-service",
"tracing", "tracing",
@@ -2160,9 +2183,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.169" version = "0.2.182"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112"
[[package]] [[package]]
name = "libduckdb-sys" name = "libduckdb-sys"
@@ -2259,6 +2282,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]] [[package]]
name = "matrixmultiply" name = "matrixmultiply"
version = "0.3.9" version = "0.3.9"
@@ -2309,6 +2338,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]] [[package]]
name = "minimal-lexical" name = "minimal-lexical"
version = "0.2.1" version = "0.2.1"
@@ -3915,18 +3950,28 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.217" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.217" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -3956,6 +4001,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]] [[package]]
name = "serde_repr" name = "serde_repr"
version = "0.1.19" version = "0.1.19"
@@ -4127,6 +4183,16 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "socket2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
dependencies = [
"libc",
"windows-sys 0.60.2",
]
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.9.8" version = "0.9.8"
@@ -4523,6 +4589,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]] [[package]]
name = "sysinfo" name = "sysinfo"
version = "0.32.1" version = "0.32.1"
@@ -4739,27 +4811,26 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.42.0" version = "1.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86"
dependencies = [ dependencies = [
"backtrace",
"bytes", "bytes",
"libc", "libc",
"mio", "mio",
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2 0.6.2",
"tokio-macros", "tokio-macros",
"windows-sys 0.52.0", "windows-sys 0.61.2",
] ]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.4.0" version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -4850,6 +4921,28 @@ dependencies = [
"winnow", "winnow",
] ]
[[package]]
name = "tower"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]] [[package]]
name = "tower-service" name = "tower-service"
version = "0.3.3" version = "0.3.3"
@@ -5164,7 +5257,7 @@ version = "29.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11976a250672556d1c4c04c6d5d7656ac9192ac9edc42a4587d6c21460010e69" checksum = "11976a250672556d1c4c04c6d5d7656ac9192ac9edc42a4587d6c21460010e69"
dependencies = [ dependencies = [
"addr2line 0.24.2", "addr2line",
"anyhow", "anyhow",
"async-trait", "async-trait",
"bitflags 2.6.0", "bitflags 2.6.0",
@@ -5173,7 +5266,7 @@ dependencies = [
"cfg-if", "cfg-if",
"encoding_rs", "encoding_rs",
"fxprof-processed-profile", "fxprof-processed-profile",
"gimli 0.31.1", "gimli",
"hashbrown 0.14.5", "hashbrown 0.14.5",
"indexmap 2.7.0", "indexmap 2.7.0",
"ittapi", "ittapi",
@@ -5279,7 +5372,7 @@ dependencies = [
"cranelift-entity", "cranelift-entity",
"cranelift-frontend", "cranelift-frontend",
"cranelift-native", "cranelift-native",
"gimli 0.31.1", "gimli",
"itertools 0.12.1", "itertools 0.12.1",
"log", "log",
"object", "object",
@@ -5301,7 +5394,7 @@ dependencies = [
"cpp_demangle", "cpp_demangle",
"cranelift-bitset", "cranelift-bitset",
"cranelift-entity", "cranelift-entity",
"gimli 0.31.1", "gimli",
"indexmap 2.7.0", "indexmap 2.7.0",
"log", "log",
"object", "object",
@@ -5390,7 +5483,7 @@ checksum = "fdbabfb8f20502d5e1d81092b9ead3682ae59988487aafcd7567387b7a43cf8f"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"cranelift-codegen", "cranelift-codegen",
"gimli 0.31.1", "gimli",
"object", "object",
"target-lexicon", "target-lexicon",
"wasmparser 0.221.2", "wasmparser 0.221.2",
@@ -5511,7 +5604,7 @@ checksum = "2f849ef2c5f46cb0a20af4b4487aaa239846e52e2c03f13fa3c784684552859c"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"cranelift-codegen", "cranelift-codegen",
"gimli 0.31.1", "gimli",
"regalloc2", "regalloc2",
"smallvec", "smallvec",
"target-lexicon", "target-lexicon",
@@ -5574,6 +5667,12 @@ dependencies = [
"syn 2.0.87", "syn 2.0.87",
] ]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]] [[package]]
name = "windows-result" name = "windows-result"
version = "0.1.2" version = "0.1.2"
@@ -5610,6 +5709,24 @@ dependencies = [
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
[[package]]
name = "windows-sys"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.5",
]
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
]
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.48.5" version = "0.48.5"
@@ -5634,13 +5751,30 @@ dependencies = [
"windows_aarch64_gnullvm 0.52.6", "windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6", "windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6", "windows_i686_gnu 0.52.6",
"windows_i686_gnullvm", "windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6", "windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6", "windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6", "windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6", "windows_x86_64_msvc 0.52.6",
] ]
[[package]]
name = "windows-targets"
version = "0.53.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
"windows_i686_gnullvm 0.53.1",
"windows_i686_msvc 0.53.1",
"windows_x86_64_gnu 0.53.1",
"windows_x86_64_gnullvm 0.53.1",
"windows_x86_64_msvc 0.53.1",
]
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.5" version = "0.48.5"
@@ -5653,6 +5787,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.48.5"
@@ -5665,6 +5805,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_aarch64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.48.5"
@@ -5677,12 +5823,24 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
[[package]] [[package]]
name = "windows_i686_gnullvm" name = "windows_i686_gnullvm"
version = "0.52.6" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.48.5"
@@ -5695,6 +5853,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_i686_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.48.5"
@@ -5707,6 +5871,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.5" version = "0.48.5"
@@ -5719,6 +5889,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"
@@ -5731,6 +5907,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "windows_x86_64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]] [[package]]
name = "winnow" name = "winnow"
version = "0.6.22" version = "0.6.22"

View File

@@ -31,7 +31,11 @@ tokio-util = { version = "0.7.13", features = ["compat"] }
async-trait = "0.1.83" async-trait = "0.1.83"
testcontainers = "0.23.1" testcontainers = "0.23.1"
wasmtime = "29.0.1" wasmtime = "29.0.1"
duckdb = { version = "1.2.0", features = ["bundled"] } duckdb = { version = "1.2.0", features = ["bundled"], optional = true }
axum = "0.8.8"
[features]
duckdb = ["dep:duckdb"]
# More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target # More info on targets: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#configuring-a-target
[lib] [lib]

View File

@@ -1,19 +0,0 @@
use sqlx::any::AnyPoolOptions;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let user = "";
let password = "";
let host = "";
let database = "";
let database_type = "";
let connection_string = format!(
"{}://{}:{}@{}/{}",
database_type, user, password, host, database
);
let _ = AnyPoolOptions::new()
.max_connections(20)
.connect(&connection_string)
.await?;
Ok(())
}

View File

@@ -0,0 +1,85 @@
use std::sync::mpsc;
use axum::{
http::StatusCode,
routing::{get, post},
Json, Router,
};
use clap::Parser;
use coster_rs::graph::{Graph, RunnableGraph};
use log::info;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO: Proper logging to file
env_logger::init();
// TODO: Tracing?
// tracing_subscriber::fmt::init();
let agent = Agent::parse();
agent.run().await?;
Ok(())
}
#[derive(Parser)]
#[command(name = "ingey-agent")]
#[command(name = "Pivato M. <contact@michaelpivato.devv")]
#[command(version = "0.0.1")]
#[command(about="Simple, fast, efficient data manipulation tool", long_about=None)]
pub struct Agent {
#[arg(short = 'p', long, value_name = "PORT", default_value_t = 8080)]
web_server_port: u16,
#[arg(short = 'b', long, default_value = "localhost")]
web_server_bind_host: String,
}
impl Agent {
pub async fn run(&self) -> anyhow::Result<()> {
// TODO: kind of a want an actual queue so that we can see where in the queue we're up to in another method
// Maybe what we do instead is add to the queue in the request, then when we finish here we remove from the queue.
// Can do this with more message passing as well
let (tx, rx) = mpsc::channel::<RunnableGraph>();
tokio::spawn(async move {
// Thread to receive requests to run a task
for graph in rx {
let result = graph
.run_default_tasks(1, |id, status| {
info!("Node with id {} finished with status {:?}", id, status)
})
.await;
// If the result is bad, mark this as failed somewhere
}
});
let app = Router::new()
.route(
"/run",
post({
let sender = tx.clone();
move |body| run_graph(body, sender)
}),
)
.route("/", get(async || "Hello World"));
let listener = tokio::net::TcpListener::bind(format!(
"{}:{}",
self.web_server_bind_host, self.web_server_port
))
.await?;
info!(
"Successfully started the web server on address {}:{}",
self.web_server_bind_host, self.web_server_port
);
axum::serve(listener, app).await?;
info!("Web server has shut down");
Ok(())
}
}
async fn run_graph(Json(payload): Json<Graph>, sender: mpsc::Sender<RunnableGraph>) -> StatusCode {
if let Ok(response) = sender.send(RunnableGraph::from_graph(payload)) {
StatusCode::ACCEPTED
} else {
StatusCode::INTERNAL_SERVER_ERROR
}
}

View File

@@ -1,3 +1,4 @@
#[cfg(feature = "duckdb")]
use duckdb::{params, params_from_iter, Connection}; use duckdb::{params, params_from_iter, Connection};
use futures::TryStreamExt; use futures::TryStreamExt;
use futures_io::{AsyncRead, AsyncWrite}; use futures_io::{AsyncRead, AsyncWrite};
@@ -119,6 +120,7 @@ impl QueryExecutor for Pool<Any> {
} }
} }
#[cfg(feature = "duckdb")]
impl QueryExecutor for Connection { impl QueryExecutor for Connection {
async fn get_rows( async fn get_rows(
&mut self, &mut self,

View File

@@ -1,6 +1,5 @@
use anyhow::bail; use anyhow::bail;
use async_trait::async_trait; use async_trait::async_trait;
use duckdb::Connection;
use itertools::Itertools; use itertools::Itertools;
use log::{log, Level}; use log::{log, Level};
use schemars::JsonSchema; use schemars::JsonSchema;
@@ -11,6 +10,9 @@ use std::{collections::HashMap, fmt::format};
use tiberius::{Config, EncryptionLevel}; use tiberius::{Config, EncryptionLevel};
use tokio_util::compat::TokioAsyncWriteCompatExt; use tokio_util::compat::TokioAsyncWriteCompatExt;
#[cfg(feature = "duckdb")]
use duckdb::Connection;
use super::{node::RunnableNode, sql::QueryExecutor}; use super::{node::RunnableNode, sql::QueryExecutor};
const BIND_LIMIT: usize = 65535; const BIND_LIMIT: usize = 65535;
@@ -45,6 +47,7 @@ pub async fn upload_file_bulk(
upload_node.table_name.replace("]", ""), upload_node.table_name.replace("]", ""),
upload_node.file_path.replace("'", "''") upload_node.file_path.replace("'", "''")
)), )),
#[cfg(feature = "duckdb")]
DBType::Duckdb => Some(format!( DBType::Duckdb => Some(format!(
r#"COPY "{}" FROM '{}' WITH DELIMITER ',' CSV HEADER QUOTE '"';"#, r#"COPY "{}" FROM '{}' WITH DELIMITER ',' CSV HEADER QUOTE '"';"#,
upload_node.table_name.replace("\"", ""), upload_node.table_name.replace("\"", ""),
@@ -148,6 +151,7 @@ pub enum DBType {
Mysql, Mysql,
Postgres, Postgres,
Mssql, Mssql,
#[cfg(feature = "duckdb")]
Duckdb, Duckdb,
} }
@@ -158,6 +162,7 @@ impl DBType {
DBType::Postgres => format!(r#""{}""#, quoted.replace("\"", "")), DBType::Postgres => format!(r#""{}""#, quoted.replace("\"", "")),
DBType::Mysql => format!("`{}`", quoted.replace("`", "")), DBType::Mysql => format!("`{}`", quoted.replace("`", "")),
DBType::Mssql => format!("[{}]", quoted.replace("]", "")), DBType::Mssql => format!("[{}]", quoted.replace("]", "")),
#[cfg(feature = "duckdb")]
DBType::Duckdb => format!(r#""{}""#, quoted.replace("\"", "")), DBType::Duckdb => format!(r#""{}""#, quoted.replace("\"", "")),
} }
} }
@@ -182,6 +187,15 @@ pub struct UploadNodeRunner {
impl RunnableNode for UploadNodeRunner { impl RunnableNode for UploadNodeRunner {
async fn run(&self) -> anyhow::Result<()> { async fn run(&self) -> anyhow::Result<()> {
let upload_node = self.upload_node.clone(); let upload_node = self.upload_node.clone();
#[cfg(feature = "duckdb")]
{
if upload_node.db_type == DBType::Duckdb {
let mut conn = Connection::open(&upload_node.connection_string)?;
upload_file_bulk(&mut conn, &upload_node, BIND_LIMIT, &upload_node.db_type).await?;
Ok(())
}
}
if upload_node.db_type == DBType::Mssql { if upload_node.db_type == DBType::Mssql {
let mut config = Config::from_jdbc_string(&upload_node.connection_string)?; let mut config = Config::from_jdbc_string(&upload_node.connection_string)?;
// TODO: Restore encryption for remote hosts, doesn't work on localhost without encryption. // TODO: Restore encryption for remote hosts, doesn't work on localhost without encryption.
@@ -190,9 +204,6 @@ impl RunnableNode for UploadNodeRunner {
tcp.set_nodelay(true)?; tcp.set_nodelay(true)?;
let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?; let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
upload_file_bulk(&mut client, &upload_node, BIND_LIMIT, &upload_node.db_type).await?; upload_file_bulk(&mut client, &upload_node, BIND_LIMIT, &upload_node.db_type).await?;
} else if upload_node.db_type == DBType::Duckdb {
let mut conn = Connection::open(&upload_node.connection_string)?;
upload_file_bulk(&mut conn, &upload_node, BIND_LIMIT, &upload_node.db_type).await?;
} else { } else {
install_default_drivers(); install_default_drivers();
let mut pool = AnyPool::connect(&upload_node.connection_string).await?; let mut pool = AnyPool::connect(&upload_node.connection_string).await?;
@@ -206,6 +217,7 @@ impl RunnableNode for UploadNodeRunner {
mod tests { mod tests {
use crate::graph::node::RunnableNode; use crate::graph::node::RunnableNode;
use crate::graph::upload_to_db::{DBType, UploadNode, UploadNodeRunner}; use crate::graph::upload_to_db::{DBType, UploadNode, UploadNodeRunner};
#[cfg(feature = "duckdb")]
use duckdb::{params, Connection}; use duckdb::{params, Connection};
use sqlx::{AnyPool, Row}; use sqlx::{AnyPool, Row};
use std::path::PathBuf; use std::path::PathBuf;
@@ -217,6 +229,7 @@ mod tests {
use tokio_util::compat::TokioAsyncWriteCompatExt; use tokio_util::compat::TokioAsyncWriteCompatExt;
#[tokio::test] #[tokio::test]
#[ignore]
pub async fn check_bulk_upload_mssql() -> anyhow::Result<()> { pub async fn check_bulk_upload_mssql() -> anyhow::Result<()> {
let container = GenericImage::new( let container = GenericImage::new(
"gitea.michaelpivato.dev/vato007/ingey-test-db-mssql", "gitea.michaelpivato.dev/vato007/ingey-test-db-mssql",
@@ -316,6 +329,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
#[cfg(feature = "duckdb")]
pub async fn check_bulk_upload_duckdb() -> anyhow::Result<()> { pub async fn check_bulk_upload_duckdb() -> anyhow::Result<()> {
let connection_string = "./testing/output/test.duckdb".to_owned(); let connection_string = "./testing/output/test.duckdb".to_owned();
fs::remove_file(&connection_string); fs::remove_file(&connection_string);
@@ -406,6 +420,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
#[ignore]
pub async fn check_insert_mssql() -> anyhow::Result<()> { pub async fn check_insert_mssql() -> anyhow::Result<()> {
let container = GenericImage::new( let container = GenericImage::new(
"gitea.michaelpivato.dev/vato007/ingey-test-db-mssql", "gitea.michaelpivato.dev/vato007/ingey-test-db-mssql",