mirror of
https://github.com/lumapu/ahoy.git
synced 2025-08-04 08:58:22 +02:00
Update python decoders improves protocol analysis
This commit is contained in:
parent
d27f0c1148
commit
38346abde2
1 changed files with 122 additions and 10 deletions
|
@ -1,6 +1,9 @@
|
|||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import struct
|
||||
import crcmod
|
||||
|
||||
f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus')
|
||||
|
||||
class StatusResponse:
|
||||
e_keys = ['voltage','current','power','energy_total','energy_daily']
|
||||
|
@ -58,51 +61,148 @@ class UnknownResponse:
|
|||
def hex_ascii(self):
|
||||
return ' '.join([f'{b:02x}' for b in self.response])
|
||||
|
||||
@property
|
||||
def valid_crc(self):
|
||||
# check crc
|
||||
pcrc = struct.unpack('>H', self.response[-2:])[0]
|
||||
return f_crc_m(self.response[:-2]) == pcrc
|
||||
|
||||
@property
|
||||
def dump_longs(self):
|
||||
if len(self.response) < 5:
|
||||
return None
|
||||
|
||||
res = self.response
|
||||
n = len(res)/4
|
||||
|
||||
r = len(res) % 16
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if n % 4 == 0:
|
||||
if len(res) % 16 == 0:
|
||||
n = len(res)/4
|
||||
vals = struct.unpack(f'>{int(n)}L', res)
|
||||
|
||||
return vals
|
||||
|
||||
@property
|
||||
def dump_longs_pad1(self):
|
||||
res = self.response[1:]
|
||||
n = len(res)/4
|
||||
if len(self.response) < 7:
|
||||
return None
|
||||
|
||||
res = self.response[2:]
|
||||
|
||||
r = len(res) % 16
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if n % 4 == 0:
|
||||
if len(res) % 16 == 0:
|
||||
n = len(res)/4
|
||||
vals = struct.unpack(f'>{int(n)}L', res)
|
||||
|
||||
return vals
|
||||
|
||||
@property
|
||||
def dump_longs_pad2(self):
|
||||
if len(self.response) < 9:
|
||||
return None
|
||||
|
||||
res = self.response[4:]
|
||||
|
||||
r = len(res) % 16
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if len(res) % 16 == 0:
|
||||
n = len(res)/4
|
||||
vals = struct.unpack(f'>{int(n)}L', res)
|
||||
|
||||
return vals
|
||||
|
||||
@property
|
||||
def dump_longs_pad3(self):
|
||||
if len(self.response) < 11:
|
||||
return None
|
||||
|
||||
res = self.response[6:]
|
||||
|
||||
r = len(res) % 16
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if len(res) % 16 == 0:
|
||||
n = len(res)/4
|
||||
vals = struct.unpack(f'>{int(n)}L', res)
|
||||
|
||||
return vals
|
||||
|
||||
@property
|
||||
def dump_shorts(self):
|
||||
n = len(self.response)/2
|
||||
if len(self.response) < 5:
|
||||
return None
|
||||
|
||||
res = self.response
|
||||
|
||||
r = len(res) % 4
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if n % 2 == 0:
|
||||
vals = struct.unpack(f'>{int(n)}H', self.response)
|
||||
if len(res) % 4 == 0:
|
||||
n = len(res)/2
|
||||
vals = struct.unpack(f'>{int(n)}H', res)
|
||||
|
||||
return vals
|
||||
|
||||
@property
|
||||
def dump_shorts_pad1(self):
|
||||
if len(self.response) < 6:
|
||||
return None
|
||||
|
||||
res = self.response[1:]
|
||||
n = len(res)/2
|
||||
|
||||
r = len(res) % 4
|
||||
res = res[:r*-1]
|
||||
|
||||
vals = None
|
||||
if n % 2 == 0:
|
||||
if len(res) % 4 == 0:
|
||||
n = len(res)/2
|
||||
vals = struct.unpack(f'>{int(n)}H', res)
|
||||
|
||||
return vals
|
||||
|
||||
class HM600_Decode11(UnknownResponse):
|
||||
def __init__(self, response):
|
||||
self.response = response
|
||||
|
||||
crc_valid = self.valid_crc
|
||||
if crc_valid:
|
||||
print(' payload has valid modbus crc')
|
||||
self.response = response[:-2]
|
||||
|
||||
status = self.response[:2]
|
||||
|
||||
chunk_size = 12
|
||||
for c in range(2, len(self.response), chunk_size):
|
||||
chunk = self.response[c:c+chunk_size]
|
||||
print(' '.join([f'{b:02x}' for b in chunk]) + ': ')
|
||||
print(' BBLHl : ' + str(struct.unpack('>BBLHl', chunk)))
|
||||
print()
|
||||
|
||||
class HM600_Decode12(HM600_Decode11):
|
||||
def __init__(self, response):
|
||||
super().__init__(response)
|
||||
|
||||
class DEBUG_DecodeAny(UnknownResponse):
|
||||
def __init__(self, response):
|
||||
self.response = response
|
||||
|
||||
crc_valid = self.valid_crc
|
||||
if crc_valid:
|
||||
print(' payload has valid modbus crc')
|
||||
self.response = response[:-2]
|
||||
|
||||
l_payload = len(self.response)
|
||||
print(f' payload has {l_payload} bytes')
|
||||
|
||||
longs = self.dump_longs
|
||||
if not longs:
|
||||
print(' type long : unable to decode (len or not mod 4)')
|
||||
|
@ -115,6 +215,18 @@ class DEBUG_DecodeAny(UnknownResponse):
|
|||
else:
|
||||
print(' type long pad1 : ' + str(longs))
|
||||
|
||||
longs = self.dump_longs_pad2
|
||||
if not longs:
|
||||
print(' type long pad2 : unable to decode (len or not mod 4)')
|
||||
else:
|
||||
print(' type long pad2 : ' + str(longs))
|
||||
|
||||
longs = self.dump_longs_pad3
|
||||
if not longs:
|
||||
print(' type long pad3 : unable to decode (len or not mod 4)')
|
||||
else:
|
||||
print(' type long pad3 : ' + str(longs))
|
||||
|
||||
shorts = self.dump_shorts
|
||||
if not shorts:
|
||||
print(' type short : unable to decode (len or not mod 2)')
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue