Skip to content

Commit 2f725ff

Browse files
Implement remote evaluation and distributed garbage collection (#46)
Co-authored-by: José Valim <jose.valim@gmail.com>
1 parent 4a2a7f8 commit 2f725ff

File tree

14 files changed

+912
-109
lines changed

14 files changed

+912
-109
lines changed

c_src/python.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ DEF_SYMBOL(PyDict_Next)
2727
DEF_SYMBOL(PyDict_SetItem)
2828
DEF_SYMBOL(PyDict_SetItemString)
2929
DEF_SYMBOL(PyDict_Size)
30+
DEF_SYMBOL(PyErr_Clear)
3031
DEF_SYMBOL(PyErr_Fetch)
3132
DEF_SYMBOL(PyErr_Occurred)
3233
DEF_SYMBOL(PyEval_GetBuiltins)
@@ -101,6 +102,7 @@ void load_python_library(std::string path) {
101102
LOAD_SYMBOL(python_library, PyDict_SetItem)
102103
LOAD_SYMBOL(python_library, PyDict_SetItemString)
103104
LOAD_SYMBOL(python_library, PyDict_Size)
105+
LOAD_SYMBOL(python_library, PyErr_Clear)
104106
LOAD_SYMBOL(python_library, PyErr_Fetch)
105107
LOAD_SYMBOL(python_library, PyErr_Occurred)
106108
LOAD_SYMBOL(python_library, PyEval_GetBuiltins)

c_src/python.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ extern int (*PyDict_Next)(PyObjectPtr, Py_ssize_t *, PyObjectPtr *,
8181
extern int (*PyDict_SetItem)(PyObjectPtr, PyObjectPtr, PyObjectPtr);
8282
extern int (*PyDict_SetItemString)(PyObjectPtr, const char *, PyObjectPtr);
8383
extern Py_ssize_t (*PyDict_Size)(PyObjectPtr);
84+
extern void (*PyErr_Clear)();
8485
extern void (*PyErr_Fetch)(PyObjectPtr *, PyObjectPtr *, PyObjectPtr *);
8586
extern PyObjectPtr (*PyErr_Occurred)();
8687
extern PyObjectPtr (*PyEval_GetBuiltins)();

c_src/pythonx.cpp

Lines changed: 196 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <string>
1010
#include <thread>
1111
#include <tuple>
12+
#include <variant>
1213

1314
#include "python.hpp"
1415

@@ -138,15 +139,14 @@ auto ElixirPythonxJanitor = fine::Atom("Elixir.Pythonx.Janitor");
138139
auto ElixirPythonxObject = fine::Atom("Elixir.Pythonx.Object");
139140
auto decref = fine::Atom("decref");
140141
auto integer = fine::Atom("integer");
142+
auto lines = fine::Atom("lines");
141143
auto list = fine::Atom("list");
142144
auto map = fine::Atom("map");
143145
auto map_set = fine::Atom("map_set");
144146
auto output = fine::Atom("output");
147+
auto remote_info = fine::Atom("remote_info");
145148
auto resource = fine::Atom("resource");
146-
auto traceback = fine::Atom("traceback");
147149
auto tuple = fine::Atom("tuple");
148-
auto type = fine::Atom("type");
149-
auto value = fine::Atom("value");
150150
} // namespace atoms
151151

152152
struct PyObjectResource {
@@ -186,8 +186,26 @@ struct PyObjectResource {
186186

187187
FINE_RESOURCE(PyObjectResource);
188188

189+
// A resource that notifies the given process upon garbage collection.
190+
struct GCNotifier {
191+
ErlNifPid pid;
192+
ErlNifEnv *message_env;
193+
ERL_NIF_TERM message_term;
194+
195+
GCNotifier(ErlNifPid pid, ErlNifEnv *message_env, ERL_NIF_TERM message_term)
196+
: pid(pid), message_env(message_env), message_term(message_term) {}
197+
198+
void destructor(ErlNifEnv *env) {
199+
enif_send(env, &pid, message_env, message_term);
200+
enif_free_env(message_env);
201+
}
202+
};
203+
204+
FINE_RESOURCE(GCNotifier);
205+
189206
struct ExObject {
190207
fine::ResourcePtr<PyObjectResource> resource;
208+
std::optional<fine::Term> remote_info;
191209

192210
ExObject() {}
193211
ExObject(fine::ResourcePtr<PyObjectResource> resource) : resource(resource) {}
@@ -196,26 +214,21 @@ struct ExObject {
196214

197215
static constexpr auto fields() {
198216
return std::make_tuple(
199-
std::make_tuple(&ExObject::resource, &atoms::resource));
217+
std::make_tuple(&ExObject::resource, &atoms::resource),
218+
std::make_tuple(&ExObject::remote_info, &atoms::remote_info));
200219
}
201220
};
202221

203222
struct ExError {
204-
ExObject type;
205-
ExObject value;
206-
ExObject traceback;
223+
std::vector<fine::Term> lines;
207224

208225
ExError() {}
209-
ExError(ExObject type, ExObject value, ExObject traceback)
210-
: type(type), value(value), traceback(traceback) {}
226+
ExError(std::vector<fine::Term> lines) : lines(lines) {}
211227

212228
static constexpr auto module = &atoms::ElixirPythonxError;
213229

214230
static constexpr auto fields() {
215-
return std::make_tuple(
216-
std::make_tuple(&ExError::type, &atoms::type),
217-
std::make_tuple(&ExError::value, &atoms::value),
218-
std::make_tuple(&ExError::traceback, &atoms::traceback));
231+
return std::make_tuple(std::make_tuple(&ExError::lines, &atoms::lines));
219232
}
220233

221234
static constexpr auto is_exception = true;
@@ -228,30 +241,91 @@ struct EvalInfo {
228241
std::thread::id thread_id;
229242
};
230243

231-
void raise_py_error(ErlNifEnv *env) {
244+
void raise_formatting_error_if_failed(PyObjectPtr py_object) {
245+
if (py_object == NULL) {
246+
throw std::runtime_error("failed while formatting a python error");
247+
}
248+
}
249+
250+
void raise_formatting_error_if_failed(const char *buffer) {
251+
if (buffer == NULL) {
252+
throw std::runtime_error("failed while formatting a python error");
253+
}
254+
}
255+
256+
void raise_formatting_error_if_failed(Py_ssize_t size) {
257+
if (size == -1) {
258+
throw std::runtime_error("failed while formatting a python error");
259+
}
260+
}
261+
262+
ExError build_py_error_from_current(ErlNifEnv *env) {
232263
PyObjectPtr py_type, py_value, py_traceback;
233264
PyErr_Fetch(&py_type, &py_value, &py_traceback);
234265

235266
// If the error indicator was set, type should not be NULL, but value
236-
// and traceback might
237-
267+
// and traceback might.
238268
if (py_type == NULL) {
239-
throw std::runtime_error(
240-
"raise_py_error should only be called when the error indicator is set");
269+
throw std::runtime_error("build_py_error_from_current should only be "
270+
"called when the error indicator is set");
241271
}
242272

243273
auto type = ExObject(fine::make_resource<PyObjectResource>(py_type));
244274

245-
// Default value and traceback to None object
275+
// Default value and traceback to None object.
276+
py_value = py_value == NULL ? Py_BuildValue("") : py_value;
277+
py_traceback = py_traceback == NULL ? Py_BuildValue("") : py_traceback;
278+
279+
// Format the exception. Note that if anything raises an error here,
280+
// we throw a runtime exception, instead of a Python one, otherwise
281+
// we could go into an infinite loop.
282+
283+
auto py_traceback_module = PyImport_ImportModule("traceback");
284+
raise_formatting_error_if_failed(py_traceback_module);
285+
auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module);
246286

247-
auto value = fine::make_resource<PyObjectResource>(
248-
py_value == NULL ? Py_BuildValue("") : py_value);
287+
auto format_exception =
288+
PyObject_GetAttrString(py_traceback_module, "format_exception");
289+
raise_formatting_error_if_failed(format_exception);
290+
auto format_exception_guard = PyDecRefGuard(format_exception);
249291

250-
auto traceback = fine::make_resource<PyObjectResource>(
251-
py_traceback == NULL ? Py_BuildValue("") : py_traceback);
292+
auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback);
293+
raise_formatting_error_if_failed(format_exception_args);
294+
auto format_exception_args_guard = PyDecRefGuard(format_exception_args);
252295

253-
auto error = ExError(type, value, traceback);
254-
fine::raise(env, error);
296+
auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL);
297+
raise_formatting_error_if_failed(py_lines);
298+
auto py_lines_guard = PyDecRefGuard(py_lines);
299+
300+
auto size = PyList_Size(py_lines);
301+
raise_formatting_error_if_failed(size);
302+
303+
auto terms = std::vector<fine::Term>();
304+
terms.reserve(size);
305+
306+
for (Py_ssize_t i = 0; i < size; i++) {
307+
auto py_line = PyList_GetItem(py_lines, i);
308+
raise_formatting_error_if_failed(py_line);
309+
310+
Py_ssize_t size;
311+
auto buffer = PyUnicode_AsUTF8AndSize(py_line, &size);
312+
raise_formatting_error_if_failed(buffer);
313+
314+
// The buffer is immutable and lives as long as the Python object,
315+
// so we create the term as a resource binary to make it zero-copy.
316+
Py_IncRef(py_line);
317+
auto ex_object_resource = fine::make_resource<PyObjectResource>(py_line);
318+
auto binary_term =
319+
fine::make_resource_binary(env, ex_object_resource, buffer, size);
320+
321+
terms.push_back(binary_term);
322+
}
323+
324+
return ExError(std::move(terms));
325+
}
326+
327+
void raise_py_error(ErlNifEnv *env) {
328+
fine::raise(env, build_py_error_from_current(env));
255329
}
256330

257331
void raise_if_failed(ErlNifEnv *env, PyObjectPtr py_object) {
@@ -284,6 +358,19 @@ ERL_NIF_TERM py_str_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) {
284358
return fine::make_resource_binary(env, ex_object_resource, buffer, size);
285359
}
286360

361+
ERL_NIF_TERM py_bytes_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) {
362+
Py_ssize_t size;
363+
char *buffer;
364+
auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size);
365+
raise_if_failed(env, result);
366+
367+
// The buffer is immutable and lives as long as the Python object,
368+
// so we create the term as a resource binary to make it zero-copy.
369+
Py_IncRef(py_object);
370+
auto ex_object_resource = fine::make_resource<PyObjectResource>(py_object);
371+
return fine::make_resource_binary(env, ex_object_resource, buffer, size);
372+
}
373+
287374
fine::Ok<> init(ErlNifEnv *env, std::string python_dl_path,
288375
ErlNifBinary python_home_path,
289376
ErlNifBinary python_executable_path,
@@ -785,50 +872,6 @@ ExObject object_repr(ErlNifEnv *env, ExObject ex_object) {
785872

786873
FINE_NIF(object_repr, ERL_NIF_DIRTY_JOB_CPU_BOUND);
787874

788-
fine::Term format_exception(ErlNifEnv *env, ExError error) {
789-
ensure_initialized();
790-
auto gil_guard = PyGILGuard();
791-
792-
auto py_traceback_module = PyImport_ImportModule("traceback");
793-
raise_if_failed(env, py_traceback_module);
794-
auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module);
795-
796-
auto format_exception =
797-
PyObject_GetAttrString(py_traceback_module, "format_exception");
798-
raise_if_failed(env, format_exception);
799-
auto format_exception_guard = PyDecRefGuard(format_exception);
800-
801-
auto py_type = error.type.resource->py_object;
802-
auto py_value = error.value.resource->py_object;
803-
auto py_traceback = error.traceback.resource->py_object;
804-
805-
auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback);
806-
raise_if_failed(env, format_exception_args);
807-
auto format_exception_args_guard = PyDecRefGuard(format_exception_args);
808-
809-
auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL);
810-
raise_if_failed(env, py_lines);
811-
auto py_lines_guard = PyDecRefGuard(py_lines);
812-
813-
auto size = PyList_Size(py_lines);
814-
raise_if_failed(env, size);
815-
816-
auto terms = std::vector<ERL_NIF_TERM>();
817-
terms.reserve(size);
818-
819-
for (Py_ssize_t i = 0; i < size; i++) {
820-
auto py_line = PyList_GetItem(py_lines, i);
821-
raise_if_failed(env, py_line);
822-
823-
terms.push_back(py_str_to_binary_term(env, py_line));
824-
}
825-
826-
return enif_make_list_from_array(env, terms.data(),
827-
static_cast<unsigned int>(size));
828-
}
829-
830-
FINE_NIF(format_exception, ERL_NIF_DIRTY_JOB_CPU_BOUND);
831-
832875
fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) {
833876
ensure_initialized();
834877
auto gil_guard = PyGILGuard();
@@ -987,16 +1030,7 @@ fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) {
9871030
auto is_bytes = PyObject_IsInstance(py_object, py_bytes_type);
9881031
raise_if_failed(env, is_bytes);
9891032
if (is_bytes) {
990-
Py_ssize_t size;
991-
char *buffer;
992-
auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size);
993-
raise_if_failed(env, result);
994-
995-
// The buffer is immutable and lives as long as the Python object,
996-
// so we create the term as a resource binary to make it zero-copy.
997-
Py_IncRef(py_object);
998-
auto ex_object_resource = fine::make_resource<PyObjectResource>(py_object);
999-
return fine::make_resource_binary(env, ex_object_resource, buffer, size);
1033+
return py_bytes_to_binary_term(env, py_object);
10001034
}
10011035

