#include "lua_platform.h" #include "platform_agnostic.h" #include "dynarray.hpp" #include "mutex" #include #include "simple_string.h" #include "uniproc.h" int luaopen_platform(lua_State* L) { lua_newtable(L); lua_pushcfunction(L, lua_platform_exec); lua_setfield(L, -2, "exec"); lua_pushcfunction(L, lua_platform_exec_parallel); lua_setfield(L, -2, "exec_parallel"); lua_pushvalue(L, -1); lua_setfield(L, LUA_GLOBALSINDEX, "platform"); return 1; } int lua_platform_exec(lua_State* L) { luaL_argcheck(L, lua_isstring(L, 1), 1, "Expected first argument to be a string"); luaL_argcheck(L, lua_istable(L, 2), 2, "Expected second argument to be a table of strings"); size_t nargs = lua_objlen(L, 2); const char** argv = (const char**)malloc(nargs * sizeof(const char*)); for (size_t i = 0; i < nargs; ++i) { lua_rawgeti(L, 2, i+1); if (lua_type(L, -1) != LUA_TSTRING) return luaL_error(L, "Expected all values in argument table to be a string. Index %d was a %s", i+1, lua_typename(L, lua_type(L, -1))); argv[i] = lua_tostring(L, -1); } int retcode = 0; std::string std_out; std::string std_err; char buf[64]; uniproc_process p = uniproc_create_process(lua_tostring(L, 1), nargs, argv); if (uniproc_is_process_null(&p)) { lua_pushnil(L); lua_pushstring(L, "uniproc: failed to create process"); return 2; } uniproc_await_processes(&p, &retcode, 1); while (fgets(buf, sizeof(buf), p.out) != NULL) std_out.append(buf); while (fgets(buf, sizeof(buf), p.err) != NULL) std_err.append(buf); uniproc_close_process(&p); free(argv); lua_pushinteger(L, retcode); lua_pushlstring(L, std_out.c_str(), std_out.size()); lua_pushlstring(L, std_err.c_str(), std_err.size()); return 3; } int lua_platform_exec_parallel(lua_State* L) { try { luaL_argcheck(L, lua_type(L, 1) == LUA_TSTRING, 1, "Expected first argument to be a string representing a command"); luaL_argcheck(L, lua_type(L, 2) == LUA_TNUMBER, 2, "Expected second argument to be an integer representing the maximum number of parallel jobs to start"); luaL_argcheck(L, lua_type(L, 3) == LUA_TTABLE, 3, "Expected third argument to be a table of table of strings representing the arguments"); const char* cmd = (lua_tostring(L, 1)); const size_t max_parallel = lua_tointeger(L, 2); const size_t num_cmds = lua_objlen(L, 3); uniproc_process* workers = nullptr; int* retcodes = nullptr; size_t* num_args = nullptr; const char*** args = nullptr; std::vector std_outs; std::vector std_errs; try { num_args = (size_t*)malloc(num_cmds * sizeof(size_t)); memset(num_args, 0, sizeof(size_t) * num_cmds); args = (const char***)malloc(num_cmds * sizeof(const char**)); // Generate the command stack from lua arguments for (size_t i = 0; i < num_cmds; ++i) { lua_rawgeti(L, 3, i + 1); if (lua_istable(L, -1)) { const size_t tbl_num_args = lua_objlen(L, -1); num_args[i] = tbl_num_args; args[i] = (const char**)malloc(tbl_num_args * sizeof(const char*)); for (size_t j = 0; j < tbl_num_args; ++j) { lua_rawgeti(L, -1, j + 1); if (!lua_isstring(L, -1)) return luaL_error(L, "Expected program argument to be a string. args[%d][%d] was a %s, not a string!\n", i, j, lua_typename(L, lua_type(L, -1))); args[i][j] = lua_tostring(L, -1); lua_pop(L, 1); } } else if (lua_type(L, -1) == LUA_TSTRING) { num_args[i] = 1; args[i] = (const char**)malloc(1 * sizeof(const char*)); args[i][0] = lua_tostring(L, -1); } else return luaL_error(L, "Expected a table of program arguments. args[%d] was a %s, not a table!\n", i, lua_typename(L, lua_type(L, -1))); lua_pop(L, 1); } workers = (uniproc_process*)malloc(max_parallel * sizeof(uniproc_process)); uniproc_nullify_processes(workers, max_parallel); retcodes = (int*)malloc(num_cmds * sizeof(int)); memset(retcodes, 0, num_cmds); std_outs.resize(num_cmds, ""); std_errs.resize(num_cmds, ""); } catch (const std::exception& e) { fprintf(stderr, "%s:%d: Exception occured in preparation for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); return luaL_error(L, "%s:%d: Exception occured in preparation for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); } size_t cmd_idx = 0; while (cmd_idx < num_cmds || !uniproc_are_processes_finished(workers, max_parallel)) { try { for (uniproc_process* p = workers; p != (workers + max_parallel); ++p) { // Check and handle a worker if it has completed if (!uniproc_is_process_null(p) && uniproc_are_processes_finished(p, 1)) { char buf[64]; // Read the std. streams into the output vectors // TODO: Errors out here fprintf(stderr, "p->out = %p\n", p->out); while (fgets(buf, sizeof(buf), p->out) != NULL) std_outs[p->userdata].append(buf); fprintf(stderr, "C: %llu\n", cmd_idx); fprintf(stderr, "p->err = %p\n", p->err); while (fgets(buf, sizeof(buf), p->err) != NULL) std_errs[p->userdata].append(buf); fprintf(stderr, "D: %llu\n", cmd_idx); // Add the process return code to retcode array uniproc_await_processes(p, retcodes + p->userdata, 1); // Close the process and zero it uniproc_close_process(p); } // If a worker is null and there is more jobs to complete // Create a new process // Is a while loop to handle the possibility that process creation fails while (uniproc_is_process_null(p) && cmd_idx < num_cmds) { // Create process *p = uniproc_create_process(cmd, num_args[cmd_idx], args[cmd_idx]); p->userdata = cmd_idx; cmd_idx++; // If we failed to create the process, set the output for it to an error value if (uniproc_is_process_null(p)) { std_outs[p->userdata] = "uniproc: failed to create process"; std_errs[p->userdata] = "uniproc: failed to create process"; retcodes[p->userdata] = -1; } } } // Await a process completing before trying to iterate again // Prevents spinning the CPU uniproc_await_any_processes(workers, max_parallel); } catch (const std::exception& e) { fprintf(stderr, "%s:%d: Exception occured in parallel execution for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); return luaL_error(L, "%s:%d: Exception occured in parallel execution for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); } } for (size_t i = 0; i < num_cmds; ++i) free(args[i]); free(num_args); free(args); // Don't need to close processes as that is handled in the command loop free(workers); // Join thread results together and push them to lua stack lua_newtable(L); // rc lua_newtable(L); // stdout lua_newtable(L); // stderr for (size_t i = 0; i < num_cmds; ++i) { lua_pushinteger(L, retcodes[i]); lua_rawseti(L, -4, i + 1); lua_pushlstring(L, std_outs[i].c_str(), std_outs[i].size()); lua_rawseti(L, -3, i + 1); lua_pushlstring(L, std_errs[i].c_str(), std_errs[i].size()); lua_rawseti(L, -2, i + 1); } return 3; } catch (const std::exception& e) { fprintf(stderr, "%s:%d: Exception occured in cleanup and result handling for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); return luaL_error(L, "%s:%d: Exception occured in cleanup and result handling for fs.parallel_exec(...)\n\t%s\n", __FUNCTION__, __LINE__, e.what()); } }