Author: tmckay
Date: 2013-01-02 18:06:09 +0000 (Wed, 02 Jan 2013)
New Revision: 5626
Modified:
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
Allow hadoop "get" methods to filter results based on owner.
The owner argument may be a list of owners.
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2013-01-02 16:27:36 UTC
(rev 5625)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2013-01-02 18:06:09 UTC
(rev 5626)
@@ -936,8 +936,8 @@
assert callback
self._operate_on_ids(host, ids, callback, "stopNameNode")
- def get_name_node(self, host, ids, callback=None):
- return self._operate_on_ids(host, ids, callback, "getNameNode")
+ def get_name_node(self, host, ids, owner=None, callback=None):
+ return self._query_ids(host, ids, owner, callback, "getNameNode")
def start_data_node(self, host, nn_id, bin_file, owner, count, callback):
return self._start_node(host, nn_id, bin_file, owner, count,
@@ -946,8 +946,8 @@
def stop_data_node(self, host, ids, callback):
self._operate_on_ids(host, ids, callback, "stopDataNode")
- def get_data_node(self, host, ids, callback=None):
- return self._operate_on_ids(host, ids, callback, "getDataNode")
+ def get_data_node(self, host, ids, owner=None, callback=None):
+ return self._query_ids(host, ids, owner, callback, "getDataNode")
def start_job_tracker(self, host, nn_id, bin_file, owner, count, callback):
return self._start_node(host, nn_id, bin_file, owner, count,
@@ -956,8 +956,8 @@
def stop_job_tracker(self, host, ids, callback):
self._operate_on_ids(host, ids, callback, "stopJobTracker")
- def get_job_tracker(self, host, ids, callback=None):
- return self._operate_on_ids(host, ids, callback, "getJobTracker")
+ def get_job_tracker(self, host, ids, owner=None, callback=None):
+ return self._query_ids(host, ids, owner, callback, "getJobTracker")
def stop_task_tracker(self, host, ids, callback):
self._operate_on_ids(host, ids, callback, "stopTaskTracker")
@@ -966,20 +966,20 @@
return self._start_node(host, nn_id, bin_file, owner, count,
"startTaskTracker", callback)
- def get_task_tracker(self, host, ids, callback=None):
- return self._operate_on_ids(host, ids, callback, "getTaskTracker")
+ def get_task_tracker(self, host, ids, owner=None, callback=None):
+ return self._query_ids(host, ids, owner, callback, "getTaskTracker")
- def get_name_node_list(self, callback=None):
- return self._get_node_list(self.get_name_node, callback)
+ def get_name_node_list(self, owner=None, callback=None):
+ return self._get_node_list(self.get_name_node, owner, callback)
- def get_data_node_list(self, callback=None):
- return self._get_node_list(self.get_data_node, callback)
+ def get_data_node_list(self, owner=None, callback=None):
+ return self._get_node_list(self.get_data_node, owner, callback)
- def get_job_tracker_list(self, callback=None):
- return self._get_node_list(self.get_job_tracker, callback)
+ def get_job_tracker_list(self, owner=None, callback=None):
+ return self._get_node_list(self.get_job_tracker, owner, callback)
- def get_task_tracker_list(self, callback=None):
- return self._get_node_list(self.get_task_tracker, callback)
+ def get_task_tracker_list(self, owner=None, callback=None):
+ return self._get_node_list(self.get_task_tracker, owner, callback)
def _make_id(self, client, val):
@@ -1038,13 +1038,13 @@
client, meth_name, ref, bin_file, owner, count)
t.start()
- def _get_node_list(self, proc, callback):
+ def _get_node_list(self, proc, owner, callback):
try:
status = "OK"
hosts = self.get_hosts(self.resource, self.subtype)
data = []
for h in hosts:
- s, n = proc(h, [])
+ s, n = proc(h, [], owner)
if s == "OK":
data.extend(n)
except Exception, e:
@@ -1054,8 +1054,28 @@
callback(status, data)
else:
return (status, data)
-
- def _operate_on_ids(self, host, ids, callback, meth_name):
+
+ def _query_ids(self, host, ids, owner, callback, meth_name):
+ # Allow query results to be constrained by owner
+ def filter_results(results):
+ if type(owner) not in (list, tuple):
+ o = [owner]
+ else:
+ o = owner
+ res = []
+ for r in results:
+ if r.owner in o:
+ res.append(r)
+ return res
+
+ if owner is None:
+ return self._operate_on_ids(host, ids, callback, meth_name)
+ else:
+ return self._operate_on_ids(host, ids, callback, meth_name,
+ filter_results)
+
+
+ def _operate_on_ids(self, host, ids, callback, meth_name, filter_results=None):
if callback:
assert callable(callback)
@@ -1079,6 +1099,8 @@
status = _AviaryCommon._get_status(result.status)
if status == "OK" and hasattr(result, "results"):
data = result.results
+ if filter_results:
+ data = filter_results(data)
return (status, data)
if callback: