Changeset 4126

Show
Ignore:
Timestamp:
03/09/08 15:15:13 (10 months ago)
Author:
pmoura
Message:

When executing a threaded/1 call, use the message queue of the calling thread for collecting individual thread results instead of creating a new message queue. In addition, join individual threads as soon as they return their results, freeing resources. Individual thread cancellation is still buggy, at least when using SWI-Prolog on MacOS X 10.5.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/compiler/logtalk.pl

    r4125 r4126  
    73367336'$lgt_tr_threaded_and_call'(TGoals, MTCalls, MTExits) :- 
    73377337    '$lgt_tr_threaded_and_call'(TGoals, Queue, MTGoals, Ids, Results), 
    7338     MTCalls = (message_queue_create(Queue), MTGoals, '$lgt_mt_check_threads'(Ids, Queue)), 
    7339     MTExits = ('$lgt_mt_threaded_and_exit'(TGoals, Queue, Results), message_queue_destroy(Queue)). 
     7338    MTCalls = (thread_self(Queue), MTGoals, thread_send_message(Queue, '$lgt_workers'(Ids))), 
     7339    MTExits = ('$lgt_mt_check_threads'(Ids, Queue), '$lgt_mt_threaded_and_exit'(TGoals, Ids, Results)). 
    73407340 
    73417341 
     
    73517351'$lgt_tr_threaded_or_call'(TGoals, MTCalls, MTExits) :- 
    73527352    '$lgt_tr_threaded_or_call'(TGoals, Queue, MTGoals, Ids, Results), 
    7353     MTCalls = (message_queue_create(Queue), MTGoals, '$lgt_mt_check_threads'(Ids, Queue)), 
    7354     MTExits = ('$lgt_mt_threaded_or_exit'(TGoals, Queue, Results), message_queue_destroy(Queue)). 
     7353    MTCalls = (thread_self(Queue), MTGoals, thread_send_message(Queue, '$lgt_workers'(Ids))), 
     7354    MTExits = ('$lgt_mt_check_threads'(Ids, Queue), '$lgt_mt_threaded_or_exit'(TGoals, Ids, Results)). 
    73557355 
    73567356 
     
    1293812938'$lgt_mt_threaded_and_call'(TGoal, Queue) :- 
    1293912939    thread_self(Id), 
    12940     (   catch(TGoal, Error, (thread_send_message(Queue, Id::exception(Error)), throw(Error))) -> 
    12941         thread_send_message(Queue, Id::true(TGoal)) 
    12942     ;   thread_send_message(Queue, Id::fail) 
     12940    (   catch(TGoal, Error, (thread_send_message(Queue, '$lgt_result'(Id, exception(Error))), throw(Error))) -> 
     12941        thread_send_message(Queue, '$lgt_result'(Id, true(TGoal))) 
     12942    ;   thread_send_message(Queue, '$lgt_result'(Id, fail)) 
    1294312943    ). 
    1294412944 
     
    1295512955'$lgt_mt_check_threads'([Id| Ids], Queue) :- 
    1295612956    (   thread_property(Id, status(exception(Error))) -> 
    12957         thread_send_message(Queue, Id::exception(Error)) 
     12957        thread_send_message(Queue, '$lgt_result'(Id, exception(Error))) 
     12958    ;   '$lgt_mt_check_threads'(Ids) 
     12959    ). 
     12960 
     12961 
     12962'$lgt_mt_check_threads'([]). 
     12963 
     12964'$lgt_mt_check_threads'([Id| Ids]) :- 
     12965    (   thread_property(Id, status(exception(Error))) -> 
     12966        thread_join(Id, _) 
    1295812967    ;   true 
    1295912968    ), 
    12960     '$lgt_mt_check_threads'(Ids, Queue). 
    12961  
    12962  
    12963  
    12964 % '$lgt_mt_threaded_and_exit'(+callable, +message_queue_identifier, +list) 
     12969    '$lgt_mt_check_threads'(Ids). 
     12970 
     12971 
     12972 
     12973% '$lgt_mt_threaded_and_exit'(+callable, +list(thread_identifier), +list) 
    1296512974% 
    1296612975% retrieves the result of proving a conjunction of goals using a threaded/1 predicate 
    1296712976% call by collecting the individual thread results posted to the call message queue 
    1296812977 
    12969 '$lgt_mt_threaded_and_exit'(TGoals, Queue, Results) :- 
    12970     thread_get_message(Queue, Id::Result), 
    12971     '$lgt_mt_threaded_and_exit'(Result, Id, TGoals, Queue, Results). 
    12972  
    12973  
    12974 '$lgt_mt_threaded_and_exit'(exception(Error), _, _, Queue, Results) :- 
    12975     '$lgt_mt_threaded_call_abort'(Results), 
    12976     message_queue_destroy(Queue), 
     12978'$lgt_mt_threaded_and_exit'(TGoals, Ids, Results) :- 
     12979    thread_get_message('$lgt_result'(Id, Result)), 
     12980    thread_join(Id, _), 
     12981    '$lgt_mt_threaded_and_exit'(Result, Id, TGoals, Ids, Results). 
     12982 
     12983 
     12984'$lgt_mt_threaded_and_exit'(exception(Error), Id, _, Ids, _) :- 
     12985    '$lgt_mt_threaded_call_abort'(Ids, Id), 
    1297712986    throw(Error). 
    1297812987 
    12979 '$lgt_mt_threaded_and_exit'(true(Tgoal), Id, TGoals, Queue, Results) :- 
     12988'$lgt_mt_threaded_and_exit'(true(Tgoal), Id, TGoals, Ids, Results) :- 
    1298012989    '$lgt_mt_threaded_and_add_result'(Results, Id, Tgoal, Continue), 
    1298112990    (   Continue == false -> 
    12982         '$lgt_mt_threaded_and_clean'(Results, Queue), 
    1298312991        '$lgt_mt_threaded_and_exit_unify'(TGoals, Results) 
    12984     ;   '$lgt_mt_threaded_and_exit'(TGoals, Queue, Results) 
    12985     ). 
    12986  
    12987 '$lgt_mt_threaded_and_exit'(fail, _, _, Queue, Results) :- 
    12988     '$lgt_mt_threaded_call_abort'(Results), 
    12989     message_queue_destroy(Queue), 
     12992    ;   '$lgt_mt_threaded_and_exit'(TGoals, Ids, Results) 
     12993    ). 
     12994 
     12995'$lgt_mt_threaded_and_exit'(fail, Id, _, Ids, _) :- 
     12996    '$lgt_mt_threaded_call_abort'(Ids, Id), 
    1299012997    fail. 
    1299112998 
     
    1299913006 
    1300013007'$lgt_mt_threaded_and_exit_unify'(TGoal, [id(_, TGoal)]). 
    13001  
    13002  
    13003  
    13004 % joins all threads 
    13005  
    13006 '$lgt_mt_threaded_and_clean'([], _). 
    13007  
    13008 '$lgt_mt_threaded_and_clean'([id(Id, _)| Results], Queue) :- 
    13009     thread_join(Id, _), 
    13010     '$lgt_mt_threaded_and_clean'(Results, Queue). 
    1301113008 
    1301213009 
     
    1305113048'$lgt_mt_threaded_or_call'(TGoal, Queue) :- 
    1305213049    thread_self(Id), 
    13053     (   catch(TGoal, Error, (thread_send_message(Queue, Id::exception(Error)), throw(Error))) -> 
    13054         thread_send_message(Queue, Id::true(TGoal)) 
    13055     ;   thread_send_message(Queue, Id::fail) 
    13056     ). 
    13057  
    13058  
    13059  
    13060 % '$lgt_mt_threaded_or_exit'(+callable, +message_queue_identifier, +list) 
     13050    (   catch(TGoal, Error, (thread_send_message(Queue, '$lgt_result'(Id, exception(Error))), throw(Error))) -> 
     13051        thread_send_message(Queue, '$lgt_result'(Id, true(TGoal))) 
     13052    ;   thread_send_message(Queue, '$lgt_result'(Id, fail)) 
     13053    ). 
     13054 
     13055 
     13056 
     13057% '$lgt_mt_threaded_or_exit'(+callable, +list(thread_identifier), +list) 
    1306113058% 
    1306213059% retrieves the result of proving a disjunction of goals using a threaded/1 predicate 
    1306313060% call by collecting the individual thread results posted to the call message queue 
    1306413061 
    13065 '$lgt_mt_threaded_or_exit'(TGoals, Queue, Results) :- 
    13066     thread_get_message(Queue, Id::Result), 
    13067     '$lgt_mt_threaded_or_exit'(Result, Id, TGoals, Queue, Results). 
    13068  
    13069  
    13070 '$lgt_mt_threaded_or_exit'(exception(Error), _, _, Queue, Results) :- 
    13071     '$lgt_mt_threaded_call_abort'(Results), 
    13072     message_queue_destroy(Queue), 
     13062'$lgt_mt_threaded_or_exit'(TGoals, Ids, Results) :- 
     13063    thread_get_message('$lgt_result'(Id, Result)), 
     13064    thread_join(Id, _), 
     13065    '$lgt_mt_threaded_or_exit'(Result, Id, TGoals, Ids, Results). 
     13066 
     13067 
     13068'$lgt_mt_threaded_or_exit'(exception(Error), Id, _, Ids, _) :- 
     13069    '$lgt_mt_threaded_call_abort'(Ids, Id), 
    1307313070    throw(Error). 
    1307413071 
    13075 '$lgt_mt_threaded_or_exit'(true(TGoal), Id, TGoals, _, Results) :- 
    13076     '$lgt_mt_threaded_call_abort'(Results), 
     13072'$lgt_mt_threaded_or_exit'(true(TGoal), Id, TGoals, Ids, Results) :- 
     13073    '$lgt_mt_threaded_call_abort'(Ids, Id), 
    1307713074    '$lgt_mt_threaded_or_exit_unify'(TGoals, Results, Id, TGoal). 
    1307813075 
    13079 '$lgt_mt_threaded_or_exit'(fail, Id, TGoals, Queue, Results) :- 
     13076'$lgt_mt_threaded_or_exit'(fail, Id, TGoals, Ids, Results) :- 
    1308013077    '$lgt_mt_threaded_or_record_failure'(Results, Id, Continue), 
    1308113078    (   Continue == true -> 
    13082         '$lgt_mt_threaded_or_exit'(TGoals, Queue, Results) 
     13079        '$lgt_mt_threaded_or_exit'(TGoals, Ids, Results) 
    1308313080    ;   % all goals failed 
    13084         '$lgt_mt_threaded_call_abort'(Results), 
    13085         message_queue_destroy(Queue), 
     13081        '$lgt_mt_threaded_call_abort'(Ids, Id), 
    1308613082        fail 
    1308713083    ). 
     
    1313613132% we must use catch/3 as some thread may already be terminated 
    1313713133 
     13134'$lgt_mt_threaded_call_abort'(Ids, Id) :- 
     13135    '$lgt_mt_threaded_call_abort_filter'(Ids, Id, RIds), 
     13136    '$lgt_mt_threaded_call_abort'(RIds). 
     13137 
     13138 
     13139'$lgt_mt_threaded_call_abort_filter'([], _, []). 
     13140 
     13141'$lgt_mt_threaded_call_abort_filter'([Id| Ids], Id, Ids) :- 
     13142    !. 
     13143 
     13144'$lgt_mt_threaded_call_abort_filter'([RId| Ids], Id, [RId| RIds]) :- 
     13145    '$lgt_mt_threaded_call_abort_filter'(Ids, Id, RIds). 
     13146 
     13147 
    1313813148'$lgt_mt_threaded_call_abort'([]). 
    1313913149 
    13140 '$lgt_mt_threaded_call_abort'([id(Id, _)| Results]) :- 
    13141     catch(thread_signal(Id, (mutex_unlock_all, thread_exit(aborted))), _, true), 
    13142     thread_join(Id, _), 
    13143     '$lgt_mt_threaded_call_abort'(Results). 
    13144  
     13150'$lgt_mt_threaded_call_abort'([Id| Ids]) :- 
     13151    (   catch(thread_peek_message(Id, '$lgt_workers'(DIds)), _, fail) -> 
     13152        '$lgt_mt_threaded_call_abort'(DIds) 
     13153    ;   true 
     13154    ), 
     13155    catch(thread_signal(Id, thread_exit(aborted)), _, true), 
     13156    catch(thread_join(Id, _), _, true), 
     13157    '$lgt_mt_threaded_call_abort'(Ids). 
     13158     
    1314513159 
    1314613160