faqts : Computers : Programming : Languages : Python : Snippets

+ Search
Add Entry AlertManage Folder Edit Entry Add page to http://del.icio.us/
Did You Find This Entry Useful?

9 of 14 people (64%) answered Yes
Recently 7 of 10 people (70%) answered Yes

Entry

Rules system

Jul 5th, 2000 10:03
Nathan Wallace, Hans Nowak, Snippet 320, Geir Bjarte Terum


"""
Packages: miscellaneous
"""
# Author Geir Bjarte Terum 1997-1999 (c) All GPL rights reserved
#-----------------------------------------------------------------------------
# Rules - A Python Language Extention module.
#
# Version: 1.0
#
# Public entities:
#   class rules: make support for rules applied to class data by inheritance.
#   def init_rules(stack_oldvalues, rec_depth=10).
#   
#   
# class rules:
#   Rules can be used in many ways. E.g. state machines, AI engines, data
#   trigger systems, controlling/monitoring data invariants, etc.
#   Several rules may be added to a member. The rules are called in a
#   dept first manner. It is not possible to overload a rule virtually.
#   In other words, you may name your rule equally in chield classes. It
#   will work as if they were differently named. 
#   If a chield class implements the __setattr__(), that class must call
#   rules.__setattr__() initially enabling the callback of rules.
#
# Example class rules:
#   # Simple example of usage of rules, monitor data invariants.
#   from rules import rules            # Standard usage.
#   class X(rules):                    # Must inherit from rules.
#     def __init__(self, v):
#       rules.__init__(self)           # Call rules ctor.
#       self.x = v                     # Initiate variable x.
#       self.monitor('x', self.rule_x) # Attach our rule to variable x.
#     def rule_x(self):                # Example rule.
#       if self.x < 5:
#         self.__dict__['x'] = getoldval(5) # Retrive old value (default to 5)
#
#   x = X(39)
#   x.x = 3           # The implicitly called rule will not accept this value.
#                     # The value will still be 39.
# 
#-----------------------------------------------------------------------------
import types
# Default initially value assignment:
__RULES_VERSION__ = 1.0
__OLDVALS__ = 1                              # Stack old values as default.
__RECDEPTH__ = 10                            # Recursive level of dept.
def init_rules(stack_oldvalues, rec_depth=__RECDEPTH__):
  global __OLDVALS__, __RECDEPTH__
  __OLDVALS__ = stack_oldvalues              # Boolean.
  __RECDEPTH__ = rec_depth
if __OLDVALS__:
  # A specialized support class for class rules.
  class _stack:                              # A rather quick stack.
    def __init__(self):
      self.stack = None                      # Stack termination symbol.
      self.len = 0                           # Number of stack elements.
    def push(self, node):                    # Grow tree 'up/left'.
      self.len = self.len + 1
      assert(self.len < __RECDEPTH__)        # Moderate the recursive depth.
      self.stack = node, self.stack          # New root tuple: (node, tree).
    def pop(self): 
      self.len = self.len - 1
      node, self.stack = self.stack          # Remove root tuple.
      assert(node)                           # Check for underflow.
      #return node                            # TypeError if empty.
    def top(self):
      return self.stack[0]                   # Peek at stack top value.
    def __len__(self):                       # on: len, not
      return self.len
    def __repr__(self): return '[FastStack:' + `self.stack` + ']' 
  _oldval_stack = _stack()                   # A stacked list of old values.
class rules:
#---------------------------------------------------------------------------
  # Contract:
  # 1. This ctor must be called initially by chield class.
#---------------------------------------------------------------------------
  def __init__(self):
    self.__dict__['__rule__'] = {}  # Dictionary of lists with callback rules.
#---------------------------------------------------------------------------
  # Synopsis: monitor('foo', ... , self.my_rule [, 1])
  # 
  # Contract:
  # 1. Call this method for the member data to attach rules to.
  # 2. Last arg specifies whether the rule should be called initially(default)
  # 3. The member data must be defined before invoking monitor().
  # 4. The specified rule is called initially (optional).
  # 5. Rules attached to a specific variable "should" update the variable
  #    through the dictionary explicitly to avoid indefinite recursion.
  # 6. By assigning a value to a monitored member, the attached rule(s) will 
  #    be called in a dept first manner.
  # 7. Rules are not called if variable is accessed through the dictionary.
  #
  # Exceptions: 
  # * RuntimeError
  #
  # See method rules.__setattr__() for further details.   
