diff --git a/Cargo.lock b/Cargo.lock index 18e1b8b7f723f..41f9e546f589a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,8 +233,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-arith", "arrow-array", @@ -256,8 +255,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -270,8 +268,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8955af33b25f3b175ee10af580577280b4bd01f7e823d94c7cdef7cf8c9aef" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "ahash", "arrow-buffer", @@ -289,8 +286,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c697ddca96183182f35b3a18e50b9110b11e916d7b7799cbfd4d34662f2c56c2" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "bytes", "half", @@ -301,8 +297,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -323,8 +318,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-cast", @@ -338,8 +332,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fdd994a9d28e6365aa78e15da3f3950c0fdcea6b963a12fa1c391afb637b304" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-buffer", "arrow-schema", @@ -351,8 +344,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58c5b083668e6230eae3eab2fc4b5fb989974c845d0aa538dde61a4327c78675" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-arith", "arrow-array", @@ -379,8 +371,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf7df950701ab528bf7c0cf7eeadc0445d03ef5d6ffc151eaae6b38a58feff1" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -395,8 +386,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff8357658bedc49792b13e2e862b80df908171275f8e6e075c460da5ee4bf86" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -419,8 +409,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -432,8 +421,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -445,8 +433,7 @@ dependencies = [ [[package]] name = "arrow-schema" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "bitflags", "serde", @@ -457,8 +444,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bf3e3efbd1278f770d67e5dc410257300b161b93baedb3aae836144edcaf4b" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "ahash", "arrow-array", @@ -471,8 +457,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "arrow-array", "arrow-buffer", @@ -601,9 +586,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.13" +version = "1.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c456581cb3c77fafcc8c67204a70680d40b61112d6da78c77bd31d945b65f1b5" +checksum = "8a8fc176d53d6fe85017f230405e3255cedb4a02221cb55ed6d76dccbbb099b2" dependencies = [ "aws-credential-types", "aws-runtime", @@ -631,9 +616,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +checksum = "e26bbf46abc608f2dc61fd6cb3b7b0665497cc259a21520151ed98f8b37d2c79" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -665,9 +650,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c635c2dc792cb4a11ce1a4f392a925340d1bdf499289b5ec1ec6810954eb43f5" +checksum = "b0f92058d22a46adf53ec57a6a96f34447daf02bff52e8fb956c66bcd5c6ac12" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -678,6 +663,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", + "bytes-utils", "fastrand", "http 1.4.0", "http-body 1.0.1", @@ -689,9 +675,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.93.0" +version = "1.94.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcb38bb33fc0a11f1ffc3e3e85669e0a11a37690b86f77e75306d8f369146a0" +checksum = "699da1961a289b23842d88fe2984c6ff68735fdf9bdcbc69ceaeb2491c9bf434" dependencies = [ "aws-credential-types", "aws-runtime", @@ -713,9 +699,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.95.0" +version = "1.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ada8ffbea7bd1be1f53df1dadb0f8fdb04badb13185b3321b929d1ee3caad09" +checksum = "e3e3a4cb3b124833eafea9afd1a6cc5f8ddf3efefffc6651ef76a03cbc6b4981" dependencies = [ "aws-credential-types", "aws-runtime", @@ -737,9 +723,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.97.0" +version = "1.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6443ccadc777095d5ed13e21f5c364878c9f5bad4e35187a6cdbd863b0afcad" +checksum = "89c4f19655ab0856375e169865c91264de965bd74c407c7f1e403184b1049409" dependencies = [ "aws-credential-types", "aws-runtime", @@ -762,9 +748,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.8" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa49f3c607b92daae0c078d48a4571f599f966dce3caee5f1ea55c4d9073f99" +checksum = "68f6ae9b71597dc5fd115d52849d7a5556ad9265885ad3492ea8d73b93bbc46e" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -784,9 +770,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" +checksum = "3cba48474f1d6807384d06fec085b909f5807e16653c5af5c45dfe89539f0b70" dependencies = [ "futures-util", "pin-project-lite", @@ -795,9 +781,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.63.3" +version = "0.63.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" +checksum = "af4a8a5fe3e4ac7ee871237c340bbce13e982d37543b65700f4419e039f5d78e" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -816,9 +802,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.9" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" +checksum = "0709f0083aa19b704132684bc26d3c868e06bd428ccc4373b0b55c3e8748a58b" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -840,27 +826,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.3" +version = "0.62.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb96aa208d62ee94104645f7b2ecaf77bf27edf161590b6224bfbac2832f979" +checksum = "27b3a779093e18cad88bbae08dc4261e1d95018c4c5b9356a52bcae7c0b6e9bb" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" +checksum = "4d3f39d5bb871aaf461d59144557f16d5927a5248a983a40654d9cf3b9ba183b" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" +checksum = "05f76a580e3d8f8961e5d48763214025a2af65c2fa4cd1fb7f270a0e107a71b0" dependencies = [ "aws-smithy-types", "urlencoding", @@ -868,9 +854,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" +checksum = "8fd3dfc18c1ce097cf81fced7192731e63809829c6cbf933c1ec47452d08e1aa" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -893,9 +879,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.3" +version = "1.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" +checksum = "8c55e0837e9b8526f49e0b9bfa9ee18ddee70e853f5bc09c5d11ebceddcb0fec" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -910,9 +896,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" +checksum = "576b0d6991c9c32bc14fc340582ef148311f924d41815f641a308b5d11e8e7cd" dependencies = [ "base64-simd", "bytes", @@ -933,18 +919,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "b53543b4b86ed43f051644f704a98c7291b3618b67adf057ee77a366fa52fcaa" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.11" +version = "1.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +checksum = "6c50f3cdf47caa8d01f2be4a6663ea02418e892f9bbfd82c7b9a3a37eaccdd3a" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -1324,9 +1310,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.57" +version = "4.5.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" +checksum = "63be97961acde393029492ce0be7a1af7e323e6bae9511ebfac33751be5e6806" dependencies = [ "clap_builder", "clap_derive", @@ -1334,9 +1320,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.57" +version = "4.5.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" +checksum = "7f13174bda5dfd69d7e947827e5af4b0f2f94a4a3ee92912fba07a66150f21e2" dependencies = [ "anstream", "anstyle", @@ -1358,9 +1344,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.6" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" [[package]] name = "clipboard-win" @@ -1538,9 +1524,9 @@ dependencies = [ [[package]] name = "criterion" -version = "0.8.2" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" +checksum = "4d883447757bb0ee46f233e9dc22eb84d93a9508c9b868687b274fc431d886bf" dependencies = [ "alloca", "anes", @@ -1565,9 +1551,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.8.2" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" +checksum = "ed943f81ea2faa8dcecbbfa50164acf95d555afec96a27871663b300e387b2e4" dependencies = [ "cast", "itertools 0.13.0", @@ -3268,6 +3254,8 @@ dependencies = [ "cfg-if", "crunchy", "num-traits", + "rand 0.9.2", + "rand_distr", "zerocopy", ] @@ -4236,9 +4224,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.12.4" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c1be0c6c22ec0817cdc77d3842f721a17fd30ab6965001415b5402a74e6b740" +checksum = "c2858065e55c148d294a9f3aae3b0fa9458edadb41a108397094566f4e3c0dfb" dependencies = [ "async-trait", "base64 0.22.1", @@ -4258,7 +4246,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "ring", - "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", @@ -4358,13 +4346,11 @@ dependencies = [ [[package]] name = "parquet" version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee96b29972a257b855ff2341b37e61af5f12d6af1158b6dcdb5b31ea07bb3cb" +source = "git+https://github.com/apache/arrow-rs.git?branch=main#d6168e526aae79d6fbafe8c11062b5f834021052" dependencies = [ "ahash", "arrow-array", "arrow-buffer", - "arrow-cast", "arrow-data", "arrow-ipc", "arrow-schema", @@ -5050,9 +5036,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.3" +version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" dependencies = [ "aho-corasick", "memchr", @@ -5920,9 +5906,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.25.0" +version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand", "getrandom 0.3.4", diff --git a/Cargo.toml b/Cargo.toml index 123054e87c640..1c399584bdd7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,7 +164,7 @@ itertools = "0.14" liblzma = { version = "0.4.4", features = ["static"] } log = "^0.4" num-traits = { version = "0.2" } -object_store = { version = "0.12.4", default-features = false } +object_store = { version = "0.13.1", default-features = false } parking_lot = "0.12" parquet = { version = "57.3.0", default-features = false, features = [ "arrow", @@ -277,3 +277,32 @@ incremental = false inherits = "release" debug = true strip = false + +## Temporary arrow-rs patch until 58 is released + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-array = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-cast = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-data = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-select = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-string = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-ord = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } +parquet = { git = "https://github.com/apache/arrow-rs.git", branch = "main" } + +#arrow = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow" } +#arrow-array = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-array" } +#arrow-buffer = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-buffer" } +#arrow-cast = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-cast" } +#arrow-data = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-data" } +#arrow-ipc = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-ipc" } +#arrow-schema = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-schema" } +#arrow-select = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-select" } +#arrow-string = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-string" } +#arrow-ord = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-ord" } +#arrow-flight = { path= "/Users/andrewlamb/Software/arrow-rs2/arrow-flight" } +#parquet = { path= "/Users/andrewlamb/Software/arrow-rs2/parquet" } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 94bd8ee2c4f9d..0cb21e751c74d 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -521,6 +521,7 @@ mod tests { use datafusion::common::plan_err; use datafusion::prelude::SessionContext; + use datafusion_common::assert_contains; use url::Url; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { @@ -714,7 +715,7 @@ mod tests { let err = create_external_table_test(location, &sql) .await .unwrap_err(); - assert!(err.to_string().contains("os error 2")); + assert_contains!(err.to_string(), "os error 2"); // for service_account_key let sql = format!( @@ -722,9 +723,8 @@ mod tests { ); let err = create_external_table_test(location, &sql) .await - .unwrap_err() - .to_string(); - assert!(err.contains("No RSA key found in pem file"), "{err}"); + .unwrap_err(); + assert_contains!(err.to_string(), "Error reading pem file: no items found"); // for application_credentials_path let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET @@ -732,7 +732,7 @@ mod tests { let err = create_external_table_test(location, &sql) .await .unwrap_err(); - assert!(err.to_string().contains("os error 2")); + assert_contains!(err.to_string(), "os error 2"); Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 9e53260e42773..84d04e087193e 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -617,8 +617,8 @@ mod tests { | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ | alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1339 | 2 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); @@ -648,8 +648,8 @@ mod tests { | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ | alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1339 | 3 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 0d5e9dc2c5a84..5a00ebae324e8 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -36,10 +36,11 @@ use datafusion::{ execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; use futures::stream::{BoxStream, Stream}; +use futures::{StreamExt, TryStreamExt}; use object_store::{ - GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - path::Path, + CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, path::Path, }; use parking_lot::{Mutex, RwLock}; use url::Url; @@ -230,16 +231,26 @@ impl InstrumentedObjectStore { let timestamp = Utc::now(); let range = options.range.clone(); + let head = options.head; let start = Instant::now(); let ret = self.inner.get_opts(location, options).await?; let elapsed = start.elapsed(); + let (op, size) = if head { + (Operation::Head, None) + } else { + ( + Operation::Get, + Some((ret.range.end - ret.range.start) as usize), + ) + }; + self.requests.lock().push(RequestDetails { - op: Operation::Get, + op, path: location.clone(), timestamp, duration: Some(elapsed), - size: Some((ret.range.end - ret.range.start) as usize), + size, range, extra_display: None, }); @@ -247,23 +258,30 @@ impl InstrumentedObjectStore { Ok(ret) } - async fn instrumented_delete(&self, location: &Path) -> Result<()> { + fn instrumented_delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + let requests_captured = Arc::clone(&self.requests); + let timestamp = Utc::now(); let start = Instant::now(); - self.inner.delete(location).await?; - let elapsed = start.elapsed(); - - self.requests.lock().push(RequestDetails { - op: Operation::Delete, - path: location.clone(), - timestamp, - duration: Some(elapsed), - size: None, - range: None, - extra_display: None, - }); - - Ok(()) + self.inner + .delete_stream(locations) + .and_then(move |location| { + let elapsed = start.elapsed(); + requests_captured.lock().push(RequestDetails { + op: Operation::Delete, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + futures::future::ok(location) + }) + .boxed() } fn instrumented_list( @@ -361,25 +379,6 @@ impl InstrumentedObjectStore { Ok(()) } - - async fn instrumented_head(&self, location: &Path) -> Result { - let timestamp = Utc::now(); - let start = Instant::now(); - let ret = self.inner.head(location).await?; - let elapsed = start.elapsed(); - - self.requests.lock().push(RequestDetails { - op: Operation::Head, - path: location.clone(), - timestamp, - duration: Some(elapsed), - size: None, - range: None, - extra_display: None, - }); - - Ok(ret) - } } impl fmt::Display for InstrumentedObjectStore { @@ -429,12 +428,15 @@ impl ObjectStore for InstrumentedObjectStore { self.inner.get_opts(location, options).await } - async fn delete(&self, location: &Path) -> Result<()> { + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { if self.enabled() { - return self.instrumented_delete(location).await; + return self.instrumented_delete_stream(locations); } - self.inner.delete(location).await + self.inner.delete_stream(locations) } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { @@ -453,28 +455,24 @@ impl ObjectStore for InstrumentedObjectStore { self.inner.list_with_delimiter(prefix).await } - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - if self.enabled() { - return self.instrumented_copy(from, to).await; - } - - self.inner.copy(from, to).await - } - - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - if self.enabled() { - return self.instrumented_copy_if_not_exists(from, to).await; - } - - self.inner.copy_if_not_exists(from, to).await - } - - async fn head(&self, location: &Path) -> Result { + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { if self.enabled() { - return self.instrumented_head(location).await; + return match options.mode { + object_store::CopyMode::Create => { + self.instrumented_copy_if_not_exists(from, to).await + } + object_store::CopyMode::Overwrite => { + self.instrumented_copy(from, to).await + } + }; } - self.inner.head(location).await + self.inner.copy_opts(from, to, options).await } } diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs index f19d628fa8bee..a2cd187fee067 100644 --- a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -69,7 +69,7 @@ use datafusion_proto::protobuf::{ }; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; use serde::{Deserialize, Serialize}; /// Example showing how to preserve custom adapter information during plan serialization. diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index fc1130313e00c..35f36ea8bc0ce 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -36,7 +36,7 @@ use datafusion::{ use datafusion::datasource::physical_plan::FileScanConfigBuilder; use datafusion_examples::utils::datasets::ExampleDataset; use futures::StreamExt; -use object_store::{ObjectStore, local::LocalFileSystem, memory::InMemory}; +use object_store::{ObjectStoreExt, local::LocalFileSystem, memory::InMemory}; /// This example demonstrates using the low level [`FileStream`] / [`FileOpener`] APIs to directly /// read data from (CSV/JSON) into Arrow RecordBatches. diff --git a/datafusion-examples/examples/custom_data_source/custom_file_casts.rs b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs index 36cc936332065..f4a1ccb8dcfbd 100644 --- a/datafusion-examples/examples/custom_data_source/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs @@ -40,7 +40,7 @@ use datafusion_physical_expr_adapter::{ }; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; // Example showing how to implement custom casting rules to adapt file schemas. // This example enforces that casts must be strictly widening: if the file type is Int64 and the table type is Int32, it will error diff --git a/datafusion-examples/examples/custom_data_source/default_column_values.rs b/datafusion-examples/examples/custom_data_source/default_column_values.rs index d7171542d5186..40c8836c1f822 100644 --- a/datafusion-examples/examples/custom_data_source/default_column_values.rs +++ b/datafusion-examples/examples/custom_data_source/default_column_values.rs @@ -48,7 +48,7 @@ use datafusion_physical_expr_adapter::{ use futures::StreamExt; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; // Metadata key for storing default values in field metadata const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value"; @@ -79,7 +79,7 @@ pub async fn default_column_values() -> Result<()> { let mut buf = vec![]; let props = WriterProperties::builder() - .set_max_row_group_size(2) + .set_max_row_group_row_count(Some(2)) .build(); let mut writer = diff --git a/datafusion-examples/examples/data_io/json_shredding.rs b/datafusion-examples/examples/data_io/json_shredding.rs index 77dba5a98ac6f..ca1513f626245 100644 --- a/datafusion-examples/examples/data_io/json_shredding.rs +++ b/datafusion-examples/examples/data_io/json_shredding.rs @@ -47,7 +47,7 @@ use datafusion_physical_expr_adapter::{ }; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStoreExt, PutPayload}; // Example showing how to implement custom filter rewriting for JSON shredding. // @@ -76,7 +76,7 @@ pub async fn json_shredding() -> Result<()> { let mut buf = vec![]; let props = WriterProperties::builder() - .set_max_row_group_size(2) + .set_max_row_group_row_count(Some(2)) .build(); let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)) diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index 3f4ebe7a92055..f02b01354b784 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -43,7 +43,7 @@ use datafusion::parquet::arrow::arrow_reader::{ ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; use datafusion::parquet::schema::types::ColumnPath; use datafusion::physical_expr::PhysicalExpr; @@ -410,7 +410,7 @@ impl IndexedFile { let options = ArrowReaderOptions::new() // Load the page index when reading metadata to cache // so it is available to interpret row selections - .with_page_index(true); + .with_page_index_policy(PageIndexPolicy::Required); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?; let metadata = reader.metadata().clone(); @@ -567,7 +567,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { .object_meta .location .parts() - .last() + .next_back() .expect("No path in location") .as_ref() .to_string(); @@ -659,7 +659,7 @@ fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> // enable page statistics for the tag column, // for everything else. let props = WriterProperties::builder() - .set_max_row_group_size(100) + .set_max_row_group_row_count(Some(100)) // compute column chunk (per row group) statistics by default .set_statistics_enabled(EnabledStatistics::Chunk) // compute column page statistics for the tag column diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index c7374949ecef5..5d2abd23172ed 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -84,7 +84,7 @@ mod tests { .build(); // Verify the expected options propagated down to parquet crate WriterProperties struct - assert_eq!(properties.max_row_group_size(), 123); + assert_eq!(properties.max_row_group_row_count(), Some(123)); assert_eq!(properties.data_page_size_limit(), 123); assert_eq!(properties.write_batch_size(), 123); assert_eq!(properties.writer_version(), WriterVersion::PARQUET_2_0); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f6608d16c1022..a7a1fc6d0bb66 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -222,7 +222,7 @@ impl ParquetOptions { .and_then(|s| parse_statistics_string(s).ok()) .unwrap_or(DEFAULT_STATISTICS_ENABLED), ) - .set_max_row_group_size(*max_row_group_size) + .set_max_row_group_row_count(Some(*max_row_group_size)) .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) .set_statistics_truncate_length(*statistics_truncate_length) @@ -393,7 +393,7 @@ mod tests { use parquet::basic::Compression; use parquet::file::properties::{ BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV, - EnabledStatistics, + DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics, }; use std::collections::HashMap; @@ -536,7 +536,9 @@ mod tests { write_batch_size: props.write_batch_size(), writer_version: props.writer_version().into(), dictionary_page_size_limit: props.dictionary_page_size_limit(), - max_row_group_size: props.max_row_group_size(), + max_row_group_size: props + .max_row_group_row_count() + .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT), created_by: props.created_by().to_string(), column_index_truncate_length: props.column_index_truncate_length(), statistics_truncate_length: props.statistics_truncate_length(), diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs index e44524127bf18..f099137973592 100644 --- a/datafusion/core/benches/parquet_query_sql.rs +++ b/datafusion/core/benches/parquet_query_sql.rs @@ -45,7 +45,7 @@ const NUM_BATCHES: usize = 2048; /// The number of rows in each record batch to write const WRITE_RECORD_BATCH_SIZE: usize = 1024; /// The number of rows in a row group -const ROW_GROUP_SIZE: usize = 1024 * 1024; +const ROW_GROUP_ROW_COUNT: usize = 1024 * 1024; /// The number of row groups expected const EXPECTED_ROW_GROUPS: usize = 2; @@ -154,7 +154,7 @@ fn generate_file() -> NamedTempFile { let properties = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) - .set_max_row_group_size(ROW_GROUP_SIZE) + .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT)) .build(); let mut writer = diff --git a/datafusion/core/benches/parquet_struct_query.rs b/datafusion/core/benches/parquet_struct_query.rs index 17ba17e02ba80..e7e91f0dd0e1e 100644 --- a/datafusion/core/benches/parquet_struct_query.rs +++ b/datafusion/core/benches/parquet_struct_query.rs @@ -40,7 +40,7 @@ const NUM_BATCHES: usize = 128; /// The number of rows in each record batch to write const WRITE_RECORD_BATCH_SIZE: usize = 4096; /// The number of rows in a row group -const ROW_GROUP_SIZE: usize = 65536; +const ROW_GROUP_ROW_COUNT: usize = 65536; /// The number of row groups expected const EXPECTED_ROW_GROUPS: usize = 8; /// The range for random string lengths @@ -114,7 +114,7 @@ fn generate_file() -> NamedTempFile { let properties = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) - .set_max_row_group_size(ROW_GROUP_SIZE) + .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT)) .build(); let mut writer = diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs index 3c2199c708de6..d41085907dbc8 100644 --- a/datafusion/core/benches/push_down_filter.rs +++ b/datafusion/core/benches/push_down_filter.rs @@ -25,9 +25,9 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_plan::ExecutionPlan; -use object_store::ObjectStore; use object_store::memory::InMemory; use object_store::path::Path; +use object_store::{ObjectStore, ObjectStoreExt}; use parquet::arrow::ArrowWriter; use std::sync::Arc; diff --git a/datafusion/core/benches/sql_query_with_io.rs b/datafusion/core/benches/sql_query_with_io.rs index 0c188f7ba1047..fc8caf31acd11 100644 --- a/datafusion/core/benches/sql_query_with_io.rs +++ b/datafusion/core/benches/sql_query_with_io.rs @@ -31,7 +31,7 @@ use datafusion::{ use datafusion_execution::runtime_env::RuntimeEnv; use itertools::Itertools; use object_store::{ - ObjectStore, + ObjectStore, ObjectStoreExt, memory::InMemory, path::Path, throttle::{ThrottleConfig, ThrottledStore}, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index aa226144a4af1..51d799a5b65c1 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -65,7 +65,8 @@ mod tests { use object_store::path::Path; use object_store::{ Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, + ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, + PutPayload, PutResult, }; use regex::Regex; use rstest::*; @@ -104,10 +105,6 @@ mod tests { unimplemented!() } - async fn get(&self, location: &Path) -> object_store::Result { - self.get_opts(location, GetOptions::default()).await - } - async fn get_opts( &self, location: &Path, @@ -147,14 +144,6 @@ mod tests { unimplemented!() } - async fn head(&self, _location: &Path) -> object_store::Result { - unimplemented!() - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - unimplemented!() - } - fn list( &self, _prefix: Option<&Path>, @@ -169,17 +158,21 @@ mod tests { unimplemented!() } - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - unimplemented!() - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, _from: &Path, _to: &Path, + _options: object_store::CopyOptions, ) -> object_store::Result<()> { unimplemented!() } + + fn delete_stream( + &self, + _locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + unimplemented!() + } } impl VariableStream { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index def3c0f35f9b3..6a8f7ab999757 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -156,8 +156,8 @@ mod tests { use futures::StreamExt; use futures::stream::BoxStream; use insta::assert_snapshot; - use object_store::ObjectMeta; use object_store::local::LocalFileSystem; + use object_store::{CopyOptions, ObjectMeta}; use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, path::Path, @@ -165,7 +165,8 @@ mod tests { use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::metadata::{ - KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, + KeyValue, PageIndexPolicy, ParquetColumnIndex, ParquetMetaData, + ParquetOffsetIndex, }; use parquet::file::page_index::column_index::ColumnIndexMetaData; use tokio::fs::File; @@ -310,7 +311,7 @@ mod tests { _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { - Err(object_store::Error::NotImplemented) + unimplemented!() } async fn put_multipart_opts( @@ -318,7 +319,7 @@ mod tests { _location: &Path, _opts: PutMultipartOptions, ) -> object_store::Result> { - Err(object_store::Error::NotImplemented) + unimplemented!() } async fn get_opts( @@ -330,40 +331,34 @@ mod tests { self.inner.get_opts(location, options).await } - async fn head(&self, _location: &Path) -> object_store::Result { - Err(object_store::Error::NotImplemented) - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) + fn delete_stream( + &self, + _locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + unimplemented!() } fn list( &self, _prefix: Option<&Path>, ) -> BoxStream<'static, object_store::Result> { - Box::pin(futures::stream::once(async { - Err(object_store::Error::NotImplemented) - })) + unimplemented!() } async fn list_with_delimiter( &self, _prefix: Option<&Path>, ) -> object_store::Result { - Err(object_store::Error::NotImplemented) - } - - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) + unimplemented!() } - async fn copy_if_not_exists( + async fn copy_opts( &self, _from: &Path, _to: &Path, + _options: CopyOptions, ) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) + unimplemented!() } } @@ -1105,7 +1100,8 @@ mod tests { let testdata = datafusion_common::test_util::parquet_test_data(); let path = format!("{testdata}/alltypes_tiny_pages.parquet"); let file = File::open(path).await?; - let options = ArrowReaderOptions::new().with_page_index(true); + let options = + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) .await? diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 0e40ed2df2066..82c47b6c7281c 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -31,7 +31,7 @@ mod tests { use datafusion_datasource::TableSchema; use datafusion_datasource_csv::CsvFormat; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreExt}; use crate::datasource::file_format::FileFormat; use crate::prelude::CsvReadOptions; diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index a0438e3d74ab2..62c6699f8fcd1 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -27,6 +27,7 @@ use crate::{ prelude::SessionContext, }; use futures::{FutureExt, stream::BoxStream}; +use object_store::{CopyOptions, ObjectStoreExt}; use std::{ fmt::{Debug, Display, Formatter}, sync::Arc, @@ -130,39 +131,40 @@ impl ObjectStore for BlockingObjectStore { location: &Path, options: GetOptions, ) -> object_store::Result { - self.inner.get_opts(location, options).await - } - - async fn head(&self, location: &Path) -> object_store::Result { - println!( - "{} received head call for {location}", - BlockingObjectStore::NAME - ); - // Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests. - let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await; - match wait_result { - Ok(_) => println!( - "{} barrier reached for {location}", + if options.head { + println!( + "{} received head call for {location}", BlockingObjectStore::NAME - ), - Err(_) => { - let error_message = format!( - "{} barrier wait timed out for {location}", + ); + // Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests. + let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await; + match wait_result { + Ok(_) => println!( + "{} barrier reached for {location}", BlockingObjectStore::NAME - ); - log::error!("{error_message}"); - return Err(Error::Generic { - store: BlockingObjectStore::NAME, - source: error_message.into(), - }); + ), + Err(_) => { + let error_message = format!( + "{} barrier wait timed out for {location}", + BlockingObjectStore::NAME + ); + log::error!("{error_message}"); + return Err(Error::Generic { + store: BlockingObjectStore::NAME, + source: error_message.into(), + }); + } } } + // Forward the call to the inner object store. - self.inner.head(location).await + self.inner.get_opts(location, options).await } - - async fn delete(&self, location: &Path) -> object_store::Result<()> { - self.inner.delete(location).await + fn delete_stream( + &self, + locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.delete_stream(locations) } fn list( @@ -179,15 +181,12 @@ impl ObjectStore for BlockingObjectStore { self.inner.list_with_delimiter(prefix).await } - async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.copy(from, to).await - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, from: &Path, to: &Path, + options: CopyOptions, ) -> object_store::Result<()> { - self.inner.copy_if_not_exists(from, to).await + self.inner.copy_opts(from, to, options).await } } diff --git a/datafusion/core/tests/catalog_listing/pruned_partition_list.rs b/datafusion/core/tests/catalog_listing/pruned_partition_list.rs index f4782ee13c24d..8f93dc17dbad2 100644 --- a/datafusion/core/tests/catalog_listing/pruned_partition_list.rs +++ b/datafusion/core/tests/catalog_listing/pruned_partition_list.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::DataType; use futures::{FutureExt, StreamExt as _, TryStreamExt as _}; -use object_store::{ObjectStore as _, memory::InMemory, path::Path}; +use object_store::{ObjectStoreExt, memory::InMemory, path::Path}; use datafusion::execution::SessionStateBuilder; use datafusion_catalog_listing::helpers::{ diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 6c0452a99bccc..c94ab10a9e72f 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6534,7 +6534,7 @@ async fn test_fill_null_all_columns() -> Result<()> { async fn test_insert_into_casting_support() -> Result<()> { // Testing case1: // Inserting query schema mismatch: Expected table field 'a' with type Float16, but got 'a' with type Utf8. - // And the cast is not supported from Utf8 to Float16. + // And the cast is not supported from Binary to Float16. // Create a new schema with one field called "a" of type Float16, and setting nullable to false let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float16, false)])); @@ -6545,7 +6545,10 @@ async fn test_insert_into_casting_support() -> Result<()> { let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?); session_ctx.register_table("t", initial_table.clone())?; - let mut write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap(); + let mut write_df = session_ctx + .sql("values (x'a123'), (x'b456')") + .await + .unwrap(); write_df = write_df .clone() @@ -6559,7 +6562,7 @@ async fn test_insert_into_casting_support() -> Result<()> { assert_contains!( e.to_string(), - "Inserting query schema mismatch: Expected table field 'a' with type Float16, but got 'a' with type Utf8." + "Inserting query schema mismatch: Expected table field 'a' with type Float16, but got 'a' with type Binary." ); // Testing case2: diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 561de21520394..30654c687f8d2 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -36,8 +36,9 @@ use insta::assert_snapshot; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ - GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, + CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, + PutResult, }; use parking_lot::Mutex; use std::fmt; @@ -54,8 +55,8 @@ async fn create_single_csv_file() { @r" RequestCountingObjectStore() Total Requests: 2 - - HEAD path=csv_table.csv - - GET path=csv_table.csv + - GET (opts) path=csv_table.csv head=true + - GET (opts) path=csv_table.csv " ); } @@ -76,7 +77,7 @@ async fn query_single_csv_file() { ------- Object Store Request Summary ------- RequestCountingObjectStore() Total Requests: 2 - - HEAD path=csv_table.csv + - GET (opts) path=csv_table.csv head=true - GET (opts) path=csv_table.csv " ); @@ -91,9 +92,9 @@ async fn create_multi_file_csv_file() { RequestCountingObjectStore() Total Requests: 4 - LIST prefix=data - - GET path=data/file_0.csv - - GET path=data/file_1.csv - - GET path=data/file_2.csv + - GET (opts) path=data/file_0.csv + - GET (opts) path=data/file_1.csv + - GET (opts) path=data/file_2.csv " ); } @@ -351,8 +352,8 @@ async fn create_single_parquet_file_default() { @r" RequestCountingObjectStore() Total Requests: 2 - - HEAD path=parquet_table.parquet - - GET (range) range=0-2994 path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true + - GET (opts) path=parquet_table.parquet range=0-2994 " ); } @@ -370,8 +371,8 @@ async fn create_single_parquet_file_prefetch() { @r" RequestCountingObjectStore() Total Requests: 2 - - HEAD path=parquet_table.parquet - - GET (range) range=1994-2994 path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true + - GET (opts) path=parquet_table.parquet range=1994-2994 " ); } @@ -399,10 +400,10 @@ async fn create_single_parquet_file_too_small_prefetch() { @r" RequestCountingObjectStore() Total Requests: 4 - - HEAD path=parquet_table.parquet - - GET (range) range=2494-2994 path=parquet_table.parquet - - GET (range) range=2264-2986 path=parquet_table.parquet - - GET (range) range=2124-2264 path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true + - GET (opts) path=parquet_table.parquet range=2494-2994 + - GET (opts) path=parquet_table.parquet range=2264-2986 + - GET (opts) path=parquet_table.parquet range=2124-2264 " ); } @@ -431,9 +432,9 @@ async fn create_single_parquet_file_small_prefetch() { @r" RequestCountingObjectStore() Total Requests: 3 - - HEAD path=parquet_table.parquet - - GET (range) range=2254-2994 path=parquet_table.parquet - - GET (range) range=2124-2264 path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true + - GET (opts) path=parquet_table.parquet range=2254-2994 + - GET (opts) path=parquet_table.parquet range=2124-2264 " ); } @@ -455,8 +456,8 @@ async fn create_single_parquet_file_no_prefetch() { @r" RequestCountingObjectStore() Total Requests: 2 - - HEAD path=parquet_table.parquet - - GET (range) range=0-2994 path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true + - GET (opts) path=parquet_table.parquet range=0-2994 " ); } @@ -476,7 +477,7 @@ async fn query_single_parquet_file() { ------- Object Store Request Summary ------- RequestCountingObjectStore() Total Requests: 3 - - HEAD path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true - GET (ranges) path=parquet_table.parquet ranges=4-534,534-1064 - GET (ranges) path=parquet_table.parquet ranges=1064-1594,1594-2124 " @@ -500,7 +501,7 @@ async fn query_single_parquet_file_with_single_predicate() { ------- Object Store Request Summary ------- RequestCountingObjectStore() Total Requests: 2 - - HEAD path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 " ); @@ -524,7 +525,7 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() { ------- Object Store Request Summary ------- RequestCountingObjectStore() Total Requests: 3 - - HEAD path=parquet_table.parquet + - GET (opts) path=parquet_table.parquet head=true - GET (ranges) path=parquet_table.parquet ranges=4-421,421-534,534-951,951-1064 - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 " @@ -701,7 +702,7 @@ impl Test { let mut buffer = vec![]; let props = parquet::file::properties::WriterProperties::builder() - .set_max_row_group_size(100) + .set_max_row_group_row_count(Some(100)) .build(); let mut writer = parquet::arrow::ArrowWriter::try_new( &mut buffer, @@ -752,11 +753,8 @@ impl Test { /// Details of individual requests made through the [`RequestCountingObjectStore`] #[derive(Clone, Debug)] enum RequestDetails { - Get { path: Path }, GetOpts { path: Path, get_options: GetOptions }, GetRanges { path: Path, ranges: Vec> }, - GetRange { path: Path, range: Range }, - Head { path: Path }, List { prefix: Option }, ListWithDelimiter { prefix: Option }, ListWithOffset { prefix: Option, offset: Path }, @@ -774,9 +772,6 @@ fn display_range(range: &Range) -> impl Display + '_ { impl Display for RequestDetails { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - RequestDetails::Get { path } => { - write!(f, "GET path={path}") - } RequestDetails::GetOpts { path, get_options } => { write!(f, "GET (opts) path={path}")?; if let Some(range) = &get_options.range { @@ -814,13 +809,6 @@ impl Display for RequestDetails { } Ok(()) } - RequestDetails::GetRange { path, range } => { - let range = display_range(range); - write!(f, "GET (range) range={range} path={path}") - } - RequestDetails::Head { path } => { - write!(f, "HEAD path={path}") - } RequestDetails::List { prefix } => { write!(f, "LIST")?; if let Some(prefix) = prefix { @@ -893,7 +881,7 @@ impl ObjectStore for RequestCountingObjectStore { _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { - Err(object_store::Error::NotImplemented) + unimplemented!() } async fn put_multipart_opts( @@ -901,15 +889,7 @@ impl ObjectStore for RequestCountingObjectStore { _location: &Path, _opts: PutMultipartOptions, ) -> object_store::Result> { - Err(object_store::Error::NotImplemented) - } - - async fn get(&self, location: &Path) -> object_store::Result { - let result = self.inner.get(location).await?; - self.requests.lock().push(RequestDetails::Get { - path: location.to_owned(), - }); - Ok(result) + unimplemented!() } async fn get_opts( @@ -925,19 +905,6 @@ impl ObjectStore for RequestCountingObjectStore { Ok(result) } - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - let result = self.inner.get_range(location, range.clone()).await?; - self.requests.lock().push(RequestDetails::GetRange { - path: location.to_owned(), - range: range.clone(), - }); - Ok(result) - } - async fn get_ranges( &self, location: &Path, @@ -951,18 +918,6 @@ impl ObjectStore for RequestCountingObjectStore { Ok(result) } - async fn head(&self, location: &Path) -> object_store::Result { - let result = self.inner.head(location).await?; - self.requests.lock().push(RequestDetails::Head { - path: location.to_owned(), - }); - Ok(result) - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) - } - fn list( &self, prefix: Option<&Path>, @@ -998,15 +953,19 @@ impl ObjectStore for RequestCountingObjectStore { self.inner.list_with_delimiter(prefix).await } - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) + fn delete_stream( + &self, + _locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + unimplemented!() } - async fn copy_if_not_exists( + async fn copy_opts( &self, _from: &Path, _to: &Path, + _options: CopyOptions, ) -> object_store::Result<()> { - Err(object_store::Error::NotImplemented) + unimplemented!() } } diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs index 8a84e4c5d1814..8ce5207f91190 100644 --- a/datafusion/core/tests/fuzz_cases/pruning.rs +++ b/datafusion/core/tests/fuzz_cases/pruning.rs @@ -31,7 +31,9 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::{ExecutionPlan, collect, filter::FilterExec}; use itertools::Itertools; -use object_store::{ObjectStore, PutPayload, memory::InMemory, path::Path}; +use object_store::{ + ObjectStore, ObjectStoreExt, PutPayload, memory::InMemory, path::Path, +}; use parquet::{ arrow::ArrowWriter, file::properties::{EnabledStatistics, WriterProperties}, diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 7f994daeaa58c..d14afaf1b3267 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -31,7 +31,7 @@ use datafusion_execution::object_store::ObjectStoreUrl; use itertools::Itertools; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; use parquet::arrow::ArrowWriter; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 25f69d2975eac..ae11fa9a11334 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -43,7 +43,7 @@ use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; diff --git a/datafusion/core/tests/parquet/expr_adapter.rs b/datafusion/core/tests/parquet/expr_adapter.rs index aee37fda1670d..efd492ed27800 100644 --- a/datafusion/core/tests/parquet/expr_adapter.rs +++ b/datafusion/core/tests/parquet/expr_adapter.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, }; -use object_store::{ObjectStore, memory::InMemory, path::Path}; +use object_store::{ObjectStore, ObjectStoreExt, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; async fn write_parquet(batch: RecordBatch, store: Arc, path: &str) { diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 0c02c8fe523dc..9ff8137687c95 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -409,7 +409,7 @@ fn get_test_data() -> TestData { .expect("tempfile creation"); let props = WriterProperties::builder() - .set_max_row_group_size(row_per_group) + .set_max_row_group_row_count(Some(row_per_group)) .build(); let batches = create_data_batch(scenario); diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 1eb8103d3e4d4..e6266b2c088d7 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -63,7 +63,7 @@ async fn single_file() { // Set the row group size smaller so can test with fewer rows let props = WriterProperties::builder() - .set_max_row_group_size(1024) + .set_max_row_group_row_count(Some(1024)) .build(); // Only create the parquet file once as it is fairly large @@ -230,7 +230,7 @@ async fn single_file_small_data_pages() { // Set a low row count limit to improve page filtering let props = WriterProperties::builder() - .set_max_row_group_size(2048) + .set_max_row_group_row_count(Some(2048)) .set_data_page_row_count_limit(512) .set_write_batch_size(512) .build(); diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 5a05718936509..0535ddd9247d4 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -1148,7 +1148,7 @@ async fn make_test_file_rg( .expect("tempfile creation"); let props = WriterProperties::builder() - .set_max_row_group_size(row_per_group) + .set_max_row_group_row_count(Some(row_per_group)) .set_bloom_filter_enabled(true) .set_statistics_enabled(EnabledStatistics::Page) .build(); diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index b717f546dc422..cdfed5011696e 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -49,7 +49,7 @@ use datafusion_physical_plan::{ collect, displayable, ExecutionPlan, Partitioning, }; -use object_store::ObjectStore; +use object_store::ObjectStoreExt; use object_store::memory::InMemory; use rstest::rstest; use url::Url; diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index c6f920584dc2b..a9061849795c7 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -20,7 +20,6 @@ use std::collections::BTreeSet; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; -use std::ops::Range; use std::sync::Arc; use arrow::datatypes::DataType; @@ -43,9 +42,12 @@ use datafusion_execution::config::SessionConfig; use async_trait::async_trait; use bytes::Bytes; use chrono::{TimeZone, Utc}; +use futures::StreamExt; use futures::stream::{self, BoxStream}; use insta::assert_snapshot; -use object_store::{Attributes, MultipartUpload, PutMultipartOptions, PutPayload}; +use object_store::{ + Attributes, CopyOptions, GetRange, MultipartUpload, PutMultipartOptions, PutPayload, +}; use object_store::{ GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, PutResult, path::Path, @@ -620,7 +622,7 @@ async fn create_partitioned_alltypes_parquet_table( } #[derive(Debug)] -/// An object store implem that is mirrors a given file to multiple paths. +/// An object store implem that mirrors a given file to multiple paths. pub struct MirroringObjectStore { /// The `(path,size)` of the files that "exist" in the store files: Vec, @@ -669,12 +671,13 @@ impl ObjectStore for MirroringObjectStore { async fn get_opts( &self, location: &Path, - _options: GetOptions, + options: GetOptions, ) -> object_store::Result { self.files.iter().find(|x| *x == location).unwrap(); let path = std::path::PathBuf::from(&self.mirrored_file); let file = File::open(&path).unwrap(); let metadata = file.metadata().unwrap(); + let meta = ObjectMeta { location: location.clone(), last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), @@ -683,37 +686,35 @@ impl ObjectStore for MirroringObjectStore { version: None, }; + let payload = if options.head { + // no content for head requests + GetResultPayload::Stream(stream::empty().boxed()) + } else if let Some(range) = options.range { + let GetRange::Bounded(range) = range else { + unimplemented!("Unbounded range not supported in MirroringObjectStore"); + }; + let mut file = File::open(path).unwrap(); + file.seek(SeekFrom::Start(range.start)).unwrap(); + + let to_read = range.end - range.start; + let to_read: usize = to_read.try_into().unwrap(); + let mut data = Vec::with_capacity(to_read); + let read = file.take(to_read as u64).read_to_end(&mut data).unwrap(); + assert_eq!(read, to_read); + let stream = stream::once(async move { Ok(Bytes::from(data)) }).boxed(); + GetResultPayload::Stream(stream) + } else { + GetResultPayload::File(file, path) + }; + Ok(GetResult { range: 0..meta.size, - payload: GetResultPayload::File(file, path), + payload, meta, attributes: Attributes::default(), }) } - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - self.files.iter().find(|x| *x == location).unwrap(); - let path = std::path::PathBuf::from(&self.mirrored_file); - let mut file = File::open(path).unwrap(); - file.seek(SeekFrom::Start(range.start)).unwrap(); - - let to_read = range.end - range.start; - let to_read: usize = to_read.try_into().unwrap(); - let mut data = Vec::with_capacity(to_read); - let read = file.take(to_read as u64).read_to_end(&mut data).unwrap(); - assert_eq!(read, to_read); - - Ok(data.into()) - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - unimplemented!() - } - fn list( &self, prefix: Option<&Path>, @@ -783,14 +784,18 @@ impl ObjectStore for MirroringObjectStore { }) } - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { + fn delete_stream( + &self, + _locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { unimplemented!() } - async fn copy_if_not_exists( + async fn copy_opts( &self, _from: &Path, _to: &Path, + _options: CopyOptions, ) -> object_store::Result<()> { unimplemented!() } diff --git a/datafusion/core/tests/tracing/traceable_object_store.rs b/datafusion/core/tests/tracing/traceable_object_store.rs index 00aa4ea3f36d9..71a61dbf8772a 100644 --- a/datafusion/core/tests/tracing/traceable_object_store.rs +++ b/datafusion/core/tests/tracing/traceable_object_store.rs @@ -18,10 +18,11 @@ //! Object store implementation used for testing use crate::tracing::asserting_tracer::assert_traceability; +use futures::StreamExt; use futures::stream::BoxStream; use object_store::{ - GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, path::Path, + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, path::Path, }; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; @@ -83,14 +84,17 @@ impl ObjectStore for TraceableObjectStore { self.inner.get_opts(location, options).await } - async fn head(&self, location: &Path) -> object_store::Result { - assert_traceability().await; - self.inner.head(location).await - } - - async fn delete(&self, location: &Path) -> object_store::Result<()> { - assert_traceability().await; - self.inner.delete(location).await + fn delete_stream( + &self, + locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + self.inner + .delete_stream(locations) + .then(|res| async { + futures::executor::block_on(assert_traceability()); + res + }) + .boxed() } fn list( @@ -109,17 +113,13 @@ impl ObjectStore for TraceableObjectStore { self.inner.list_with_delimiter(prefix).await } - async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { - assert_traceability().await; - self.inner.copy(from, to).await - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, from: &Path, to: &Path, + options: CopyOptions, ) -> object_store::Result<()> { assert_traceability().await; - self.inner.copy_if_not_exists(from, to).await + self.inner.copy_opts(from, to, options).await } } diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 9997d23d4c61f..f60bce3249935 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -63,7 +63,8 @@ use datafusion_session::Session; use futures::StreamExt; use futures::stream::BoxStream; use object_store::{ - GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, path::Path, + GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt, + path::Path, }; use tokio::io::AsyncWriteExt; diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 4c8fd5b3407be..99446cb876230 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -52,7 +52,7 @@ use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; use futures::StreamExt; use itertools::Itertools; -use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; +use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore, ObjectStoreExt}; /// Enum indicating which Arrow IPC format to use #[derive(Clone, Copy, Debug)] diff --git a/datafusion/datasource-avro/src/file_format.rs b/datafusion/datasource-avro/src/file_format.rs index 2447c032e700d..c4960dbcc99bb 100644 --- a/datafusion/datasource-avro/src/file_format.rs +++ b/datafusion/datasource-avro/src/file_format.rs @@ -41,7 +41,7 @@ use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; use async_trait::async_trait; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt}; #[derive(Default)] /// Factory struct used to create [`AvroFormat`] diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 1c466be266f17..bd9ff2a7a842a 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -147,7 +147,7 @@ mod private { use bytes::Buf; use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture}; use futures::StreamExt; - use object_store::{GetResultPayload, ObjectStore}; + use object_store::{GetResultPayload, ObjectStore, ObjectStoreExt}; pub struct AvroOpener { pub config: Arc, diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index efb7829179e07..7a253d81db9f8 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -60,7 +60,9 @@ use bytes::{Buf, Bytes}; use datafusion_datasource::source::DataSourceExec; use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt, pin_mut}; -use object_store::{ObjectMeta, ObjectStore, delimited::newline_delimited_stream}; +use object_store::{ + ObjectMeta, ObjectStore, ObjectStoreExt, delimited::newline_delimited_stream, +}; use regex::Regex; #[derive(Default)] diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index 881e5f3d873e6..8fe445705a21c 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -61,7 +61,7 @@ use datafusion_session::Session; use crate::utils::JsonArrayToNdjsonReader; use async_trait::async_trait; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt}; #[derive(Default)] /// Factory struct used to create [JsonFormat] diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 867cfe0e98fea..52a38f49945cd 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -509,9 +509,9 @@ mod tests { use bytes::Bytes; use datafusion_datasource::FileRange; use futures::TryStreamExt; - use object_store::PutPayload; use object_store::memory::InMemory; use object_store::path::Path; + use object_store::{ObjectStoreExt, PutPayload}; /// Helper to create a test schema fn test_schema() -> SchemaRef { diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs index ed92031f86c6b..02137b5a1d288 100644 --- a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -34,9 +34,9 @@ use parquet::arrow::{ArrowWriter, ProjectionMask}; use parquet::file::properties::WriterProperties; use tempfile::TempDir; -const ROW_GROUP_SIZE: usize = 10_000; +const ROW_GROUP_ROW_COUNT: usize = 10_000; const TOTAL_ROW_GROUPS: usize = 10; -const TOTAL_ROWS: usize = ROW_GROUP_SIZE * TOTAL_ROW_GROUPS; +const TOTAL_ROWS: usize = ROW_GROUP_ROW_COUNT * TOTAL_ROW_GROUPS; const TARGET_VALUE: &str = "target_value"; const COLUMN_NAME: &str = "list_col"; const PAYLOAD_COLUMN_NAME: &str = "payload"; @@ -69,7 +69,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) { b.iter(|| { let matched = scan_with_predicate(&dataset_path, &predicate, false) .expect("baseline parquet scan with filter succeeded"); - assert_eq!(matched, ROW_GROUP_SIZE); + assert_eq!(matched, ROW_GROUP_ROW_COUNT); }); }); @@ -79,7 +79,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) { b.iter(|| { let matched = scan_with_predicate(&dataset_path, &predicate, true) .expect("pushdown parquet scan with filter succeeded"); - assert_eq!(matched, ROW_GROUP_SIZE); + assert_eq!(matched, ROW_GROUP_ROW_COUNT); }); }); @@ -170,7 +170,7 @@ fn create_dataset() -> datafusion_common::Result { ])); let writer_props = WriterProperties::builder() - .set_max_row_group_size(ROW_GROUP_SIZE) + .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT)) .build(); let mut writer = ArrowWriter::try_new( @@ -195,7 +195,7 @@ fn create_dataset() -> datafusion_common::Result { ]; for value in sorted_values { - let batch = build_list_batch(&schema, value, ROW_GROUP_SIZE)?; + let batch = build_list_batch(&schema, value, ROW_GROUP_ROW_COUNT)?; writer.write(&batch)?; } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d59b42ed15d15..22fe08aa42f6b 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -70,7 +70,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use parquet::arrow::arrow_writer::{ ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, ArrowWriterOptions, compute_leaves, @@ -82,7 +82,9 @@ use parquet::basic::Type; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, SortingColumn}; -use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::file::properties::{ + DEFAULT_MAX_ROW_GROUP_ROW_COUNT, WriterProperties, WriterPropertiesBuilder, +}; use parquet::file::writer::SerializedFileWriter; use parquet::schema::types::SchemaDescriptor; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -1589,7 +1591,9 @@ fn spawn_parquet_parallel_serialization_task( ) -> SpawnedTask> { SpawnedTask::spawn(async move { let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; - let max_row_group_rows = writer_props.max_row_group_size(); + let max_row_group_rows = writer_props + .max_row_group_row_count() + .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT); let mut row_group_index = 0; let col_writers = row_group_writer_factory.create_column_writers(row_group_index)?; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..108e8c5752017 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -348,7 +348,8 @@ impl FileOpener for ParquetOpener { // unnecessary I/O. We decide later if it is needed to evaluate the // pruning predicates. Thus default to not requesting it from the // underlying reader. - let mut options = ArrowReaderOptions::new().with_page_index(false); + let mut options = + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); #[cfg(feature = "parquet_encryption")] if let Some(fd_val) = file_decryption_properties { options = options.with_file_decryption_properties(Arc::clone(&fd_val)); @@ -1037,7 +1038,7 @@ mod test { }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; - use object_store::{ObjectStore, memory::InMemory, path::Path}; + use object_store::{ObjectStore, ObjectStoreExt, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -1734,7 +1735,7 @@ mod test { // Write parquet file with multiple row groups // Force small row groups by setting max_row_group_size let props = WriterProperties::builder() - .set_max_row_group_size(3) // Force each batch into its own row group + .set_max_row_group_row_count(Some(3)) // Force each batch into its own row group .build(); let data_len = write_parquet_batches( @@ -1834,7 +1835,7 @@ mod test { .unwrap(); // 4 rows let props = WriterProperties::builder() - .set_max_row_group_size(4) + .set_max_row_group_row_count(Some(4)) .build(); let data_len = write_parquet_batches( @@ -1921,7 +1922,7 @@ mod test { let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(2) + .set_max_row_group_row_count(Some(2)) .build(); let data_len = write_parquet_batches( diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 7eea8285ad6b5..932988af051e4 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -662,6 +662,7 @@ mod tests { use datafusion_expr::{Expr, cast, col, lit}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use object_store::ObjectStoreExt; use parquet::arrow::ArrowSchemaConverter; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; @@ -1752,7 +1753,7 @@ mod tests { pruning_predicate: &PruningPredicate, ) -> Result { use datafusion_datasource::PartitionedFile; - use object_store::{ObjectMeta, ObjectStore}; + use object_store::ObjectMeta; let object_meta = ObjectMeta { location: object_store::path::Path::parse(file_name).expect("creating path"), diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index f80c9cb0b0daa..d19d20ec1ff3d 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -573,7 +573,7 @@ mod tests { use datafusion_execution::object_store::{ DefaultObjectStoreRegistry, ObjectStoreRegistry, }; - use object_store::{local::LocalFileSystem, path::Path}; + use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path}; use std::{collections::HashMap, ops::Not, sync::Arc}; use url::Url; diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 0c274806c09c3..39d1047984ff6 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -30,7 +30,7 @@ use itertools::Itertools; use log::debug; use object_store::path::DELIMITER; use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use url::Url; /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] @@ -521,8 +521,8 @@ mod tests { use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use object_store::{ - GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions, - PutPayload, + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, + PutMultipartOptions, PutPayload, }; use std::any::Any; use std::collections::HashMap; @@ -1108,7 +1108,14 @@ mod tests { location: &Path, options: GetOptions, ) -> object_store::Result { - self.in_mem.get_opts(location, options).await + if options.head && self.forbidden_paths.contains(location) { + Err(object_store::Error::PermissionDenied { + path: location.to_string(), + source: "forbidden".into(), + }) + } else { + self.in_mem.get_opts(location, options).await + } } async fn get_ranges( @@ -1119,19 +1126,11 @@ mod tests { self.in_mem.get_ranges(location, ranges).await } - async fn head(&self, location: &Path) -> object_store::Result { - if self.forbidden_paths.contains(location) { - Err(object_store::Error::PermissionDenied { - path: location.to_string(), - source: "forbidden".into(), - }) - } else { - self.in_mem.head(location).await - } - } - - async fn delete(&self, location: &Path) -> object_store::Result<()> { - self.in_mem.delete(location).await + fn delete_stream( + &self, + locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + self.in_mem.delete_stream(locations) } fn list( @@ -1148,16 +1147,13 @@ mod tests { self.in_mem.list_with_delimiter(prefix).await } - async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.in_mem.copy(from, to).await - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, from: &Path, to: &Path, + options: CopyOptions, ) -> object_store::Result<()> { - self.in_mem.copy_if_not_exists(from, to).await + self.in_mem.copy_opts(from, to, options).await } } diff --git a/datafusion/functions-nested/src/range.rs b/datafusion/functions-nested/src/range.rs index aae641ceeb358..307067b9c9975 100644 --- a/datafusion/functions-nested/src/range.rs +++ b/datafusion/functions-nested/src/range.rs @@ -392,20 +392,27 @@ impl Range { } let stop = if !self.include_upper_bound { - Date32Type::subtract_month_day_nano(stop, step) + Date32Type::subtract_month_day_nano_opt(stop, step).ok_or_else(|| { + exec_datafusion_err!( + "Cannot generate date range where stop {} - {step:?}) overflows", + date32_to_string(stop) + ) + })? } else { stop }; let neg = months < 0 || days < 0; - let mut new_date = start; + let mut new_date = Some(start); let values = from_fn(|| { - if (neg && new_date < stop) || (!neg && new_date > stop) { + let Some(current_date) = new_date else { + return None; // previous overflow + }; + if (neg && current_date < stop) || (!neg && current_date > stop) { None } else { - let current_date = new_date; - new_date = Date32Type::add_month_day_nano(new_date, step); + new_date = Date32Type::add_month_day_nano_opt(current_date, step); Some(Some(current_date)) } }); @@ -578,3 +585,11 @@ fn parse_tz(tz: &Option<&str>) -> Result { Tz::from_str(tz) .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op)) } + +fn date32_to_string(value: i32) -> String { + if let Some(d) = Date32Type::to_naive_date_opt(value) { + format!("{value} ({d})") + } else { + format!("{value} (unknown date)") + } +} diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs index 40834ec345df5..4c6f731db18a6 100644 --- a/datafusion/spark/src/function/datetime/last_day.rs +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -114,7 +114,11 @@ impl ScalarUDFImpl for SparkLastDay { } fn spark_last_day(days: i32) -> Result { - let date = Date32Type::to_naive_date(days); + let date = Date32Type::to_naive_date_opt(days).ok_or_else(|| { + exec_datafusion_err!( + "Spark `last_day`: Unable to convert days value {days} to date" + ) + })?; let (year, month) = (date.year(), date.month()); let (next_year, next_month) = if month == 12 { diff --git a/datafusion/spark/src/function/datetime/next_day.rs b/datafusion/spark/src/function/datetime/next_day.rs index 2acd295f8f142..b28759cc89e84 100644 --- a/datafusion/spark/src/function/datetime/next_day.rs +++ b/datafusion/spark/src/function/datetime/next_day.rs @@ -216,7 +216,7 @@ where } fn spark_next_day(days: i32, day_of_week: &str) -> Option { - let date = Date32Type::to_naive_date(days); + let date = Date32Type::to_naive_date_opt(days)?; let day_of_week = day_of_week.trim().to_uppercase(); let day_of_week = match day_of_week.as_str() { diff --git a/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt b/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt index bc796a51ff5a4..8e85c8f90580e 100644 --- a/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt +++ b/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt @@ -113,4 +113,3 @@ SELECT '2001-09-28'::date / '03:00'::time query error Invalid timestamp arithmetic operation SELECT '2001-09-28'::date % '03:00'::time - diff --git a/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt b/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt index 10381346f8359..aeeebe73db701 100644 --- a/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt +++ b/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt @@ -144,4 +144,4 @@ query error Invalid timestamp arithmetic operation SELECT '2001-09-28T01:00:00'::timestamp % arrow_cast(12345, 'Duration(Second)'); query error Invalid timestamp arithmetic operation -SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)'); \ No newline at end of file +SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)'); diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3bef..3a183a7357430 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -268,7 +268,7 @@ physical_plan 06)┌─────────────┴─────────────┐ 07)│ DataSourceExec │ 08)│ -------------------- │ -09)│ bytes: 1040 │ +09)│ bytes: 1024 │ 10)│ format: memory │ 11)│ rows: 2 │ 12)└───────────────────────────┘ @@ -345,7 +345,7 @@ physical_plan 15)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 16)│ DataSourceExec ││ ProjectionExec │ 17)│ -------------------- ││ -------------------- │ -18)│ bytes: 520 ││ date_col: date_col │ +18)│ bytes: 512 ││ date_col: date_col │ 19)│ format: memory ││ int_col: int_col │ 20)│ rows: 1 ││ │ 21)│ ││ string_col: │ @@ -592,7 +592,7 @@ physical_plan 07)┌─────────────┴─────────────┐ 08)│ DataSourceExec │ 09)│ -------------------- │ -10)│ bytes: 520 │ +10)│ bytes: 512 │ 11)│ format: memory │ 12)│ rows: 1 │ 13)└───────────────────────────┘ @@ -954,7 +954,7 @@ physical_plan 13)┌─────────────┴─────────────┐ 14)│ DataSourceExec │ 15)│ -------------------- │ -16)│ bytes: 520 │ +16)│ bytes: 512 │ 17)│ format: memory │ 18)│ rows: 1 │ 19)└───────────────────────────┘ @@ -1305,7 +1305,7 @@ physical_plan 42)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 43)│ DataSourceExec ││ DataSourceExec │ 44)│ -------------------- ││ -------------------- │ -45)│ bytes: 296 ││ bytes: 288 │ +45)│ bytes: 288 ││ bytes: 280 │ 46)│ format: memory ││ format: memory │ 47)│ rows: 1 ││ rows: 1 │ 48)└───────────────────────────┘└───────────────────────────┘ @@ -1324,14 +1324,14 @@ physical_plan 04)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 05)│ DataSourceExec ││ ProjectionExec │ 06)│ -------------------- ││ -------------------- │ -07)│ bytes: 296 ││ id: CAST(id AS Int32) │ +07)│ bytes: 288 ││ id: CAST(id AS Int32) │ 08)│ format: memory ││ name: name │ 09)│ rows: 1 ││ │ 10)└───────────────────────────┘└─────────────┬─────────────┘ 11)-----------------------------┌─────────────┴─────────────┐ 12)-----------------------------│ DataSourceExec │ 13)-----------------------------│ -------------------- │ -14)-----------------------------│ bytes: 288 │ +14)-----------------------------│ bytes: 280 │ 15)-----------------------------│ format: memory │ 16)-----------------------------│ rows: 1 │ 17)-----------------------------└───────────────────────────┘ diff --git a/datafusion/sqllogictest/test_files/spark/datetime/make_interval.slt b/datafusion/sqllogictest/test_files/spark/datetime/make_interval.slt index d6c5199b87b75..a796094979d97 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/make_interval.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/make_interval.slt @@ -90,21 +90,15 @@ SELECT make_interval(0, 0, 0, 0, 2147483647, 1, 0.0); ---- NULL -# Intervals being rendered as empty string, see issue: -# https://github.com/apache/datafusion/issues/17455 -# We expect something like 0.00 secs with query ? query T SELECT make_interval(0, 0, 0, 0, 0, 0, 0.0) || ''; ---- -(empty) +0 secs -# Intervals being rendered as empty string, see issue: -# https://github.com/apache/datafusion/issues/17455 -# We expect something like 0.00 secs with query ? query T SELECT make_interval() || ''; ---- -(empty) +0 secs query ? SELECT INTERVAL '1' SECOND AS iv; diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 403509515bf31..f545ccf19306a 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -99,7 +99,7 @@ mod test { use datafusion_physical_plan::collect; use datafusion_sql::parser::DFParser; use futures::{StreamExt, TryStreamExt, stream}; - use object_store::{ObjectStore, PutPayload, memory::InMemory, path::Path}; + use object_store::{ObjectStoreExt, PutPayload, memory::InMemory, path::Path}; use url::Url; use wasm_bindgen_test::wasm_bindgen_test;