MapReduceApiCallSeq
From Grid-Appliance Wiki
Sequence
api calling sequence
Initiating map/reduce task using "mapreduce.Start" rpc
- User initiates map/reduce using "mapreduce.Start"
- In HandleRpc, it gets proper "MapReduceComputaion" object using task name, and start the process
public void HandleRpc(ISender caller, string method, IList args, object req_state) { if (method == "Start") { if (_name_to_task.TryGetValue(task_name, out task)) { MapReduceComputation mr = new MapReduceComputation(_node, req_state, task, mr_args); mr.Start(); } } }
- At MapReduceComputaion.Start(), it first registers MapHanlder as a channel close event and initiates Map task to itself. It also registers GenTreeHandler and generates tree to disseminate the map/reduce task to known nodes. MapHandler() and GenTreeHandler() will be explained next.
public void Start() { Channel map_res = new Channel(1); map_res.CloseEvent += this.MapHandler; _mr_task.Map(map_res, _mr_args.MapArg); if( _state.Done ) { return; } Channel gentree_res = new Channel(1); gentree_res.CloseEvent += this.GenTreeHandler; _mr_task.GenerateTree(gentree_res, _mr_args); }
- At MapHandler(), it dequeues the map rpc result, and creates new state using map result. With the new state, it tries to reduce it. In UpdateMap(), it simply update MapResult, so I will skip explain the module.
protected void MapHandler(object chan, EventArgs args) { map_res = map_chan.Dequeue(); new_state = old_state.UpdateMap(map_res); TryNextReduce(new_state, old_state, new RpcResult(null, map_res), false); }
- At GenTreeHandler(), it dequeues generated tree result. After updating tree result, if the map/reduce task is done, return the result. If not, it registers ChildCallback() to handle the task result, and sends the map/reduce task to generated node list.
protected void GenTreeHandler(object chano, EventArgs args) { object gtchan_result = gtchan.Dequeue(); MapReduceInfo[] children = (MapReduceInfo[])gtchan_result; new_state = old_state.UpdateTree(children); if( new_state.Done ) { SendResult( new_state.ReduceResult ); return; } foreach(MapReduceInfo mri in children) { Channel child_q = new Channel(1, mri); child_q.CloseEvent += this.ChildCallback; _rpc.Invoke(mri.Sender, child_q, "mapreduce.Start", mri.Args.ToHashtable()); } }
- At UpdateTree(), it updates tree value, and checks whether the map/reduce task should be terminited or not. If it has no tree(child) to send the map/reduce task, and the map result is already reduced, it sets done flag false, which results in task termination.
public State UpdateTree(MapReduceInfo[] tree) { bool done=false; if( ReduceResult != DEFAULT_OBJ ) { done = done || (tree.Length == 0); } return new State(MapResult, tree, ChildReductions, ReduceResult, done, Pending, Reducing); }
- In ChildCallback(), it dequeues map/reduce result, and call Reduce() method to exceute reduce.
protected void ChildCallback(object cq, EventArgs arg) { RpcResult child_r; Channel child_q = (Channel)cq; child_r = (RpcResult)child_q.Dequeue(); Reduce(child_r); }
- In Reduce(), it creates new state using newly arrived map/reduce task. Using the result, it calls TryNextReduce()
protected void Reduce(RpcResult child_r) { new_state = old_state.AddChildResult(child_r); TryNextReduce(new_state, old_state, child_r, false); }
- If there is other task alreading reducing, it add the result at the pending task list and waits for other task's termination. If no other task is reducing and its own map result is not reduced yet, it add the result at the pending task and wait until its own map task returns result. Otherwise(no reducing task and its own map task is done), it creates new state to reduce the returned result.
public State AddChildResult(RpcResult child) { if( Reducing ) { var pend = new ImmutableList<RpcResult>(child, Pending); return new State(MapResult, Tree, ChildReductions, ReduceResult, Done, pend, true); } else if( MapResult == DEFAULT_OBJ ){ var pend = new ImmutableList<RpcResult>(child, Pending); return new State(MapResult, Tree, ChildReductions, ReduceResult, Done, pend, false); } else { return new State(MapResult, Tree, ChildReductions, ReduceResult, Done, ImmutableList<RpcResult>.Empty, true); } }
- In TryNextReduce(), it first checks the "Done" flag, if it is true, the task is finished, and send the result. If not, it checks whether it can execute reduce task.
protected bool TryNextReduce(State new_s, State old_s, RpcResult v, bool cont) { if( new_s.Done ) { SendResult( new_s.ReduceResult ); } if(new_s.Reducing && (cont || (false == old_s.Reducing))){ Channel r_chan = new Channel(1, v); r_chan.CloseEvent += this.ReduceHandler; _mr_task.Reduce(r_chan, _mr_args.ReduceArg, startval, v); } }
- At ReduceHandler(), it dequeues reduce result, and creates new state using UpdateReduce(). The new state is used for TryNextReduce()
protected void ReduceHandler(object ro, EventArgs args) { Channel reduce_chan = (Channel)ro; RpcResult value_reduced = (RpcResult)reduce_chan.State; retval = (Brunet.Util.Pair<object, bool>)r_o; new_state = old_state.UpdateReduce(retval, value_reduced.ResultSender != null); TryNextReduce(new_state, old_state, old_state.Pending.Head, true); }
- It first increases number of child reduction counter(child_reds), if the reduce result is from child. If the whole map/reduce task is not done, and there is a pending task, set the reduce flag true for next reduce task. If there is no pending task, and "done" flag is false, it checks whether the whole map/reduce task terminited by comparing child number task and tree length. The tree length means the number of child map/reduce task which the map/reduce task initiaor started.
public State UpdateReduce(Brunet.Util.Pair<object,bool> val, bool child_reduction) { int child_reds = ChildReductions; if( child_reduction ) { child_reds++; } if( false == Pending.IsEmpty && false == done ) { pend = Pending.Tail; reduce = true; } else { pend = ImmutableList<RpcResult>.Empty; reduce = false; if( false == done ) { if( Tree != null ) { done = (child_reds == Tree.Length); Console.WriteLine("Tree is not null in UpdateReduce done = " + done); } } } }


