Author: tmckay
Date: 2013-02-18 15:07:28 +0000 (Mon, 18 Feb 2013)
New Revision: 5712
Modified:
branches/tmckay/sage/python/sage/wallaby/wallabyoperations.py
Log:
Intermediate change
Modified: branches/tmckay/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- branches/tmckay/sage/python/sage/wallaby/wallabyoperations.py 2013-02-18 15:00:53 UTC
(rev 5711)
+++ branches/tmckay/sage/python/sage/wallaby/wallabyoperations.py 2013-02-18 15:07:28 UTC
(rev 5712)
@@ -145,44 +145,57 @@
self._condition.wait(retry_secs)
self._condition.release()
- # Init remaining time til next update to 0 for each
- # cached item in case the thread was restarted
- for attr, val in self._cache.iteritems():
- val.remaining = 0
+ last_wallaroo_tag = None
# Okay, now we're ready to retrieve data
while not self._stop:
start_processing = time.time()
- for attr, val in self._cache.iteritems():
- if self._stop:
- break
- # val.remaining is the number of seconds left before
- # the next update of this data item. None is
"forever".
- # Synthetic items are not retreived from the store.
- if not val.synthetic and \
- val.remaining is not None and val.remaining <= 0:
- val.get_values(attr, self)
+ # Get the current tag from wallaroo and see if the store
+ # has been changed since we last saw it. If so, refresh
+ # the cache with current values, potentially moving our
+ # branch to current first and following up with a replay
+ # of unactivated changes.
+ # This represents other entities outside of cumin modifying the
+ # store. We want to see those changes.
+ wallaroo_current_tag = self._get_wallaroo_current_tag()
+ if not self._compare_tags(wallaroo_current_tag,
+ last_wallaroo_tag):
- # Now handle the synthetics. val.synthetic generates
- # and stores it's own results.
- for attr, val in self._cache.iteritems():
- if self._stop:
- break
+ # Before we update the cache, change to the current tag
+ # if we are working on a local branch. If we are already
+ # current then we can skip this step.
+ need_replay = False
+ if not self._my_branch_is_current():
+ # Okay, we are moving to the current branch.
+ # After we rebuild the cache, we need to replay
+ # unactivated changes.
+ need_replay = True
+ self._update_branch_to_current()
+
+ for attr, val in self._cache.iteritems():
+ if self._stop:
+ break
+ if not val.synthetic:
+ val.get_values(attr, self)
- if val.synthetic and \
- val.remaining is not None and val.remaining <= 0:
- val.get_values(attr, self)
-
- log.debug("WallabyOperations: total refresh processing time
%s" \
- % (time.time() - start_processing))
+ # Now handle the synthetics. val.synthetic generates
+ # and stores it's own results.
+ for attr, val in self._cache.iteritems():
+ if self._stop:
+ break
+ if val.synthetic:
+ val.get_values(attr, self)
- # Find out how long we should sleep for.
- # Based on min remaining times for all items
- # If minimum is 0 because we have items waiting
- # for a retry, we fall back on retry_secs as a minimum.
- sleep_time = self._find_min_remaining(min=retry_secs)
+ last_wallaroo_tag = wallaroo_current_tag
+ log.debug("WallabyOperations: total refresh processing time
%s" \
+ % (time.time() - start_processing))
+ # Replay our unactivated changes
+ if need_replay:
+ pass
+
+ sleep_time = 30
self._condition.acquire()
if not self._stop:
# Could be signaled, so track the actual sleep time
@@ -193,39 +206,14 @@
slept = time.time() - bed_time
log.debug("WallabyOperations: cache thread slept for"\
" %s seconds" % slept)
-
- # When we wake up from sleep here, we already
- # have the lock so we might as well check refresh
- # and adjust the "remaining" values
- for attr, val in self._cache.iteritems():
- if val.refresh: # Force an update
- val.remaining = 0
- val.refresh = False
- elif val.remaining is not None:
- val.remaining -= slept
self._condition.release()
# Clear cache if we have been stopped....
for attr in self._cache:
self._set_cache(attr, [])
self._store = None
-
#end maintain_cache
- def get_values(attr, call, *args):
- log.debug("WallabyOperations: refreshing %s" % attr)
- try:
- # Wallaby API uses extensions to __getattr__ on
- # the Store to retrieve objects from the Broker
- # and return a list of proxy objects.
- start = time.time()
- d = call(*args)
- except:
- d = []
- delta = time.time() - start
- log.debug("WallabyOperations: %s seconds to refresh %s" %
(delta, attr))
- return d
-
# Wrap the entire cache thread with an exception handler
def wrap_maintain_cache():
try:
@@ -595,6 +583,23 @@
# Super secret private implementation stuff. Don't look!
+ def _get_current_wallaroo_tag(self):
+ return self._store.cm.fetch_json_resource("/tags/current")
+
+ def _compare_tags(self, first, second):
+ try:
+ return first["commit"] == second["commit"]
+ except:
+ pass
+ return False
+
+ def _my_branch_is_current(self):
+ my_branch = self._store.cm.how.to_q()
+ return 'tag' in my_branch and my_branch['tag'] ==
'current'
+
+ def _update_branch_to_current(self):
+ self._store.cm.how = wallaroo.client.cmeta.mk_how({"tag":
"current"})
+
def _find_min_remaining(self, min):
# None indicates forever, the biggest value
# Note though that None < int is True in Python!
Show replies by date