Author: tmckay
Date: 2011-08-30 20:59:44 +0000 (Tue, 30 Aug 2011)
New Revision: 4940
Added:
trunk/sage/python/sage/aviary/https.py
Modified:
trunk/cumin/bin/cumin-web
trunk/cumin/python/cumin/config.py
trunk/cumin/python/cumin/main.py
trunk/sage/python/sage/aviary/aviaryoperations.py
Log:
Add support for ssl connection to aviary.
Modified: trunk/cumin/bin/cumin-web
===================================================================
--- trunk/cumin/bin/cumin-web 2011-08-29 18:00:52 UTC (rev 4939)
+++ trunk/cumin/bin/cumin-web 2011-08-30 20:59:44 UTC (rev 4940)
@@ -18,6 +18,8 @@
cumin.use_aviary = values.use_aviary
cumin.aviary_job_servers = values.aviary_job_servers
cumin.aviary_query_servers = values.aviary_query_servers
+ cumin.aviary_key = values.aviary_key
+ cumin.aviary_cert = values.aviary_cert
def set_wallaby_configs(cumin, values, brokers):
if values.wallaby_broker == "":
Modified: trunk/cumin/python/cumin/config.py
===================================================================
--- trunk/cumin/python/cumin/config.py 2011-08-29 18:00:52 UTC (rev 4939)
+++ trunk/cumin/python/cumin/config.py 2011-08-30 20:59:44 UTC (rev 4940)
@@ -164,6 +164,12 @@
param = ConfigParameter(self, "aviary-query-servers", str)
param.default = "http://localhost:9091"
+ param = ConfigParameter(self, "aviary-key", str)
+ param.default = ""
+
+ param = ConfigParameter(self, "aviary-cert", str)
+ param.default = ""
+
self.log_file = ConfigParameter(self, "log-file", str)
param = ConfigParameter(self, "log-level", str)
Modified: trunk/cumin/python/cumin/main.py
===================================================================
--- trunk/cumin/python/cumin/main.py 2011-08-29 18:00:52 UTC (rev 4939)
+++ trunk/cumin/python/cumin/main.py 2011-08-30 20:59:44 UTC (rev 4940)
@@ -87,6 +87,8 @@
self.use_aviary = True
self.aviary_job_servers = ""
self.aviary_query_servers = ""
+ self.aviary_key = ""
+ self.aviary_cert = ""
self.wallaby = None
self.wallaby_broker = None
@@ -172,7 +174,8 @@
ops.insert(0, AviaryOperations("aviary", aviary_dir,
self.aviary_job_servers,
- self.aviary_query_servers))
+ self.aviary_query_servers,
+ self.aviary_key, self.aviary_cert))
self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-29 18:00:52 UTC (rev 4939)
+++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-30 20:59:44 UTC (rev 4940)
@@ -9,8 +9,10 @@
from suds import *
from suds.client import Client
+from suds.transport.https import HttpAuthenticated
from sage.util import CallSync, CallThread, ObjectPool, host_list
from datetime import datetime
+from sage.aviary.https import *
log = logging.getLogger("sage.aviary")
@@ -56,7 +58,8 @@
'''
class AviaryOperations(object):
- def __init__(self, name, datadir, job_servers, query_servers):
+ def __init__(self, name, datadir, job_servers, query_servers,
+ key="", cert=""):
self.name = name
self.datadir = datadir
@@ -87,29 +90,34 @@
self.type_to_aviary = self._type_to_aviary()
self.aviary_to_type = self._aviary_to_type()
+ self.key = key
+ self.cert = cert
+
# job server operations
def set_job_attribute(self, scheduler, job_id, name, value, callback, submission):
assert callback
- job_client = self.job_client_pool.get_object()
-
def my_callback(result):
self.job_client_pool.return_object(job_client)
result = self._pretty_result(result, scheduler.Machine)
# massage results for use by standard callback
cb_args = self._cb_args_dataless(result)
callback(*cb_args)
-
- host = self._get_host(scheduler.Machine, self.job_servers)
- if host == "":
- self._raise_no_host(scheduler.Machine)
-
- service = host + "setJobAttribute"
-
+
+# job_client = self.job_client_pool.get_object()
+# scheme, host = self._get_host(scheduler.Machine, self.job_servers)
+# if host == "":
+# self._raise_no_host(scheduler.Machine)
+# service = host + "setJobAttribute"
# Have to set the URL for the method. This might go away someday...
- job_client.set_options(location=service)
+# job_client.set_options(location=service)
+ job_client = self._setup_client(self.job_client_pool,
+ scheduler.Machine,
+ self.job_servers,
+ "setJobAttribute")
+
# Make a job id parameter (see job wsdl)
jobId = job_client.factory.create('ns0:JobID')
jobId.job = job_id
@@ -130,8 +138,6 @@
def submit_job(self, scheduler, ad, callback):
assert callback
- job_client = self.job_client_pool.get_object()
-
def my_callback(result):
# Turn this back off before we put it back in the pool
# so allow_overrides isn't set for someone else...
@@ -151,15 +157,11 @@
id = None
callback(status, id)
- host = self._get_host(scheduler.Machine, self.job_servers)
- if host == "":
- self._raise_no_host(scheduler.Machine)
+ job_client = self._setup_client(self.job_client_pool,
+ scheduler.Machine,
+ self.job_servers,
+ "submitJob")
- service = host + "submitJob"
-
- # Have to set the URL for the method. This might go away someday...
- job_client.set_options(location=service)
-
# Set basic attributes in the order defined by aviary-job.wsdl.
args = list()
basic_attrs = ("Cmd", "Args", "Owner",
"Iwd", "Submission")
@@ -277,8 +279,6 @@
# Aviary doesn't use the file name as does QMF, instead it
# specifies the file type and lets condor figure out the path.
- client = self.query_client_pool.get_object()
-
def my_process_results(result):
# Fix up the exception message if necessary
result = self._pretty_result(result, job_server.Machine)
@@ -295,16 +295,11 @@
data = None
return (status, data)
- host = self._get_host(job_server.Machine, self.query_servers)
- if host == "":
- self._raise_no_host(job_server.Machine)
+ client = self._setup_client(self.query_client_pool,
+ job_server.Machine,
+ self.query_servers,
+ "getJobData")
- service = host + "getJobData"
-
- # Have to set the URL for the method. This might go away someday...
- #print "Aviary control job " + url
- client.set_options(location=service)
-
# Make a job data parameter (see query wsdl)
jobData = client.factory.create('ns0:JobData')
jobData.id.job = job_id
@@ -345,8 +340,6 @@
default = "default" in kwargs and kwargs["default"] or None
timeout = "timeout" in kwargs and kwargs["timeout"] or 5
- client = self.query_client_pool.get_object()
-
def make_tuple(attr):
# Attempt to cast the value into the specified type
if attr.type in self.aviary_to_type:
@@ -380,16 +373,11 @@
data = None
return (status, data)
- host = self._get_host(job_server.Machine, self.query_servers)
- if host == "":
- self._raise_no_host(job_server.Machine)
+ client = self._setup_client(self.query_client_pool,
+ job_server.Machine,
+ self.query_servers,
+ "getJobDetails")
- service = host + "getJobDetails"
-
- # Have to set the URL for the method. This might go away someday...
- #print "Aviary control job " + url
- client.set_options(location=service)
-
# Make a job id parameter (see job wsdl)
jobId = client.factory.create('ns0:JobID')
jobId.job = job_id
@@ -406,8 +394,6 @@
def get_job_summaries(self, submission, callback, machine_name):
assert callback
- query_client = self.query_client_pool.get_object()
-
def to_int_seconds(dt):
# Change a datetime.datetime into int seconds since epoch
# Note, this works nicely if the datetime happens to include microseconds
@@ -459,16 +445,12 @@
else:
data = {"Jobs": None}
callback(status, data)
-
- host = self._get_host(machine_name, self.query_servers)
- if host == "":
- self._raise_no_host(machine_name)
- service = host + "getSubmissionSummary"
+ query_client = self._setup_client(self.query_client_pool,
+ machine_name,
+ self.query_servers,
+ "getSubmissionSummary")
- # Have to set the URL for the method. This might go away someday...
- query_client.set_options(location=service)
-
# What we really want here is the job summaries from the
# submission summary response. To get those, we have to
# set an extra attribute on the client...
@@ -499,23 +481,51 @@
@classmethod
def _get_host(cls, name, servers):
+ scheme = ""
host = ""
if name in servers:
urls = servers[name]
if len(urls) > 0:
- host = str(random.sample(urls, 1)[0])
+ url = random.sample(urls, 1)[0]
+ scheme = url.scheme
+ host = str(url)
# A particular method name is going to be appended to path,
# so ensure the file "/" here. Since we supply default path
# values when the host list is parsed, we know the last portion
# of the host string has to be a path.
if not host.endswith("/"):
host += "/"
- return host
+ return scheme, host
@classmethod
def _raise_no_host(cls, name):
raise Exception("No aviary job servers specified for %s, check config
files" % name)
+ def _setup_client(self, client_pool, host_name, host_list, method):
+ # Get a client object from the client_pool. Use host_name as
+ # a lookup in the host_list to find an initial URL and append the
+ # method name. Set up the client to point at that URL and return.
+ client = client_pool.get_object()
+ scheme, host = self._get_host(host_name, host_list)
+ if host == "":
+ self._raise_no_host(host_name)
+ # Have to set the URL for the method. This might go away someday...
+ client.set_options(location=host+method)
+ # Since we pool the clients and reuse them for different requests
+ # and since its possible to be using servers with different schemes,
+ # we have to always reset the transport here.
+ if scheme == "https":
+ if not os.path.isfile(self.key):
+ raise Exception("File not found for aviary-key, check cumin.conf
settings")
+ if not os.path.isfile(self.cert):
+ raise Exception("File not found for aviary-cert, check cumin.conf
settings")
+ the_transport = HTTPSClientCertTransport(self.key, self.cert)
+ else:
+ # this is the default transport when none is specified
+ the_transport = HttpAuthenticated()
+ client.set_options(transport=the_transport)
+ return client
+
@classmethod
def _get_status(cls, result):
# For Aviary operations, if the operation
@@ -560,19 +570,13 @@
meth_name,
callback, default, timeout):
- host = self._get_host(scheduler.Machine, self.job_servers)
- if host == "":
- self._raise_no_host(scheduler.Machine)
+ client = self._setup_client(self.job_client_pool,
+ scheduler.Machine,
+ self.job_servers,
+ meth_name)
- service = host + meth_name
-
- client = self.job_client_pool.get_object()
meth = getattr(client.service, meth_name)
- # Have to set the URL for the method. This might go away someday...
- #print "Aviary control job " + url
- client.set_options(location=service)
-
# Make a job id parameter (see job wsdl)
jobId = client.factory.create('ns0:JobID')
jobId.job = job_id
Added: trunk/sage/python/sage/aviary/https.py
===================================================================
--- trunk/sage/python/sage/aviary/https.py (rev 0)
+++ trunk/sage/python/sage/aviary/https.py 2011-08-30 20:59:44 UTC (rev 4940)
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2009-2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#
http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# uses Suds -
https://fedorahosted.org/suds/
+import urllib2 as u2
+from suds.transport.http import HttpTransport, Reply, TransportError
+import httplib
+
+class HTTPSClientAuthHandler(u2.HTTPSHandler):
+ def __init__(self, key, cert):
+ u2.HTTPSHandler.__init__(self)
+ self.key = key
+ self.cert = cert
+
+ def https_open(self, req):
+ #Rather than pass in a reference to a connection class, we pass in
+ # a reference to a function which, for all intents and purposes,
+ # will behave as a constructor
+ return self.do_open(self.getConnection, req)
+
+ def getConnection(self, host, timeout=300):
+ return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
+
+class HTTPSClientCertTransport(HttpTransport):
+ def __init__(self, key, cert, *args, **kwargs):
+ HttpTransport.__init__(self, *args, **kwargs)
+ self.key = key
+ self.cert = cert
+
+ def u2open(self, u2request):
+ """
+ Open a connection.
+ @param u2request: A urllib2 request.
+ @type u2request: urllib2.Request.
+ @return: The opened file-like urllib2 object.
+ @rtype: fp
+ """
+ tm = self.options.timeout
+ url = u2.build_opener(HTTPSClientAuthHandler(self.key, self.cert))
+ if self.u2ver() < 2.6:
+ socket.setdefaulttimeout(tm)
+ return url.open(u2request)
+ else:
+ return url.open(u2request, timeout=tm)