Author: tmckay
Date: 2011-08-17 18:17:46 +0000 (Wed, 17 Aug 2011)
New Revision: 4935
Modified:
trunk/cumin/python/cumin/grid/job.py
trunk/cumin/python/cumin/model.py
trunk/sage/python/sage/aviary/aviaryoperations.py
trunk/sage/python/sage/qmf/qmfoperations.py
Log:
Initial ops using aviary query server (fetch_job_data and get_job_ad)
Modified: trunk/cumin/python/cumin/grid/job.py
===================================================================
--- trunk/cumin/python/cumin/grid/job.py 2011-08-17 15:52:36 UTC (rev 4934)
+++ trunk/cumin/python/cumin/grid/job.py 2011-08-17 18:17:46 UTC (rev 4935)
@@ -864,10 +864,9 @@
id = self.frame.id.get(session)
job_server = self.frame.get_job_server(session, id)
job_id = self.frame.job_id.get(session)
- file, start, end = self.get_file_args(session)
+ state, file, start, end = self.get_file_args(session)
if file:
- result = self.app.remote.fetch_job_data(job_server,
- job_id,
+ result = self.app.remote.fetch_job_data(job_server, job_id, state,
file, start, end, {'Data':
""})
if result.error:
return result.status
@@ -882,8 +881,9 @@
else:
start = 0
end = 2048
- file = self.which_file.get_current_file_name(session)
- return (file, start, end)
+ state = self.which_file.get_current_state(session)
+ file = self.which_file.get_file_name(session, state)
+ return (state, file, start, end)
def render_loading(self, session, *args):
file = self.which_file.get_current_file_name(session)
@@ -1010,6 +1010,9 @@
state = self.get(session)
return self.get_file_name(session, state)
+ def get_current_state(self, session):
+ return self.get(session)
+
def select_first_enabled(self, session):
states = self.get_items(session)
disabled = self.disabled.get(session)
Modified: trunk/cumin/python/cumin/model.py
===================================================================
--- trunk/cumin/python/cumin/model.py 2011-08-17 15:52:36 UTC (rev 4934)
+++ trunk/cumin/python/cumin/model.py 2011-08-17 18:17:46 UTC (rev 4935)
@@ -642,7 +642,9 @@
def render_universe(self, session, value):
try:
- return self.universes[value]
+ # might have a string version of universe
+ # value passed in here...
+ return self.universes[int(value)]
except KeyError:
return "Unknown (%s)" % str(value)
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-17 15:52:36 UTC (rev 4934)
+++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-17 18:17:46 UTC (rev 4935)
@@ -85,9 +85,10 @@
self.job_client_pool = JobClientPool(job_wsdl, None)
self.query_client_pool = QueryClientPool(query_wsdl, None)
- self.type_map = self._type_map()
+ self.type_to_aviary = self._type_to_aviary()
+ self.aviary_to_type = self._aviary_to_type()
-# scheduler operations
+# job server operations
def set_job_attribute(self, scheduler, job_id, name, value, callback):
assert callback
@@ -184,7 +185,7 @@
extra.type = "EXPRESSION"
else:
try:
- extra.type = self.type_map[type(v)]
+ extra.type = self.type_to_aviary[type(v)]
except KeyError:
extra.type = "UNDEFINED"
extra.value = v
@@ -224,14 +225,130 @@
"removeJob",
callback, default, timeout)
+# query server ops
+
+ def fetch_job_data(self, job_server, job_id, ftype, file, start, end,
+ default=None, timeout=5):
+ # Aviary doesn't use the file name as does QMF, instead it
+ # specifies the file type and lets condor figure out the path.
+
+ client = self.query_client_pool.get_object()
+
+ def my_process_results(result):
+ # Fix up the exception message if necessary
+ result = self._pretty_result(result, job_server.Machine)
+ if isinstance(result, Exception):
+ status = result
+ data = None
+ else:
+ status = AviaryOperations._get_status(result.status)
+ # Match the format expected by Cumin. This is
+ # the format used by the QMF call...
+ data = {'Data': result.content}
+ return (status, data)
+
+ host = self._get_host(job_server.Machine, self.query_servers)
+ if host == "":
+ self._raise_no_host(job_server.Machine)
+
+ service = host + "getJobData"
+
+ # Have to set the URL for the method. This might go away someday...
+ #print "Aviary control job " + url
+ client.set_options(location=service)
+
+ # Make a job data parameter (see query wsdl)
+ jobData = client.factory.create('ns0:JobData')
+ jobData.id.job = job_id
+ jobData.id.pool = job_server.Pool
+
+ # Translate cumin file type to Aviary file type
+ if ftype == "e":
+ jobData.type = "ERR"
+ elif ftype == "o":
+ jobData.type = "OUT"
+ elif ftype == "u":
+ jobData.type = "LOG"
+ else:
+ # We can't translate the type.
+ # Let Aviary throw an error on this instead of us.
+ jobData.type = ftype
+
+ from_end = start < 0
+ max_bytes = abs(end - start)
+
+ res = self._call_sync(my_process_results, client.service.getJobData,
+ jobData, max_bytes, from_end )
+ self.query_client_pool.return_object(client)
+ return res;
+
+ def get_job_ad(self, job_server, job_id, default=None, timeout=5):
+
+ client = self.query_client_pool.get_object()
+
+ def make_tuple(attr):
+ if attr.type in self.aviary_to_type:
+ try:
+ v = self.aviary_to_type[attr.type](attr.value)
+ except:
+ v = attr.value
+ else:
+ v = attr.value
+ return (attr.name, v)
+
+ def make_dict(attrs):
+ return dict([make_tuple(attr) for attr in attrs])
+
+ def my_process_results(result):
+ # Fix up the exception message if necessary
+ result = self._pretty_result(result, job_server.Machine)
+ if isinstance(result, Exception):
+ status = result
+ data = None
+ else:
+ status = AviaryOperations._get_status(result[0].status)
+ # Match the format expected by Cumin. This is
+ # the format used by the QMF call. We have a list
+ # of attributes in attrs that we need to make into
+ # a dictionary
+ ads = make_dict(result[0].details.attrs)
+ data = {'JobAd': ads}
+ return (status, data)
+
+ host = self._get_host(job_server.Machine, self.query_servers)
+ if host == "":
+ self._raise_no_host(job_server.Machine)
+
+ service = host + "getJobDetails"
+
+ # Have to set the URL for the method. This might go away someday...
+ #print "Aviary control job " + url
+ client.set_options(location=service)
+
+ # Make a job id parameter (see job wsdl)
+ jobId = client.factory.create('ns0:JobID')
+ jobId.job = job_id
+ jobId.pool = job_server.Pool
+ # still don't have the scheduler filled in here
+ # or the submission. Ditto with other jobId values in this module
+
+ res = self._call_sync(my_process_results,
+ client.service.getJobDetails, jobId)
+ self.query_client_pool.return_object(client)
+ return res;
+
# Secret private implementation stuff, don't look!
@classmethod
- def _type_map(cls):
+ def _type_to_aviary(cls):
# Need to be able to turn simple Python types into Aviary types for attributes
return {int: "INTEGER", float: "FLOAT", str:
"STRING", bool: "BOOLEAN"}
@classmethod
+ def _aviary_to_type(cls):
+ return {"INTEGER": int, "FLOAT": float, "STRING":
str, "BOOLEAN": bool}
+
+ @classmethod
def _get_host(cls, name, servers):
host = ""
if name in servers:
Modified: trunk/sage/python/sage/qmf/qmfoperations.py
===================================================================
--- trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-17 15:52:36 UTC (rev 4934)
+++ trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-17 18:17:46 UTC (rev 4935)
@@ -119,8 +119,9 @@
def get_job_ad(self, job_server, job_id, default=None, timeout=5):
return self._call(job_server, "GetJobAd", 0, default, timeout, job_id)
- def fetch_job_data(self, job_server, job_id, file, start, end,
+ def fetch_job_data(self, job_server, job_id, ftype, file, start, end,
default=None, timeout=5):
+ # QMF doesn't use the ftype value, it just uses the filename
return self._call(job_server, "FetchJobData", 0, default, timeout,
job_id, file, start, end)