#---------------------------------------------------------------------------
  def monitor(self, *args):         # <args>: (<name>,)* <rule> [,<0 or 1>]
    nargs = len(args)               # Do not count self arg.
    assert(nargs > 1)
    call_rule = 1                   # Call attached rule by default.
    if nargs == 2:                  # Optimized for speed.
      _rule = self.monitor_2(args[0], args[1])
    else:
      _rule, call_rule = self.monitor_N(args)
    if call_rule:
      _rule()                       # Initial check.
  def getoldval(self):
    if __OLDVALS__:
      return _oldval_stack.top()
    else:
      return None                   # Force TypeError.
#---------------------------------------------------------------------------
  # Synopsis: example: self.attr = data       # implicitly call __setattr__.
  # 
  # Contract:
  # 1. Implicitly called upon updating class member data.
  # 2. Non monitored data members are assigned new values without any tests.
  # 3. Monitored data will have it's rules called in a depth first manner.
  # 4. The monitored data is updated before any rules are called.
  # 5. If chield classes define the __setattr__, then rules.__setattr__() must
  #    be called initially.
#---------------------------------------------------------------------------
  def __setattr__(self, name, newval):
    dict = self.__dict__
    assert(self.__dict__.has_key('__rule__')) # Forgotten to call ctor?
    _rule = dict['__rule__']
    if not _rule.has_key(name):
      dict[name] = newval           # Update/create variable.
      return newval
    assert(dict.has_key(name))
    if __OLDVALS__:
      _oldval_stack.push(dict[name])
    dict[name] = newval
    _rule = _rule[name]             # Get list of rules for this variable.
    for _rule in _rule[:]:          # For each rule:
      _rule()                       #   call the rule.
    if __OLDVALS__:
      _oldval_stack.pop()
    return dict[name]               # Return verified value.
  def __delattr__(self, name, kill=1):
    if self.__rule__.has_key(name):
      del self.__rule__[name]
    if kill:
      del self.__dict__[name]
#---------------------------------------------------------------------------
  # Methods mainly for internal usage:
#---------------------------------------------------------------------------
  def isattr(self, name):
    return type(name) == types.StringType and self.__dict__.has_key(name)
  def ismutable(self, name):
    x = self.__dict__[name]
    tx = type(x)
    return not ( tx == types.IntType or
                 tx == types.FloatType or
                 tx == types.StringType or
                 tx == types.LongType or
                 tx == types.ComplexType or
                 ( tx == types.InstanceType and         # User defined type?
                   x.__dict__.has_key('immutable') and  # Initially immutable?
                   x.immutable                          # Still immutable?
                 )
               )
  def monitor_2(self, name, _rule):
    assert(type(name) == types.StringType)
    assert(type(_rule) == types.MethodType)
    self.attach_rule(name, _rule)
    return _rule
  def monitor_N(self, args):
    name = []
    call_rule = 1
    for arg in args:                # Sort out the arguments.
      arg_type = type(arg)
      if arg_type == types.StringType:
        name.append(arg)
      elif arg_type == types.MethodType:
        _rule = arg
      elif arg_type == types.IntType:
        call_rule = arg
      else:
        msg = 'Unexpected arg ' + repr(arg_type) + \
          ' applied to rule.monitor(..)'
        raise RuntimeError, msg
    rmap = self.__rule__
    for name in name:
      self.attach_rule(name, _rule)
    return _rule, call_rule
  def attach_rule(self, name, _rule):
    if __debug__:                   # Stricter verifications in debug mode.
      if not self.isattr(name):     # Verify membership.
        msg = "Monitored member '" + self.__class__.__name__ + '.'+ name +\
          "' must be instanciated."
        raise RuntimeError, msg
      if self.ismutable(name):      # Verify immutability.
        msg = "Monitored member '" + self.__class__.__name__ + '.'+ name +\
          "', of " + repr(type(self.__dict__[name])) + ", must be immutable."
        raise RuntimeError, msg
    rmap = self.__rule__
    if not rmap.has_key(name):      # Member not monitored yet?
      rmap[name] = [_rule]
    else:
      rmap[name].append(_rule)      # Add rule to the existing list.