Entry
Rules system
Jul 5th, 2000 10:03
Nathan Wallace, Hans Nowak, Snippet 320, Geir Bjarte Terum
"""
Packages: miscellaneous
"""
# Author Geir Bjarte Terum 1997-1999 (c) All GPL rights reserved
#-----------------------------------------------------------------------------
# Rules - A Python Language Extention module.
#
# Version: 1.0
#
# Public entities:
# class rules: make support for rules applied to class data by inheritance.
# def init_rules(stack_oldvalues, rec_depth=10).
#
#
# class rules:
# Rules can be used in many ways. E.g. state machines, AI engines, data
# trigger systems, controlling/monitoring data invariants, etc.
# Several rules may be added to a member. The rules are called in a
# dept first manner. It is not possible to overload a rule virtually.
# In other words, you may name your rule equally in chield classes. It
# will work as if they were differently named.
# If a chield class implements the __setattr__(), that class must call
# rules.__setattr__() initially enabling the callback of rules.
#
# Example class rules:
# # Simple example of usage of rules, monitor data invariants.
# from rules import rules # Standard usage.
# class X(rules): # Must inherit from rules.
# def __init__(self, v):
# rules.__init__(self) # Call rules ctor.
# self.x = v # Initiate variable x.
# self.monitor('x', self.rule_x) # Attach our rule to variable x.
# def rule_x(self): # Example rule.
# if self.x < 5:
# self.__dict__['x'] = getoldval(5) # Retrive old value (default to 5)
#
# x = X(39)
# x.x = 3 # The implicitly called rule will not accept this value.
# # The value will still be 39.
#
#-----------------------------------------------------------------------------
import types
# Default initially value assignment:
__RULES_VERSION__ = 1.0
__OLDVALS__ = 1 # Stack old values as default.
__RECDEPTH__ = 10 # Recursive level of dept.
def init_rules(stack_oldvalues, rec_depth=__RECDEPTH__):
global __OLDVALS__, __RECDEPTH__
__OLDVALS__ = stack_oldvalues # Boolean.
__RECDEPTH__ = rec_depth
if __OLDVALS__:
# A specialized support class for class rules.
class _stack: # A rather quick stack.
def __init__(self):
self.stack = None # Stack termination symbol.
self.len = 0 # Number of stack elements.
def push(self, node): # Grow tree 'up/left'.
self.len = self.len + 1
assert(self.len < __RECDEPTH__) # Moderate the recursive depth.
self.stack = node, self.stack # New root tuple: (node, tree).
def pop(self):
self.len = self.len - 1
node, self.stack = self.stack # Remove root tuple.
assert(node) # Check for underflow.
#return node # TypeError if empty.
def top(self):
return self.stack[0] # Peek at stack top value.
def __len__(self): # on: len, not
return self.len
def __repr__(self): return '[FastStack:' + `self.stack` + ']'
_oldval_stack = _stack() # A stacked list of old values.
class rules:
#---------------------------------------------------------------------------
# Contract:
# 1. This ctor must be called initially by chield class.
#---------------------------------------------------------------------------
def __init__(self):
self.__dict__['__rule__'] = {} # Dictionary of lists with callback rules.
#---------------------------------------------------------------------------
# Synopsis: monitor('foo', ... , self.my_rule [, 1])
#
# Contract:
# 1. Call this method for the member data to attach rules to.
# 2. Last arg specifies whether the rule should be called initially(default)
# 3. The member data must be defined before invoking monitor().
# 4. The specified rule is called initially (optional).
# 5. Rules attached to a specific variable "should" update the variable
# through the dictionary explicitly to avoid indefinite recursion.
# 6. By assigning a value to a monitored member, the attached rule(s) will
# be called in a dept first manner.
# 7. Rules are not called if variable is accessed through the dictionary.
#
# Exceptions:
# * RuntimeError
#
# See method rules.__setattr__() for further details.
#---------------------------------------------------------------------------
def monitor(self, *args): # <args>: (<name>,)* <rule> [,<0 or 1>]
nargs = len(args) # Do not count self arg.
assert(nargs > 1)
call_rule = 1 # Call attached rule by default.
if nargs == 2: # Optimized for speed.
_rule = self.monitor_2(args[0], args[1])
else:
_rule, call_rule = self.monitor_N(args)
if call_rule:
_rule() # Initial check.
def getoldval(self):
if __OLDVALS__:
return _oldval_stack.top()
else:
return None # Force TypeError.
#---------------------------------------------------------------------------
# Synopsis: example: self.attr = data # implicitly call __setattr__.
#
# Contract:
# 1. Implicitly called upon updating class member data.
# 2. Non monitored data members are assigned new values without any tests.
# 3. Monitored data will have it's rules called in a depth first manner.
# 4. The monitored data is updated before any rules are called.
# 5. If chield classes define the __setattr__, then rules.__setattr__() must
# be called initially.
#---------------------------------------------------------------------------
def __setattr__(self, name, newval):
dict = self.__dict__
assert(self.__dict__.has_key('__rule__')) # Forgotten to call ctor?
_rule = dict['__rule__']
if not _rule.has_key(name):
dict[name] = newval # Update/create variable.
return newval
assert(dict.has_key(name))
if __OLDVALS__:
_oldval_stack.push(dict[name])
dict[name] = newval
_rule = _rule[name] # Get list of rules for this variable.
for _rule in _rule[:]: # For each rule:
_rule() # call the rule.
if __OLDVALS__:
_oldval_stack.pop()
return dict[name] # Return verified value.
def __delattr__(self, name, kill=1):
if self.__rule__.has_key(name):
del self.__rule__[name]
if kill:
del self.__dict__[name]
#---------------------------------------------------------------------------
# Methods mainly for internal usage:
#---------------------------------------------------------------------------
def isattr(self, name):
return type(name) == types.StringType and self.__dict__.has_key(name)
def ismutable(self, name):
x = self.__dict__[name]
tx = type(x)
return not ( tx == types.IntType or
tx == types.FloatType or
tx == types.StringType or
tx == types.LongType or
tx == types.ComplexType or
( tx == types.InstanceType and # User defined type?
x.__dict__.has_key('immutable') and # Initially immutable?
x.immutable # Still immutable?
)
)
def monitor_2(self, name, _rule):
assert(type(name) == types.StringType)
assert(type(_rule) == types.MethodType)
self.attach_rule(name, _rule)
return _rule
def monitor_N(self, args):
name = []
call_rule = 1
for arg in args: # Sort out the arguments.
arg_type = type(arg)
if arg_type == types.StringType:
name.append(arg)
elif arg_type == types.MethodType:
_rule = arg
elif arg_type == types.IntType:
call_rule = arg
else:
msg = 'Unexpected arg ' + repr(arg_type) + \
' applied to rule.monitor(..)'
raise RuntimeError, msg
rmap = self.__rule__
for name in name:
self.attach_rule(name, _rule)
return _rule, call_rule
def attach_rule(self, name, _rule):
if __debug__: # Stricter verifications in debug mode.
if not self.isattr(name): # Verify membership.
msg = "Monitored member '" + self.__class__.__name__ + '.'+ name +\
"' must be instanciated."
raise RuntimeError, msg
if self.ismutable(name): # Verify immutability.
msg = "Monitored member '" + self.__class__.__name__ + '.'+ name +\
"', of " + repr(type(self.__dict__[name])) + ", must be immutable."
raise RuntimeError, msg
rmap = self.__rule__
if not rmap.has_key(name): # Member not monitored yet?
rmap[name] = [_rule]
else:
rmap[name].append(_rule) # Add rule to the existing list.