-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathcopy_function.cpp
More file actions
155 lines (123 loc) · 5.67 KB
/
copy_function.cpp
File metadata and controls
155 lines (123 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
#include "duckdb_vx.h"
#include "duckdb_vx/data.hpp"
#include "duckdb/function/copy_function.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parsed_data/create_copy_function_info.hpp"
#include "duckdb_vx/error.hpp"
using namespace duckdb;
namespace vortex {
struct CCopyBindData final : TableFunctionData {
CCopyBindData(const duckdb_vx_copy_func_vtab_t vtab_p, unique_ptr<CData> ffi_data_p)
: vtab(vtab_p), ffi_data(std::move(ffi_data_p)) {
}
const duckdb_vx_copy_func_vtab_t vtab;
unique_ptr<CData> ffi_data;
};
struct CCopyGlobalData final : GlobalFunctionData {
explicit CCopyGlobalData(unique_ptr<CData> ffi_data_p) : ffi_data(std::move(ffi_data_p)) {
}
unique_ptr<CData> ffi_data;
};
struct CCopyLocalData final : LocalFunctionData {
explicit CCopyLocalData(unique_ptr<CData> ffi_data_p) : ffi_data(std::move(ffi_data_p)) {
}
unique_ptr<CData> ffi_data;
};
static duckdb_vx_copy_func_vtab_t copy_vtab_one;
unique_ptr<FunctionData> c_bind_one(ClientContext &context, CopyFunctionBindInput &info,
const vector<string> &column_names,
const vector<LogicalType> &column_types) {
auto c_column_names = vector<char *>();
for (const auto &col_id : column_names) {
c_column_names.push_back(const_cast<char *>(col_id.c_str()));
}
auto c_column_types = vector<duckdb_logical_type>();
for (auto &col_type : column_types) {
c_column_types.push_back(reinterpret_cast<duckdb_logical_type>(const_cast<LogicalType *>(&col_type)));
}
duckdb_vx_error error_out = nullptr;
auto ffi_bind_data =
copy_vtab_one.bind(reinterpret_cast<duckdb_vx_copy_func_bind_input>(&info), c_column_names.data(),
c_column_names.size(), c_column_types.data(), c_column_types.size(), &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
return make_uniq<CCopyBindData>(
// This should only be filled out once
copy_vtab_one, unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data)));
}
unique_ptr<GlobalFunctionData> c_init_global(ClientContext &context, FunctionData &bind_data,
const string &file_path) {
auto &bind = bind_data.Cast<CCopyBindData>();
duckdb_vx_error error_out = nullptr;
auto global_data = bind.vtab.init_global(reinterpret_cast<duckdb_vx_client_context>(&context),
bind.ffi_data->DataPtr(), file_path.c_str(), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
return make_uniq<CCopyGlobalData>(unique_ptr<CData>(reinterpret_cast<CData *>(global_data)));
}
unique_ptr<LocalFunctionData> c_init_local(ExecutionContext &context, FunctionData &bind_data) {
auto &bind = bind_data.Cast<CCopyBindData>();
duckdb_vx_error error_out = nullptr;
auto data = bind.vtab.init_local(bind.ffi_data->DataPtr(), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
return make_uniq<CCopyLocalData>(unique_ptr<CData>(reinterpret_cast<CData *>(data)));
}
void c_copy_to_sink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
LocalFunctionData &lstate, DataChunk &input) {
auto &bind = bind_data.Cast<CCopyBindData>();
auto &global = gstate.Cast<CCopyGlobalData>();
auto &local = lstate.Cast<CCopyLocalData>();
duckdb_vx_error error_out = nullptr;
bind.vtab.copy_to_sink(bind.ffi_data->DataPtr(), global.ffi_data->DataPtr(), local.ffi_data->DataPtr(),
reinterpret_cast<duckdb_data_chunk>(&input), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
}
void copy_to_finalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) {
auto &bind = bind_data.Cast<CCopyBindData>();
auto &global = gstate.Cast<CCopyGlobalData>();
duckdb_vx_error error_out = nullptr;
bind.vtab.copy_to_finalize(bind.ffi_data->DataPtr(), global.ffi_data->DataPtr(), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
}
extern "C" duckdb_vx_copy_func_vtab_t *get_vtab_one() {
return ©_vtab_one;
}
extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_connection ffi_conn) {
if (!ffi_conn) {
return DuckDBError;
}
auto conn = reinterpret_cast<Connection *>(ffi_conn);
auto copy_function = CopyFunction(copy_vtab_one.name);
copy_function.copy_to_bind = c_bind_one;
copy_function.copy_to_initialize_global = c_init_global;
copy_function.copy_to_initialize_local = c_init_local;
copy_function.copy_to_sink = c_copy_to_sink;
copy_function.copy_to_finalize = copy_to_finalize;
copy_function.extension = copy_vtab_one.extension;
// TODO(joe): expose this via c our api
copy_function.execution_mode = [](bool preserve_insertion_order, bool supports_batch_index) {
return CopyFunctionExecutionMode::REGULAR_COPY_TO_FILE;
};
// TODO(joe): handle parameters as in table_function
try {
CreateCopyFunctionInfo info(std::move(copy_function));
auto &system_catalog = Catalog::GetSystemCatalog(*conn->context->db);
auto data = CatalogTransaction::GetSystemTransaction(*conn->context->db);
system_catalog.CreateCopyFunction(data, info);
} catch (...) {
return DuckDBError;
}
return DuckDBSuccess;
}
} // namespace vortex