Replaced popen usage with uniproc_process

This commit is contained in:
2025-03-06 17:54:53 +11:00
parent fe01a41aac
commit 667151b1b6
2 changed files with 106 additions and 113 deletions

View File

@@ -10,11 +10,11 @@
int luaopen_platform(lua_State* L); int luaopen_platform(lua_State* L);
// Returns the exit code of the command as well as a string of its stdout // Returns the exit code of the command as well as a string of its stdout
// int, string function platform.exec(cmd: string, args: array<string>) // int, string, string function platform.exec(cmd: string, args: array<string>)
int lua_platform_exec(lua_State* L); int lua_platform_exec(lua_State* L);
// Executes a command asynchronosly, returning a promise to the functions return values // Executes a command asynchronosly, returning a promise to the functions return values
// promise<int>, promise<string> function platform.exec_async(cmd: string, args: array<string>) // promise<int>, promise<string>, promise<string> function platform.exec_async(cmd: string, args: array<string>)
int lua_platform_exec_async(lua_State* L); int lua_platform_exec_async(lua_State* L);
// Executes a command asynchronosly, calling a callback on its completion with its error code and // Executes a command asynchronosly, calling a callback on its completion with its error code and
@@ -22,13 +22,14 @@ int lua_platform_exec_async(lua_State* L);
// function platform.exec_async_cb(cmd: string, // function platform.exec_async_cb(cmd: string,
// function on_complete(cmd: string, exit_code: int, overall_output: string), // function on_complete(cmd: string, exit_code: int, overall_output: string),
// function on_stdout(cmd: string, overall_output: string, delta_output: string) // function on_stdout(cmd: string, overall_output: string, delta_output: string)
// function on_stderr(cmd: string, overall_output: string, delta_output: string)
// ) // )
int lua_platform_exec_async_cb(lua_State* L); int lua_platform_exec_async_cb(lua_State* L);
// Executes the same command up to N times in parallel, iterating through the args provided until all arguments have been used // Executes the same command up to N times in parallel, iterating through the args provided until all arguments have been used
// array[promise<int>, promise<string>] function platform.exec_parallel_async(cmd: string, N: int, args: array<array<string>>) // array[promise<int>, promise<string>, promise<string>] function platform.exec_parallel_async(cmd: string, N: int, args: array<array<string>>)
int lua_platform_exec_parallel_async(lua_State* L); int lua_platform_exec_parallel_async(lua_State* L);
// array<int>, array<string> function platform.exec_parallel(cmd: string, N: int, args: array<array<string>>) // array<int>, array<string>, array<string> function platform.exec_parallel(cmd: string, N: int, args: array<array<string>>)
int lua_platform_exec_parallel(lua_State* L); int lua_platform_exec_parallel(lua_State* L);

View File

