diff --git a/tools/rpi/hoymiles/decoders/__init__.py b/tools/rpi/hoymiles/decoders/__init__.py index 8e4e91bb..2b27a2cc 100644 --- a/tools/rpi/hoymiles/decoders/__init__.py +++ b/tools/rpi/hoymiles/decoders/__init__.py @@ -10,6 +10,52 @@ from datetime import datetime, timedelta import crcmod f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus') +f_crc8 = crcmod.mkCrcFun(0x101, initCrc=0, xorOut=0) + +def g_unpack(s_fmt, s_buf): + """Chunk unpack helper + + :param s_fmt: struct format string + :type s_fmt: str + :param s_buf: buffer to unpack + :type s_buf: bytes + :return: decoded data iterator + :rtype: generator object + """ + + cs = struct.calcsize(s_fmt) + s_exc = len(s_buf) % cs + + return struct.iter_unpack(s_fmt, s_buf[:len(s_buf) - s_exc]) + +def print_table_unpack(s_fmt, payload, cw=6): + """ + Print table of decoded numbers with different offsets + Helps recognizing values in unknown payloads + + :param s_fmt: struct format string + :type s_fmt: str + :param payload: bytes data + :type payload: bytes + :param cw: cell width + :type cw: int + :return: None + """ + + l_hexlified = [f'{byte:02x}' for byte in payload] + + print(f'{"Pos": <{cw}}', end='') + print(''.join([f'{num: >{cw}}' for num in range(0, len(payload))])) + print(f'{"Hex": <{cw}}', end='') + print(''.join([f'{byte: >{cw}}' for byte in l_hexlified])) + + l_fmt = struct.calcsize(s_fmt) + if len(payload) >= l_fmt: + for offset in range(0, l_fmt): + print(f'{s_fmt: <{cw}}', end='') + print(' ' * cw * offset, end='') + print(''.join( + [f'{num[0]: >{cw*l_fmt}}' for num in g_unpack(s_fmt, payload[offset:])])) class Response: """ All Response Shared methods """ @@ -140,8 +186,18 @@ class UnknownResponse(Response): """ return ' '.join([f'{byte:02x}' for byte in self.response]) - @property - def valid_crc(self): + def validate_crc8(self): + """ + Checks if self.response has valid CRC8 + + :return: if crc is available and correct + :rtype: bool + """ + # check crc + pcrc = struct.unpack('>B', self.response[-1:])[0] + return f_crc8(self.response[:-1]) == pcrc + + def validate_crc_m(self): """ Checks if self.response has valid Modbus CRC @@ -152,113 +208,10 @@ class UnknownResponse(Response): pcrc = struct.unpack('>H', self.response[-2:])[0] return f_crc_m(self.response[:-2]) == pcrc - @property - def dump_longs(self): - """Get all data, interpreted as long""" - if len(self.response) < 3: - return None + def unpack_table(self, *args): + """Access shared debug function""" + print_table_unpack(*args) - res = self.response - - rem = len(res) % 16 - res = res[:rem*-1] - - vals = None - if len(res) % 16 == 0: - rlen = len(res)/4 - vals = struct.unpack(f'>{int(rlen)}L', res) - - return vals - - @property - def dump_longs_pad1(self): - """Get all data, interpreted as long""" - if len(self.response) < 5: - return None - - res = self.response[2:] - - rem = len(res) % 16 - res = res[:rem*-1] - - vals = None - if len(res) % 16 == 0: - rlen = len(res)/4 - vals = struct.unpack(f'>{int(rlen)}L', res) - - return vals - - @property - def dump_longs_pad2(self): - """Get all data, interpreted as long""" - if len(self.response) < 7: - return None - - res = self.response[4:] - - rem = len(res) % 16 - res = res[:rem*-1] - - vals = None - if len(res) % 16 == 0: - rlen = len(res)/4 - vals = struct.unpack(f'>{int(rlen)}L', res) - - return vals - - @property - def dump_longs_pad3(self): - """Get all data, interpreted as long""" - if len(self.response) < 9: - return None - - res = self.response[6:] - - rem = len(res) % 16 - res = res[:rem*-1] - - vals = None - if len(res) % 16 == 0: - rlen = len(res)/4 - vals = struct.unpack(f'>{int(rlen)}L', res) - - return vals - - @property - def dump_shorts(self): - """Get all data, interpreted as short""" - if len(self.response) < 3: - return None - - res = self.response - - rem = len(res) % 4 - res = res[:rem*-1] - - vals = None - if len(res) % 4 == 0: - rlen = len(res)/2 - vals = struct.unpack(f'>{int(rlen)}H', res) - - return vals - - @property - def dump_shorts_pad1(self): - """Get all data, interpreted as short""" - if len(self.response) < 4: - return None - - res = self.response[1:] - - rem = len(res) % 4 - res = res[:rem*-1] - - vals = None - if len(res) % 4 == 0: - rlen = len(res)/2 - vals = struct.unpack(f'>{int(rlen)}H', res) - - return vals class EventsResponse(UnknownResponse): """ Hoymiles micro-inverter event log decode helper """ @@ -337,7 +290,7 @@ class EventsResponse(UnknownResponse): def __init__(self, *args, **params): super().__init__(*args, **params) - crc_valid = self.valid_crc + crc_valid = self.validate_crc_m() if crc_valid: print(' payload has valid modbus crc') self.response = self.response[:-2] @@ -365,7 +318,12 @@ class DebugDecodeAny(UnknownResponse): def __init__(self, *args, **params): super().__init__(*args, **params) - crc_valid = self.valid_crc + crc8_valid = self.validate_crc8() + if crc8_valid: + print(' payload has valid crc8') + self.response = self.response[:-1] + + crc_valid = self.validate_crc_m() if crc_valid: print(' payload has valid modbus crc') self.response = self.response[:-2] @@ -373,41 +331,17 @@ class DebugDecodeAny(UnknownResponse): l_payload = len(self.response) print(f' payload has {l_payload} bytes') - longs = self.dump_longs - if not longs: - print(' type long : unable to decode (len or not mod 4)') - else: - print(' type long : ' + str(longs)) + print() + print('Field view: int') + print_table_unpack('>B', self.response) - longs = self.dump_longs_pad1 - if not longs: - print(' type long pad1 : unable to decode (len or not mod 4)') - else: - print(' type long pad1 : ' + str(longs)) + print() + print('Field view: shorts') + print_table_unpack('>H', self.response) - longs = self.dump_longs_pad2 - if not longs: - print(' type long pad2 : unable to decode (len or not mod 4)') - else: - print(' type long pad2 : ' + str(longs)) - - longs = self.dump_longs_pad3 - if not longs: - print(' type long pad3 : unable to decode (len or not mod 4)') - else: - print(' type long pad3 : ' + str(longs)) - - shorts = self.dump_shorts - if not shorts: - print(' type short : unable to decode (len or not mod 2)') - else: - print(' type short : ' + str(shorts)) - - shorts = self.dump_shorts_pad1 - if not shorts: - print(' type short pad1: unable to decode (len or not mod 2)') - else: - print(' type short pad1: ' + str(shorts)) + print() + print('Field view: longs') + print_table_unpack('>L', self.response) try: if len(self.response) > 2: @@ -423,6 +357,9 @@ class DebugDecodeAny(UnknownResponse): # 1121-Series Intervers, 1 MPPT, 1 Phase +class Hm300Decode02(EventsResponse): + """ Inverter generic events log """ + class Hm300Decode0B(StatusResponse): """ 1121-series mirco-inverters status data """ @@ -476,6 +413,9 @@ class Hm300Decode12(EventsResponse): # 1141-Series Inverters, 2 MPPT, 1 Phase +class Hm600Decode02(EventsResponse): + """ Inverter generic events log """ + class Hm600Decode0B(StatusResponse): """ 1141-series mirco-inverters status data """ @@ -558,6 +498,9 @@ class Hm600Decode12(EventsResponse): # 1161-Series Inverters, 2 MPPT, 1 Phase +class Hm1200Decode02(EventsResponse): + """ Inverter generic events log """ + class Hm1200Decode0B(StatusResponse): """ 1161-series mirco-inverters status data """