Message Parser ExampleΒΆ

#!/usr/bin/env python
"""
Modbus Message Parser
--------------------------------------------------------------------------

The following is an example of how to parse modbus messages
using the supplied framers for a number of protocols:

* tcp
* ascii
* rtu
* binary
"""
# -------------------------------------------------------------------------- #
# import needed libraries
# -------------------------------------------------------------------------- #
from __future__ import print_function
import collections
import textwrap
from optparse import OptionParser
import codecs as c

from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.transaction import ModbusSocketFramer
from pymodbus.transaction import ModbusBinaryFramer
from pymodbus.transaction import ModbusAsciiFramer
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.compat import  IS_PYTHON3
# -------------------------------------------------------------------------- #
# -------------------------------------------------------------------------- #
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s'
          ' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()


# -------------------------------------------------------------------------- #
# build a quick wrapper around the framers
# -------------------------------------------------------------------------- #

class Decoder(object):

    def __init__(self, framer, encode=False):
        """ Initialize a new instance of the decoder

        :param framer: The framer to use
        :param encode: If the message needs to be encoded
        """
        self.framer = framer
        self.encode = encode

    def decode(self, message):
        """ Attempt to decode the supplied message

        :param message: The messge to decode
        """
        if IS_PYTHON3:
            value = message if self.encode else c.encode(message, 'hex_codec')
        else:
            value = message if self.encode else message.encode('hex')
        print("="*80)
        print("Decoding Message %s" % value)
        print("="*80)
        decoders = [
            self.framer(ServerDecoder(), client=None),
            self.framer(ClientDecoder(), client=None)
        ]
        for decoder in decoders:
            print("%s" % decoder.decoder.__class__.__name__)
            print("-"*80)
            try:
                decoder.addToFrame(message)
                if decoder.checkFrame():
                    unit = decoder._header.get("uid", 0x00)
                    decoder.advanceFrame()
                    decoder.processIncomingPacket(message, self.report, unit)
                else:
                    self.check_errors(decoder, message)
            except Exception as ex:
                self.check_errors(decoder, message)

    def check_errors(self, decoder, message):
        """ Attempt to find message errors

        :param message: The message to find errors in
        """
        log.error("Unable to parse message - {} with {}".format(message,
                                                                decoder))

    def report(self, message):
        """ The callback to print the message information

        :param message: The message to print
        """
        print("%-15s = %s" % ('name', message.__class__.__name__))
        for (k, v) in message.__dict__.items():
            if isinstance(v, dict):
                print("%-15s =" % k)
                for kk,vv in v.items():
                    print("  %-12s => %s" % (kk, vv))

            elif isinstance(v, collections.Iterable):
                print("%-15s =" % k)
                value = str([int(x) for x  in v])
                for line in textwrap.wrap(value, 60):
                    print("%-15s . %s" % ("", line))
            else:
                print("%-15s = %s" % (k, hex(v)))
        print("%-15s = %s" % ('documentation', message.__doc__))


# -------------------------------------------------------------------------- #
# and decode our message
# -------------------------------------------------------------------------- #
def get_options():
    """ A helper method to parse the command line options

    :returns: The options manager
    """
    parser = OptionParser()

    parser.add_option("-p", "--parser",
                      help="The type of parser to use "
                           "(tcp, rtu, binary, ascii)",
                      dest="parser", default="tcp")

    parser.add_option("-D", "--debug",
                      help="Enable debug tracing",
                      action="store_true", dest="debug", default=False)

    parser.add_option("-m", "--message",
                      help="The message to parse",
                      dest="message", default=None)

    parser.add_option("-a", "--ascii",
                      help="The indicates that the message is ascii",
                      action="store_true", dest="ascii", default=True)

    parser.add_option("-b", "--binary",
                      help="The indicates that the message is binary",
                      action="store_false", dest="ascii")

    parser.add_option("-f", "--file",
                      help="The file containing messages to parse",
                      dest="file", default=None)

    parser.add_option("-t", "--transaction",
                      help="If the incoming message is in hexadecimal format",
                      action="store_true", dest="transaction", default=False)

    (opt, arg) = parser.parse_args()

    if not opt.message and len(arg) > 0:
        opt.message = arg[0]

    return opt


def get_messages(option):
    """ A helper method to generate the messages to parse

    :param options: The option manager
    :returns: The message iterator to parse
    """
    if option.message:
        if option.transaction:
            msg = ""
            for segment in option.message.split():
                segment = segment.replace("0x", "")
                segment = "0" + segment if len(segment) == 1 else segment
                msg = msg + segment
            option.message = msg

        if not option.ascii:
            if not IS_PYTHON3:
                option.message = option.message.decode('hex')
            else:
                option.message = c.decode(option.message.encode(), 'hex_codec')
        yield option.message
    elif option.file:
        with open(option.file, "r") as handle:
            for line in handle:
                if line.startswith('#'): continue
                if not option.ascii:
                    line = line.strip()
                    line = line.decode('hex')
                yield line


def main():
    """ The main runner function
    """
    option = get_options()

    if option.debug:
        try:
            modbus_log.setLevel(logging.DEBUG)
            logging.basicConfig()
        except Exception as e:
            print("Logging is not supported on this system- {}".format(e))

    framer = lookup = {
        'tcp':    ModbusSocketFramer,
        'rtu':    ModbusRtuFramer,
        'binary': ModbusBinaryFramer,
        'ascii':  ModbusAsciiFramer,
    }.get(option.parser, ModbusSocketFramer)

    decoder = Decoder(framer, option.ascii)
    for message in get_messages(option):
        decoder.decode(message)


if __name__ == "__main__":
    main()