@@ -4,6 +4,7 @@
#include "mutex" #include "mutex"
#include <chrono> #include <chrono>
#include "simple_string.h" #include "simple_string.h"
#include "uniproc/include/uniproc.h"
int luaopen_platform(lua_State* L) int luaopen_platform(lua_State* L)
{ {
@@ -23,94 +24,43 @@ int lua_platform_exec(lua_State* L)
{ {
luaL_argcheck(L, lua_isstring(L, 1), 1, "Expected first argument to be a string"); luaL_argcheck(L, lua_isstring(L, 1), 1, "Expected first argument to be a string");
luaL_argcheck(L, lua_istable(L, 2), 2, "Expected second argument to be a table of strings"); luaL_argcheck(L, lua_istable(L, 2), 2, "Expected second argument to be a table of strings");
std::string cmd(lua_tostring(L, 1));
size_t nargs = lua_objlen(L, 2); size_t nargs = lua_objlen(L, 2);
char** argv = malloc(nargs * sizeof(char*));
for (size_t i = 0; i < nargs; ++i) for (size_t i = 0; i < nargs; ++i)
{ {
lua_rawgeti(L, -1, i + 1); lua_rawgeti(L, -1, i + 1);
if (!lua_isstring(L, -1)) if (!lua_isstring(L, -1))
return luaL_error(L, "Expected all values in argument table to be a string. Index %d was a %s", i, lua_typename(L, lua_type(L, -1))); return luaL_error(L, "Expected all values in argument table to be a string. Index %d was a %s", i, lua_typename(L, lua_type(L, -1)));
cmd.append(" ").append(lua_tostring(L, -1)); argv[i] = lua_tostring(L, -1);
} }
FILE* f = popen(cmd.c_str(), "r"); int retcode = 0;
std::string std_out;
std::string std_err;
char buf[64]; char buf[64];
std::string result;
while (fgets(buf, sizeof(buf), f) != NULL)
result.append(buf);
lua_pushinteger(L, pclose(f));
lua_pushlstring(L, result.c_str(), result.size());
return 2;
}
struct command_stack
{
std::string base_cmd;
dynarray<dynarray<std::string>> commands;
std::recursive_mutex mux;
size_t num_cmds(void) const noexcept
{ return commands.size(); }
bool is_empty(void) const noexcept
{ return commands.size() == 0; }
std::pair<dynarray<std::string>, size_t> get_command(void) noexcept
{
std::scoped_lock lock(mux);
const size_t sz = commands.size();
return std::make_pair(std::move(commands.pop_back()),sz-1);
}
};
struct result_stack
{
struct result
{
std::string program_output;
int program_ret;
};
std::recursive_mutex mux; uniproc_process p = uniproc_create_process(lua_tostring(L, 1), nargs, argv);
dynarray<result> results; if (uniproc_is_process_null(&p))
void add_result(const size_t idx, const std::string& out, const int ret)
{ {
std::scoped_lock lock(mux); lua_pushnil(L);
new (results.data() + idx) result(out, ret); lua_pushstring(L, "uniproc: failed to create process");
return 2;
} }
}; uniproc_await_processes(&p, &retcode, 1);
while (fgets(buf, sizeof(buf), p.out) != NULL)
std_out.append(buf);
while (fgets(buf, sizeof(buf), p.err) != NULL)
std_err.append(buf);
uniproc_close_process(&p);
free(argv);
lua_pushinteger(L, retcode);
void worker_thread(std::stop_token token, const std::string& base_cmd, command_stack& in, result_stack& out) lua_pushlstring(L, std_out.c_str(), std_out.size());
{ lua_pushlstring(L, std_err.c_str(), std_err.size());
char buf[64]; return 3;
while (!token.stop_requested())
{
std::string cmd = base_cmd;
if (in.is_empty()) return;
auto [arg, idx] = in.get_command();
// Get a command
for (const std::string& a : arg)
cmd.append(" ").append(a);
// Run the command
FILE* f = popen(cmd.c_str(), "r");
// Capture the commands stdout
std::string pout = "";
memset(buf, '\0', sizeof(buf));
while (fgets(buf, sizeof(buf), f) != NULL)
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(10us);
pout.append(buf);
}
// Await the program to exit
int rc = pclose(f);
out.add_result(idx, pout, rc);
}
} }
int lua_platform_exec_parallel(lua_State* L) int lua_platform_exec_parallel(lua_State* L)
@@ -118,16 +68,13 @@ int lua_platform_exec_parallel(lua_State* L)
luaL_argcheck(L, lua_type(L, 1) == LUA_TSTRING, 1, "Expected first argument to be a string representing a command"); luaL_argcheck(L, lua_type(L, 1) == LUA_TSTRING, 1, "Expected first argument to be a string representing a command");
luaL_argcheck(L, lua_type(L, 2) == LUA_TNUMBER, 2, "Expected second argument to be an integer representing the maximum number of parallel jobs to start"); luaL_argcheck(L, lua_type(L, 2) == LUA_TNUMBER, 2, "Expected second argument to be an integer representing the maximum number of parallel jobs to start");
luaL_argcheck(L, lua_type(L, 3) == LUA_TTABLE, 3, "Expected third argument to be a table of table of strings representing the arguments"); luaL_argcheck(L, lua_type(L, 3) == LUA_TTABLE, 3, "Expected third argument to be a table of table of strings representing the arguments");
std::string base_cmd(lua_tostring(L, 1)); const char* cmd = (lua_tostring(L, 1));
const size_t max_parallel = lua_tointeger(L, 2); const size_t max_parallel = lua_tointeger(L, 2);
const size_t num_cmds = lua_objlen(L, 3); const size_t num_cmds = lua_objlen(L, 3);
command_stack in; const size_t* num_args = (size_t*)malloc(num_cmds * sizeof(size_t));
in.base_cmd = base_cmd; const char*** args = (const char***)malloc(num_cmds*sizeof(const char**));
result_stack out;
for (size_t i = 0; i < num_cmds; ++i)
out.results.push_back(result_stack::result{"Not Executed",-1});
// Generate the command stack from lua arguments // Generate the command stack from lua arguments
for (size_t i = 0; i < num_cmds; ++i) for (size_t i = 0; i < num_cmds; ++i)
{ {
@@ -135,57 +82,102 @@ int lua_platform_exec_parallel(lua_State* L)
if (lua_istable(L, -1)) if (lua_istable(L, -1))
{ {
const size_t num_args = lua_objlen(L, -1); const size_t num_args = lua_objlen(L, -1);
in.commands.push_back(dynarray<std::string>()); args[i] = (const char**)malloc(num_args*sizeof(const char*));
dynarray<std::string>& back = in.commands.back();
for (size_t j = 0; j < num_args; ++j) for (size_t j = 0; j < num_args; ++j)
{ {
lua_rawgeti(L, -1, j + 1); lua_rawgeti(L, -1, j + 1);
if (!lua_isstring(L, -1)) if (!lua_isstring(L, -1))
return luaL_error(L, "Expected program argument to be a string. args[%d][%d] was a %s, not a string!\n", i, j, lua_typename(L, lua_type(L, -1))); return luaL_error(L, "Expected program argument to be a string. args[%d][%d] was a %s, not a string!\n", i, j, lua_typename(L, lua_type(L, -1)));
std::string str(lua_tostring(L, -1), lua_objlen(L, -1)); args[i][j] = lua_tostring(L, -1);
back.push_back(std::move(str));
lua_pop(L, 1); lua_pop(L, 1);
} }
} }
else if (lua_type(L, -1) == LUA_TSTRING) else if (lua_type(L, -1) == LUA_TSTRING)
{ {
in.commands.push_back(dynarray<std::string>()); args[i] = (const char**)malloc(1*sizeof(const char*));
in.commands.back().push_back(lua_tostring(L, -1)); args[i][0] = lua_tostring(L, -1);
} }
else else
return luaL_error(L, "Expected a table of program arguments. args[%d] was a %s, not a table!\n", i, lua_typename(L, lua_type(L, -1))); return luaL_error(L, "Expected a table of program arguments. args[%d] was a %s, not a table!\n", i, lua_typename(L, lua_type(L, -1)));
lua_pop(L, 1); lua_pop(L, 1);
} }
// Dispatch worker threads uniproc_process* workers = (uniproc_process*)malloc(max_parallel*sizeof(uniproc_process));
std::vector<std::jthread> workers; uniproc_nullify_processes(workers, max_parallel);
for (size_t i = 0; i < max_parallel; ++i) int* retcodes = (int*)malloc(num_cmds*sizeof(int));
workers.push_back(std::jthread(worker_thread, memset(retcodes, 0, num_cmds);
std::ref(base_cmd),
std::ref(in), std::vector<std::string> std_outs;
std::ref(out) std::vector<std::string> std_errs;
)); std_outs.resize(num_cmds, "");
std_errs.resize(num_cmds, "");
size_t cmd_idx = 0;
while (cmd_idx < num_cmds || !uniproc_are_processes_finished(workers, max_parallel))
{
for (uniproc_process* p = workers; p != (workers + max_parallel); ++p)
{
// Check and handle a worker if it has completed
if (uniproc_are_processes_finished(p, 1))
{
char buf[64];
// Read the std. streams into the output vectors
while (fgets(buf, sizeof(buf), p.out) != NULL)
std_outs[p.userdata].append(buf);
while (fgets(buf, sizeof(buf), p.err) != NULL)
std_errs[p.userdata].append(buf);
// Add the process return code to retcode array
uniproc_await_processes(p, retcodes + p.userdata, 1);
// Close the process and zero it
uniproc_close_process(p);
}
// If a worker is null and there is more jobs to complete
// Create a new process
// Is a while loop to handle the possibility that process creation fails
while (uniproc_is_process_null(p) && cmd_idx < num_cmds)
{
// Create process
*p = uniproc_create_process(cmd, num_args[cmd_idx], args[cmd_idx]);
uniproc_set_userdata(p, num_cmds);
cmd_idx++;
// If we failed to create the process, set the output for it to an error value
if (uniproc_is_process_null(p))
{
std_outs[p->userdata] = "uniproc: failed to create process";
std_errs[p->userdata] = "uniproc: failed to create process";
retcodes[p->userdata] = -1;
}
}
}
// Await a process completing before trying to iterate again
// Prevents spinning the CPU
uniproc_await_any_processes(workers, max_parallel);
}
for (size_t i = 0; i < num_cmds; ++i)
free(args[i]);
free(num_args);
free(args);
// Don't need to close processes as that is handled in the command loop
free(workers);
using namespace std::chrono_literals;
while (!in.is_empty())
std::this_thread::sleep_for(1ms);
for (size_t i = 0; i < workers.size(); ++i)
workers[i].request_stop();
for (size_t i = 0; i < workers.size(); ++i)
workers[i].join();
// Join thread results together and push them to lua stack // Join thread results together and push them to lua stack
lua_newtable(L); // rc lua_newtable(L); // rc
lua_newtable(L); // stdout lua_newtable(L); // stdout
for (size_t i = 0; i < out.results.size(); ++i) lua_newtable(L); // stderr
for (size_t i = 0; i < num_cmds; ++i)
{ {
const result_stack::result& x = out.results[i]; lua_pushinteger(L, retcodes[i]);
lua_pushinteger(L, x.program_ret); lua_rawseti(L, -4, i+1);
lua_pushlstring(L, std_outs[i].c_str(), std_outs[i].size());
lua_rawseti(L, -3, i+1); lua_rawseti(L, -3, i+1);
lua_pushlstring(L, x.program_output.c_str(), x.program_output.size()); lua_pushlstring(L, std_errs[i].c_str(), std_errs[i].size());
lua_rawseti(L, -2, i + 1); lua_rawseti(L, -2, i+1);
} }
return 2; return 3;
} }