Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ if [ "${ARROW_ENABLE_THREADING:-ON}" = "OFF" ]; then
ARROW_JEMALLOC=OFF
ARROW_MIMALLOC=OFF
ARROW_S3=OFF
ARROW_S3_MODULE=OFF
ARROW_WITH_OPENTELEMETRY=OFF
fi

Expand Down Expand Up @@ -229,6 +230,7 @@ else
-DARROW_PARQUET=${ARROW_PARQUET:-OFF} \
-DARROW_RUNTIME_SIMD_LEVEL=${ARROW_RUNTIME_SIMD_LEVEL:-MAX} \
-DARROW_S3=${ARROW_S3:-OFF} \
-DARROW_S3_MODULE=${ARROW_S3_MODULE:-OFF} \
-DARROW_SIMD_LEVEL=${ARROW_SIMD_LEVEL:-DEFAULT} \
-DARROW_SUBSTRAIT=${ARROW_SUBSTRAIT:-OFF} \
-DARROW_TEST_LINKAGE=${ARROW_TEST_LINKAGE:-shared} \
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ if(ARROW_S3)
endif()
endif()

if(ARROW_S3_MODULE)
if(ARROW_S3_MODULE AND ARROW_BUILD_TESTS)
add_arrow_test(s3fs_module_test
SOURCES
s3fs_module_test.cc
Expand Down
32 changes: 25 additions & 7 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -893,20 +893,24 @@ Status LoadFileSystemFactories(const char* libpath) {

namespace {

Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(
const Uri& uri, const std::string& uri_string,
const FileSystemFactoryOptions& options, const io::IOContext& io_context,
std::string* out_path) {
const auto scheme = uri.scheme();

{
ARROW_ASSIGN_OR_RAISE(
auto* factory,
FileSystemFactoryRegistry::GetInstance()->FactoryForScheme(scheme));
if (factory != nullptr) {
return factory->function(uri, io_context, out_path);
return factory->function(uri, options, io_context, out_path);
}
}
if (!options.empty()) {
return Status::NotImplemented("Filesystem options are not supported yet for scheme '",
scheme, "', got ", options.size(), " option(s)");
}

if (scheme == "abfs" || scheme == "abfss") {
#ifdef ARROW_AZURE
Expand Down Expand Up @@ -962,14 +966,28 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,

Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
std::string* out_path) {
return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
return FileSystemFromUriAndOptions(uri_string, /*options=*/{}, io::default_io_context(),
out_path);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
const std::string& uri_string, const FileSystemFactoryOptions& options,
std::string* out_path) {
return FileSystemFromUriAndOptions(uri_string, options, io::default_io_context(),
out_path);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
return FileSystemFromUriAndOptions(uri_string, /*options=*/{}, io_context, out_path);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
const std::string& uri_string, const FileSystemFactoryOptions& options,
const io::IOContext& io_context, std::string* out_path) {
ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string));
return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path);
return FileSystemFromUriReal(fsuri, uri_string, options, io_context, out_path);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(const std::string& uri_string,
Expand Down
84 changes: 83 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <any>
#include <chrono>
#include <cstdint>
#include <functional>
Expand All @@ -28,6 +29,7 @@

#include "arrow/filesystem/type_fwd.h"
#include "arrow/io/interfaces.h"
#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compare.h"
#include "arrow/util/macros.h"
Expand Down Expand Up @@ -357,13 +359,44 @@ class ARROW_EXPORT FileSystem
bool default_async_is_sync_ = true;
};

using FileSystemFactoryOptions = std::vector<std::pair<std::string, std::any>>;

struct FileSystemFactory {
std::function<Result<std::shared_ptr<FileSystem>>(
const Uri& uri, const io::IOContext& io_context, std::string* out_path)>
const Uri& uri, const FileSystemFactoryOptions& options,
const io::IOContext& io_context, std::string* out_path)>
function;
std::string_view file;
int line;

/// Construct from an options-aware factory function.
FileSystemFactory(std::function<Result<std::shared_ptr<FileSystem>>(
const Uri&, const FileSystemFactoryOptions&, const io::IOContext&,
std::string*)>
fn,
std::string_view file, int line)
: function(std::move(fn)), file(file), line(line) {}

/// Construct from a non-options aware factory function maintaining source compatibility
/// with existing factories.
FileSystemFactory(std::function<Result<std::shared_ptr<FileSystem>>(
const Uri&, const io::IOContext&, std::string*)>
fn,
std::string_view file, int line)
: function([fn = std::move(fn)](
const Uri& uri, const FileSystemFactoryOptions& options,
const io::IOContext& ctx,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
if (!options.empty()) {
return Status::NotImplemented(
"Filesystem factory does not support additional options, got ",
options.size(), " option(s)");
}
return fn(uri, ctx, out_path);
}),
file(file),
line(line) {}

bool operator==(const FileSystemFactory& other) const {
// In the case where libarrow is linked statically both to the executable and to a
// dynamically loaded filesystem implementation library, the library contains a
Expand Down Expand Up @@ -547,6 +580,30 @@ ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
std::string* out_path = NULLPTR);

/// \brief Create a new FileSystem by URI with extended backend-specific filesystem
/// options
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
/// "gs" and "gcs".
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
///
/// \param[in] uri the URI to give access to
/// \param[in] options a list of backend-specific filesystem options
/// Each option is a (name, value) pair.
/// The expected type is specific to the backend and
/// option name.
/// Options are forwarded to schemes dispatched through a registered
/// FileSystemFactory. Non-empty options return NotImplemented for a registered
/// FileSystemFactory that does not support them or for schemes not handled by
/// a registered factory.
/// \param[out] out_path (optional) Path inside the filesystem.
/// \return out_fs FileSystem instance.
ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
const std::string& uri, const FileSystemFactoryOptions& options,
std::string* out_path = NULLPTR);

/// \brief Create a new FileSystem by URI with a custom IO context
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
Expand All @@ -563,6 +620,31 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
const io::IOContext& io_context,
std::string* out_path = NULLPTR);

/// \brief Create a new FileSystem by URI with a custom IO context with backend-specific
/// filesystem options
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
/// "gs" and "gcs".
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
///
/// \param[in] uri a URI-based path, ex: file:///some/local/path
/// \param[in] options a list of backend-specific filesystem options
/// Each option is a (name, value) pair.
/// The expected type is specific to the backend and
/// option name.
/// Options are forwarded to schemes dispatched through a registered
/// FileSystemFactory. Non-empty options return NotImplemented for a registered
/// FileSystemFactory that does not support them or for schemes not handled by
/// a registered factory.
/// \param[in] io_context an IOContext which will be associated with the filesystem
/// \param[out] out_path (optional) Path inside the filesystem.
/// \return out_fs FileSystem instance.
ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
const std::string& uri, const FileSystemFactoryOptions& options,
const io::IOContext& io_context, std::string* out_path = NULLPTR);

/// \brief Create a new FileSystem by URI
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <any>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -640,6 +641,11 @@ TEST_F(TestMockFS, FileSystemFromUri) {
Invalid, ::testing::HasSubstr("syntax error at character ' ' (position 12)"),
FileSystemFromUri("mock:/folder name/bar", &path));
CheckDirs({});
FileSystemFactoryOptions options{{"some_option", 1}};
EXPECT_RAISES_WITH_MESSAGE_THAT(
NotImplemented, ::testing::HasSubstr("options are not supported"),
FileSystemFromUriAndOptions("mock:///foo/bar", options, &path));
CheckDirs({});
}

