Index: nse_nmaplib.cc =================================================================== --- nse_nmaplib.cc (revision 15944) +++ nse_nmaplib.cc (working copy) @@ -325,6 +325,73 @@ return 1; // aux_mutex closure } +static int aux_condvar (lua_State *L) +{ + size_t i, n = 0; + enum {WAIT, SIGNAL, BROADCAST}; + static const char * op[] = {"wait", "signal", "broadcast"}; + switch (luaL_checkoption(L, 1, NULL, op)) + { + case WAIT: + lua_pushthread(L); + lua_rawseti(L, lua_upvalueindex(1), lua_objlen(L, lua_upvalueindex(1))+1); + return nse_yield(L); + case SIGNAL: + n = 1; + break; + case BROADCAST: + n = lua_objlen(L, lua_upvalueindex(1)); + break; + } + lua_pushvalue(L, lua_upvalueindex(1)); + for (i = n; i >= 1; i--) + { + lua_rawgeti(L, -1, i); /* get the thread */ + if (lua_isthread(L, -1)) + nse_restore(lua_tothread(L, -1), 0); + lua_pop(L, 1); /* pop the thread */ + lua_pushnil(L); + lua_rawseti(L, -2, i); + } + return 0; +} + +static int aux_condvar_done (lua_State *L) +{ + lua_State *thread = lua_tothread(L, 1); + lua_pushvalue(L, lua_upvalueindex(1)); // aux_condvar closure + lua_pushliteral(L, "broadcast"); // wake up all threads waiting + luaL_checkstack(thread, 2, "aux_condvar_done"); + lua_xmove(L, thread, 2); + if (lua_pcall(thread, 1, 0, 0) != 0) lua_pop(thread, 1); // pop error msg + return 0; +} + +static int l_condvar (lua_State *L) +{ + int t = lua_type(L, 1); + if (t == LUA_TNONE || t == LUA_TNIL || t == LUA_TBOOLEAN || t == LUA_TNUMBER) + luaL_argerror(L, 1, "object expected"); + lua_pushvalue(L, 1); + lua_gettable(L, lua_upvalueindex(1)); + if (lua_isnil(L, -1)) + { + lua_newtable(L); // waiting threads + lua_pushnil(L); // placeholder for aux_mutex_done + lua_pushcclosure(L, aux_condvar, 2); + lua_pushvalue(L, -1); // aux_condvar closure + lua_pushcclosure(L, aux_condvar_done, 1); + lua_setupvalue(L, -2, 2); // replace nil upvalue with aux_condvar_done + lua_pushvalue(L, 1); // "condition variable object" + lua_pushvalue(L, -2); // condvar function + lua_settable(L, lua_upvalueindex(1)); // Add to condition variable table + } + lua_pushvalue(L, -1); /* aux_condvar closure */ + lua_getupvalue(L, -1, 2); /* aux_mutex_done closure */ + nse_destructor(L, 'a'); + return 1; // condition variable closure +} + Target *get_target (lua_State *L, int index) { int top = lua_gettop(L); @@ -618,6 +685,14 @@ lua_setfield(L, -2, "mutex"); lua_newtable(L); + lua_createtable(L, 0, 1); + lua_pushliteral(L, "v"); + lua_setfield(L, -2, "__mode"); + lua_setmetatable(L, -2); // Allow closures to be collected (see l_condvar) + lua_pushcclosure(L, l_condvar, 1); /* condvar function */ + lua_setfield(L, -2, "condvar"); + + lua_newtable(L); lua_setfield(L, -2, "registry"); lua_pushcclosure(L, luaopen_nsock, 0); Index: nselib/stdnse.lua =================================================================== --- nselib/stdnse.lua (revision 15944) +++ nselib/stdnse.lua (working copy) @@ -259,3 +259,105 @@ end end +--- This function allows you to create worker threads that may perform +-- network tasks in parallel with your script thread. +-- +-- Any network task (e.g. socket:connect(...)) will cause the +-- running thread to yield to NSE. This allows network tasks to appear to be +-- blocking while being able to run multiple network tasks at once. +-- While this is useful for running multiple separate scripts, it is +-- unfortunately difficult for a script itself to perform network tasks in +-- parallel. In order to allow scripts to also have network tasks running in +-- parallel, we provide this function, stdnse.new_thread, to +-- create a new thread that can perform its own network related tasks +-- in parallel with the script. +-- +-- The script launches the worker thread by calling the new_thread +-- function with the parameters: +-- * The main Lua function for the script to execute, similar to the script action function. +-- * The variable number of arguments to be passed to the worker's main function. +-- +-- The stdnse.new_thread function will return two results: +-- * The worker thread's base (main) coroutine (useful for tracking status). +-- * An status query function (described below). +-- +-- The status query function shall return two values: +-- * The result of coroutine.status using the worker thread base coroutine. +-- * The error object thrown that ended the worker thread or nil if no error was thrown. This is typically a string, like most Lua errors. +-- +-- Note that NSE discards all return values of the worker's main function. You +-- must use function upvalues or environments to communicate results. +-- +-- You should use the condition variable (nmap.condvar) +-- and mutex (nmap.mutex) facilities to coordinate with your +-- worker threads. Keep in mind that Nmap is single threaded so there are +-- no (memory) synchrony issues to worry about; however, there is +-- resource contention. Your resources are usually network bandwidth, network +-- sockets, etc. You will need condition variables if the work for any single +-- thread is dynamic. For example, a web server spider script with a pool +-- of workers will initially have a single root html document. Following the +-- retrieval of the root document, the set of resources to be retrieved +-- (the worker's work) will become very large (an html document adds many +-- new hyperlinks (resources) to fetch). +--@name new_thread +--@class function +--@param main The main function of the worker thread. +--@param ... The arguments passed to the main worker thread. +--@return co The base coroutine of the worker thread. +--@return info A query function used to obtain status information of the worker. +--@usage +--local requests = {"/", "/index.html", --[[ long list of objects ]]} +-- +--function thread_main (host, port, responses, ...) +-- local condvar = nmap.condvar(responses); +-- local what = {n = select("#", ...), ...}; +-- local allReqs = nil; +-- for i = 1, what.n do +-- allReqs = http.pGet(host, port, what[i], nil, nil, allReqs); +-- end +-- local p = assert(http.pipeline(host, port, allReqs)); +-- for i, response in ipairs(p) do responses[#responses+1] = response end +-- condvar "signal"; +--end +-- +--function many_requests (host, port) +-- local threads = {}; +-- local responses = {}; +-- local condvar = nmap.condvar(responses); +-- local i = 1; +-- repeat +-- local j = math.min(i+10, #requests); +-- local co = stdnse.new_thread(thread_main, host, port, responses, +-- unpack(requests, i, j)); +-- threads[co] = true; +-- i = j+1; +-- until i > #requests; +-- repeat +-- condvar "wait"; +-- for thread in pairs(threads) do +-- if coroutine.status(thread) == "dead" then threads[thread] = nil end +-- end +-- until next(threads) == nil; +-- return responses; +--end +do end -- no function here, see nse_main.lua + +--- Returns the base coroutine of the running script. +-- +-- A script may be resuming multiple coroutines to facilitate its own +-- collaborative multithreading design. Because there is a "root" or "base" +-- coroutine that lets us determine whether the script is still active +-- (that is, the script did not end, possibly due to an error), we provide +-- this stdnse.base function that will retrieve the base +-- coroutine of the script. This base coroutine is the coroutine that runs +-- the action function. +-- +-- The base coroutine is useful for many reasons but here are some common +-- uses: +-- * We want to attribute the ownership of an object (perhaps a network socket) to a script. +-- * We want to identify if the script is still alive. +--@name base +--@class function +--@return coroutine Returns the base coroutine of the running script. +-- +do end -- no function here, see nse_main.lua Index: nselib/nmap.luadoc =================================================================== --- nselib/nmap.luadoc (revision 15944) +++ nselib/nmap.luadoc (working copy) @@ -166,6 +166,43 @@ -- end function mutex(object) +--- Create a condition variable for an object. +-- +-- This function returns a function that works as a condition variable for +-- the given object parameter. The object can be any Lua data type except +-- nil, Booleans, and Numbers. The returned function allows you +-- wait, signal, and broadcast on the condition variable. The returned +-- function takes only one argument, which must be one of +-- * "wait": Wait on the condition variable until another thread wakes us. +-- * "signal": Wake up a single thread from the waiting set of threads for this condition variable. +-- * "broadcast": Wake up all threads in the waiting set of threads for this condition variable. +-- In NSE, Condition Variables are typically used to coordinate with threads +-- created using the stdnse.new_thread facility. The worker threads must +-- wait until work is available that the master thread (the actual running +-- script) will provide. Once work is created, the master thread will awaken +-- one or more workers so that the work can be done. +-- +-- It is important to check the predicate (the test to see if your worker +-- thread should "wait" or not) BEFORE and AFTER the call to wait. You are +-- not guaranteed spurious wakeups will not occur (that is, there is no +-- guarantee your thread will not be awakened when no thread called +-- "signal" or "broadcast" on the condition variable). +-- One important check for your worker threads, before and after waiting, +-- should be to check that the master script thread is still alive. +-- (To check that the master script thread is alive, obtain the "base" thread +-- using stdnse.base). You do not want your worker threads to continue when +-- the script has ended for reasons unknown to your worker thread. +-- You are guaranteed that all threads waiting on a condition variable +-- will be awakened if any thread that has accessed the condition variable +-- via nmap.condvar ends for any reason. This is essential +-- to prevent deadlock with threads waiting for another thread to awaken +-- them that has ended unexpectedly. +-- @see stdnse.new_thread +-- @see stdnse.base +-- @param object Object to create a condition variable for. +-- @return ConditionVariable Condition variable function. +function condvar(object) + --- Creates a new exception handler. -- -- This function returns an exception handler function. The exception handler is Index: nse_main.lua =================================================================== --- nse_main.lua (revision 15944) +++ nse_main.lua (working copy) @@ -51,6 +51,7 @@ local next = next; local pairs = pairs; local rawget = rawget; +local rawset = rawset; local select = select; local setfenv = setfenv; local setmetatable = setmetatable; @@ -88,6 +89,8 @@ package.path = package.path..";"..path.."?.lua"; end +local stdnse = require "stdnse"; + (require "strict")() -- strict global checking -- NSE_YIELD_VALUE @@ -536,6 +539,34 @@ _R[SELECTED_BY_NAME] = function() return current and current.selected_by_name; end + rawset(stdnse, "new_thread", function (main, ...) + assert(type(main) == "function", "function expected"); + local co = create(function(...) main(...) end); -- do not return results + print_debug(2, "%s spawning new thread (%s).", + current.parent.info, tostring(co)); + local thread = { + co = co, + args = {n = select("#", ...), ...}, + host = current.host, + port = current.port, + parent = current.parent, + info = format("'%s' worker (%s)", current.short_basename, tostring(co)); + -- d = function(...) end, -- output no debug information + }; + local thread_mt = { + __metatable = Thread, + __index = current, + }; + setmetatable(thread, thread_mt); + total, all[co], pending[co] = total+1, thread, thread; + local function info () + return status(co), rawget(thread, "error"); + end + return co, info; + end); + rawset(stdnse, "base", function () + return current.co; + end); -- Loop while any thread is running or waiting. while next(running) or next(waiting) do @@ -581,6 +612,7 @@ thread:d("%THREAD against %s%s threw an error!\n%s\n", thread.host.ip, thread.port and ":"..thread.port.number or "", traceback(co, tostring(result))); + thread.error = result; thread:close(); elseif status(co) == "suspended" then if result == NSE_YIELD_VALUE then