Python function to apply pydsfapi patch



  • The pydsfapi permits a subscription connection to transfer just patches of changes to the machine model. What this returns is not a 'classic' json patch, but rather just the changed elements of the nested dict and list structure. The pydsfapi source simply says "new update patches of the object model need to be applied manually".

    The standard dict method .update doesn't work because the connection returns empty dictionaries to mean 'no change', but dict.update then replaces the existing dictionary with that empty one.

    This is my solution, a function 'patch' that splices a subscription connection patch into the machine model. Note that it wants as input two dicts (or two lists, but the DSF machine model is dicts at the highest level), and the pydsfapi connection returns a json string so you need to convert it to a dict (probably with json.loads()) then supply it to the function.

    It's here in a complete example that connects to the DSF and simply prints out the setpoint and current temperature of every defined heater to stdout periodically, but it can obviously be easily modified to do something else - it maintains a copy of the whole machine model.

    #!/usr/bin/python3
    # subscribe to the machine model and report the temperatures of each heater
    
    # this is frequency of logging (seconds)
    interval=3
    
    # this is duration over which to log (seconds)
    duration=600
    
    import json
    import time
    from pydsfapi import pydsfapi
    from pydsfapi.initmessages.clientinitmessages import SubscriptionMode
    
    
    # update dictionary-of-dicts-and-lists a with a merge patch b
    # values in b overwrite values with same key tree in a
    # note this function recurses
    # note pydsfapi patches are just a json string so need to be turned into a dict
    # before feeding to this function
    # if a and b are both lists and b is shorter than a, a will be truncated
    # this behaviour can be changed below - search 'truncated'
    # debug=True draws a diagram
    # debug=True and verbose=True reports input to the function on every entry
    def patch(a, b, path=None, debug=False, verbose=False):
        if path is None: path = []
        if debug and (len(path)==0 or verbose): 
            print('patch: ' + str(b))
            if verbose:
                print(' into: ' + str(a))
                print(' at :  ' + str(path))
    
        # if both a and b are dicts work through the keys
        if isinstance(a,dict) and isinstance(b,dict):
            for key in b:
                if debug:
                    # debug draws a sort of diagram indicating nesting depth
                    print('   ',end='')
                    for i in path: print(' > ', end='')
                    print(key)         
                if key in a:
                    # the element in the patch already exists in the original
                    if ((isinstance(a[key], dict) and isinstance(b[key], dict))
                     or (isinstance(a[key], list) and isinstance(b[key], list))):
                        # a[key] and b[key] are both dicts or both lists, recurse
                        a[key] = patch(a[key], b[key], path + [str(key)], debug, verbose)
                    else:
                        # mixed types, so treat as leaf
                        if debug:
                            for i in path: print('   ',end='')
                            print('    = '+str(b[key]))
                        a[key] = b[key]
                else:
                    # the element in the patch is not in the original
                    # treat as a leaf, but note either could actually be a dict or list
                    # e.g. a[key] could have been a single value and we now overwrite with a new dict
                    # or it could have been a dict and now we've overwritten a single scalar
                    if debug:
                        for i in path: print('   ',end='')
                        print('    = ' + str(b[key]))
                    a[key]=b[key]
    
        # if both a and b are lists we enumerate and work through the values
        elif isinstance(a,list) and isinstance(b,list):
            # if b is shorter than a, should a be truncated or should the trailing values be unaltered?
            # The following line assumes that trailing values nopt found in b are truncated from a
            # remove this line if desired behaviour is that trailing values in a are left in place
            # note that would mean that a list will grow but never shrink
            a=a[0:len(b)]
            # if b is longer than a you could use a try: except IndexError: below
            # but that requires putting an entire recursion of this function inside the try
            # which makes it difficult to track where teh exception occurs, so instead
            # a is padded to be as long as b in advance
            a.extend([None] * (len(b)-len(a)))
            for idx, value in enumerate(b):
                # need a new diagram nesting line
                if debug:
                    print('   ',end='')
                    for i in path: print(' > ',end='')
                    # put [] around index to identify we're in a list
                    print('['+str(idx)+']')
                a[idx] = patch(a[idx], b[idx], path + [str(idx)], debug, verbose)
    
        else:
            # a and b are different types, replace a with b
            if debug:
                for i in path: print('   ',end='')
                print(' = '+str(b))
            a=b
    
        # all done
        return a
    
    
    # establish the connection
    subscribe_connection = pydsfapi.SubscribeConnection(SubscriptionMode.PATCH, debug=False)
    subscribe_connection.connect()
    
    # decide when to stop logging
    endat = time.time() + duration;
    
    try:
        # Get the complete model once
        machine_model = subscribe_connection.get_machine_model()
    
        # Get updates
        while time.time() < endat:
            time.sleep(interval - time.time()%interval)
            # get machine model update as a string
            mm_u_str = subscribe_connection.get_machine_model_patch()
            # convert to a  dict
            mm_update=json.loads(mm_u_str)
            # apply to the saved machine model
            patch(machine_model,mm_update)
            # do something with the machine model
            # in this case just print out heater setpoints and current values
            print (time.strftime('%H:%M:%S'), end='')
            for heater in machine_model['heat']['heaters']:
                print (' {:5.1f} {:5.1f}'.format(heater['active'],heater['current']), end='')
            print()
    finally:
        subscribe_connection.close()
    

    Some notes about the function:

    It's not very 'pythonic', mainly in that it tests the type of variables in advance of processing them, which seems to be frowned upon (i.e. I decide whether it's a duck before trying to make it quack). Also, it checks the lengths of lists in advance rather than rely on a try: except IndexError: construction.

    My first attempt at such a function is in the thread 'Using pydsfapi with very little knowledge', and is more pythonic. It actually patches the machine model correctly but the debug reporting is wrong for lists of lists (which occurs in the machine model in the 'tools' dict). In debugging that I found it difficult to keep track of multiple nested try: constructs (because the function ends up being called recursively inside a try:) and the additional code to get the reporting to work made that funtion longer than this. That's why it's refactored like this. This one is less pythonic, more intelligible, and the debug display works correctly.


Log in to reply