////////////////////////////////////////////////////////////////////////////
Expand Down
68 changes: 63 additions & 5 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <any>
#include <memory>
#include <sstream>
#include <string>
Expand All @@ -29,6 +30,7 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/testing/examplefs.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/util/io_util.h"
Expand Down Expand Up @@ -83,6 +85,12 @@ Result<std::shared_ptr<FileSystem>> FSFromUriOrPath(const std::string& uri,
////////////////////////////////////////////////////////////////////////////
// Registered FileSystemFactory tests

struct ConcreteTypedOption : ExampleTypedOption {
explicit ConcreteTypedOption(int value) : value_(value) {}
int value() const override { return value_; }
int value_;
};

class SlowFileSystemPublicProps : public SlowFileSystem {
public:
SlowFileSystemPublicProps(std::shared_ptr<FileSystem> base_fs, double average_latency,
Expand Down Expand Up @@ -144,7 +152,6 @@ TEST(FileSystemFromUri, LoadedRegisteredFactory) {
EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid));

EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok());

ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "local");
Expand All @@ -171,10 +178,21 @@ TEST(FileSystemFromUri, RuntimeRegisteredFactory) {
}

FileSystemRegistrar kSegfaultFileSystemModule[]{
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
};
ARROW_REGISTER_FILESYSTEM(
"segfault",
std::function<Result<std::shared_ptr<FileSystem>>(
const Uri&, const io::IOContext&, std::string*)>(nullptr),
{}),
ARROW_REGISTER_FILESYSTEM(
"segfault",
std::function<Result<std::shared_ptr<FileSystem>>(
const Uri&, const io::IOContext&, std::string*)>(nullptr),
{}),
ARROW_REGISTER_FILESYSTEM(
"segfault",
std::function<Result<std::shared_ptr<FileSystem>>(
const Uri&, const io::IOContext&, std::string*)>(nullptr),
{})};
TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
// Since multiple registrars are defined in this translation unit which all
// register factories for the 'segfault' scheme, using that scheme in FileSystemFromUri
Expand All @@ -185,6 +203,46 @@ TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
// other schemes are not affected by the collision
EXPECT_THAT(FileSystemFromUri("slowfile:///hey/yo", &path), Ok());
}

TEST(FileSystemFromUriAndOptions, LoadedRegisteredFactory) {
#ifdef __EMSCRIPTEN__
GTEST_SKIP() << "Emscripten dynamic library testing disabled";
#endif
std::string path;
EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok());
// Validate extra options are forwarded to the factory.
FileSystemFactoryOptions options{
{"example_option_string", std::string("example_value")},
{"example_option_int", 42},
{"example_typed_option",
std::shared_ptr<ExampleTypedOption>(std::make_shared<ConcreteTypedOption>(12345))},
};
ASSERT_OK_AND_ASSIGN(auto fs,
FileSystemFromUriAndOptions("example:///hey/yo", options, &path));
EXPECT_EQ(path, "/hey/yo/example_value/42/12345");
EXPECT_EQ(fs->type_name(), "local");
}

TEST(FileSystemFromUriAndOptions, RuntimeRegisteredFactory) {
std::string path;
EXPECT_THAT(FileSystemFromUriAndOptions("slowfile3:///hey/yo", {}, &path),
Raises(StatusCode::Invalid));

EXPECT_THAT(
RegisterFileSystemFactory("slowfile3", {SlowFileSystemFactory, __FILE__, __LINE__}),
Ok());

ASSERT_OK_AND_ASSIGN(auto fs,
FileSystemFromUriAndOptions("slowfile3:///hey/yo", {}, &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "slow");

// Validate that legacy (3-arg) factories reject non-empty options.
FileSystemFactoryOptions unsupported{{"some_option", 1}};
EXPECT_THAT(FileSystemFromUriAndOptions("slowfile3:///hey/yo", unsupported, &path),
Raises(StatusCode::NotImplemented));
}

////////////////////////////////////////////////////////////////////////////
// Misc tests

Expand Down
Loading
Loading