From 667151b1b608223523501930bcd896891f4ac5a9 Mon Sep 17 00:00:00 2001 From: Riley King-Saunders Date: Thu, 6 Mar 2025 17:54:53 +1100 Subject: [PATCH] Replaced popen usage with uniproc_process --- include/lua_platform.h | 9 +- src/lua_platform.cpp | 210 ++++++++++++++++++++--------------------- 2 files changed, 106 insertions(+), 113 deletions(-) diff --git a/include/lua_platform.h b/include/lua_platform.h index 65b4d7f..01c4e22 100644 --- a/include/lua_platform.h +++ b/include/lua_platform.h @@ -10,11 +10,11 @@ int luaopen_platform(lua_State* L); // Returns the exit code of the command as well as a string of its stdout -// int, string function platform.exec(cmd: string, args: array) +// int, string, string function platform.exec(cmd: string, args: array) int lua_platform_exec(lua_State* L); // Executes a command asynchronosly, returning a promise to the functions return values -// promise, promise function platform.exec_async(cmd: string, args: array) +// promise, promise, promise function platform.exec_async(cmd: string, args: array) int lua_platform_exec_async(lua_State* L); // Executes a command asynchronosly, calling a callback on its completion with its error code and @@ -22,13 +22,14 @@ int lua_platform_exec_async(lua_State* L); // function platform.exec_async_cb(cmd: string, // function on_complete(cmd: string, exit_code: int, overall_output: string), // function on_stdout(cmd: string, overall_output: string, delta_output: string) +// function on_stderr(cmd: string, overall_output: string, delta_output: string) // ) int lua_platform_exec_async_cb(lua_State* L); // Executes the same command up to N times in parallel, iterating through the args provided until all arguments have been used -// array[promise, promise] function platform.exec_parallel_async(cmd: string, N: int, args: array>) +// array[promise, promise, promise] function platform.exec_parallel_async(cmd: string, N: int, args: array>) int lua_platform_exec_parallel_async(lua_State* L); -// array, array function platform.exec_parallel(cmd: string, N: int, args: array>) +// array, array, array function platform.exec_parallel(cmd: string, N: int, args: array>) int lua_platform_exec_parallel(lua_State* L); \ No newline at end of file diff --git a/src/lua_platform.cpp b/src/lua_platform.cpp index 913fb47..e1dcab1 100644 --- a/src/lua_platform.cpp +++ b/src/lua_platform.cpp @@ -4,6 +4,7 @@ #include "mutex" #include #include "simple_string.h" +#include "uniproc/include/uniproc.h" int luaopen_platform(lua_State* L) { @@ -23,94 +24,43 @@ int lua_platform_exec(lua_State* L) { luaL_argcheck(L, lua_isstring(L, 1), 1, "Expected first argument to be a string"); luaL_argcheck(L, lua_istable(L, 2), 2, "Expected second argument to be a table of strings"); - std::string cmd(lua_tostring(L, 1)); + size_t nargs = lua_objlen(L, 2); + char** argv = malloc(nargs * sizeof(char*)); for (size_t i = 0; i < nargs; ++i) { lua_rawgeti(L, -1, i + 1); if (!lua_isstring(L, -1)) return luaL_error(L, "Expected all values in argument table to be a string. Index %d was a %s", i, lua_typename(L, lua_type(L, -1))); - cmd.append(" ").append(lua_tostring(L, -1)); + argv[i] = lua_tostring(L, -1); } - FILE* f = popen(cmd.c_str(), "r"); + int retcode = 0; + std::string std_out; + std::string std_err; char buf[64]; - std::string result; - while (fgets(buf, sizeof(buf), f) != NULL) - result.append(buf); - - lua_pushinteger(L, pclose(f)); - lua_pushlstring(L, result.c_str(), result.size()); - return 2; -} - - -struct command_stack -{ - std::string base_cmd; - dynarray> commands; - std::recursive_mutex mux; - - size_t num_cmds(void) const noexcept - { return commands.size(); } - bool is_empty(void) const noexcept - { return commands.size() == 0; } - std::pair, size_t> get_command(void) noexcept - { - std::scoped_lock lock(mux); - const size_t sz = commands.size(); - return std::make_pair(std::move(commands.pop_back()),sz-1); - } -}; -struct result_stack -{ - struct result - { - std::string program_output; - int program_ret; - }; - std::recursive_mutex mux; - dynarray results; - - void add_result(const size_t idx, const std::string& out, const int ret) + uniproc_process p = uniproc_create_process(lua_tostring(L, 1), nargs, argv); + if (uniproc_is_process_null(&p)) { - std::scoped_lock lock(mux); - new (results.data() + idx) result(out, ret); + lua_pushnil(L); + lua_pushstring(L, "uniproc: failed to create process"); + return 2; } -}; + uniproc_await_processes(&p, &retcode, 1); + + while (fgets(buf, sizeof(buf), p.out) != NULL) + std_out.append(buf); + while (fgets(buf, sizeof(buf), p.err) != NULL) + std_err.append(buf); + + uniproc_close_process(&p); + free(argv); - -void worker_thread(std::stop_token token, const std::string& base_cmd, command_stack& in, result_stack& out) -{ - char buf[64]; - while (!token.stop_requested()) - { - std::string cmd = base_cmd; - if (in.is_empty()) return; - auto [arg, idx] = in.get_command(); - // Get a command - - for (const std::string& a : arg) - cmd.append(" ").append(a); - - // Run the command - FILE* f = popen(cmd.c_str(), "r"); - - // Capture the commands stdout - std::string pout = ""; - memset(buf, '\0', sizeof(buf)); - while (fgets(buf, sizeof(buf), f) != NULL) - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(10us); - pout.append(buf); - } - - // Await the program to exit - int rc = pclose(f); - out.add_result(idx, pout, rc); - } + lua_pushinteger(L, retcode); + lua_pushlstring(L, std_out.c_str(), std_out.size()); + lua_pushlstring(L, std_err.c_str(), std_err.size()); + return 3; } int lua_platform_exec_parallel(lua_State* L) @@ -118,16 +68,13 @@ int lua_platform_exec_parallel(lua_State* L) luaL_argcheck(L, lua_type(L, 1) == LUA_TSTRING, 1, "Expected first argument to be a string representing a command"); luaL_argcheck(L, lua_type(L, 2) == LUA_TNUMBER, 2, "Expected second argument to be an integer representing the maximum number of parallel jobs to start"); luaL_argcheck(L, lua_type(L, 3) == LUA_TTABLE, 3, "Expected third argument to be a table of table of strings representing the arguments"); - std::string base_cmd(lua_tostring(L, 1)); + const char* cmd = (lua_tostring(L, 1)); const size_t max_parallel = lua_tointeger(L, 2); const size_t num_cmds = lua_objlen(L, 3); - command_stack in; - in.base_cmd = base_cmd; - result_stack out; - for (size_t i = 0; i < num_cmds; ++i) - out.results.push_back(result_stack::result{"Not Executed",-1}); - + const size_t* num_args = (size_t*)malloc(num_cmds * sizeof(size_t)); + const char*** args = (const char***)malloc(num_cmds*sizeof(const char**)); + // Generate the command stack from lua arguments for (size_t i = 0; i < num_cmds; ++i) { @@ -135,57 +82,102 @@ int lua_platform_exec_parallel(lua_State* L) if (lua_istable(L, -1)) { const size_t num_args = lua_objlen(L, -1); - in.commands.push_back(dynarray()); - dynarray& back = in.commands.back(); + args[i] = (const char**)malloc(num_args*sizeof(const char*)); for (size_t j = 0; j < num_args; ++j) { lua_rawgeti(L, -1, j + 1); if (!lua_isstring(L, -1)) return luaL_error(L, "Expected program argument to be a string. args[%d][%d] was a %s, not a string!\n", i, j, lua_typename(L, lua_type(L, -1))); - std::string str(lua_tostring(L, -1), lua_objlen(L, -1)); - back.push_back(std::move(str)); + args[i][j] = lua_tostring(L, -1); lua_pop(L, 1); } } else if (lua_type(L, -1) == LUA_TSTRING) { - in.commands.push_back(dynarray()); - in.commands.back().push_back(lua_tostring(L, -1)); + args[i] = (const char**)malloc(1*sizeof(const char*)); + args[i][0] = lua_tostring(L, -1); } else return luaL_error(L, "Expected a table of program arguments. args[%d] was a %s, not a table!\n", i, lua_typename(L, lua_type(L, -1))); lua_pop(L, 1); } - // Dispatch worker threads - std::vector workers; - for (size_t i = 0; i < max_parallel; ++i) - workers.push_back(std::jthread(worker_thread, - std::ref(base_cmd), - std::ref(in), - std::ref(out) - )); + uniproc_process* workers = (uniproc_process*)malloc(max_parallel*sizeof(uniproc_process)); + uniproc_nullify_processes(workers, max_parallel); + int* retcodes = (int*)malloc(num_cmds*sizeof(int)); + memset(retcodes, 0, num_cmds); + + std::vector std_outs; + std::vector std_errs; + std_outs.resize(num_cmds, ""); + std_errs.resize(num_cmds, ""); + + + size_t cmd_idx = 0; + while (cmd_idx < num_cmds || !uniproc_are_processes_finished(workers, max_parallel)) + { + for (uniproc_process* p = workers; p != (workers + max_parallel); ++p) + { + // Check and handle a worker if it has completed + if (uniproc_are_processes_finished(p, 1)) + { + char buf[64]; + // Read the std. streams into the output vectors + while (fgets(buf, sizeof(buf), p.out) != NULL) + std_outs[p.userdata].append(buf); + while (fgets(buf, sizeof(buf), p.err) != NULL) + std_errs[p.userdata].append(buf); + // Add the process return code to retcode array + uniproc_await_processes(p, retcodes + p.userdata, 1); + // Close the process and zero it + uniproc_close_process(p); + } + // If a worker is null and there is more jobs to complete + // Create a new process + // Is a while loop to handle the possibility that process creation fails + while (uniproc_is_process_null(p) && cmd_idx < num_cmds) + { + // Create process + *p = uniproc_create_process(cmd, num_args[cmd_idx], args[cmd_idx]); + uniproc_set_userdata(p, num_cmds); + cmd_idx++; + + // If we failed to create the process, set the output for it to an error value + if (uniproc_is_process_null(p)) + { + std_outs[p->userdata] = "uniproc: failed to create process"; + std_errs[p->userdata] = "uniproc: failed to create process"; + retcodes[p->userdata] = -1; + } + } + } + // Await a process completing before trying to iterate again + // Prevents spinning the CPU + uniproc_await_any_processes(workers, max_parallel); + } + + for (size_t i = 0; i < num_cmds; ++i) + free(args[i]); + free(num_args); + free(args); + // Don't need to close processes as that is handled in the command loop + free(workers); - using namespace std::chrono_literals; - while (!in.is_empty()) - std::this_thread::sleep_for(1ms); - for (size_t i = 0; i < workers.size(); ++i) - workers[i].request_stop(); - for (size_t i = 0; i < workers.size(); ++i) - workers[i].join(); // Join thread results together and push them to lua stack lua_newtable(L); // rc lua_newtable(L); // stdout - for (size_t i = 0; i < out.results.size(); ++i) + lua_newtable(L); // stderr + for (size_t i = 0; i < num_cmds; ++i) { - const result_stack::result& x = out.results[i]; - lua_pushinteger(L, x.program_ret); + lua_pushinteger(L, retcodes[i]); + lua_rawseti(L, -4, i+1); + lua_pushlstring(L, std_outs[i].c_str(), std_outs[i].size()); lua_rawseti(L, -3, i+1); - lua_pushlstring(L, x.program_output.c_str(), x.program_output.size()); - lua_rawseti(L, -2, i + 1); + lua_pushlstring(L, std_errs[i].c_str(), std_errs[i].size()); + lua_rawseti(L, -2, i+1); } - return 2; + return 3; }