10021036
auto py_set_type = PyDict_GetItemString(py_builtins, "set");
@@ -1461,6 +1495,86 @@ eval(ErlNifEnv *env, ErlNifBinary code, std::string code_md5,
14611495

14621496
FINE_NIF(eval, ERL_NIF_DIRTY_JOB_CPU_BOUND);
14631497

1498+
std::variant<fine::Ok<fine::Term>, fine::Error<std::string, ExError>>
1499+
dump_object(ErlNifEnv *env, ExObject ex_object) {
1500+
ensure_initialized();
1501+
auto gil_guard = PyGILGuard();
1502+
1503+
std::string pickle_module_name;
1504+
PyObjectPtr py_pickle;
1505+
1506+
py_pickle = PyImport_ImportModule("cloudpickle");
1507+
if (py_pickle != NULL) {
1508+
pickle_module_name = "cloudpickle";
1509+
} else {
1510+
// If importing fails, we ignore the error and fallback to the pickle
1511+
// module.
1512+
PyErr_Clear();
1513+
py_pickle = PyImport_ImportModule("pickle");
1514+
raise_if_failed(env, py_pickle);
1515+
pickle_module_name = "pickle";
1516+
}
1517+
auto py_pickle_guard = PyDecRefGuard(py_pickle);
1518+
1519+
auto py_dumps = PyObject_GetAttrString(py_pickle, "dumps");
1520+
raise_if_failed(env, py_dumps);
1521+
auto py_dumps_guard = PyDecRefGuard(py_dumps);
1522+
1523+
auto py_dumps_args = PyTuple_Pack(1, ex_object.resource->py_object);
1524+
raise_if_failed(env, py_dumps_args);
1525+
auto py_dumps_args_guard = PyDecRefGuard(py_dumps_args);
1526+
1527+
auto py_dump_bytes = PyObject_Call(py_dumps, py_dumps_args, NULL);
1528+
if (py_dump_bytes == NULL) {
1529+
return fine::Error<std::string, ExError>(pickle_module_name,
1530+
build_py_error_from_current(env));
1531+
}
1532+
raise_if_failed(env, py_dump_bytes);
1533+
auto py_bytes_guard = PyDecRefGuard(py_dump_bytes);
1534+
1535+
return fine::Ok<fine::Term>(py_bytes_to_binary_term(env, py_dump_bytes));
1536+
}
1537+
1538+
FINE_NIF(dump_object, ERL_NIF_DIRTY_JOB_CPU_BOUND);
1539+
1540+
ExObject load_object(ErlNifEnv *env, ErlNifBinary binary) {
1541+
ensure_initialized();
1542+
auto gil_guard = PyGILGuard();
1543+
1544+
auto py_pickle = PyImport_ImportModule("pickle");
1545+
raise_if_failed(env, py_pickle);
1546+
auto py_pickle_guard = PyDecRefGuard(py_pickle);
1547+
1548+
auto py_loads = PyObject_GetAttrString(py_pickle, "loads");
1549+
raise_if_failed(env, py_loads);
1550+
auto py_loads_guard = PyDecRefGuard(py_loads);
1551+
1552+
auto py_bytes = PyBytes_FromStringAndSize(
1553+
reinterpret_cast<const char *>(binary.data), binary.size);
1554+
raise_if_failed(env, py_bytes);
1555+
auto py_bytes_guard = PyDecRefGuard(py_bytes);
1556+
1557+
auto py_loads_args = PyTuple_Pack(1, py_bytes);
1558+
raise_if_failed(env, py_loads_args);
1559+
auto py_loads_args_guard = PyDecRefGuard(py_loads_args);
1560+
1561+
auto py_object = PyObject_Call(py_loads, py_loads_args, NULL);
1562+
raise_if_failed(env, py_object);
1563+
1564+
return ExObject(fine::make_resource<PyObjectResource>(py_object));
1565+
}
1566+
1567+
FINE_NIF(load_object, ERL_NIF_DIRTY_JOB_CPU_BOUND);
1568+
1569+
fine::ResourcePtr<GCNotifier> create_gc_notifier(ErlNifEnv *env, ErlNifPid pid,
1570+
fine::Term term) {
1571+
auto message_env = enif_alloc_env();
1572+
auto message_term = enif_make_copy(message_env, term);
1573+
return fine::make_resource<GCNotifier>(pid, message_env, message_term);
1574+
}
1575+
1576+
FINE_NIF(create_gc_notifier, 0);
1577+
14641578
} // namespace pythonx
14651579

14661580
FINE_INIT("Elixir.Pythonx.NIF");
@@ -1505,6 +1619,9 @@ extern "C" void pythonx_handle_io_write(const char *message,
15051619
ErlNifPid janitor_pid;
15061620
if (enif_whereis_pid(caller_env, janitor_name, &janitor_pid)) {
15071621
auto device = type == 0 ? eval_info.stdout_device : eval_info.stderr_device;
1622+
// The device term is from a different env, so we copy it into
1623+
// the message env, otherwise we may run into unexpected behaviour.
1624+
device = enif_make_copy(env, device);
15081625

15091626
auto msg = fine::encode(env, std::make_tuple(pythonx::atoms::output,
15101627
std::string(message), device));

0 commit comments

Comments
 (0)