let runner
create_worker workers_waiting
conf logger chooser test_cases =
let map_test_cases =
List.fold_left
(fun mp ((test_path, _, _) as test_case) ->
MapPath.add test_path test_case mp)
MapPath.empty
test_cases
in
let state = OUnitState.create conf chooser test_cases in
let shards = max (shards conf) 1 in
let master_id = logger.OUnitLogger.lshard in
let worker_idx = ref 1 in
let test_per_worker, incr_tests_per_worker =
OUnitUtils.make_counter ()
in
let health_check_per_worker, incr_health_check_per_worker =
OUnitUtils.make_counter ()
in
let () = infof logger "Using %d workers maximum." shards; in
let worker_log_file =
if not (OUnitLoggerStd.is_output_file_shard_dependent conf) then begin
warningf logger
"-output-file doesn't include $(shard_id), shards won't have file log.";
false
end else begin
true
end
in
let master_shared = OUnitShared.noscope_create () in
let process_message worker msg state =
match msg with
| AckExit ->
let msg_opt =
infof logger "Worker %s has ended." worker.shard_id;
worker.close_worker ()
in
OUnitUtils.opt
(errorf logger "Worker return status: %s")
msg_opt;
remove_idle_worker worker state
| Log log_ev ->
OUnitLogger.report (set_shard worker.shard_id logger) log_ev;
state
| Lock id ->
worker.channel.send_data
(AckLock (master_shared.OUnitShared.try_lock id));
state
| Unlock id ->
master_shared.OUnitShared.unlock id;
state
| TestDone test_result ->
OUnitState.test_finished conf test_result worker state
in
let declare_dead_worker test_path worker result state =
let log_pos = position logger in
report logger (TestEvent (test_path, EResult result));
report logger (TestEvent (test_path, EEnd));
remove_idle_worker
worker
(test_finished conf
((test_path, result, log_pos), [])
worker state)
in
let kill_timeout state =
List.fold_left
(fun state (test_path, test_length, worker) ->
let _msg : string option =
errorf logger "Worker %s, running test %s has timed out."
worker.shard_id (string_of_path test_path);
worker.close_worker ()
in
declare_dead_worker test_path worker (RTimeout test_length) state)
state
(get_worker_timed_out state)
in
let check_health state =
List.fold_left
(fun state (test_path, worker) ->
incr_health_check_per_worker worker.shard_id;
if worker.is_running () then begin
update_test_activity test_path state
end else begin
let result_msg =
errorf logger
"Worker %s, running test %s is not running anymore."
worker.shard_id (string_of_path test_path);
match worker.close_worker () with
| Some msg ->
Printf.sprintf "Worker stops running: %s" msg
| None ->
"Worker stops running for unknown reason."
in
declare_dead_worker test_path worker
(RError (result_msg, None))
state
end)
state
(get_worker_need_health_check state)
in
let rec wait_test_done state =
let state = (check_health (kill_timeout state)) in
if get_workers state <> [] then begin
let workers_waiting_lst =
infof logger "%d tests running: %s."
(count_tests_running state)
(String.concat ", "
(List.map string_of_path (get_tests_running state)));
workers_waiting (get_workers state) (timeout state)
in
List.fold_left
(fun state worker ->
process_message worker (worker.channel.receive_data ()) state)
state
workers_waiting_lst
end else begin
state
end
in
let rec wait_stopped state =
if OUnitState.get_workers state = [] then
state
else
wait_stopped (wait_test_done state)
in
let rec iter state =
match OUnitState.next_test_case conf logger state with
| Not_enough_worker, state ->
if OUnitState.count_worker state < shards then begin
let shard_id = OUnitUtils.shardf !worker_idx in
let () = infof logger "Starting worker number %s." shard_id in
let worker =
create_worker
conf map_test_cases shard_id master_id worker_log_file
in
let () = infof logger "Worker %s started." worker.shard_id in
let state = add_worker worker state in
incr worker_idx;
iter state
end else begin
iter (wait_test_done state)
end
| Try_again, state ->
iter (wait_test_done state)
| Next_test_case (test_path, _, worker), state ->
incr_tests_per_worker worker.shard_id;
worker.channel.send_data (RunTest test_path);
iter state
| Finished, state ->
let count_tests_running = OUnitState.count_tests_running state in
if count_tests_running = 0 then begin
let state =
List.iter
(fun worker -> worker.channel.send_data Exit)
(OUnitState.get_workers state);
wait_stopped state
in
infof logger "Used %d worker during test execution."
(!worker_idx - 1);
List.iter
(fun (shard_id, count) ->
infof logger "Run %d tests with shard %s."
count shard_id)
(test_per_worker ());
List.iter
(fun (shard_id, count) ->
infof logger "Check health of shard %s, %d times."
shard_id count)
(health_check_per_worker ());
OUnitState.get_results state
end else begin
infof logger "Still %d tests running : %s." count_tests_running
(String.concat ", "
(List.map string_of_path
(get_tests_running state)));
iter (wait_test_done state)
end
in
iter state