#!/usr/bin/env python
"""
Libmodbus Protocol Wrapper
------------------------------------------------------------
What follows is an example wrapper of the libmodbus library
(http://libmodbus.org/documentation/) for use with pymodbus.
There are two utilities involved here:
* LibmodbusLevel1Client
This is simply a python wrapper around the c library. It is
mostly a clone of the pylibmodbus implementation, but I plan
on extending it to implement all the available protocol using
the raw execute methods.
* LibmodbusClient
This is just another modbus client that can be used just like
any other client in pymodbus.
For these to work, you must have `cffi` and `libmodbus-dev` installed:
sudo apt-get install libmodbus-dev
pip install cffi
"""
# -------------------------------------------------------------------------- #
# import system libraries
# -------------------------------------------------------------------------- #
from cffi import FFI
# -------------------------------------------------------------------------- #
# import pymodbus libraries
# -------------------------------------------------------------------------- #
from pymodbus.constants import Defaults
from pymodbus.exceptions import ModbusException
from pymodbus.client.common import ModbusClientMixin
from pymodbus.bit_read_message import ReadCoilsResponse, ReadDiscreteInputsResponse
from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse
from pymodbus.register_read_message import ReadWriteMultipleRegistersResponse
from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse
from pymodbus.register_write_message import WriteSingleRegisterResponse, WriteMultipleRegistersResponse
# --------------------------------------------------------------------------- #
# create the C interface
# --------------------------------------------------------------------------- #
# * TODO add the protocol needed for the servers
# --------------------------------------------------------------------------- #
compiler = FFI()
compiler.cdef("""
typedef struct _modbus modbus_t;
int modbus_connect(modbus_t *ctx);
int modbus_flush(modbus_t *ctx);
void modbus_close(modbus_t *ctx);
const char *modbus_strerror(int errnum);
int modbus_set_slave(modbus_t *ctx, int slave);
void modbus_get_response_timeout(modbus_t *ctx, uint32_t *to_sec, uint32_t *to_usec);
void modbus_set_response_timeout(modbus_t *ctx, uint32_t to_sec, uint32_t to_usec);
int modbus_read_bits(modbus_t *ctx, int addr, int nb, uint8_t *dest);
int modbus_read_input_bits(modbus_t *ctx, int addr, int nb, uint8_t *dest);
int modbus_read_registers(modbus_t *ctx, int addr, int nb, uint16_t *dest);
int modbus_read_input_registers(modbus_t *ctx, int addr, int nb, uint16_t *dest);
int modbus_write_bit(modbus_t *ctx, int coil_addr, int status);
int modbus_write_bits(modbus_t *ctx, int addr, int nb, const uint8_t *data);
int modbus_write_register(modbus_t *ctx, int reg_addr, int value);
int modbus_write_registers(modbus_t *ctx, int addr, int nb, const uint16_t *data);
int modbus_write_and_read_registers(modbus_t *ctx, int write_addr, int write_nb, const uint16_t *src, int read_addr, int read_nb, uint16_t *dest);
int modbus_mask_write_register(modbus_t *ctx, int addr, uint16_t and_mask, uint16_t or_mask);
int modbus_send_raw_request(modbus_t *ctx, uint8_t *raw_req, int raw_req_length);
float modbus_get_float(const uint16_t *src);
void modbus_set_float(float f, uint16_t *dest);
modbus_t* modbus_new_tcp(const char *ip_address, int port);
modbus_t* modbus_new_rtu(const char *device, int baud, char parity, int data_bit, int stop_bit);
void modbus_free(modbus_t *ctx);
int modbus_receive(modbus_t *ctx, uint8_t *req);
int modbus_receive_from(modbus_t *ctx, int sockfd, uint8_t *req);
int modbus_receive_confirmation(modbus_t *ctx, uint8_t *rsp);
""")
LIB = compiler.dlopen('modbus') # create our bindings
# -------------------------------------------------------------------------- #
# helper utilites
# -------------------------------------------------------------------------- #
def get_float(data):
return LIB.modbus_get_float(data)
def set_float(value, data):
LIB.modbus_set_float(value, data)
def cast_to_int16(data):
return int(compiler.cast('int16_t', data))
def cast_to_int32(data):
return int(compiler.cast('int32_t', data))
class NotImplementedException(Exception):
pass
# -------------------------------------------------------------------------- #
# level1 client
# -------------------------------------------------------------------------- #
class LibmodbusLevel1Client(object):
""" A raw wrapper around the libmodbus c library. Feel free
to use it if you want increased performance and don't mind the
entire protocol not being implemented.
"""
@classmethod
def create_tcp_client(klass, host='127.0.0.1', port=Defaults.Port):
""" Create a TCP modbus client for the supplied parameters.
:param host: The host to connect to
:param port: The port to connect to on that host
:returns: A new level1 client
"""
client = LIB.modbus_new_tcp(host.encode(), port)
return klass(client)
@classmethod
def create_rtu_client(klass, **kwargs):
""" Create a TCP modbus client for the supplied parameters.
:param port: The serial port to attach to
:param stopbits: The number of stop bits to use
:param bytesize: The bytesize of the serial messages
:param parity: Which kind of parity to use
:param baudrate: The baud rate to use for the serial device
:returns: A new level1 client
"""
port = kwargs.get('port', '/dev/ttyS0')
baudrate = kwargs.get('baud', Defaults.Baudrate)
parity = kwargs.get('parity', Defaults.Parity)
bytesize = kwargs.get('bytesize', Defaults.Bytesize)
stopbits = kwargs.get('stopbits', Defaults.Stopbits)
client = LIB.modbus_new_rtu(port, baudrate, parity, bytesize, stopbits)
return klass(client)
def __init__(self, client):
""" Initalize a new instance of the LibmodbusLevel1Client. This
method should not be used, instead new instances should be created
using the two supplied factory methods:
* LibmodbusLevel1Client.create_rtu_client(...)
* LibmodbusLevel1Client.create_tcp_client(...)
:param client: The underlying client instance to operate with.
"""
self.client = client
self.slave = Defaults.UnitId
def set_slave(self, slave):
""" Set the current slave to operate against.
:param slave: The new slave to operate against
:returns: The resulting slave to operate against
"""
self.slave = self._execute(LIB.modbus_set_slave, slave)
return self.slave
def connect(self):
""" Attempt to connect to the client target.
:returns: True if successful, throws otherwise
"""
return (self.__execute(LIB.modbus_connect) == 0)
def flush(self):
""" Discards the existing bytes on the wire.
:returns: The number of flushed bytes, or throws
"""
return self.__execute(LIB.modbus_flush)
def close(self):
""" Closes and frees the underlying connection
and context structure.
:returns: Always True
"""
LIB.modbus_close(self.client)
LIB.modbus_free(self.client)
return True
def __execute(self, command, *args):
""" Run the supplied command against the currently
instantiated client with the supplied arguments. This
will make sure to correctly handle resulting errors.
:param command: The command to execute against the context
:param *args: The arguments for the given command
:returns: The result of the operation unless -1 which throws
"""
result = command(self.client, *args)
if result == -1:
message = LIB.modbus_strerror(compiler.errno)
raise ModbusException(compiler.string(message))
return result
def read_bits(self, address, count=1):
"""
:param address: The starting address to read from
:param count: The number of coils to read
:returns: The resulting bits
"""
result = compiler.new("uint8_t[]", count)
self.__execute(LIB.modbus_read_bits, address, count, result)
return result
def read_input_bits(self, address, count=1):
"""
:param address: The starting address to read from
:param count: The number of discretes to read
:returns: The resulting bits
"""
result = compiler.new("uint8_t[]", count)
self.__execute(LIB.modbus_read_input_bits, address, count, result)
return result
def write_bit(self, address, value):
"""
:param address: The starting address to write to
:param value: The value to write to the specified address
:returns: The number of written bits
"""
return self.__execute(LIB.modbus_write_bit, address, value)
def write_bits(self, address, values):
"""
:param address: The starting address to write to
:param values: The values to write to the specified address
:returns: The number of written bits
"""
count = len(values)
return self.__execute(LIB.modbus_write_bits, address, count, values)
def write_register(self, address, value):
"""
:param address: The starting address to write to
:param value: The value to write to the specified address
:returns: The number of written registers
"""
return self.__execute(LIB.modbus_write_register, address, value)
def write_registers(self, address, values):
"""
:param address: The starting address to write to
:param values: The values to write to the specified address
:returns: The number of written registers
"""
count = len(values)
return self.__execute(LIB.modbus_write_registers, address, count, values)
def read_registers(self, address, count=1):
"""
:param address: The starting address to read from
:param count: The number of registers to read
:returns: The resulting read registers
"""
result = compiler.new("uint16_t[]", count)
self.__execute(LIB.modbus_read_registers, address, count, result)
return result
def read_input_registers(self, address, count=1):
"""
:param address: The starting address to read from
:param count: The number of registers to read
:returns: The resulting read registers
"""
result = compiler.new("uint16_t[]", count)
self.__execute(LIB.modbus_read_input_registers, address, count, result)
return result
def read_and_write_registers(self, read_address, read_count, write_address, write_registers):
"""
:param read_address: The address to start reading from
:param read_count: The number of registers to read from address
:param write_address: The address to start writing to
:param write_registers: The registers to write to the specified address
:returns: The resulting read registers
"""
write_count = len(write_registers)
read_result = compiler.new("uint16_t[]", read_count)
self.__execute(LIB.modbus_write_and_read_registers,
write_address, write_count, write_registers,
read_address, read_count, read_result)
return read_result
# -------------------------------------------------------------------------- #
# level2 client
# -------------------------------------------------------------------------- #
class LibmodbusClient(ModbusClientMixin):
""" A facade around the raw level 1 libmodbus client
that implements the pymodbus protocol on top of the lower level
client.
"""
# ----------------------------------------------------------------------- #
# these are used to convert from the pymodbus request types to the
# libmodbus operations (overloaded operator).
# ----------------------------------------------------------------------- #
__methods = {
'ReadCoilsRequest': lambda c, r: c.read_bits(r.address, r.count),
'ReadDiscreteInputsRequest': lambda c, r: c.read_input_bits(r.address,
r.count),
'WriteSingleCoilRequest': lambda c, r: c.write_bit(r.address,
r.value),
'WriteMultipleCoilsRequest': lambda c, r: c.write_bits(r.address,
r.values),
'WriteSingleRegisterRequest': lambda c, r: c.write_register(r.address,
r.value),
'WriteMultipleRegistersRequest':
lambda c, r: c.write_registers(r.address, r.values),
'ReadHoldingRegistersRequest':
lambda c, r: c.read_registers(r.address, r.count),
'ReadInputRegistersRequest':
lambda c, r: c.read_input_registers(r.address, r.count),
'ReadWriteMultipleRegistersRequest':
lambda c, r: c.read_and_write_registers(r.read_address,
r.read_count,
r.write_address,
r.write_registers),
}
# ----------------------------------------------------------------------- #
# these are used to convert from the libmodbus result to the
# pymodbus response type
# ----------------------------------------------------------------------- #
__adapters = {
'ReadCoilsRequest':
lambda tx, rx: ReadCoilsResponse(list(rx)),
'ReadDiscreteInputsRequest':
lambda tx, rx: ReadDiscreteInputsResponse(list(rx)),
'WriteSingleCoilRequest':
lambda tx, rx: WriteSingleCoilResponse(tx.address, rx),
'WriteMultipleCoilsRequest':
lambda tx, rx: WriteMultipleCoilsResponse(tx.address, rx),
'WriteSingleRegisterRequest':
lambda tx, rx: WriteSingleRegisterResponse(tx.address, rx),
'WriteMultipleRegistersRequest':
lambda tx, rx: WriteMultipleRegistersResponse(tx.address, rx),
'ReadHoldingRegistersRequest':
lambda tx, rx: ReadHoldingRegistersResponse(list(rx)),
'ReadInputRegistersRequest':
lambda tx, rx: ReadInputRegistersResponse(list(rx)),
'ReadWriteMultipleRegistersRequest':
lambda tx, rx: ReadWriteMultipleRegistersResponse(list(rx)),
}
def __init__(self, client):
""" Initalize a new instance of the LibmodbusClient. This should
be initialized with one of the LibmodbusLevel1Client instances:
* LibmodbusLevel1Client.create_rtu_client(...)
* LibmodbusLevel1Client.create_tcp_client(...)
:param client: The underlying client instance to operate with.
"""
self.client = client
# ----------------------------------------------------------------------- #
# We use the client mixin to implement the api methods which are all
# forwarded to this method. It is implemented using the previously
# defined lookup tables. Any method not defined simply throws.
# ----------------------------------------------------------------------- #
def execute(self, request):
""" Execute the supplied request against the server.
:param request: The request to process
:returns: The result of the request execution
"""
if self.client.slave != request.unit_id:
self.client.set_slave(request.unit_id)
method = request.__class__.__name__
operation = self.__methods.get(method, None)
adapter = self.__adapters.get(method, None)
if not operation or not adapter:
raise NotImplementedException("Method not "
"implemented: " + operation)
response = operation(self.client, request)
return adapter(request, response)
# ----------------------------------------------------------------------- #
# Other methods can simply be forwarded using the decorator pattern
# ----------------------------------------------------------------------- #
def connect(self):
return self.client.connect()
def close(self):
return self.client.close()
# ----------------------------------------------------------------------- #
# magic methods
# ----------------------------------------------------------------------- #
def __enter__(self):
""" Implement the client with enter block
:returns: The current instance of the client
"""
self.client.connect()
return self
def __exit__(self, klass, value, traceback):
""" Implement the client with exit block """
self.client.close()
# -------------------------------------------------------------------------- #
# main example runner
# -------------------------------------------------------------------------- #
if __name__ == '__main__':
# create our low level client
host = '127.0.0.1'
port = 502
protocol = LibmodbusLevel1Client.create_tcp_client(host, port)
# operate with our high level client
with LibmodbusClient(protocol) as client:
registers = client.write_registers(0, [13, 12, 11])
print(registers)
registers = client.read_holding_registers(0, 10)
print(registers.registers)